Compare commits

...

120 Commits

Author SHA1 Message Date
thekiwismarthome 9430811cda Merge pull request #6 from thekiwismarthome/2.3.0---Product-Barcode-Scanner-w/-OpenFoodFacts-Lookup
2.3.0   product barcode scanner w/ open food facts lookup
2026-03-03 17:52:21 +13:00
thekiwismarthome 2b11632253 feat: Improve image download robustness with a User-Agent header and extended timeout, and enhance WebP conversion by ensuring images are in RGB mode. 2026-03-01 22:24:14 +13:00
thekiwismarthome ae7717a8eb feat: Add WebSocket endpoint to download, process, and save product images locally as WebP. 2026-03-01 21:20:09 +13:00
thekiwismarthome 433c03035b feat: Add websocket command to search for products by barcode. 2026-03-01 20:15:29 +13:00
thekiwismarthome 8eb403ed8e docs: extensively update README with detailed features, requirements, and installation instructions. 2026-02-28 22:29:19 +13:00
thekiwismarthome 03fb9a9a67 Merge pull request #5 from thekiwismarthome/v2.1.0---Themes
V2.1.0   themes
2026-02-28 22:16:05 +13:00
thekiwismarthome ae133ae59b feat: Implement comprehensive access control for shopping list and item WebSocket handlers and refine country code validation. 2026-02-28 10:34:40 +13:00
thekiwismarthome 2a8b12a07e Add compiled Python bytecode files for the shopping list manager custom component. 2026-02-28 00:23:52 +13:00
thekiwismarthome a36933c4c6 Merge pull request #4 from thekiwismarthome/v2.0.5-Country-Specific-Product-Catalogues
V2.0.5 country specific product catalogues
2026-02-27 17:57:50 +13:00
thekiwismarthome 402881c687 feat: Add barcode_type field to the Card model and its WebSocket handlers for creation and updates. 2026-02-27 17:35:49 +13:00
thekiwismarthome d412764cba fix: Change the default value of the loyalty card 'private' field to true in the websocket handler. 2026-02-27 14:49:30 +13:00
thekiwismarthome 752f9e5622 feat: add loyalty card management with dedicated storage, data model, and websocket API endpoints. 2026-02-27 13:00:08 +13:00
thekiwismarthome 86896ba4af feat: Introduce private shopping lists with owner and member management, and add Home Assistant user lookup. 2026-02-26 14:41:43 +13:00
thekiwismarthome 36a8939ebc feat: Implement backup/restore functionality for user data via new WebSocket handlers and automatic config backups. 2026-02-26 09:53:05 +13:00
thekiwismarthome 57b6d52ddf feat: Add country selection for product catalogs, including new WebSocket handlers and catalog reloading logic. 2026-02-26 06:51:26 +13:00
thekiwismarthome 03249df651 Merge pull request #3 from thekiwismarthome/v2.0.4---Lots-of-Enhancements
V2.0.4 - Bug Fixes and New Features
2026-02-25 22:40:58 +13:00
thekiwismarthome 11180db0e3 feat: rebuild product search engine immediately after adding a new product 2026-02-25 21:54:33 +13:00
thekiwismarthome 9bdaea0b1b refactor: Replace storage.get_all_products() with storage.get_products() for product retrieval. 2026-02-25 08:19:29 +13:00
thekiwismarthome ec0f44f109 style: update color codes for Health & Beauty and Baby categories. 2026-02-19 11:32:39 +13:00
thekiwismarthome 88b3f2d435 Update handlers.py 2026-02-17 20:58:37 +13:00
thekiwismarthome d93ea86d72 duplicate websocket_search_products in handlers.py 2026-02-17 20:13:17 +13:00
thekiwismarthome 5b3dcb65b4 another fix 2026-02-17 19:14:23 +13:00
thekiwismarthome 94cdede3b9 non-admin event handler 2026-02-17 18:57:40 +13:00
thekiwismarthome e6117073be Sync Bug 2026-02-17 14:53:06 +13:00
thekiwismarthome e76fad5a92 Correct Increment Handler 2026-02-16 22:17:16 +13:00
thekiwismarthome bcfde6df9a Fix the Broken Increment Handler 2026-02-16 22:00:20 +13:00
thekiwismarthome 85e0e68af9 spelling error 2026-02-16 21:39:31 +13:00
thekiwismarthome 11eb698c20 Product Items VACA Sync Bug Fix 2026-02-16 21:27:23 +13:00
thekiwismarthome 0e3fcd56f5 Update __init__.py 2026-02-16 20:14:23 +13:00
thekiwismarthome 9d8fd3f63e Update handlers.py 2026-02-16 20:09:54 +13:00
thekiwismarthome c9c1d16f08 Update handlers.py 2026-02-16 20:05:22 +13:00
thekiwismarthome 3b0cff3476 Update handlers.py 2026-02-16 20:00:58 +13:00
thekiwismarthome 9c98972b40 Merge pull request #2 from thekiwismarthome/v2.0.0-phase2-backend-foundation
V2.0.0 phase2 backend foundation
2026-02-14 16:16:23 +13:00
thekiwismarthome 575e59225f Update handlers.py 2026-02-14 15:44:46 +13:00
thekiwismarthome 960319231f Update __init__.py 2026-02-14 07:42:23 +13:00
thekiwismarthome 4dbae38bd9 Update __init__.py 2026-02-14 07:37:41 +13:00
thekiwismarthome 06eae30773 Update handlers.py 2026-02-14 07:36:09 +13:00
thekiwismarthome 0ad377114b Update storage.py 2026-02-14 07:29:27 +13:00
thekiwismarthome bc584c647a Update storage.py 2026-02-14 07:27:08 +13:00
thekiwismarthome 70dc3f1693 Update storage.py 2026-02-14 07:25:07 +13:00
thekiwismarthome b9ae304ba0 Update __init__.py 2026-02-14 07:22:53 +13:00
thekiwismarthome 18040e5017 Update __init__.py 2026-02-14 07:21:19 +13:00
thekiwismarthome 76d0eed1d6 Update storage.py 2026-02-14 07:12:31 +13:00
thekiwismarthome fd841270d8 Create search.py 2026-02-14 07:11:59 +13:00
thekiwismarthome 30859a7d26 Update manifest.json 2026-02-14 07:11:25 +13:00
thekiwismarthome 91615964c5 Update storage.py 2026-02-14 07:05:08 +13:00
thekiwismarthome 61a7678e24 Update category_loader.py 2026-02-14 07:04:22 +13:00
thekiwismarthome c7310d4213 Update storage.py 2026-02-14 07:01:47 +13:00
thekiwismarthome f8324ccf8f Update manifest.json 2026-02-14 06:59:55 +13:00
thekiwismarthome 9a74fd9027 Update catalog_loader.py 2026-02-14 06:59:16 +13:00
thekiwismarthome 1b05185dfe Update models.py 2026-02-14 06:52:29 +13:00
thekiwismarthome 94f771d0e3 Update storage.py 2026-02-14 06:51:55 +13:00
thekiwismarthome e8ba135d9d Add files via upload 2026-02-13 22:47:57 +13:00
thekiwismarthome 02cddeafc1 Update products_catalog_nz.json 2026-02-13 22:19:59 +13:00
thekiwismarthome 5378f79ac4 Update config_flow.py 2026-02-13 21:53:39 +13:00
thekiwismarthome d22b234f68 Update __init__.py 2026-02-13 21:38:33 +13:00
thekiwismarthome b4ea6bc7f0 Update config_flow.py 2026-02-13 21:36:37 +13:00
thekiwismarthome e09f9004a6 Update storage.py 2026-02-13 21:21:58 +13:00
thekiwismarthome b349a3142a Update storage.py 2026-02-13 21:19:57 +13:00
thekiwismarthome 5fab64bd4d Update __init__.py 2026-02-13 21:19:07 +13:00
thekiwismarthome d1573846cd Update config_flow.py 2026-02-13 21:18:13 +13:00
thekiwismarthome b2da5e62bd Update storage.py 2026-02-13 20:54:33 +13:00
thekiwismarthome f0c2bb6f29 Update __init__.py 2026-02-13 20:52:57 +13:00
thekiwismarthome ab5bd34c07 Update __init__.py 2026-02-13 20:19:04 +13:00
thekiwismarthome f80efe7582 Update storage.py 2026-02-13 20:16:50 +13:00
thekiwismarthome e21d136369 Update catalog_loader.py 2026-02-13 20:12:09 +13:00
thekiwismarthome 8689e8aa3f Create catalog_loader.py 2026-02-13 20:11:56 +13:00
thekiwismarthome 5f5066e6d8 Update products_catalog_nz.json 2026-02-13 15:46:28 +13:00
thekiwismarthome 0345c091be Create products_catalog_nz.json 2026-02-13 15:08:58 +13:00
thekiwismarthome 073218b851 Update const.py 2026-02-13 15:07:18 +13:00
thekiwismarthome 0258bc8690 Update images.py 2026-02-13 15:06:11 +13:00
thekiwismarthome 5a7b8203f6 Create __init__.py 2026-02-13 15:00:39 +13:00
thekiwismarthome c7f91ac37c Update manifest.json 2026-02-13 15:00:11 +13:00
thekiwismarthome 7dda89c5cb Create images.py 2026-02-13 14:54:38 +13:00
thekiwismarthome 95b8a99304 Update config_flow.py 2026-02-13 14:53:47 +13:00
thekiwismarthome acce175d0a Update __init__.py 2026-02-13 14:25:46 +13:00
thekiwismarthome 4a9a12ea7a Update const.py 2026-02-13 13:57:19 +13:00
thekiwismarthome 980a99c7ff Update const.py 2026-02-13 13:56:07 +13:00
thekiwismarthome 3829039a8a Delete custom_components/shopping_list_manager/websocket_api.py 2026-02-13 13:54:38 +13:00
thekiwismarthome 5af3e83fde Delete custom_components/shopping_list_manager/manager.py 2026-02-13 13:54:23 +13:00
thekiwismarthome 4ce63938e9 Update models.py 2026-02-13 13:53:36 +13:00
thekiwismarthome c67cc4b445 Update manifest.json 2026-02-13 13:15:02 +13:00
thekiwismarthome bdff875fcb Update hacs.json 2026-02-13 13:14:37 +13:00
thekiwismarthome 5c33f01e51 Update hacs.json 2026-02-13 13:07:06 +13:00
thekiwismarthome 4f56c655ca Update const.py 2026-02-13 12:49:59 +13:00
thekiwismarthome d0806aaf69 Update storage.py 2026-02-13 12:45:03 +13:00
thekiwismarthome 8bc67b5331 Update category_loader.py 2026-02-13 12:42:51 +13:00
thekiwismarthome ab59be6f1d Update const.py 2026-02-13 12:42:22 +13:00
thekiwismarthome 296a8ad5fd Update storage.py 2026-02-13 12:24:19 +13:00
thekiwismarthome 16b594f11a Update __init__.py 2026-02-13 10:48:44 +13:00
thekiwismarthome 3303776cd0 Update handlers.py 2026-02-13 10:48:10 +13:00
thekiwismarthome 832d29b0f9 Create storage.py 2026-02-13 10:35:26 +13:00
thekiwismarthome b91d027d08 Update manifest.json 2026-02-13 10:34:35 +13:00
thekiwismarthome a8ee630b7c Update manifest.json 2026-02-13 10:34:04 +13:00
thekiwismarthome 4135231a7e Create __init__.py 2026-02-13 10:33:19 +13:00
thekiwismarthome 903d2577c0 Create handlers.py 2026-02-13 10:32:33 +13:00
thekiwismarthome d3b19e61c5 Add files via upload 2026-02-13 10:32:00 +13:00
thekiwismarthome 0ec00d38ec Create category_loader.py 2026-02-13 10:31:46 +13:00
thekiwismarthome b6bfa78eb5 Add files via upload 2026-02-13 10:31:02 +13:00
thekiwismarthome 9022ed0dc8 Merge pull request #1 from thekiwismarthome/Phase-2.5-–-List-Ownership-&-Visibility-Architecture
Phase 2.5 – list ownership & visibility architecture
2026-02-12 18:38:16 +13:00
thekiwismarthome 8b2197794a Update manager.py 2026-02-12 12:03:08 +13:00
thekiwismarthome 0eb06f240d Update websocket_api.py 2026-02-12 12:02:49 +13:00
thekiwismarthome b6941a1499 Update __init__.py 2026-02-12 12:02:22 +13:00
thekiwismarthome 78829aefb0 Update websocket_api.py 2026-02-12 11:38:28 +13:00
thekiwismarthome c114645f88 Update manager.py 2026-02-12 11:37:55 +13:00
thekiwismarthome 5e043d45f2 v1.5.0 - Added dropdown list for Lists 2026-02-10 23:40:30 +13:00
thekiwismarthome 17cb680f1f Downgrade version from 1.4.0 to 1.0.0 2026-02-10 11:06:50 +13:00
thekiwismarthome 94450a4450 Update installation instructions and add Shopping List Card
Added a section for the Shopping List Card integration.
2026-02-10 11:06:20 +13:00
thekiwismarthome ae11549469 Revise installation instructions in README.md
Updated installation instructions and added manual installation steps.
2026-02-10 09:21:20 +13:00
thekiwismarthome 5429b97605 Delete custom_components/shopping_list_manager/frontend directory
splitting Integration and Card
2026-02-10 09:09:40 +13:00
thekiwismarthome 0664f5331a Update manifest.json 2026-02-06 23:56:43 +13:00
thekiwismarthome 30d8d8defd Bump version to 1.3.0 in manifest.json 2026-02-06 23:52:37 +13:00
thekiwismarthome 07329323bf Refactor configuration handling and improve product filtering
Refactor setConfig method for improved validation and normalization of configuration. Update fuzzy matching and rendering logic for inactive products.
2026-02-06 23:51:53 +13:00
thekiwismarthome 7cf703a9ac Update manifest.json 2026-02-06 16:19:06 +13:00
thekiwismarthome 1edf4b8eea Update manifest.json 2026-02-06 16:08:21 +13:00
thekiwismarthome 6376c5148d Update README.md 2026-02-06 09:48:41 +13:00
thekiwismarthome 54cbcd5298 Bump version to 1.1.0 in manifest.json 2026-02-06 09:22:47 +13:00
thekiwismarthome 5c093d2f3b Delete shopping_list_card.js 2026-02-06 09:19:53 +13:00
thekiwismarthome aef90630db Update shopping_list_card.js 2026-02-06 09:19:27 +13:00
thekiwismarthome a79e4e7171 Create shopping_list_card.js 2026-02-06 09:19:03 +13:00
35 changed files with 31145 additions and 2907 deletions
+84 -22
View File
@@ -1,36 +1,98 @@
# Shopping List Manager Integration for Home Assistant
The backend integration that powers the Shopping List Manager. Provides persistent multi-list storage, a 500+ product catalog, real-time WebSocket events, and a full API for the Lovelace card — all running natively inside Home Assistant.
> **Pair with the [Shopping List Manager Card](https://github.com/thekiwismarthome/shopping-list-manager-card)** for the full UI experience.
[![Open your Home Assistant instance and open this repository inside the Home Assistant Community Store.](https://my.home-assistant.io/badges/hacs_repository.svg)](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager&category=integration)
---
## Features
### 🛒 Multi-List Management
- Create and manage multiple shopping lists
- Private or shared lists with per-member access control
- Active list state shared across all connected devices and users
- List total price calculation
### 📦 Items
- Add, update, check, and delete items with quantity and unit
- Atomic quantity increment / decrement
- Bulk check and clear checked items
- Per-item pricing, notes, and category assignment
### 🔍 Product Catalog
- **500+ products** (NZ-focused, extensible to AU, US, GB, CA)
- Fuzzy search with alias matching
- Recently-used product suggestions
- Custom product creation
- Allergen filtering and product substitute groups
- Product images (WebP, 200×200px, optimised)
### 🗂️ Categories
- 13 default categories — Produce, Dairy, Meat, Bakery, Pantry, Frozen, Beverages, Snacks, Household, Health, Pet, Baby, Other
- Category colour coding and emoji icons
- Per-list category ordering
### 💳 Loyalty Cards
- Store loyalty and rewards card data
- Private or shared card access per user
### 🔄 Real-Time Events
- All changes fire events on the Home Assistant bus
- Custom WebSocket subscription proxy so **non-admin users** receive live updates without requiring HA admin privileges
---
## Requirements
| Component | Minimum Version |
|---|---|
| Home Assistant | 2024.1 |
| HACS | 2.x |
---
## Installation ## Installation
### 1. Install via HACS ### Via HACS (Recommended)
[![Open your Home Assistant instance and open a repository inside the Home Assistant Community Store.](https://my.home-assistant.io/badges/hacs_repository.svg)](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager&category=integration) [![Open your Home Assistant instance and open this repository inside the Home Assistant Community Store.](https://my.home-assistant.io/badges/hacs_repository.svg)](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager&category=integration)
Click the button above or manually add the custom repository in HACS. 1. Click the button above
Restart HA 2. Confirm adding the repository to HACS
3. Install **Shopping List Manager** from **HACS → Integrations**
4. Restart Home Assistant
5. Go to **Settings → Devices & Services → Add Integration** and search for **Shopping List Manager**
### 2. Add the Card Resource ### Manual Installation
After installing via HACS, add the frontend resource: 1. Copy the `custom_components/shopping_list_manager/` folder into your HA `/config/custom_components/` directory
2. Restart Home Assistant
3. Go to **Settings → Devices & Services → Add Integration** and search for **Shopping List Manager**
1. Go to Settings → Dashboards ---
2. Click ⋮ (three dots, top right) → Resources
3. Click "+ Add Resource"
4. URL: `/local/community/shopping-list-manager/shopping_list_card.js`
5. Resource type: JavaScript Module
6. Click "Create"
### 3. Add the Integration ## Lovelace Card
[![Open your Home Assistant instance and start setting up a new integration.](https://my.home-assistant.io/badges/config_flow_start.svg)](https://my.home-assistant.io/redirect/config_flow_start/?domain=shopping_list_manager) Install the companion card to get the full shopping UI:
Click the button above or manually add via Settings → Devices & Services. [![Open your Home Assistant instance and open this repository inside the Home Assistant Community Store.](https://my.home-assistant.io/badges/hacs_repository.svg)](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager-card&category=plugin)
### 4. Add Card to Dashboard ---
```yaml
type: custom:shopping-list-card
title: Shopping List
list_id: groceries
```
## Documentation
Use the ⚙️ cog button in the card to configure settings. Full documentation is available in the [Wiki](https://github.com/thekiwismarthome/shopping-list-manager/wiki).
## Support & Feedback
- [Open an Issue](https://github.com/thekiwismarthome/shopping-list-manager/issues)
- [Home Assistant Community Forum](https://community.home-assistant.io)
---
## License
MIT — see [LICENSE](LICENSE) for details.
@@ -1,49 +1,283 @@
""" """Shopping List Manager integration for Home Assistant."""
Shopping List Manager - Home Assistant Custom Integration
Clean-slate architecture with enforced invariants
"""
import logging import logging
import os
from pathlib import Path
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.components import websocket_api as ha_websocket from homeassistant.helpers.typing import ConfigType
from .const import DOMAIN from .const import DOMAIN, EVENT_ITEM_ADDED, EVENT_ITEM_UPDATED, EVENT_ITEM_CHECKED, EVENT_ITEM_DELETED, EVENT_LIST_UPDATED, EVENT_LIST_DELETED
from .manager import ShoppingListManager from .storage import ShoppingListStorage
# Import websocket handler functions directly from .utils.images import ImageHandler
from .websocket_api import (
websocket_add_product,
websocket_set_qty,
websocket_get_products,
websocket_get_active,
websocket_delete_product,
)
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
# Track storage instance globally
DATA_STORAGE = f"{DOMAIN}_storage"
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Shopping List Manager component from yaml (not used)."""
# This integration doesn't support YAML configuration
# All setup is done via config entries (UI configuration)
return True
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Shopping List Manager from a config entry.""" """Set up Shopping List Manager from a config entry."""
# Initialize the manager _LOGGER.info("Setting up Shopping List Manager")
manager = ShoppingListManager(hass)
await manager.async_load()
# Store manager in hass.data # Get component path for loading data files
component_path = os.path.dirname(__file__)
config_path = hass.config.path()
# Get country from options (or fall back to data, or default to NZ)
country = entry.options.get("country") or entry.data.get("country", "NZ")
_LOGGER.info("Using country: %s", country)
# Initialize storage with country
storage = ShoppingListStorage(hass, component_path, country)
await storage.async_load()
# Initialize image handler
image_handler = ImageHandler(hass, config_path)
# Store instances in hass.data
hass.data.setdefault(DOMAIN, {}) hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN]["manager"] = manager hass.data[DOMAIN][DATA_STORAGE] = storage
hass.data[DOMAIN]["image_handler"] = image_handler
hass.data[DOMAIN]["country"] = country
# Register WebSocket commands using Home Assistant's websocket_api # Register update listener for options changes
ha_websocket.async_register_command(hass, websocket_add_product) entry.async_on_unload(entry.add_update_listener(update_listener))
ha_websocket.async_register_command(hass, websocket_set_qty)
ha_websocket.async_register_command(hass, websocket_get_products)
ha_websocket.async_register_command(hass, websocket_get_active)
ha_websocket.async_register_command(hass, websocket_delete_product)
_LOGGER.info("Shopping List Manager setup complete - registered 5 WebSocket commands") # Register WebSocket commands
await _async_register_websocket_handlers(hass, storage)
# Register frontend resources
await _async_register_frontend(hass)
# CRITICAL: Register event listeners so non-admin users can subscribe
# Without these, non-admin users cannot receive real-time updates
def _dummy_listener(event):
"""Dummy listener to enable event subscription for all users."""
pass
hass.bus.async_listen(EVENT_ITEM_ADDED, _dummy_listener)
hass.bus.async_listen(EVENT_ITEM_UPDATED, _dummy_listener)
hass.bus.async_listen(EVENT_ITEM_CHECKED, _dummy_listener)
hass.bus.async_listen(EVENT_ITEM_DELETED, _dummy_listener)
hass.bus.async_listen(EVENT_LIST_UPDATED, _dummy_listener)
hass.bus.async_listen(EVENT_LIST_DELETED, _dummy_listener)
_LOGGER.info("Shopping List Manager setup complete with event subscriptions enabled")
return True return True
async def update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Handle options update."""
# Reload the integration when options change
await hass.config_entries.async_reload(entry.entry_id)
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload Shopping List Manager.""" """Unload a config entry."""
hass.data[DOMAIN].pop("manager", None) _LOGGER.info("Unloading Shopping List Manager")
# Clean up hass.data
if DOMAIN in hass.data:
hass.data[DOMAIN].pop(DATA_STORAGE, None)
return True return True
async def _async_register_websocket_handlers(
hass: HomeAssistant,
storage: ShoppingListStorage
) -> None:
"""Register WebSocket API handlers."""
from homeassistant.components import websocket_api
from .websocket import handlers
# Lists handlers
websocket_api.async_register_command(
hass,
handlers.websocket_subscribe,
)
websocket_api.async_register_command(
hass,
handlers.websocket_get_lists,
)
websocket_api.async_register_command(
hass,
handlers.websocket_create_list,
)
websocket_api.async_register_command(
hass,
handlers.websocket_update_list,
)
websocket_api.async_register_command(
hass,
handlers.websocket_delete_list,
)
websocket_api.async_register_command(
hass,
handlers.websocket_set_active_list,
)
# Items handlers
websocket_api.async_register_command(
hass,
handlers.websocket_get_items,
)
websocket_api.async_register_command(
hass,
handlers.websocket_add_item,
)
websocket_api.async_register_command(
hass,
handlers.websocket_update_item,
)
websocket_api.async_register_command(
hass,
handlers.websocket_check_item,
)
websocket_api.async_register_command(
hass,
handlers.websocket_increment_item,
)
websocket_api.async_register_command(
hass,
handlers.websocket_delete_item,
)
websocket_api.async_register_command(
hass,
handlers.websocket_reorder_items,
)
websocket_api.async_register_command(
hass,
handlers.websocket_bulk_check_items,
)
websocket_api.async_register_command(
hass,
handlers.websocket_clear_checked_items,
)
websocket_api.async_register_command(
hass,
handlers.websocket_get_list_total,
)
# Products handlers
websocket_api.async_register_command(
hass,
handlers.websocket_download_product_image,
)
websocket_api.async_register_command(
hass,
handlers.websocket_search_by_barcode,
)
websocket_api.async_register_command(
hass,
handlers.websocket_search_products,
)
websocket_api.async_register_command(
hass,
handlers.ws_get_products_by_ids,
)
websocket_api.async_register_command(
hass,
handlers.websocket_get_product_suggestions,
)
websocket_api.async_register_command(
hass,
handlers.websocket_add_product,
)
websocket_api.async_register_command(
hass,
handlers.websocket_update_product,
)
websocket_api.async_register_command(
hass,
handlers.websocket_get_product_substitutes,
)
# Categories handlers
websocket_api.async_register_command(
hass,
handlers.websocket_get_categories,
)
# Integration settings handlers
websocket_api.async_register_command(
hass,
handlers.websocket_get_integration_settings,
)
websocket_api.async_register_command(
hass,
handlers.websocket_set_country,
)
# Backup / Restore handlers
websocket_api.async_register_command(
hass,
handlers.websocket_export_data,
)
websocket_api.async_register_command(
hass,
handlers.websocket_import_data,
)
# List members handler
websocket_api.async_register_command(
hass,
handlers.websocket_update_list_members,
)
# HA users handler
websocket_api.async_register_command(
hass,
handlers.websocket_get_ha_users,
)
# Loyalty card handlers
websocket_api.async_register_command(
hass,
handlers.websocket_get_loyalty_cards,
)
websocket_api.async_register_command(
hass,
handlers.websocket_add_loyalty_card,
)
websocket_api.async_register_command(
hass,
handlers.websocket_update_loyalty_card,
)
websocket_api.async_register_command(
hass,
handlers.websocket_delete_loyalty_card,
)
websocket_api.async_register_command(
hass,
handlers.websocket_update_loyalty_card_members,
)
_LOGGER.debug("WebSocket handlers registered")
async def _async_register_frontend(hass: HomeAssistant) -> None:
"""Register frontend resources."""
# Since frontend is a separate HACS module, we don't need to register it here
# The frontend card registers itself independently
_LOGGER.debug("Frontend resources skipped (separate HACS module)")
def get_storage(hass: HomeAssistant) -> ShoppingListStorage:
"""Get the storage instance from hass.data.
Helper function for WebSocket handlers to access storage.
"""
return hass.data[DOMAIN][DATA_STORAGE]
@@ -1,5 +1,7 @@
"""Config flow for Shopping List Manager.""" """Config flow for Shopping List Manager."""
import voluptuous as vol
from homeassistant import config_entries from homeassistant import config_entries
from homeassistant.core import callback
from .const import DOMAIN from .const import DOMAIN
@@ -16,10 +18,76 @@ class ShoppingListManagerConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
return self.async_abort(reason="single_instance_allowed") return self.async_abort(reason="single_instance_allowed")
if user_input is not None: if user_input is not None:
# Create entry with default country
return self.async_create_entry( return self.async_create_entry(
title="Shopping List Manager", title="Shopping List Manager",
data={} data={"country": "NZ"},
options={
"country": "NZ",
"enable_price_tracking": True,
"enable_image_search": True,
"metric_units_only": True,
}
) )
# Show simple form # Show simple setup form
return self.async_show_form(step_id="user") return self.async_show_form(
step_id="user",
description_placeholders={
"info": "Country and other settings can be configured after setup via the Configure button."
}
)
@staticmethod
@callback
def async_get_options_flow(config_entry):
"""Get the options flow for this handler."""
return OptionsFlowHandler(config_entry)
class OptionsFlowHandler(config_entries.OptionsFlow):
"""Handle options flow for Shopping List Manager."""
def __init__(self, config_entry):
"""Initialize options flow."""
self.config_entry = config_entry
async def async_step_init(self, user_input=None):
"""Manage the options."""
if user_input is not None:
# Update options
return self.async_create_entry(title="", data=user_input)
# Get current settings
current_country = self.config_entry.options.get(
"country",
self.config_entry.data.get("country", "NZ")
)
return self.async_show_form(
step_id="init",
data_schema=vol.Schema({
vol.Required("country", default=current_country): vol.In({
"NZ": "New Zealand",
"AU": "Australia",
"US": "United States",
"GB": "United Kingdom",
"CA": "Canada",
}),
vol.Optional(
"enable_price_tracking",
default=self.config_entry.options.get("enable_price_tracking", True)
): bool,
vol.Optional(
"enable_image_search",
default=self.config_entry.options.get("enable_image_search", True)
): bool,
vol.Optional(
"metric_units_only",
default=self.config_entry.options.get("metric_units_only", True)
): bool,
}),
description_placeholders={
"info": "Changing country will reload the product catalog on next restart."
}
)
@@ -1,11 +1,131 @@
"""Constants for Shopping List Manager.""" """Constants for Shopping List Manager."""
# Domain
DOMAIN = "shopping_list_manager" DOMAIN = "shopping_list_manager"
# Storage keys # Storage Keys
STORAGE_VERSION = 1 STORAGE_VERSION = 2
STORAGE_KEY_LISTS = f"{DOMAIN}.lists"
STORAGE_KEY_ITEMS = f"{DOMAIN}.items"
STORAGE_KEY_PRODUCTS = f"{DOMAIN}.products" STORAGE_KEY_PRODUCTS = f"{DOMAIN}.products"
STORAGE_KEY_ACTIVE = f"{DOMAIN}.active_list" STORAGE_KEY_CATEGORIES = f"{DOMAIN}.categories"
STORAGE_KEY_LOYALTY_CARDS = f"{DOMAIN}.loyalty_cards"
# WebSocket Commands - Lists
WS_TYPE_LISTS_GET_ALL = f"{DOMAIN}/lists/get_all"
WS_TYPE_LISTS_CREATE = f"{DOMAIN}/lists/create"
WS_TYPE_LISTS_UPDATE = f"{DOMAIN}/lists/update"
WS_TYPE_LISTS_DELETE = f"{DOMAIN}/lists/delete"
WS_TYPE_LISTS_SET_ACTIVE = f"{DOMAIN}/lists/set_active"
WS_TYPE_LISTS_UPDATE_MEMBERS = f"{DOMAIN}/lists/update_members"
# WebSocket Commands - Users
WS_TYPE_USERS_GET_ALL = f"{DOMAIN}/users/get_all"
# WebSocket Commands - Items
WS_TYPE_ITEMS_GET = f"{DOMAIN}/items/get"
WS_TYPE_ITEMS_ADD = f"{DOMAIN}/items/add"
WS_TYPE_ITEMS_UPDATE = f"{DOMAIN}/items/update"
WS_TYPE_ITEMS_CHECK = f"{DOMAIN}/items/check"
WS_TYPE_ITEMS_DELETE = f"{DOMAIN}/items/delete"
WS_TYPE_ITEMS_REORDER = f"{DOMAIN}/items/reorder"
WS_TYPE_ITEMS_BULK_CHECK = f"{DOMAIN}/items/bulk_check"
WS_TYPE_ITEMS_CLEAR_CHECKED = f"{DOMAIN}/items/clear_checked"
WS_TYPE_ITEMS_GET_TOTAL = f"{DOMAIN}/items/get_total"
# WebSocket Commands - Products
WS_TYPE_PRODUCTS_SEARCH = f"{DOMAIN}/products/search"
WS_TYPE_PRODUCTS_SUGGESTIONS = f"{DOMAIN}/products/suggestions"
WS_TYPE_PRODUCTS_ADD = f"{DOMAIN}/products/add"
WS_TYPE_PRODUCTS_UPDATE = f"{DOMAIN}/products/update"
WS_TYPE_PRODUCTS_DELETE = f"{DOMAIN}/products/delete"
# WebSocket Commands - Categories
WS_TYPE_CATEGORIES_GET_ALL = f"{DOMAIN}/categories/get_all"
WS_TYPE_CATEGORIES_REORDER = f"{DOMAIN}/categories/reorder"
# WebSocket Commands - Loyalty Cards
WS_TYPE_LOYALTY_GET_ALL = f"{DOMAIN}/loyalty/get_all"
WS_TYPE_LOYALTY_ADD = f"{DOMAIN}/loyalty/add"
WS_TYPE_LOYALTY_UPDATE = f"{DOMAIN}/loyalty/update"
WS_TYPE_LOYALTY_DELETE = f"{DOMAIN}/loyalty/delete"
WS_TYPE_LOYALTY_UPDATE_MEMBERS = f"{DOMAIN}/loyalty/update_members"
# WebSocket Commands - Subscriptions
WS_TYPE_SUBSCRIBE = f"{DOMAIN}/subscribe"
WS_TYPE_UNSUBSCRIBE = f"{DOMAIN}/unsubscribe"
# WebSocket Commands - Barcode (Phase 5)
WS_TYPE_BARCODE_SCAN = f"{DOMAIN}/barcode/scan"
WS_TYPE_BARCODE_ADD = f"{DOMAIN}/barcode/add_to_list"
# WebSocket Commands - OpenFoodFacts (Phase 5)
WS_TYPE_OFF_FETCH = f"{DOMAIN}/openfoodfacts/fetch"
WS_TYPE_OFF_IMPORT = f"{DOMAIN}/openfoodfacts/import"
# Events # Events
EVENT_SHOPPING_LIST_UPDATED = f"{DOMAIN}_updated" EVENT_ITEM_ADDED = f"{DOMAIN}_item_added"
EVENT_ITEM_UPDATED = f"{DOMAIN}_item_updated"
EVENT_ITEM_CHECKED = f"{DOMAIN}_item_checked"
EVENT_ITEM_DELETED = f"{DOMAIN}_item_deleted"
EVENT_LIST_UPDATED = f"{DOMAIN}_list_updated"
EVENT_LIST_DELETED = f"{DOMAIN}_list_deleted"
# Image Configuration
IMAGE_FORMAT = "webp"
IMAGE_SIZE = 200 # 200x200px
IMAGE_QUALITY = 85
IMAGE_MAX_SIZE_KB = 15
IMAGES_LOCAL_DIR = "www/shopping_list_manager/images"
# Placeholder image (inline SVG)
PLACEHOLDER_IMAGE = "data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='200' height='200'%3E%3Crect width='200' height='200' fill='%23f0f0f0'/%3E%3Ctext x='50%25' y='50%25' dominant-baseline='middle' text-anchor='middle' font-family='Arial' font-size='16' fill='%23999'%3ENo Image%3C/text%3E%3C/svg%3E"
# Metric Units (always metric, regardless of country)
METRIC_UNITS = {
"weight": ["kg", "g"],
"volume": ["L", "mL"],
"count": ["units", "pack", "loaf", "dozen", "ea", "pkt", "tray", "bottle", "can", "bunch", "pottle", "roll", "sachet", "tub", "bar"]
}
# Default quantities for common products (NZ-focused, can be country-specific later)
DEFAULT_QUANTITIES = {
"milk": {"quantity": 2, "unit": "L"},
"bread": {"quantity": 1, "unit": "loaf"},
"butter": {"quantity": 500, "unit": "g"},
"eggs": {"quantity": 12, "unit": "ea"},
"cheese": {"quantity": 500, "unit": "g"},
"yogurt": {"quantity": 1, "unit": "kg"},
"flour": {"quantity": 1.5, "unit": "kg"},
"sugar": {"quantity": 1.5, "unit": "kg"},
"rice": {"quantity": 1, "unit": "kg"},
"pasta": {"quantity": 500, "unit": "g"},
"chicken breast": {"quantity": 1, "unit": "kg"},
"beef mince": {"quantity": 500, "unit": "g"},
"sausages": {"quantity": 500, "unit": "g"},
"bacon": {"quantity": 500, "unit": "g"},
"apples": {"quantity": 1, "unit": "kg"},
"bananas": {"quantity": 1, "unit": "kg"},
"potatoes": {"quantity": 2, "unit": "kg"},
"onions": {"quantity": 1, "unit": "kg"},
"carrots": {"quantity": 1, "unit": "kg"},
"tomatoes": {"quantity": 500, "unit": "g"},
"lettuce": {"quantity": 1, "unit": "ea"},
"capsicum": {"quantity": 1, "unit": "ea"},
"broccoli": {"quantity": 1, "unit": "ea"},
"cereal": {"quantity": 1, "unit": "pack"},
"baked beans": {"quantity": 1, "unit": "can"},
"tuna": {"quantity": 1, "unit": "can"},
"olive oil": {"quantity": 1, "unit": "L"},
"coffee": {"quantity": 200, "unit": "g"},
"tea bags": {"quantity": 100, "unit": "ea"},
"toilet paper": {"quantity": 12, "unit": "roll"},
"paper towels": {"quantity": 2, "unit": "roll"},
"dishwashing liquid": {"quantity": 500, "unit": "mL"},
"laundry powder": {"quantity": 2, "unit": "kg"}
}
# Paths
CATEGORIES_FILE = "categories.json"
PRODUCTS_CATALOG_FILE = "products_catalog.json"
IMAGES_PATH = "images/products"
@@ -0,0 +1,62 @@
"""Product catalog loader for Shopping List Manager."""
import json
import logging
from typing import List, Dict, Any
import aiofiles
_LOGGER = logging.getLogger(__name__)
async def load_product_catalog(component_path: str, country_code: str = "NZ") -> List[Dict[str, Any]]:
"""Load product catalog from JSON file asynchronously.
Args:
component_path: Path to the component directory
country_code: Country code (e.g., 'NZ', 'AU', 'US')
Returns:
List of product dictionaries
"""
import os
# Try country-specific catalog first
if country_code:
catalog_file = os.path.join(
component_path, "data", f"products_catalog_{country_code.lower()}.json"
)
if not os.path.exists(catalog_file):
_LOGGER.warning(
"No country-specific catalog found for %s at %s",
country_code,
catalog_file
)
return []
else:
return []
try:
# Use aiofiles for async file reading
async with aiofiles.open(catalog_file, "r", encoding="utf-8") as f:
content = await f.read()
data = json.loads(content)
_LOGGER.info(
"Loaded product catalog version %s for region %s",
data.get("version", "unknown"),
data.get("region", "default")
)
products = data.get("products", [])
_LOGGER.info("Loaded %d products from catalog", len(products))
return products
except FileNotFoundError:
_LOGGER.error("Product catalog file not found: %s", catalog_file)
return []
except json.JSONDecodeError as err:
_LOGGER.error("Failed to parse product catalog file: %s", err)
return []
except Exception as err:
_LOGGER.error("Unexpected error loading product catalog: %s", err)
return []
@@ -0,0 +1,110 @@
{
"version": "1.0.0",
"region": "NZ",
"categories": [
{
"id": "produce",
"name": "Fruit & Veg",
"icon": "mdi:fruit-cherries",
"color": "#4CAF50",
"sort_order": 1,
"system": true
},
{
"id": "dairy",
"name": "Dairy & Eggs",
"icon": "mdi:cheese",
"color": "#FFC107",
"sort_order": 2,
"system": true
},
{
"id": "meat",
"name": "Meat & Seafood",
"icon": "mdi:food-steak",
"color": "#F44336",
"sort_order": 3,
"system": true
},
{
"id": "bakery",
"name": "Bakery",
"icon": "mdi:bread-slice",
"color": "#FF9800",
"sort_order": 4,
"system": true
},
{
"id": "frozen",
"name": "Frozen Foods",
"icon": "mdi:snowflake",
"color": "#2196F3",
"sort_order": 5,
"system": true
},
{
"id": "pantry",
"name": "Pantry",
"icon": "mdi:package-variant",
"color": "#795548",
"sort_order": 6,
"system": true
},
{
"id": "beverages",
"name": "Drinks",
"icon": "mdi:cup",
"color": "#00BCD4",
"sort_order": 7,
"system": true
},
{
"id": "snacks",
"name": "Snacks & Biscuits",
"icon": "mdi:food-apple",
"color": "#E91E63",
"sort_order": 8,
"system": true
},
{
"id": "household",
"name": "Household",
"icon": "mdi:spray-bottle",
"color": "#9C27B0",
"sort_order": 9,
"system": true
},
{
"id": "health",
"name": "Health & Beauty",
"icon": "mdi:heart-pulse",
"color": "#009688",
"sort_order": 10,
"system": true
},
{
"id": "pet",
"name": "Pet Supplies",
"icon": "mdi:paw",
"color": "#FF5722",
"sort_order": 11,
"system": true
},
{
"id": "baby",
"name": "Baby",
"icon": "mdi:baby-face",
"color": "#F48FB1",
"sort_order": 12,
"system": true
},
{
"id": "other",
"name": "Other",
"icon": "mdi:dots-horizontal",
"color": "#9E9E9E",
"sort_order": 99,
"system": true
}
]
}
@@ -0,0 +1,98 @@
"""Category loader utility."""
import json
import logging
import os
from typing import List, Dict, Any
import aiofiles
_LOGGER = logging.getLogger(__name__)
async def load_categories(component_path: str, country_code: str = None) -> List[Dict[str, Any]]:
"""Load categories from JSON file asynchronously.
Args:
component_path: Path to the component directory
country_code: Country code from HA config (e.g., 'NZ', 'AU', 'US')
If None, loads default categories.json
Returns:
List of category dictionaries
"""
import os
# Try country-specific file first if country_code provided
if country_code:
country_file = os.path.join(
component_path, "data", f"categories_{country_code.lower()}.json"
)
if os.path.exists(country_file):
categories_file = country_file
_LOGGER.debug("Using country-specific categories: %s", country_code)
else:
_LOGGER.debug(
"No country-specific categories found for %s, using default",
country_code
)
categories_file = os.path.join(component_path, "data", "categories.json")
else:
categories_file = os.path.join(component_path, "data", "categories.json")
try:
async with aiofiles.open(categories_file, "r", encoding="utf-8") as f:
content = await f.read()
data = json.loads(content)
_LOGGER.info(
"Loaded categories version %s for region %s",
data.get("version", "unknown"),
data.get("region", "default")
)
return data.get("categories", [])
except FileNotFoundError:
_LOGGER.error("Categories file not found: %s", categories_file)
return _get_fallback_categories()
except json.JSONDecodeError as err:
_LOGGER.error("Failed to parse categories file: %s", err)
return _get_fallback_categories()
except Exception as err:
_LOGGER.error("Unexpected error loading categories: %s", err)
return _get_fallback_categories()
def _get_fallback_categories() -> List[Dict[str, Any]]:
"""Get minimal fallback categories if file loading fails.
Returns:
List of basic category dictionaries
"""
_LOGGER.warning("Using fallback categories")
return [
{
"id": "produce",
"name": "Produce",
"icon": "mdi:fruit-cherries",
"color": "#4CAF50",
"sort_order": 1,
"system": True
},
{
"id": "dairy",
"name": "Dairy",
"icon": "mdi:cheese",
"color": "#FFC107",
"sort_order": 2,
"system": True
},
{
"id": "other",
"name": "Other",
"icon": "mdi:dots-horizontal",
"color": "#9E9E9E",
"sort_order": 99,
"system": True
}
]
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
@@ -1,300 +0,0 @@
"""Core Shopping List Manager with invariant enforcement."""
import asyncio
import logging
from typing import Dict, Optional, List
from homeassistant.core import HomeAssistant
from homeassistant.helpers import storage
from .const import (
DOMAIN,
EVENT_SHOPPING_LIST_UPDATED,
STORAGE_KEY_ACTIVE,
STORAGE_KEY_PRODUCTS,
STORAGE_VERSION,
)
from .models import Product, ActiveItem, InvariantError, validate_invariant
_LOGGER = logging.getLogger(__name__)
class ShoppingListManager:
"""
Manages multiple independent shopping lists.
Each list_id gets its own pair of storage files:
- shopping_list_manager.{list_id}.products
- shopping_list_manager.{list_id}.active_list
The default "groceries" list uses the original flat keys for backward compat:
- shopping_list_manager.products → groceries products
- shopping_list_manager.active_list → groceries active
Architecture principles:
1. Products and active_list are separate concerns per list
2. Products are authoritative, persistent data
3. Active list is ephemeral state
4. Invariant (active ⊆ products) enforced on every mutation
5. Lock ensures atomic operations per list
"""
def __init__(self, hass: HomeAssistant):
"""Initialize the manager."""
self.hass = hass
# Per-list in-memory caches: list_id -> {key: Product}
self._products: Dict[str, Dict[str, Product]] = {}
self._active_list: Dict[str, Dict[str, ActiveItem]] = {}
# Per-list locks
self._locks: Dict[str, asyncio.Lock] = {}
# Per-list storage Store instances (created lazily, except groceries)
self._store_products: Dict[str, storage.Store] = {}
self._store_active: Dict[str, storage.Store] = {}
# Pre-create stores for the default "groceries" list using the original flat keys
# for backward compatibility — existing data just works
self._store_products["groceries"] = storage.Store(
hass, STORAGE_VERSION, STORAGE_KEY_PRODUCTS # "shopping_list_manager.products"
)
self._store_active["groceries"] = storage.Store(
hass, STORAGE_VERSION, STORAGE_KEY_ACTIVE # "shopping_list_manager.active_list"
)
def _lock_for(self, list_id: str) -> asyncio.Lock:
"""Get or create lock for a list."""
if list_id not in self._locks:
self._locks[list_id] = asyncio.Lock()
return self._locks[list_id]
def _store_products_for(self, list_id: str) -> storage.Store:
"""Get or create products Store for a list."""
if list_id not in self._store_products:
# Non-groceries lists use namespaced keys
key = f"{DOMAIN}.{list_id}.products"
self._store_products[list_id] = storage.Store(self.hass, STORAGE_VERSION, key)
return self._store_products[list_id]
def _store_active_for(self, list_id: str) -> storage.Store:
"""Get or create active Store for a list."""
if list_id not in self._store_active:
key = f"{DOMAIN}.{list_id}.active_list"
self._store_active[list_id] = storage.Store(self.hass, STORAGE_VERSION, key)
return self._store_active[list_id]
async def _ensure_loaded(self, list_id: str) -> None:
"""Lazily load a list from storage if not yet in memory."""
if list_id in self._products:
return # already loaded
products_data = await self._store_products_for(list_id).async_load()
self._products[list_id] = {
key: Product.from_dict(data) for key, data in (products_data or {}).items()
}
active_data = await self._store_active_for(list_id).async_load()
self._active_list[list_id] = {
key: ActiveItem.from_dict(data) for key, data in (active_data or {}).items()
}
# Repair any orphaned active items
await self._async_repair_invariant(list_id)
_LOGGER.info(
"Loaded list '%s': %d products, %d active",
list_id, len(self._products[list_id]), len(self._active_list[list_id])
)
async def async_load(self) -> None:
"""Pre-load the default groceries list for backward compat."""
async with self._lock_for("groceries"):
await self._ensure_loaded("groceries")
async def _async_repair_invariant(self, list_id: str) -> None:
"""Remove active items whose product no longer exists."""
orphaned = [k for k in self._active_list[list_id] if k not in self._products[list_id]]
if orphaned:
_LOGGER.warning(
"List '%s': removing %d orphaned active items: %s",
list_id, len(orphaned), orphaned
)
for k in orphaned:
del self._active_list[list_id][k]
await self._async_save_active(list_id)
async def _async_save_products(self, list_id: str) -> None:
"""Persist products to storage."""
data = {key: p.to_dict() for key, p in self._products[list_id].items()}
await self._store_products_for(list_id).async_save(data)
async def _async_save_active(self, list_id: str) -> None:
"""Persist active list to storage."""
data = {key: a.to_dict() for key, a in self._active_list[list_id].items()}
await self._store_active_for(list_id).async_save(data)
def _fire_update_event(self) -> None:
"""Fire event to notify listeners of changes."""
self.hass.bus.async_fire(EVENT_SHOPPING_LIST_UPDATED)
# ========================================================================
# PUBLIC API - All operations enforce invariants
# ========================================================================
async def async_add_product(
self,
list_id: str,
key: str,
name: str,
category: str = "other",
unit: str = "pcs",
image: str = ""
) -> Product:
"""
Add or update a product in a list's catalog.
This operation:
- Creates/updates product metadata
- Does NOT modify quantities
- Is idempotent
- Persists to storage
Args:
list_id: List identifier
key: Unique product identifier
name: Display name
category: Product category
unit: Unit of measurement
image: Image URL
Returns:
The created/updated Product
"""
async with self._lock_for(list_id):
await self._ensure_loaded(list_id)
product = Product(
key=key,
name=name,
category=category,
unit=unit,
image=image
)
self._products[list_id][key] = product
await self._async_save_products(list_id)
_LOGGER.debug("List '%s': added/updated product %s (%s)", list_id, name, key)
self._fire_update_event()
return product
async def async_set_qty(self, list_id: str, key: str, qty: int) -> None:
"""
Set quantity for a product on the shopping list.
This operation:
- REQUIRES product to exist (enforces invariant)
- qty > 0: adds/updates active_list
- qty == 0: removes from active_list
- Persists state
- Fires update event
Args:
list_id: List identifier
key: Product key (must exist in catalog)
qty: New quantity (0 to remove, >0 to add/update)
Raises:
InvariantError: If product doesn't exist
ValueError: If qty is negative
"""
if qty < 0:
raise ValueError(f"Quantity cannot be negative: {qty}")
async with self._lock_for(list_id):
await self._ensure_loaded(list_id)
# INVARIANT ENFORCEMENT: Product must exist
if key not in self._products[list_id]:
raise InvariantError(
f"Cannot set quantity for unknown product '{key}' in list '{list_id}'. "
f"Product must be created first with add_product."
)
# Update or remove from active list
if qty > 0:
self._active_list[list_id][key] = ActiveItem(qty=qty)
_LOGGER.debug("List '%s': set qty for %s: %d", list_id, key, qty)
else:
# qty == 0: remove from list
if key in self._active_list[list_id]:
del self._active_list[list_id][key]
_LOGGER.debug("List '%s': removed %s from active list", list_id, key)
await self._async_save_active(list_id)
self._fire_update_event()
async def async_delete_product(self, list_id: str, key: str) -> None:
"""
Delete a product from the catalog.
This operation:
- Removes product from catalog
- Removes from active list (maintains invariant)
- Persists both changes
Args:
list_id: List identifier
key: Product key to delete
"""
async with self._lock_for(list_id):
await self._ensure_loaded(list_id)
if key not in self._products[list_id]:
_LOGGER.warning("List '%s': attempted to delete non-existent product: %s", list_id, key)
return
# Remove from catalog
del self._products[list_id][key]
# Remove from active list (maintain invariant)
if key in self._active_list[list_id]:
del self._active_list[list_id][key]
await self._async_save_products(list_id)
await self._async_save_active(list_id)
_LOGGER.debug("List '%s': deleted product: %s", list_id, key)
self._fire_update_event()
async def async_get_products(self, list_id: str) -> Dict[str, dict]:
"""
Get all products in a list's catalog.
Args:
list_id: List identifier
Returns:
Dictionary of product key -> product data
"""
async with self._lock_for(list_id):
await self._ensure_loaded(list_id)
return {key: product.to_dict() for key, product in self._products[list_id].items()}
async def async_get_active(self, list_id: str) -> Dict[str, dict]:
"""
Get active shopping list (quantities only).
Args:
list_id: List identifier
Returns:
Dictionary of product key -> active item data (qty only)
"""
async with self._lock_for(list_id):
await self._ensure_loaded(list_id)
return {key: item.to_dict() for key, item in self._active_list[list_id].items()}
# NOTE: The following methods were removed as they're not used by the websocket API
# and would need updating to support per-list structure:
# - async_get_full_state()
# - get_product()
# - get_active_qty()
@@ -1,12 +1,17 @@
{ {
"domain": "shopping_list_manager", "domain": "shopping_list_manager",
"name": "Shopping List Manager", "name": "Shopping List Manager",
"version": "1.0.0", "version": "2.0.0",
"documentation": "https://github.com/thekiwismarthome/shopping-list-manager", "documentation": "https://github.com/thekiwismarthome/shopping-list-manager",
"issue_tracker": "https://github.com/thekiwismarthome/shopping-list-manager/issues", "issue_tracker": "https://github.com/thekiwismarthome/shopping-list-manager/issues",
"requirements": [], "requirements": [
"Pillow>=10.0.0",
"aiofiles>=23.0.0",
"rapidfuzz>=3.0.0"
],
"dependencies": [], "dependencies": [],
"codeowners": ["@thekiwismarthome"], "codeowners": ["@thekiwismarthome"],
"config_flow": true, "config_flow": true,
"iot_class": "local_push" "iot_class": "local_push",
"integration_type": "service"
} }
+120 -86
View File
@@ -1,104 +1,138 @@
"""Data models for Shopping List Manager.""" """Data models for Shopping List Manager."""
from dataclasses import dataclass, asdict from dataclasses import dataclass, field, asdict
from typing import Dict from datetime import datetime
from typing import Optional, List, Dict, Any
import uuid
def generate_id() -> str:
"""Generate a unique ID."""
return str(uuid.uuid4())
def current_timestamp() -> str:
"""Get current ISO timestamp."""
return datetime.utcnow().isoformat() + "Z"
@dataclass
class Category:
"""Category model."""
id: str
name: str
icon: str
color: str
sort_order: int
system: bool = True
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)
@dataclass @dataclass
class Product: class Product:
""" """Product model."""
Product catalog entry - authoritative product definition. id: str
Products exist independently of the shopping list.
They define WHAT can be shopped, not HOW MUCH is needed.
"""
key: str
name: str name: str
category: str = "other" category_id: str
unit: str = "pcs" aliases: List[str] = field(default_factory=list)
image: str = "" default_unit: str = "units"
default_quantity: float = 1
price: Optional[float] = None
currency: Optional[str] = None
price_per_unit: Optional[float] = None
price_updated: Optional[str] = None
image_url: Optional[str] = None
image_source: Optional[str] = None
barcode: Optional[str] = None
brands: List[str] = field(default_factory=list)
nutrition: Optional[Dict[str, Any]] = None
user_frequency: int = 0
last_used: Optional[str] = None
custom: bool = False
source: str = "user"
tags: List[str] = field(default_factory=list)
collections: List[str] = field(default_factory=list)
taxonomy: Dict[str, Any] = field(default_factory=dict)
allergens: List[str] = field(default_factory=list)
substitution_group: str = ""
priority_level: int = 0
image_hint: str = ""
def to_dict(self) -> dict: def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for storage/transmission.""" """Convert to dictionary."""
return { return asdict(self)
"key": self.key,
"name": self.name,
"category": self.category,
"unit": self.unit,
"image": self.image
}
@staticmethod
def from_dict(data: dict) -> 'Product':
"""Create Product from dictionary."""
return Product(
key=data["key"],
name=data["name"],
category=data.get("category", "other"),
unit=data.get("unit", "pcs"),
image=data.get("image", "")
)
def __post_init__(self):
"""Validate product data."""
if not self.key:
raise ValueError("Product key cannot be empty")
if not self.name:
raise ValueError("Product name cannot be empty")
@dataclass @dataclass
class ActiveItem: class Item:
""" """Shopping list item model."""
Shopping list state - quantity only. id: str
list_id: str
name: str
category_id: str
product_id: Optional[str] = None
quantity: float = 1
unit: str = "units"
note: Optional[str] = None
checked: bool = False
checked_at: Optional[str] = None
created_at: str = field(default_factory=current_timestamp)
updated_at: str = field(default_factory=current_timestamp)
image_url: Optional[str] = None
order_index: int = 0
price: Optional[float] = None
estimated_total: Optional[float] = None
barcode: Optional[str] = None
Contains NO product metadata, only references products by key. def to_dict(self) -> Dict[str, Any]:
qty > 0 means "on the list" """Convert to dictionary."""
qty == 0 means "not on the list" (should be removed) return asdict(self)
"""
qty: int
def to_dict(self) -> dict: def calculate_total(self) -> None:
"""Convert to dictionary for storage/transmission.""" """Calculate estimated total from quantity and price."""
return {"qty": self.qty} if self.price is not None:
self.estimated_total = self.quantity * self.price
@staticmethod
def from_dict(data: dict) -> 'ActiveItem':
"""Create ActiveItem from dictionary."""
return ActiveItem(qty=data["qty"])
def __post_init__(self):
"""Validate quantity."""
if self.qty < 0:
raise ValueError("Quantity cannot be negative")
class InvariantError(Exception): @dataclass
""" class LoyaltyCard:
Raised when the core data model invariant is violated. """Loyalty card model."""
id: str
name: str
number: str
barcode: str = ""
barcode_type: str = "barcode" # "barcode" or "qrcode"
logo: str = ""
notes: str = ""
color: str = "#9fa8da"
created_at: str = field(default_factory=current_timestamp)
updated_at: str = field(default_factory=current_timestamp)
# Ownership: None = visible to all users; set = private to owner + allowed_users
owner_id: Optional[str] = None
allowed_users: List[str] = field(default_factory=list)
Invariant: Every key in active_list MUST exist in products. def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
If this exception is raised, the system is in an inconsistent state return asdict(self)
and must be repaired before continuing.
"""
pass
def validate_invariant(products: Dict[str, Product], @dataclass
active_list: Dict[str, ActiveItem]) -> None: class ShoppingList:
""" """Shopping list model."""
Validate the core data model invariant. id: str
name: str
icon: str = "mdi:cart"
created_at: str = field(default_factory=current_timestamp)
updated_at: str = field(default_factory=current_timestamp)
item_order: List[str] = field(default_factory=list)
category_order: List[str] = field(default_factory=list)
active: bool = False
# Ownership: None = visible to all users; set = private to owner + allowed_users
owner_id: Optional[str] = None
allowed_users: List[str] = field(default_factory=list)
Args: def to_dict(self) -> Dict[str, Any]:
products: Product catalog dictionary """Convert to dictionary."""
active_list: Active shopping list dictionary return asdict(self)
Raises:
InvariantError: If any key in active_list doesn't exist in products
"""
for key in active_list:
if key not in products:
raise InvariantError(
f"Invariant violated: active_list contains unknown product key '{key}'. "
f"This product must be added to the catalog first."
)
@@ -0,0 +1,752 @@
"""Storage management for Shopping List Manager."""
import json
import logging
import os
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
from .utils.search import ProductSearch
from homeassistant.core import HomeAssistant
from homeassistant.helpers.storage import Store
from .const import (
STORAGE_VERSION,
STORAGE_KEY_LISTS,
STORAGE_KEY_ITEMS,
STORAGE_KEY_PRODUCTS,
STORAGE_KEY_CATEGORIES,
STORAGE_KEY_LOYALTY_CARDS,
)
from .data.catalog_loader import load_product_catalog
from .models import ShoppingList, Item, Product, Category, LoyaltyCard, generate_id
from .data.category_loader import load_categories
_LOGGER = logging.getLogger(__name__)
class ShoppingListStorage:
"""Handle storage for shopping lists."""
def __init__(self, hass: HomeAssistant, component_path: str, country: str = "NZ") -> None:
"""Initialize storage.
Args:
hass: Home Assistant instance
component_path: Path to the component directory
country: Country code (NZ, AU, US, GB, CA, etc.)
"""
self.hass = hass
self._component_path = component_path
self._country = country # Store country
self._store_lists = Store(hass, STORAGE_VERSION, STORAGE_KEY_LISTS)
self._store_items = Store(hass, STORAGE_VERSION, STORAGE_KEY_ITEMS)
self._store_products = Store(hass, STORAGE_VERSION, STORAGE_KEY_PRODUCTS)
self._store_categories = Store(hass, STORAGE_VERSION, STORAGE_KEY_CATEGORIES)
self._store_loyalty_cards = Store(hass, STORAGE_VERSION, STORAGE_KEY_LOYALTY_CARDS)
self._lists: Dict[str, ShoppingList] = {}
self._items: Dict[str, List[Item]] = {}
self._products: Dict[str, Product] = {}
self._categories: List[Category] = []
self._loyalty_cards: Dict[str, LoyaltyCard] = {}
self._search_engine: Optional[ProductSearch] = None
async def async_load(self) -> None:
"""Load data from storage."""
# Load lists
lists_data = await self._store_lists.async_load()
if lists_data:
self._lists = {
list_id: ShoppingList(**list_data)
for list_id, list_data in lists_data.items()
}
_LOGGER.debug("Loaded %d lists", len(self._lists))
else:
# Create default list if none exist
default_list = ShoppingList(
id=generate_id(),
name="Shopping List",
icon="mdi:cart",
active=True
)
self._lists[default_list.id] = default_list
await self._save_lists()
_LOGGER.info("Created default shopping list")
# Load items
items_data = await self._store_items.async_load()
if items_data:
self._items = {
list_id: [Item(**item_data) for item_data in items]
for list_id, items in items_data.items()
}
_LOGGER.debug("Loaded items for %d lists", len(self._items))
# Load products
products_data = await self._store_products.async_load()
if products_data:
self._products = {
product_id: Product(**product_data)
for product_id, product_data in products_data.items()
}
_LOGGER.debug("Loaded %d products", len(self._products))
# Load categories
categories_data = await self._store_categories.async_load()
if categories_data:
self._categories = [Category(**cat_data) for cat_data in categories_data]
_LOGGER.debug("Loaded %d categories", len(self._categories))
else:
# Initialize with default categories from JSON file
default_categories = await load_categories(self._component_path, self._country) # Use self._country
self._categories = [Category(**cat) for cat in default_categories]
await self._save_categories()
_LOGGER.info(
"Initialized %d default categories for country: %s",
len(self._categories),
self._country # Use self._country
)
# Load product catalog if products are empty
if not self._products:
_LOGGER.info("Loading product catalog for country: %s", self._country)
catalog_products = await load_product_catalog(self._component_path, self._country) # Use self._country
if catalog_products:
_LOGGER.info("Importing %d products from catalog", len(catalog_products))
# ... rest of import code ...
for prod_data in catalog_products:
try:
# Create Product from catalog data
product = Product(
id=prod_data.get("id", generate_id()),
name=prod_data["name"],
category_id=prod_data.get("category_id", "other"),
aliases=prod_data.get("aliases", []),
default_unit=prod_data.get("default_unit", "units"),
default_quantity=prod_data.get("default_quantity", 1),
price=prod_data.get("price") or prod_data.get("typical_price"),
currency=self.hass.config.currency,
barcode=prod_data.get("barcode"),
brands=prod_data.get("brands", []),
image_url=prod_data.get("image_url", ""),
custom=False,
source="catalog",
tags=prod_data.get("tags", []),
collections=prod_data.get("collections", []),
taxonomy=prod_data.get("taxonomy", {}),
allergens=prod_data.get("allergens", []),
substitution_group=prod_data.get("substitution_group", ""),
priority_level=prod_data.get("priority_level", 0),
image_hint=prod_data.get("image_hint", "")
)
self._products[product.id] = product
except Exception as err:
_LOGGER.error("Failed to import product %s: %s", prod_data.get("name"), err)
continue
await self._save_products()
_LOGGER.info("Successfully imported %d products from catalog", len(self._products))
# Load loyalty cards
loyalty_data = await self._store_loyalty_cards.async_load()
if loyalty_data:
self._loyalty_cards = {
card_id: LoyaltyCard(**card_data)
for card_id, card_data in loyalty_data.items()
}
_LOGGER.debug("Loaded %d loyalty cards", len(self._loyalty_cards))
# Initialize search engine after products are loaded
if self._products:
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
self._search_engine = ProductSearch(products_dict)
_LOGGER.debug("Initialized product search engine with %d products", len(self._products))
else:
self._search_engine = None
_LOGGER.warning("No products loaded, search engine not initialized")
# Lists methods
async def _save_lists(self) -> None:
"""Save lists to storage."""
data = {list_id: lst.to_dict() for list_id, lst in self._lists.items()}
await self._store_lists.async_save(data)
def get_lists(self, user_id: str = None, is_admin: bool = False) -> List[ShoppingList]:
"""Get lists visible to the specified user.
Global lists (owner_id=None) are visible to everyone.
Private lists are visible to their owner, anyone in allowed_users, and admins.
"""
all_lists = list(self._lists.values())
if is_admin or user_id is None:
return all_lists
return [
lst for lst in all_lists
if lst.owner_id is None
or lst.owner_id == user_id
or user_id in (lst.allowed_users or [])
]
def get_list(self, list_id: str) -> Optional[ShoppingList]:
"""Get a specific list."""
return self._lists.get(list_id)
def get_active_list(self) -> Optional[ShoppingList]:
"""Get the active list."""
for lst in self._lists.values():
if lst.active:
return lst
return None
async def create_list(self, name: str, icon: str = "mdi:cart", owner_id: str = None) -> ShoppingList:
"""Create a new list. Pass owner_id to make the list private to that user."""
new_list = ShoppingList(
id=generate_id(),
name=name,
icon=icon,
category_order=[cat.id for cat in self._categories],
owner_id=owner_id,
)
self._lists[new_list.id] = new_list
self._items[new_list.id] = []
await self._save_lists()
await self._write_config_backup()
_LOGGER.info("Created new list: %s", name)
return new_list
async def update_list(self, list_id: str, **kwargs) -> Optional[ShoppingList]:
"""Update a list."""
if list_id not in self._lists:
return None
lst = self._lists[list_id]
for key, value in kwargs.items():
if hasattr(lst, key):
setattr(lst, key, value)
from .models import current_timestamp
lst.updated_at = current_timestamp()
await self._save_lists()
_LOGGER.debug("Updated list: %s", list_id)
return lst
async def update_list_members(self, list_id: str, allowed_users: List[str]) -> Optional[ShoppingList]:
"""Update the allowed_users for a private list."""
if list_id not in self._lists:
return None
lst = self._lists[list_id]
lst.allowed_users = allowed_users
from .models import current_timestamp
lst.updated_at = current_timestamp()
await self._save_lists()
_LOGGER.debug("Updated members for list: %s", list_id)
return lst
async def delete_list(self, list_id: str) -> bool:
"""Delete a list."""
if list_id not in self._lists:
return False
del self._lists[list_id]
if list_id in self._items:
del self._items[list_id]
await self._save_lists()
await self._save_items()
_LOGGER.info("Deleted list: %s", list_id)
return True
async def set_active_list(self, list_id: str) -> bool:
"""Set the active list."""
if list_id not in self._lists:
return False
# Deactivate all lists
for lst in self._lists.values():
lst.active = False
# Activate the specified list
self._lists[list_id].active = True
await self._save_lists()
_LOGGER.debug("Set active list: %s", list_id)
return True
# Items methods
async def _save_items(self) -> None:
"""Save items to storage."""
data = {
list_id: [item.to_dict() for item in items]
for list_id, items in self._items.items()
}
await self._store_items.async_save(data)
def get_items(self, list_id: str) -> List[Item]:
"""Get items for a list."""
return self._items.get(list_id, [])
async def add_item(self, list_id: str, **kwargs) -> Optional[Item]:
"""Add an item to a list."""
if list_id not in self._lists:
return None
new_item = Item(
id=generate_id(),
list_id=list_id,
**kwargs
)
new_item.calculate_total()
if list_id not in self._items:
self._items[list_id] = []
self._items[list_id].append(new_item)
# Update product frequency if product_id provided
if new_item.product_id and new_item.product_id in self._products:
product = self._products[new_item.product_id]
product.user_frequency += 1
from .models import current_timestamp
product.last_used = current_timestamp()
await self._save_products()
await self._save_items()
_LOGGER.debug("Added item to list %s: %s", list_id, new_item.name)
return new_item
async def update_item(self, item_id: str, **kwargs) -> Optional[Item]:
"""Update an item."""
for list_id, items in self._items.items():
for item in items:
if item.id == item_id:
for key, value in kwargs.items():
if hasattr(item, key):
setattr(item, key, value)
from .models import current_timestamp
item.updated_at = current_timestamp()
item.calculate_total()
await self._save_items()
_LOGGER.debug("Updated item: %s", item_id)
return item
return None
async def check_item(self, item_id: str, checked: bool) -> Optional[Item]:
"""Check or uncheck an item."""
for items in self._items.values():
for item in items:
if item.id == item_id:
item.checked = checked
from .models import current_timestamp
item.checked_at = current_timestamp() if checked else None
item.updated_at = current_timestamp()
await self._save_items()
_LOGGER.debug("Checked item: %s = %s", item_id, checked)
return item
return None
async def delete_item(self, item_id: str) -> bool:
"""Delete an item."""
for list_id, items in self._items.items():
for i, item in enumerate(items):
if item.id == item_id:
del self._items[list_id][i]
await self._save_items()
_LOGGER.debug("Deleted item: %s", item_id)
return True
return False
async def bulk_check_items(self, item_ids: List[str], checked: bool) -> int:
"""Bulk check/uncheck items."""
count = 0
from .models import current_timestamp
timestamp = current_timestamp()
for items in self._items.values():
for item in items:
if item.id in item_ids:
item.checked = checked
item.checked_at = timestamp if checked else None
item.updated_at = timestamp
count += 1
if count > 0:
await self._save_items()
_LOGGER.debug("Bulk checked %d items", count)
return count
async def clear_checked_items(self, list_id: str) -> int:
"""Clear all checked items from a list."""
if list_id not in self._items:
return 0
original_count = len(self._items[list_id])
self._items[list_id] = [item for item in self._items[list_id] if not item.checked]
removed_count = original_count - len(self._items[list_id])
if removed_count > 0:
await self._save_items()
_LOGGER.info("Cleared %d checked items from list %s", removed_count, list_id)
return removed_count
def get_list_total(self, list_id: str) -> Dict[str, Any]:
"""Get total price for a list."""
items = self.get_items(list_id)
total = 0.0
item_count = 0
for item in items:
if not item.checked and item.price is not None:
total += item.quantity * item.price
item_count += 1
return {
"total": round(total, 2),
"currency": self.hass.config.currency,
"item_count": item_count
}
# Products methods
async def _save_products(self) -> None:
"""Save products to storage."""
data = {product_id: product.to_dict() for product_id, product in self._products.items()}
await self._store_products.async_save(data)
def get_products(self) -> List[Product]:
"""Get all products."""
return list(self._products.values())
def get_product(self, product_id: str) -> Optional[Product]:
"""Get a specific product."""
return self._products.get(product_id)
def search_products(
self,
query: str,
limit: int = 10,
exclude_allergens: Optional[List[str]] = None,
include_tags: Optional[List[str]] = None,
substitution_group: Optional[str] = None,
) -> List[Product]:
"""Search products with enhanced fuzzy matching and filters.
Args:
query: Search query
limit: Maximum results
exclude_allergens: Allergens to exclude
include_tags: Tags to include
substitution_group: Filter by substitution group
Returns:
List of matching products
"""
if not self._search_engine:
_LOGGER.warning("Search engine not initialized")
return []
# Convert products dict to format search engine expects
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
search_engine = ProductSearch(products_dict)
results = search_engine.search(
query=query,
limit=limit,
exclude_allergens=exclude_allergens,
include_tags=include_tags,
substitution_group=substitution_group,
)
# Convert back to Product objects
return [self._products[r["id"]] for r in results if r["id"] in self._products]
def find_product_substitutes(self, product_id: str, limit: int = 5) -> List[Product]:
"""Find substitute products.
Args:
product_id: Product to find substitutes for
limit: Maximum substitutes
Returns:
List of substitute products
"""
if not self._search_engine:
return []
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
search_engine = ProductSearch(products_dict)
results = search_engine.find_substitutes(product_id, limit)
return [self._products[r["id"]] for r in results if r["id"] in self._products]
def get_product_suggestions(self, limit: int = 20) -> List[Product]:
"""Get product suggestions based on usage frequency."""
products = list(self._products.values())
products.sort(key=lambda p: p.user_frequency, reverse=True)
return products[:limit]
async def add_product(self, **kwargs) -> Product:
"""Add a new product."""
new_product = Product(
id=generate_id(),
currency=self.hass.config.currency,
**kwargs
)
self._products[new_product.id] = new_product
await self._save_products()
await self._write_config_backup()
# Rebuild search engine so the new product is immediately searchable
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
self._search_engine = ProductSearch(products_dict)
_LOGGER.debug("Added product: %s", new_product.name)
return new_product
async def reload_catalog(self, country_code: str) -> int:
"""Replace catalog-sourced products with those from a new country's catalog.
Products with source='user' are preserved."""
catalog_ids = [
pid for pid, p in self._products.items()
if getattr(p, 'source', 'user') == 'catalog'
]
for pid in catalog_ids:
del self._products[pid]
self._country = country_code
catalog_products = await load_product_catalog(self._component_path, country_code)
count = 0
for prod_data in catalog_products:
try:
product = Product(
id=prod_data.get("id", generate_id()),
name=prod_data["name"],
category_id=prod_data.get("category_id", "other"),
aliases=prod_data.get("aliases", []),
default_unit=prod_data.get("default_unit", "units"),
default_quantity=prod_data.get("default_quantity", 1),
price=prod_data.get("price") or prod_data.get("typical_price"),
currency=self.hass.config.currency,
barcode=prod_data.get("barcode"),
brands=prod_data.get("brands", []),
image_url=prod_data.get("image_url", ""),
custom=False,
source="catalog",
tags=prod_data.get("tags", []),
collections=prod_data.get("collections", []),
taxonomy=prod_data.get("taxonomy", {}),
allergens=prod_data.get("allergens", []),
substitution_group=prod_data.get("substitution_group", ""),
priority_level=prod_data.get("priority_level", 0),
image_hint=prod_data.get("image_hint", "")
)
self._products[product.id] = product
count += 1
except Exception as err:
_LOGGER.error("Failed to import product %s: %s", prod_data.get("name"), err)
await self._save_products()
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
self._search_engine = ProductSearch(products_dict)
_LOGGER.info("Reloaded catalog for %s: %d products imported", country_code, count)
return count
async def update_product(self, product_id: str, **kwargs) -> Optional[Product]:
"""Update a product."""
if product_id not in self._products:
return None
product = self._products[product_id]
for key, value in kwargs.items():
if hasattr(product, key):
setattr(product, key, value)
await self._save_products()
await self._write_config_backup()
_LOGGER.debug("Updated product: %s", product_id)
return product
# ---------------------------------------------------------------------------
# Backup / Restore
# ---------------------------------------------------------------------------
async def export_user_data(self) -> dict:
"""Return a serialisable snapshot of all user-created data."""
user_products = [
p.to_dict() for p in self._products.values()
if getattr(p, "source", "user") == "user"
]
lists = [lst.to_dict() for lst in self._lists.values()]
items = {
list_id: [item.to_dict() for item in items_list]
for list_id, items_list in self._items.items()
}
return {
"slm_backup_version": "1.0",
"exported_at": datetime.now(timezone.utc).isoformat(),
"country": self._country,
"user_products": user_products,
"lists": lists,
"items": items,
}
async def import_user_data(self, data: dict) -> dict:
"""Merge a backup into live storage. Skips anything already present by ID."""
imported_products = 0
imported_lists = 0
imported_items = 0
for prod_data in data.get("user_products", []):
prod_id = prod_data.get("id")
if prod_id and prod_id not in self._products:
try:
self._products[prod_id] = Product(**prod_data)
imported_products += 1
except Exception as err:
_LOGGER.warning("Skipped product during import: %s", err)
if imported_products:
await self._save_products()
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
self._search_engine = ProductSearch(products_dict)
for list_data in data.get("lists", []):
list_id = list_data.get("id")
if list_id and list_id not in self._lists:
try:
lst = ShoppingList(**list_data)
lst.active = False
self._lists[list_id] = lst
imported_lists += 1
except Exception as err:
_LOGGER.warning("Skipped list during import: %s", err)
backup_items = data.get("items", {})
for list_id, items_list in backup_items.items():
if list_id in self._lists and list_id not in self._items:
try:
self._items[list_id] = [Item(**d) for d in items_list]
imported_items += len(self._items[list_id])
except Exception as err:
_LOGGER.warning("Skipped items for list %s: %s", list_id, err)
if imported_lists or imported_items:
await self._save_lists()
await self._save_items()
_LOGGER.info(
"Import complete: %d products, %d lists, %d items",
imported_products, imported_lists, imported_items,
)
return {"products": imported_products, "lists": imported_lists, "items": imported_items}
async def _write_config_backup(self) -> None:
"""Silently write a backup JSON to the HA config directory."""
try:
backup_path = os.path.join(
self.hass.config.config_dir,
"shopping_list_manager_backup.json",
)
data = await self.export_user_data()
def _write() -> None:
with open(backup_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
await self.hass.async_add_executor_job(_write)
_LOGGER.debug("Auto-backup written to %s", backup_path)
except Exception as err:
_LOGGER.warning("Failed to write config backup: %s", err)
# Categories methods
async def _save_categories(self) -> None:
"""Save categories to storage."""
data = [cat.to_dict() for cat in self._categories]
await self._store_categories.async_save(data)
def get_categories(self) -> List[Category]:
"""Get all categories."""
return self._categories
# Loyalty card methods
async def _save_loyalty_cards(self) -> None:
"""Save loyalty cards to storage."""
data = {card_id: card.to_dict() for card_id, card in self._loyalty_cards.items()}
await self._store_loyalty_cards.async_save(data)
def get_loyalty_cards(self, user_id: str = None, is_admin: bool = False) -> List[LoyaltyCard]:
"""Get loyalty cards visible to the specified user.
Global cards (owner_id=None) are visible to everyone.
Private cards are visible to their owner, anyone in allowed_users, and admins.
"""
all_cards = list(self._loyalty_cards.values())
if is_admin or user_id is None:
return all_cards
return [
card for card in all_cards
if card.owner_id is None
or card.owner_id == user_id
or user_id in (card.allowed_users or [])
]
def get_loyalty_card(self, card_id: str) -> Optional[LoyaltyCard]:
"""Get a specific loyalty card."""
return self._loyalty_cards.get(card_id)
async def create_loyalty_card(self, owner_id: str = None, **kwargs) -> LoyaltyCard:
"""Create a new loyalty card."""
from .models import current_timestamp
new_card = LoyaltyCard(
id=generate_id(),
owner_id=owner_id,
**kwargs
)
self._loyalty_cards[new_card.id] = new_card
await self._save_loyalty_cards()
_LOGGER.debug("Created loyalty card: %s", new_card.name)
return new_card
async def update_loyalty_card(self, card_id: str, **kwargs) -> Optional[LoyaltyCard]:
"""Update a loyalty card."""
if card_id not in self._loyalty_cards:
return None
card = self._loyalty_cards[card_id]
for key, value in kwargs.items():
if hasattr(card, key):
setattr(card, key, value)
from .models import current_timestamp
card.updated_at = current_timestamp()
await self._save_loyalty_cards()
_LOGGER.debug("Updated loyalty card: %s", card_id)
return card
async def delete_loyalty_card(self, card_id: str) -> bool:
"""Delete a loyalty card."""
if card_id not in self._loyalty_cards:
return False
del self._loyalty_cards[card_id]
await self._save_loyalty_cards()
_LOGGER.debug("Deleted loyalty card: %s", card_id)
return True
async def update_loyalty_card_members(self, card_id: str, allowed_users: List[str]) -> Optional[LoyaltyCard]:
"""Update the allowed_users for a private loyalty card."""
if card_id not in self._loyalty_cards:
return None
card = self._loyalty_cards[card_id]
card.allowed_users = allowed_users
from .models import current_timestamp
card.updated_at = current_timestamp()
await self._save_loyalty_cards()
_LOGGER.debug("Updated members for loyalty card: %s", card_id)
return card
@@ -0,0 +1 @@
"""Utilities for Shopping List Manager."""
@@ -0,0 +1,109 @@
"""Image handling utilities for Shopping List Manager."""
import logging
import os
from pathlib import Path
from typing import Optional
_LOGGER = logging.getLogger(__name__)
class ImageHandler:
"""Handle product images with URL and local file support."""
def __init__(self, hass, config_path: str):
"""Initialize image handler.
Args:
hass: Home Assistant instance
config_path: Path to HA config directory
"""
self.hass = hass
# Images stored in /config/www/shopping_list_manager/images/
self._local_images_dir = Path(config_path) / "www" / "shopping_list_manager" / "images"
self._local_images_dir.mkdir(parents=True, exist_ok=True)
_LOGGER.info("Image directory: %s", self._local_images_dir)
def get_image_url(self, product_name: str, external_url: Optional[str] = None) -> str:
"""Get image URL for a product.
Priority:
1. External URL (if provided)
2. Local file match
3. Placeholder
Args:
product_name: Name of product to find image for
external_url: Optional external image URL
Returns:
Image URL (external, local, or placeholder)
"""
# Priority 1: Use external URL if provided
if external_url:
return external_url
# Priority 2: Look for local file
local_url = self._find_local_image(product_name)
if local_url:
return local_url
# Priority 3: Placeholder
return self._get_placeholder_url()
def _find_local_image(self, product_name: str) -> Optional[str]:
"""Find local image file for product.
Searches for files matching product name (case-insensitive).
Supports: .webp, .jpg, .jpeg, .png
Args:
product_name: Product name to search for
Returns:
Local URL if found, None otherwise
"""
# Normalize product name for filename matching
normalized_name = product_name.lower().replace(" ", "_")
# Supported extensions
extensions = [".webp", ".jpg", ".jpeg", ".png"]
for ext in extensions:
# Check exact match
image_file = self._local_images_dir / f"{normalized_name}{ext}"
if image_file.exists():
return f"/local/shopping_list_manager/images/{normalized_name}{ext}"
# Check for files starting with the product name
for file in self._local_images_dir.glob(f"{normalized_name}*{ext}"):
return f"/local/shopping_list_manager/images/{file.name}"
return None
def _get_placeholder_url(self) -> str:
"""Get placeholder image URL.
Returns:
URL to placeholder image
"""
# Use a simple colored placeholder
# You can replace this with a real placeholder image later
return "data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='200' height='200'%3E%3Crect width='200' height='200' fill='%23f0f0f0'/%3E%3Ctext x='50%25' y='50%25' dominant-baseline='middle' text-anchor='middle' font-family='Arial' font-size='16' fill='%23999'%3ENo Image%3C/text%3E%3C/svg%3E"
def list_available_images(self) -> list:
"""List all available local images.
Returns:
List of (filename, product_name_guess) tuples
"""
images = []
extensions = [".webp", ".jpg", ".jpeg", ".png"]
for ext in extensions:
for image_file in self._local_images_dir.glob(f"*{ext}"):
# Guess product name from filename
product_name = image_file.stem.replace("_", " ").title()
images.append((image_file.name, product_name))
return sorted(images)
@@ -0,0 +1,192 @@
"""Enhanced product search utilities."""
import logging
from typing import List, Dict, Any, Optional
from rapidfuzz import fuzz, process
_LOGGER = logging.getLogger(__name__)
class ProductSearch:
"""Advanced product search with fuzzy matching and filtering."""
def __init__(self, products: Dict[str, Any]):
"""Initialize search with product catalog.
Args:
products: Dictionary of product_id -> Product objects
"""
self.products = products
def search(
self,
query: str,
limit: int = 10,
exclude_allergens: Optional[List[str]] = None,
include_tags: Optional[List[str]] = None,
substitution_group: Optional[str] = None,
taxonomy_filters: Optional[Dict[str, Any]] = None,
min_score: int = 60,
) -> List[Dict[str, Any]]:
"""Advanced product search with multiple filters.
Args:
query: Search query string
limit: Maximum results to return
exclude_allergens: List of allergens to exclude (e.g., ["milk", "gluten"])
include_tags: Only include products with these tags
substitution_group: Filter by substitution group
taxonomy_filters: Filter by taxonomy (e.g., {"dietary": ["vegan"]})
min_score: Minimum fuzzy match score (0-100)
Returns:
List of matching products with scores
"""
query_lower = query.lower().strip()
if not query_lower:
return []
candidates = []
for product_id, product in self.products.items():
# Apply allergen filter
if exclude_allergens:
if any(
allergen in product.get("allergens", [])
for allergen in exclude_allergens
):
continue
# Apply tag filter
if include_tags:
if not any(
tag in product.get("tags", [])
for tag in include_tags
):
continue
# Apply substitution group filter
if substitution_group:
if product.get("substitution_group") != substitution_group:
continue
# Apply taxonomy filters
if taxonomy_filters:
product_taxonomy = product.get("taxonomy", {})
matches_taxonomy = True
for key, values in taxonomy_filters.items():
if key not in product_taxonomy:
matches_taxonomy = False
break
product_values = product_taxonomy[key]
if isinstance(product_values, list):
if not any(v in product_values for v in values):
matches_taxonomy = False
break
else:
if product_values not in values:
matches_taxonomy = False
break
if not matches_taxonomy:
continue
# Calculate match score
score = self._calculate_score(query_lower, product)
if score >= min_score:
candidates.append({
"product": product,
"score": score,
})
# Sort by score (descending), then by user frequency, then by priority
candidates.sort(
key=lambda x: (
x["score"],
x["product"].get("user_frequency", 0),
x["product"].get("priority_level", 0),
),
reverse=True
)
# Return top results
return [c["product"] for c in candidates[:limit]]
def _calculate_score(self, query: str, product: Dict[str, Any]) -> int:
"""Calculate fuzzy match score for a product.
Args:
query: Search query
product: Product dictionary
Returns:
Score from 0-100
"""
product_name = product.get("name", "").lower()
aliases = [a.lower() for a in product.get("aliases", [])]
# Exact match gets highest score
if query == product_name:
return 100
# Check aliases for exact match
if query in aliases:
return 95
# Check if query is substring of product name
if query in product_name:
return 90
# Check if query is substring of any alias
for alias in aliases:
if query in alias:
return 85
# Fuzzy match on product name
name_score = fuzz.WRatio(query, product_name)
# Fuzzy match on aliases
alias_scores = [fuzz.WRatio(query, alias) for alias in aliases]
best_alias_score = max(alias_scores) if alias_scores else 0
# Return best score
return max(name_score, best_alias_score)
def find_substitutes(self, product_id: str, limit: int = 5) -> List[Dict[str, Any]]:
"""Find substitute products for a given product.
Args:
product_id: ID of product to find substitutes for
limit: Maximum substitutes to return
Returns:
List of substitute products
"""
if product_id not in self.products:
return []
product = self.products[product_id]
substitution_group = product.get("substitution_group")
if not substitution_group:
return []
# Find all products in the same substitution group
substitutes = []
for pid, p in self.products.items():
if pid != product_id and p.get("substitution_group") == substitution_group:
substitutes.append(p)
# Sort by priority and frequency
substitutes.sort(
key=lambda x: (
x.get("priority_level", 0),
x.get("user_frequency", 0),
),
reverse=True
)
return substitutes[:limit]
@@ -0,0 +1 @@
"""WebSocket API handlers for Shopping List Manager."""
File diff suppressed because it is too large Load Diff
@@ -1,256 +0,0 @@
"""WebSocket API for Shopping List Manager."""
import logging
import voluptuous as vol
from homeassistant.components import websocket_api
from homeassistant.core import HomeAssistant, callback
from .const import DOMAIN
from .models import InvariantError
_LOGGER = logging.getLogger(__name__)
@websocket_api.websocket_command({
vol.Required("type"): "shopping_list_manager/add_product",
vol.Optional("list_id", default="groceries"): str,
vol.Required("key"): str,
vol.Required("name"): str,
vol.Optional("category", default="other"): str,
vol.Optional("unit", default="pcs"): str,
vol.Optional("image", default=""): str,
})
@websocket_api.async_response
async def websocket_add_product(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict,
) -> None:
"""
Add or update a product in the catalog.
Does NOT modify quantity - use set_qty for that.
Request:
{
"type": "shopping_list_manager/add_product",
"list_id": "groceries", # optional, defaults to "groceries"
"key": "milk",
"name": "Milk",
"category": "dairy",
"unit": "pcs",
"image": ""
}
Response:
{
"success": true,
"result": {
"key": "milk",
"name": "Milk",
"category": "dairy",
"unit": "pcs",
"image": ""
}
}
"""
manager = hass.data[DOMAIN]["manager"]
list_id = msg.get("list_id", "groceries")
try:
product = await manager.async_add_product(
list_id=list_id,
key=msg["key"],
name=msg["name"],
category=msg.get("category", "other"),
unit=msg.get("unit", "pcs"),
image=msg.get("image", "")
)
connection.send_result(msg["id"], product.to_dict())
except Exception as err:
_LOGGER.error("Error adding product to list '%s': %s", list_id, err)
connection.send_error(msg["id"], "add_product_failed", str(err))
@websocket_api.websocket_command({
vol.Required("type"): "shopping_list_manager/set_qty",
vol.Optional("list_id", default="groceries"): str,
vol.Required("key"): str,
vol.Required("qty"): vol.All(int, vol.Range(min=0)),
})
@websocket_api.async_response
async def websocket_set_qty(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict,
) -> None:
"""
Set quantity for a product on the shopping list.
Product MUST exist in catalog first.
qty = 0 removes from list.
qty > 0 adds/updates on list.
Request:
{
"type": "shopping_list_manager/set_qty",
"list_id": "groceries", # optional, defaults to "groceries"
"key": "milk",
"qty": 2
}
Response:
{
"success": true
}
Error (if product doesn't exist):
{
"success": false,
"error": {
"code": "invariant_violation",
"message": "Cannot set quantity for unknown product 'milk'..."
}
}
"""
manager = hass.data[DOMAIN]["manager"]
list_id = msg.get("list_id", "groceries")
try:
await manager.async_set_qty(
list_id=list_id,
key=msg["key"],
qty=msg["qty"]
)
connection.send_result(msg["id"], {"success": True})
except InvariantError as err:
# This is expected if frontend tries to set qty for non-existent product
_LOGGER.warning("Invariant violation in set_qty (list '%s'): %s", list_id, err)
connection.send_error(msg["id"], "invariant_violation", str(err))
except Exception as err:
_LOGGER.error("Error setting quantity in list '%s': %s", list_id, err)
connection.send_error(msg["id"], "set_qty_failed", str(err))
@websocket_api.websocket_command({
vol.Required("type"): "shopping_list_manager/get_products",
vol.Optional("list_id", default="groceries"): str,
})
@websocket_api.async_response
async def websocket_get_products(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict,
) -> None:
"""
Get all products in the catalog.
Request:
{
"type": "shopping_list_manager/get_products",
"list_id": "groceries" # optional, defaults to "groceries"
}
Response:
{
"milk": {
"key": "milk",
"name": "Milk",
"category": "dairy",
"unit": "pcs",
"image": ""
},
...
}
"""
manager = hass.data[DOMAIN]["manager"]
list_id = msg.get("list_id", "groceries")
try:
products = await manager.async_get_products(list_id=list_id)
connection.send_result(msg["id"], products)
except Exception as err:
_LOGGER.error("Error getting products for list '%s': %s", list_id, err)
connection.send_error(msg["id"], "get_products_failed", str(err))
@websocket_api.websocket_command({
vol.Required("type"): "shopping_list_manager/get_active",
vol.Optional("list_id", default="groceries"): str,
})
@websocket_api.async_response
async def websocket_get_active(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict,
) -> None:
"""
Get active shopping list (quantities only).
Request:
{
"type": "shopping_list_manager/get_active",
"list_id": "groceries" # optional, defaults to "groceries"
}
Response:
{
"milk": {"qty": 2},
"bread": {"qty": 1},
...
}
"""
manager = hass.data[DOMAIN]["manager"]
list_id = msg.get("list_id", "groceries")
try:
active = await manager.async_get_active(list_id=list_id)
connection.send_result(msg["id"], active)
except Exception as err:
_LOGGER.error("Error getting active list for '%s': %s", list_id, err)
connection.send_error(msg["id"], "get_active_failed", str(err))
@websocket_api.websocket_command({
vol.Required("type"): "shopping_list_manager/delete_product",
vol.Optional("list_id", default="groceries"): str,
vol.Required("key"): str,
})
@websocket_api.async_response
async def websocket_delete_product(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict,
) -> None:
"""
Delete a product from catalog (and remove from active list).
Request:
{
"type": "shopping_list_manager/delete_product",
"list_id": "groceries", # optional, defaults to "groceries"
"key": "milk"
}
Response:
{
"success": true
}
"""
manager = hass.data[DOMAIN]["manager"]
list_id = msg.get("list_id", "groceries")
try:
await manager.async_delete_product(list_id=list_id, key=msg["key"])
connection.send_result(msg["id"], {"success": True})
except Exception as err:
_LOGGER.error("Error deleting product from list '%s': %s", list_id, err)
connection.send_error(msg["id"], "delete_product_failed", str(err))
File diff suppressed because it is too large Load Diff