Compare commits

..

2 Commits

Author SHA1 Message Date
thekiwismarthome 9a5a0bbcaf Bump version to 1.3.0 in manifest.json 2026-02-06 21:51:21 +13:00
thekiwismarthome 40d29aee3a Improve setConfig method for better validation
Enhance configuration handling with validation and normalization.
2026-02-06 21:50:23 +13:00
35 changed files with 3038 additions and 31059 deletions
+102 -65
View File
@@ -1,98 +1,135 @@
# Shopping List Manager Integration for Home Assistant # Shopping List Manager
The backend integration that powers the Shopping List Manager. Provides persistent multi-list storage, a 500+ product catalog, real-time WebSocket events, and a full API for the Lovelace card — all running natively inside Home Assistant. A custom Home Assistant integration that provides an enhanced shopping list experience, including a companion Lovelace card for managing items directly from your dashboard.
> **Pair with the [Shopping List Manager Card](https://github.com/thekiwismarthome/shopping-list-manager-card)** for the full UI experience.
[![Open your Home Assistant instance and open this repository inside the Home Assistant Community Store.](https://my.home-assistant.io/badges/hacs_repository.svg)](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager&category=integration)
--- ---
## Features ## Features
### 🛒 Multi-List Management - 📋 Manage shopping list items from Home Assistant
- Create and manage multiple shopping lists - 🔌 WebSocket-based backend (no polling entities)
- Private or shared lists with per-member access control - 🖥️ Custom Lovelace card
- Active list state shared across all connected devices and users - ⚙️ UI-based configuration (Config Flow)
- List total price calculation - 🚀 Compatible with Home Assistant **2024.8+**
### 📦 Items
- Add, update, check, and delete items with quantity and unit
- Atomic quantity increment / decrement
- Bulk check and clear checked items
- Per-item pricing, notes, and category assignment
### 🔍 Product Catalog
- **500+ products** (NZ-focused, extensible to AU, US, GB, CA)
- Fuzzy search with alias matching
- Recently-used product suggestions
- Custom product creation
- Allergen filtering and product substitute groups
- Product images (WebP, 200×200px, optimised)
### 🗂️ Categories
- 13 default categories — Produce, Dairy, Meat, Bakery, Pantry, Frozen, Beverages, Snacks, Household, Health, Pet, Baby, Other
- Category colour coding and emoji icons
- Per-list category ordering
### 💳 Loyalty Cards
- Store loyalty and rewards card data
- Private or shared card access per user
### 🔄 Real-Time Events
- All changes fire events on the Home Assistant bus
- Custom WebSocket subscription proxy so **non-admin users** receive live updates without requiring HA admin privileges
--- ---
## Requirements ## 1. Installation (HACS)
| Component | Minimum Version | [![Open your Home Assistant instance and open a repository inside the Home Assistant Community Store.](https://my.home-assistant.io/badges/hacs_repository.svg)](
|---|---| https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager&category=integration
| Home Assistant | 2024.1 | )
| HACS | 2.x |
Click the button above **or** follow the manual steps below.
1. Open **HACS**
2. Go to **Integrations**
3. Click **⋮ → Custom repositories**
4. Add this repository:
- **Repository:** `https://github.com/thekiwismarthome/shopping-list-manager`
- **Category:** Integration
5. Install **Shopping List Manager**
6. **Restart Home Assistant**
--- ---
## Installation ## 2. Install the Lovelace Card Resource (Required)
### Via HACS (Recommended) The Lovelace card JavaScript file is included with the integration, but **must be copied manually** to the `www` directory so Home Assistant can load it.
[![Open your Home Assistant instance and open this repository inside the Home Assistant Community Store.](https://my.home-assistant.io/badges/hacs_repository.svg)](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager&category=integration) ### Step 1: Copy the card file
1. Click the button above Run the following command (via SSH, Terminal add-on, or container shell):
2. Confirm adding the repository to HACS
3. Install **Shopping List Manager** from **HACS → Integrations**
4. Restart Home Assistant
5. Go to **Settings → Devices & Services → Add Integration** and search for **Shopping List Manager**
### Manual Installation ```bash
mkdir -p /config/www/community/shopping_list_card && \
1. Copy the `custom_components/shopping_list_manager/` folder into your HA `/config/custom_components/` directory cp /config/custom_components/shopping_list_manager/frontend/shopping_list_card.js \
2. Restart Home Assistant /config/www/community/shopping_list_card/shopping_list_card.js
3. Go to **Settings → Devices & Services → Add Integration** and search for **Shopping List Manager** ```
--- ---
## Lovelace Card ### Step 2: Add the resource to Home Assistant
Install the companion card to get the full shopping UI: 1. Go to **Settings → Dashboards**
2. Click **⋮ (top right) → Resources**
3. Click **Add Resource**
4. Enter:
[![Open your Home Assistant instance and open this repository inside the Home Assistant Community Store.](https://my.home-assistant.io/badges/hacs_repository.svg)](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager-card&category=plugin) ```text
URL: /local/community/shopping_list_card/shopping_list_card.js
Type: JavaScript Module
```
5. Click **Create**
6. Refresh your browser (**Ctrl + F5**)
--- ---
## Documentation ## 3. Add the Integration
Full documentation is available in the [Wiki](https://github.com/thekiwismarthome/shopping-list-manager/wiki). [![Open your Home Assistant instance and start setting up a new integration.](https://my.home-assistant.io/badges/config_flow_start.svg)](
https://my.home-assistant.io/redirect/config_flow_start/?domain=shopping_list_manager
)
## Support & Feedback Click the button above **or** add it manually:
- [Open an Issue](https://github.com/thekiwismarthome/shopping-list-manager/issues) 1. Go to **Settings → Devices & Services**
- [Home Assistant Community Forum](https://community.home-assistant.io) 2. Click **Add Integration**
3. Search for **Shopping List Manager**
4. Follow the setup steps
No YAML configuration is required.
---
## 4. Add the Card to a Dashboard
Add a **Manual** card to your dashboard and use the following YAML:
```yaml
type: custom:shopping-list-card
title: Shopping List
list_id: groceries
```
Use the **⚙️ cog button** in the card to configure additional settings.
---
## Updating
- HACS updates will update the **integration**
- If the Lovelace card JavaScript changes in a future release, you must **repeat the copy command** above
This is expected behavior for single-repository integrations.
---
## Compatibility Notes
- Designed for **Home Assistant 2024.8+**
- Uses WebSocket APIs
- Fully compatible with the **Services → Actions** change introduced in Home Assistant 2024.8
---
## Troubleshooting
If the card does not load:
1. Ensure Home Assistant was restarted after installation
2. Verify the file exists at:
```
/config/www/community/shopping_list_card/shopping_list_card.js
```
3. Confirm the resource URL is correct
4. Perform a hard browser refresh (**Ctrl + F5**)
--- ---
## License ## License
MIT — see [LICENSE](LICENSE) for details. MIT License
@@ -1,279 +1,49 @@
"""Shopping List Manager integration for Home Assistant.""" """
Shopping List Manager - Home Assistant Custom Integration
Clean-slate architecture with enforced invariants
"""
import logging import logging
import os
from pathlib import Path
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.typing import ConfigType from homeassistant.components import websocket_api as ha_websocket
from .const import DOMAIN, EVENT_ITEM_ADDED, EVENT_ITEM_UPDATED, EVENT_ITEM_CHECKED, EVENT_ITEM_DELETED, EVENT_LIST_UPDATED, EVENT_LIST_DELETED from .const import DOMAIN
from .storage import ShoppingListStorage from .manager import ShoppingListManager
from .utils.images import ImageHandler # Import websocket handler functions directly
from .websocket_api import (
websocket_add_product,
websocket_set_qty,
websocket_get_products,
websocket_get_active,
websocket_delete_product,
)
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
# Track storage instance globally
DATA_STORAGE = f"{DOMAIN}_storage"
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Shopping List Manager component from yaml (not used)."""
# This integration doesn't support YAML configuration
# All setup is done via config entries (UI configuration)
return True
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Shopping List Manager from a config entry.""" """Set up Shopping List Manager from a config entry."""
_LOGGER.info("Setting up Shopping List Manager") # Initialize the manager
manager = ShoppingListManager(hass)
await manager.async_load()
# Get component path for loading data files # Store manager in hass.data
component_path = os.path.dirname(__file__)
config_path = hass.config.path()
# Get country from options (or fall back to data, or default to NZ)
country = entry.options.get("country") or entry.data.get("country", "NZ")
_LOGGER.info("Using country: %s", country)
# Initialize storage with country
storage = ShoppingListStorage(hass, component_path, country)
await storage.async_load()
# Initialize image handler
image_handler = ImageHandler(hass, config_path)
# Store instances in hass.data
hass.data.setdefault(DOMAIN, {}) hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][DATA_STORAGE] = storage hass.data[DOMAIN]["manager"] = manager
hass.data[DOMAIN]["image_handler"] = image_handler
hass.data[DOMAIN]["country"] = country
# Register update listener for options changes # Register WebSocket commands using Home Assistant's websocket_api
entry.async_on_unload(entry.add_update_listener(update_listener)) ha_websocket.async_register_command(hass, websocket_add_product)
ha_websocket.async_register_command(hass, websocket_set_qty)
ha_websocket.async_register_command(hass, websocket_get_products)
ha_websocket.async_register_command(hass, websocket_get_active)
ha_websocket.async_register_command(hass, websocket_delete_product)
# Register WebSocket commands _LOGGER.info("Shopping List Manager setup complete - registered 5 WebSocket commands")
await _async_register_websocket_handlers(hass, storage)
# Register frontend resources
await _async_register_frontend(hass)
# CRITICAL: Register event listeners so non-admin users can subscribe
# Without these, non-admin users cannot receive real-time updates
def _dummy_listener(event):
"""Dummy listener to enable event subscription for all users."""
pass
hass.bus.async_listen(EVENT_ITEM_ADDED, _dummy_listener)
hass.bus.async_listen(EVENT_ITEM_UPDATED, _dummy_listener)
hass.bus.async_listen(EVENT_ITEM_CHECKED, _dummy_listener)
hass.bus.async_listen(EVENT_ITEM_DELETED, _dummy_listener)
hass.bus.async_listen(EVENT_LIST_UPDATED, _dummy_listener)
hass.bus.async_listen(EVENT_LIST_DELETED, _dummy_listener)
_LOGGER.info("Shopping List Manager setup complete with event subscriptions enabled")
return True return True
async def update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Handle options update."""
# Reload the integration when options change
await hass.config_entries.async_reload(entry.entry_id)
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry.""" """Unload Shopping List Manager."""
_LOGGER.info("Unloading Shopping List Manager") hass.data[DOMAIN].pop("manager", None)
# Clean up hass.data
if DOMAIN in hass.data:
hass.data[DOMAIN].pop(DATA_STORAGE, None)
return True return True
async def _async_register_websocket_handlers(
hass: HomeAssistant,
storage: ShoppingListStorage
) -> None:
"""Register WebSocket API handlers."""
from homeassistant.components import websocket_api
from .websocket import handlers
# Lists handlers
websocket_api.async_register_command(
hass,
handlers.websocket_subscribe,
)
websocket_api.async_register_command(
hass,
handlers.websocket_get_lists,
)
websocket_api.async_register_command(
hass,
handlers.websocket_create_list,
)
websocket_api.async_register_command(
hass,
handlers.websocket_update_list,
)
websocket_api.async_register_command(
hass,
handlers.websocket_delete_list,
)
websocket_api.async_register_command(
hass,
handlers.websocket_set_active_list,
)
# Items handlers
websocket_api.async_register_command(
hass,
handlers.websocket_get_items,
)
websocket_api.async_register_command(
hass,
handlers.websocket_add_item,
)
websocket_api.async_register_command(
hass,
handlers.websocket_update_item,
)
websocket_api.async_register_command(
hass,
handlers.websocket_check_item,
)
websocket_api.async_register_command(
hass,
handlers.websocket_increment_item,
)
websocket_api.async_register_command(
hass,
handlers.websocket_delete_item,
)
websocket_api.async_register_command(
hass,
handlers.websocket_reorder_items,
)
websocket_api.async_register_command(
hass,
handlers.websocket_bulk_check_items,
)
websocket_api.async_register_command(
hass,
handlers.websocket_clear_checked_items,
)
websocket_api.async_register_command(
hass,
handlers.websocket_get_list_total,
)
# Products handlers
websocket_api.async_register_command(
hass,
handlers.websocket_search_by_barcode,
)
websocket_api.async_register_command(
hass,
handlers.websocket_search_products,
)
websocket_api.async_register_command(
hass,
handlers.ws_get_products_by_ids,
)
websocket_api.async_register_command(
hass,
handlers.websocket_get_product_suggestions,
)
websocket_api.async_register_command(
hass,
handlers.websocket_add_product,
)
websocket_api.async_register_command(
hass,
handlers.websocket_update_product,
)
websocket_api.async_register_command(
hass,
handlers.websocket_get_product_substitutes,
)
# Categories handlers
websocket_api.async_register_command(
hass,
handlers.websocket_get_categories,
)
# Integration settings handlers
websocket_api.async_register_command(
hass,
handlers.websocket_get_integration_settings,
)
websocket_api.async_register_command(
hass,
handlers.websocket_set_country,
)
# Backup / Restore handlers
websocket_api.async_register_command(
hass,
handlers.websocket_export_data,
)
websocket_api.async_register_command(
hass,
handlers.websocket_import_data,
)
# List members handler
websocket_api.async_register_command(
hass,
handlers.websocket_update_list_members,
)
# HA users handler
websocket_api.async_register_command(
hass,
handlers.websocket_get_ha_users,
)
# Loyalty card handlers
websocket_api.async_register_command(
hass,
handlers.websocket_get_loyalty_cards,
)
websocket_api.async_register_command(
hass,
handlers.websocket_add_loyalty_card,
)
websocket_api.async_register_command(
hass,
handlers.websocket_update_loyalty_card,
)
websocket_api.async_register_command(
hass,
handlers.websocket_delete_loyalty_card,
)
websocket_api.async_register_command(
hass,
handlers.websocket_update_loyalty_card_members,
)
_LOGGER.debug("WebSocket handlers registered")
async def _async_register_frontend(hass: HomeAssistant) -> None:
"""Register frontend resources."""
# Since frontend is a separate HACS module, we don't need to register it here
# The frontend card registers itself independently
_LOGGER.debug("Frontend resources skipped (separate HACS module)")
def get_storage(hass: HomeAssistant) -> ShoppingListStorage:
"""Get the storage instance from hass.data.
Helper function for WebSocket handlers to access storage.
"""
return hass.data[DOMAIN][DATA_STORAGE]
@@ -1,7 +1,5 @@
"""Config flow for Shopping List Manager.""" """Config flow for Shopping List Manager."""
import voluptuous as vol
from homeassistant import config_entries from homeassistant import config_entries
from homeassistant.core import callback
from .const import DOMAIN from .const import DOMAIN
@@ -18,76 +16,10 @@ class ShoppingListManagerConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
return self.async_abort(reason="single_instance_allowed") return self.async_abort(reason="single_instance_allowed")
if user_input is not None: if user_input is not None:
# Create entry with default country
return self.async_create_entry( return self.async_create_entry(
title="Shopping List Manager", title="Shopping List Manager",
data={"country": "NZ"}, data={}
options={
"country": "NZ",
"enable_price_tracking": True,
"enable_image_search": True,
"metric_units_only": True,
}
) )
# Show simple setup form # Show simple form
return self.async_show_form( return self.async_show_form(step_id="user")
step_id="user",
description_placeholders={
"info": "Country and other settings can be configured after setup via the Configure button."
}
)
@staticmethod
@callback
def async_get_options_flow(config_entry):
"""Get the options flow for this handler."""
return OptionsFlowHandler(config_entry)
class OptionsFlowHandler(config_entries.OptionsFlow):
"""Handle options flow for Shopping List Manager."""
def __init__(self, config_entry):
"""Initialize options flow."""
self.config_entry = config_entry
async def async_step_init(self, user_input=None):
"""Manage the options."""
if user_input is not None:
# Update options
return self.async_create_entry(title="", data=user_input)
# Get current settings
current_country = self.config_entry.options.get(
"country",
self.config_entry.data.get("country", "NZ")
)
return self.async_show_form(
step_id="init",
data_schema=vol.Schema({
vol.Required("country", default=current_country): vol.In({
"NZ": "New Zealand",
"AU": "Australia",
"US": "United States",
"GB": "United Kingdom",
"CA": "Canada",
}),
vol.Optional(
"enable_price_tracking",
default=self.config_entry.options.get("enable_price_tracking", True)
): bool,
vol.Optional(
"enable_image_search",
default=self.config_entry.options.get("enable_image_search", True)
): bool,
vol.Optional(
"metric_units_only",
default=self.config_entry.options.get("metric_units_only", True)
): bool,
}),
description_placeholders={
"info": "Changing country will reload the product catalog on next restart."
}
)
@@ -1,131 +1,11 @@
"""Constants for Shopping List Manager.""" """Constants for Shopping List Manager."""
# Domain
DOMAIN = "shopping_list_manager" DOMAIN = "shopping_list_manager"
# Storage Keys # Storage keys
STORAGE_VERSION = 2 STORAGE_VERSION = 1
STORAGE_KEY_LISTS = f"{DOMAIN}.lists"
STORAGE_KEY_ITEMS = f"{DOMAIN}.items"
STORAGE_KEY_PRODUCTS = f"{DOMAIN}.products" STORAGE_KEY_PRODUCTS = f"{DOMAIN}.products"
STORAGE_KEY_CATEGORIES = f"{DOMAIN}.categories" STORAGE_KEY_ACTIVE = f"{DOMAIN}.active_list"
STORAGE_KEY_LOYALTY_CARDS = f"{DOMAIN}.loyalty_cards"
# WebSocket Commands - Lists
WS_TYPE_LISTS_GET_ALL = f"{DOMAIN}/lists/get_all"
WS_TYPE_LISTS_CREATE = f"{DOMAIN}/lists/create"
WS_TYPE_LISTS_UPDATE = f"{DOMAIN}/lists/update"
WS_TYPE_LISTS_DELETE = f"{DOMAIN}/lists/delete"
WS_TYPE_LISTS_SET_ACTIVE = f"{DOMAIN}/lists/set_active"
WS_TYPE_LISTS_UPDATE_MEMBERS = f"{DOMAIN}/lists/update_members"
# WebSocket Commands - Users
WS_TYPE_USERS_GET_ALL = f"{DOMAIN}/users/get_all"
# WebSocket Commands - Items
WS_TYPE_ITEMS_GET = f"{DOMAIN}/items/get"
WS_TYPE_ITEMS_ADD = f"{DOMAIN}/items/add"
WS_TYPE_ITEMS_UPDATE = f"{DOMAIN}/items/update"
WS_TYPE_ITEMS_CHECK = f"{DOMAIN}/items/check"
WS_TYPE_ITEMS_DELETE = f"{DOMAIN}/items/delete"
WS_TYPE_ITEMS_REORDER = f"{DOMAIN}/items/reorder"
WS_TYPE_ITEMS_BULK_CHECK = f"{DOMAIN}/items/bulk_check"
WS_TYPE_ITEMS_CLEAR_CHECKED = f"{DOMAIN}/items/clear_checked"
WS_TYPE_ITEMS_GET_TOTAL = f"{DOMAIN}/items/get_total"
# WebSocket Commands - Products
WS_TYPE_PRODUCTS_SEARCH = f"{DOMAIN}/products/search"
WS_TYPE_PRODUCTS_SUGGESTIONS = f"{DOMAIN}/products/suggestions"
WS_TYPE_PRODUCTS_ADD = f"{DOMAIN}/products/add"
WS_TYPE_PRODUCTS_UPDATE = f"{DOMAIN}/products/update"
WS_TYPE_PRODUCTS_DELETE = f"{DOMAIN}/products/delete"
# WebSocket Commands - Categories
WS_TYPE_CATEGORIES_GET_ALL = f"{DOMAIN}/categories/get_all"
WS_TYPE_CATEGORIES_REORDER = f"{DOMAIN}/categories/reorder"
# WebSocket Commands - Loyalty Cards
WS_TYPE_LOYALTY_GET_ALL = f"{DOMAIN}/loyalty/get_all"
WS_TYPE_LOYALTY_ADD = f"{DOMAIN}/loyalty/add"
WS_TYPE_LOYALTY_UPDATE = f"{DOMAIN}/loyalty/update"
WS_TYPE_LOYALTY_DELETE = f"{DOMAIN}/loyalty/delete"
WS_TYPE_LOYALTY_UPDATE_MEMBERS = f"{DOMAIN}/loyalty/update_members"
# WebSocket Commands - Subscriptions
WS_TYPE_SUBSCRIBE = f"{DOMAIN}/subscribe"
WS_TYPE_UNSUBSCRIBE = f"{DOMAIN}/unsubscribe"
# WebSocket Commands - Barcode (Phase 5)
WS_TYPE_BARCODE_SCAN = f"{DOMAIN}/barcode/scan"
WS_TYPE_BARCODE_ADD = f"{DOMAIN}/barcode/add_to_list"
# WebSocket Commands - OpenFoodFacts (Phase 5)
WS_TYPE_OFF_FETCH = f"{DOMAIN}/openfoodfacts/fetch"
WS_TYPE_OFF_IMPORT = f"{DOMAIN}/openfoodfacts/import"
# Events # Events
EVENT_ITEM_ADDED = f"{DOMAIN}_item_added" EVENT_SHOPPING_LIST_UPDATED = f"{DOMAIN}_updated"
EVENT_ITEM_UPDATED = f"{DOMAIN}_item_updated"
EVENT_ITEM_CHECKED = f"{DOMAIN}_item_checked"
EVENT_ITEM_DELETED = f"{DOMAIN}_item_deleted"
EVENT_LIST_UPDATED = f"{DOMAIN}_list_updated"
EVENT_LIST_DELETED = f"{DOMAIN}_list_deleted"
# Image Configuration
IMAGE_FORMAT = "webp"
IMAGE_SIZE = 200 # 200x200px
IMAGE_QUALITY = 85
IMAGE_MAX_SIZE_KB = 15
IMAGES_LOCAL_DIR = "www/shopping_list_manager/images"
# Placeholder image (inline SVG)
PLACEHOLDER_IMAGE = "data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='200' height='200'%3E%3Crect width='200' height='200' fill='%23f0f0f0'/%3E%3Ctext x='50%25' y='50%25' dominant-baseline='middle' text-anchor='middle' font-family='Arial' font-size='16' fill='%23999'%3ENo Image%3C/text%3E%3C/svg%3E"
# Metric Units (always metric, regardless of country)
METRIC_UNITS = {
"weight": ["kg", "g"],
"volume": ["L", "mL"],
"count": ["units", "pack", "loaf", "dozen", "ea", "pkt", "tray", "bottle", "can", "bunch", "pottle", "roll", "sachet", "tub", "bar"]
}
# Default quantities for common products (NZ-focused, can be country-specific later)
DEFAULT_QUANTITIES = {
"milk": {"quantity": 2, "unit": "L"},
"bread": {"quantity": 1, "unit": "loaf"},
"butter": {"quantity": 500, "unit": "g"},
"eggs": {"quantity": 12, "unit": "ea"},
"cheese": {"quantity": 500, "unit": "g"},
"yogurt": {"quantity": 1, "unit": "kg"},
"flour": {"quantity": 1.5, "unit": "kg"},
"sugar": {"quantity": 1.5, "unit": "kg"},
"rice": {"quantity": 1, "unit": "kg"},
"pasta": {"quantity": 500, "unit": "g"},
"chicken breast": {"quantity": 1, "unit": "kg"},
"beef mince": {"quantity": 500, "unit": "g"},
"sausages": {"quantity": 500, "unit": "g"},
"bacon": {"quantity": 500, "unit": "g"},
"apples": {"quantity": 1, "unit": "kg"},
"bananas": {"quantity": 1, "unit": "kg"},
"potatoes": {"quantity": 2, "unit": "kg"},
"onions": {"quantity": 1, "unit": "kg"},
"carrots": {"quantity": 1, "unit": "kg"},
"tomatoes": {"quantity": 500, "unit": "g"},
"lettuce": {"quantity": 1, "unit": "ea"},
"capsicum": {"quantity": 1, "unit": "ea"},
"broccoli": {"quantity": 1, "unit": "ea"},
"cereal": {"quantity": 1, "unit": "pack"},
"baked beans": {"quantity": 1, "unit": "can"},
"tuna": {"quantity": 1, "unit": "can"},
"olive oil": {"quantity": 1, "unit": "L"},
"coffee": {"quantity": 200, "unit": "g"},
"tea bags": {"quantity": 100, "unit": "ea"},
"toilet paper": {"quantity": 12, "unit": "roll"},
"paper towels": {"quantity": 2, "unit": "roll"},
"dishwashing liquid": {"quantity": 500, "unit": "mL"},
"laundry powder": {"quantity": 2, "unit": "kg"}
}
# Paths
CATEGORIES_FILE = "categories.json"
PRODUCTS_CATALOG_FILE = "products_catalog.json"
IMAGES_PATH = "images/products"
@@ -1,62 +0,0 @@
"""Product catalog loader for Shopping List Manager."""
import json
import logging
from typing import List, Dict, Any
import aiofiles
_LOGGER = logging.getLogger(__name__)
async def load_product_catalog(component_path: str, country_code: str = "NZ") -> List[Dict[str, Any]]:
"""Load product catalog from JSON file asynchronously.
Args:
component_path: Path to the component directory
country_code: Country code (e.g., 'NZ', 'AU', 'US')
Returns:
List of product dictionaries
"""
import os
# Try country-specific catalog first
if country_code:
catalog_file = os.path.join(
component_path, "data", f"products_catalog_{country_code.lower()}.json"
)
if not os.path.exists(catalog_file):
_LOGGER.warning(
"No country-specific catalog found for %s at %s",
country_code,
catalog_file
)
return []
else:
return []
try:
# Use aiofiles for async file reading
async with aiofiles.open(catalog_file, "r", encoding="utf-8") as f:
content = await f.read()
data = json.loads(content)
_LOGGER.info(
"Loaded product catalog version %s for region %s",
data.get("version", "unknown"),
data.get("region", "default")
)
products = data.get("products", [])
_LOGGER.info("Loaded %d products from catalog", len(products))
return products
except FileNotFoundError:
_LOGGER.error("Product catalog file not found: %s", catalog_file)
return []
except json.JSONDecodeError as err:
_LOGGER.error("Failed to parse product catalog file: %s", err)
return []
except Exception as err:
_LOGGER.error("Unexpected error loading product catalog: %s", err)
return []
@@ -1,110 +0,0 @@
{
"version": "1.0.0",
"region": "NZ",
"categories": [
{
"id": "produce",
"name": "Fruit & Veg",
"icon": "mdi:fruit-cherries",
"color": "#4CAF50",
"sort_order": 1,
"system": true
},
{
"id": "dairy",
"name": "Dairy & Eggs",
"icon": "mdi:cheese",
"color": "#FFC107",
"sort_order": 2,
"system": true
},
{
"id": "meat",
"name": "Meat & Seafood",
"icon": "mdi:food-steak",
"color": "#F44336",
"sort_order": 3,
"system": true
},
{
"id": "bakery",
"name": "Bakery",
"icon": "mdi:bread-slice",
"color": "#FF9800",
"sort_order": 4,
"system": true
},
{
"id": "frozen",
"name": "Frozen Foods",
"icon": "mdi:snowflake",
"color": "#2196F3",
"sort_order": 5,
"system": true
},
{
"id": "pantry",
"name": "Pantry",
"icon": "mdi:package-variant",
"color": "#795548",
"sort_order": 6,
"system": true
},
{
"id": "beverages",
"name": "Drinks",
"icon": "mdi:cup",
"color": "#00BCD4",
"sort_order": 7,
"system": true
},
{
"id": "snacks",
"name": "Snacks & Biscuits",
"icon": "mdi:food-apple",
"color": "#E91E63",
"sort_order": 8,
"system": true
},
{
"id": "household",
"name": "Household",
"icon": "mdi:spray-bottle",
"color": "#9C27B0",
"sort_order": 9,
"system": true
},
{
"id": "health",
"name": "Health & Beauty",
"icon": "mdi:heart-pulse",
"color": "#009688",
"sort_order": 10,
"system": true
},
{
"id": "pet",
"name": "Pet Supplies",
"icon": "mdi:paw",
"color": "#FF5722",
"sort_order": 11,
"system": true
},
{
"id": "baby",
"name": "Baby",
"icon": "mdi:baby-face",
"color": "#F48FB1",
"sort_order": 12,
"system": true
},
{
"id": "other",
"name": "Other",
"icon": "mdi:dots-horizontal",
"color": "#9E9E9E",
"sort_order": 99,
"system": true
}
]
}
@@ -1,98 +0,0 @@
"""Category loader utility."""
import json
import logging
import os
from typing import List, Dict, Any
import aiofiles
_LOGGER = logging.getLogger(__name__)
async def load_categories(component_path: str, country_code: str = None) -> List[Dict[str, Any]]:
"""Load categories from JSON file asynchronously.
Args:
component_path: Path to the component directory
country_code: Country code from HA config (e.g., 'NZ', 'AU', 'US')
If None, loads default categories.json
Returns:
List of category dictionaries
"""
import os
# Try country-specific file first if country_code provided
if country_code:
country_file = os.path.join(
component_path, "data", f"categories_{country_code.lower()}.json"
)
if os.path.exists(country_file):
categories_file = country_file
_LOGGER.debug("Using country-specific categories: %s", country_code)
else:
_LOGGER.debug(
"No country-specific categories found for %s, using default",
country_code
)
categories_file = os.path.join(component_path, "data", "categories.json")
else:
categories_file = os.path.join(component_path, "data", "categories.json")
try:
async with aiofiles.open(categories_file, "r", encoding="utf-8") as f:
content = await f.read()
data = json.loads(content)
_LOGGER.info(
"Loaded categories version %s for region %s",
data.get("version", "unknown"),
data.get("region", "default")
)
return data.get("categories", [])
except FileNotFoundError:
_LOGGER.error("Categories file not found: %s", categories_file)
return _get_fallback_categories()
except json.JSONDecodeError as err:
_LOGGER.error("Failed to parse categories file: %s", err)
return _get_fallback_categories()
except Exception as err:
_LOGGER.error("Unexpected error loading categories: %s", err)
return _get_fallback_categories()
def _get_fallback_categories() -> List[Dict[str, Any]]:
"""Get minimal fallback categories if file loading fails.
Returns:
List of basic category dictionaries
"""
_LOGGER.warning("Using fallback categories")
return [
{
"id": "produce",
"name": "Produce",
"icon": "mdi:fruit-cherries",
"color": "#4CAF50",
"sort_order": 1,
"system": True
},
{
"id": "dairy",
"name": "Dairy",
"icon": "mdi:cheese",
"color": "#FFC107",
"sort_order": 2,
"system": True
},
{
"id": "other",
"name": "Other",
"icon": "mdi:dots-horizontal",
"color": "#9E9E9E",
"sort_order": 99,
"system": True
}
]
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,300 @@
"""Core Shopping List Manager with invariant enforcement."""
import asyncio
import logging
from typing import Dict, Optional, List
from homeassistant.core import HomeAssistant
from homeassistant.helpers import storage
from .const import (
DOMAIN,
EVENT_SHOPPING_LIST_UPDATED,
STORAGE_KEY_ACTIVE,
STORAGE_KEY_PRODUCTS,
STORAGE_VERSION,
)
from .models import Product, ActiveItem, InvariantError, validate_invariant
_LOGGER = logging.getLogger(__name__)
class ShoppingListManager:
"""
Manages multiple independent shopping lists.
Each list_id gets its own pair of storage files:
- shopping_list_manager.{list_id}.products
- shopping_list_manager.{list_id}.active_list
The default "groceries" list uses the original flat keys for backward compat:
- shopping_list_manager.products groceries products
- shopping_list_manager.active_list groceries active
Architecture principles:
1. Products and active_list are separate concerns per list
2. Products are authoritative, persistent data
3. Active list is ephemeral state
4. Invariant (active products) enforced on every mutation
5. Lock ensures atomic operations per list
"""
def __init__(self, hass: HomeAssistant):
"""Initialize the manager."""
self.hass = hass
# Per-list in-memory caches: list_id -> {key: Product}
self._products: Dict[str, Dict[str, Product]] = {}
self._active_list: Dict[str, Dict[str, ActiveItem]] = {}
# Per-list locks
self._locks: Dict[str, asyncio.Lock] = {}
# Per-list storage Store instances (created lazily, except groceries)
self._store_products: Dict[str, storage.Store] = {}
self._store_active: Dict[str, storage.Store] = {}
# Pre-create stores for the default "groceries" list using the original flat keys
# for backward compatibility — existing data just works
self._store_products["groceries"] = storage.Store(
hass, STORAGE_VERSION, STORAGE_KEY_PRODUCTS # "shopping_list_manager.products"
)
self._store_active["groceries"] = storage.Store(
hass, STORAGE_VERSION, STORAGE_KEY_ACTIVE # "shopping_list_manager.active_list"
)
def _lock_for(self, list_id: str) -> asyncio.Lock:
"""Get or create lock for a list."""
if list_id not in self._locks:
self._locks[list_id] = asyncio.Lock()
return self._locks[list_id]
def _store_products_for(self, list_id: str) -> storage.Store:
"""Get or create products Store for a list."""
if list_id not in self._store_products:
# Non-groceries lists use namespaced keys
key = f"{DOMAIN}.{list_id}.products"
self._store_products[list_id] = storage.Store(self.hass, STORAGE_VERSION, key)
return self._store_products[list_id]
def _store_active_for(self, list_id: str) -> storage.Store:
"""Get or create active Store for a list."""
if list_id not in self._store_active:
key = f"{DOMAIN}.{list_id}.active_list"
self._store_active[list_id] = storage.Store(self.hass, STORAGE_VERSION, key)
return self._store_active[list_id]
async def _ensure_loaded(self, list_id: str) -> None:
"""Lazily load a list from storage if not yet in memory."""
if list_id in self._products:
return # already loaded
products_data = await self._store_products_for(list_id).async_load()
self._products[list_id] = {
key: Product.from_dict(data) for key, data in (products_data or {}).items()
}
active_data = await self._store_active_for(list_id).async_load()
self._active_list[list_id] = {
key: ActiveItem.from_dict(data) for key, data in (active_data or {}).items()
}
# Repair any orphaned active items
await self._async_repair_invariant(list_id)
_LOGGER.info(
"Loaded list '%s': %d products, %d active",
list_id, len(self._products[list_id]), len(self._active_list[list_id])
)
async def async_load(self) -> None:
"""Pre-load the default groceries list for backward compat."""
async with self._lock_for("groceries"):
await self._ensure_loaded("groceries")
async def _async_repair_invariant(self, list_id: str) -> None:
"""Remove active items whose product no longer exists."""
orphaned = [k for k in self._active_list[list_id] if k not in self._products[list_id]]
if orphaned:
_LOGGER.warning(
"List '%s': removing %d orphaned active items: %s",
list_id, len(orphaned), orphaned
)
for k in orphaned:
del self._active_list[list_id][k]
await self._async_save_active(list_id)
async def _async_save_products(self, list_id: str) -> None:
"""Persist products to storage."""
data = {key: p.to_dict() for key, p in self._products[list_id].items()}
await self._store_products_for(list_id).async_save(data)
async def _async_save_active(self, list_id: str) -> None:
"""Persist active list to storage."""
data = {key: a.to_dict() for key, a in self._active_list[list_id].items()}
await self._store_active_for(list_id).async_save(data)
def _fire_update_event(self) -> None:
"""Fire event to notify listeners of changes."""
self.hass.bus.async_fire(EVENT_SHOPPING_LIST_UPDATED)
# ========================================================================
# PUBLIC API - All operations enforce invariants
# ========================================================================
async def async_add_product(
self,
list_id: str,
key: str,
name: str,
category: str = "other",
unit: str = "pcs",
image: str = ""
) -> Product:
"""
Add or update a product in a list's catalog.
This operation:
- Creates/updates product metadata
- Does NOT modify quantities
- Is idempotent
- Persists to storage
Args:
list_id: List identifier
key: Unique product identifier
name: Display name
category: Product category
unit: Unit of measurement
image: Image URL
Returns:
The created/updated Product
"""
async with self._lock_for(list_id):
await self._ensure_loaded(list_id)
product = Product(
key=key,
name=name,
category=category,
unit=unit,
image=image
)
self._products[list_id][key] = product
await self._async_save_products(list_id)
_LOGGER.debug("List '%s': added/updated product %s (%s)", list_id, name, key)
self._fire_update_event()
return product
async def async_set_qty(self, list_id: str, key: str, qty: int) -> None:
"""
Set quantity for a product on the shopping list.
This operation:
- REQUIRES product to exist (enforces invariant)
- qty > 0: adds/updates active_list
- qty == 0: removes from active_list
- Persists state
- Fires update event
Args:
list_id: List identifier
key: Product key (must exist in catalog)
qty: New quantity (0 to remove, >0 to add/update)
Raises:
InvariantError: If product doesn't exist
ValueError: If qty is negative
"""
if qty < 0:
raise ValueError(f"Quantity cannot be negative: {qty}")
async with self._lock_for(list_id):
await self._ensure_loaded(list_id)
# INVARIANT ENFORCEMENT: Product must exist
if key not in self._products[list_id]:
raise InvariantError(
f"Cannot set quantity for unknown product '{key}' in list '{list_id}'. "
f"Product must be created first with add_product."
)
# Update or remove from active list
if qty > 0:
self._active_list[list_id][key] = ActiveItem(qty=qty)
_LOGGER.debug("List '%s': set qty for %s: %d", list_id, key, qty)
else:
# qty == 0: remove from list
if key in self._active_list[list_id]:
del self._active_list[list_id][key]
_LOGGER.debug("List '%s': removed %s from active list", list_id, key)
await self._async_save_active(list_id)
self._fire_update_event()
async def async_delete_product(self, list_id: str, key: str) -> None:
"""
Delete a product from the catalog.
This operation:
- Removes product from catalog
- Removes from active list (maintains invariant)
- Persists both changes
Args:
list_id: List identifier
key: Product key to delete
"""
async with self._lock_for(list_id):
await self._ensure_loaded(list_id)
if key not in self._products[list_id]:
_LOGGER.warning("List '%s': attempted to delete non-existent product: %s", list_id, key)
return
# Remove from catalog
del self._products[list_id][key]
# Remove from active list (maintain invariant)
if key in self._active_list[list_id]:
del self._active_list[list_id][key]
await self._async_save_products(list_id)
await self._async_save_active(list_id)
_LOGGER.debug("List '%s': deleted product: %s", list_id, key)
self._fire_update_event()
async def async_get_products(self, list_id: str) -> Dict[str, dict]:
"""
Get all products in a list's catalog.
Args:
list_id: List identifier
Returns:
Dictionary of product key -> product data
"""
async with self._lock_for(list_id):
await self._ensure_loaded(list_id)
return {key: product.to_dict() for key, product in self._products[list_id].items()}
async def async_get_active(self, list_id: str) -> Dict[str, dict]:
"""
Get active shopping list (quantities only).
Args:
list_id: List identifier
Returns:
Dictionary of product key -> active item data (qty only)
"""
async with self._lock_for(list_id):
await self._ensure_loaded(list_id)
return {key: item.to_dict() for key, item in self._active_list[list_id].items()}
# NOTE: The following methods were removed as they're not used by the websocket API
# and would need updating to support per-list structure:
# - async_get_full_state()
# - get_product()
# - get_active_qty()
@@ -1,17 +1,12 @@
{ {
"domain": "shopping_list_manager", "domain": "shopping_list_manager",
"name": "Shopping List Manager", "name": "Shopping List Manager",
"version": "2.0.0", "version": "1.3.0",
"documentation": "https://github.com/thekiwismarthome/shopping-list-manager", "documentation": "https://github.com/thekiwismarthome/shopping-list-manager",
"issue_tracker": "https://github.com/thekiwismarthome/shopping-list-manager/issues", "issue_tracker": "https://github.com/thekiwismarthome/shopping-list-manager/issues",
"requirements": [ "requirements": [],
"Pillow>=10.0.0",
"aiofiles>=23.0.0",
"rapidfuzz>=3.0.0"
],
"dependencies": [], "dependencies": [],
"codeowners": ["@thekiwismarthome"], "codeowners": ["@thekiwismarthome"],
"config_flow": true, "config_flow": true,
"iot_class": "local_push", "iot_class": "local_push"
"integration_type": "service"
} }
+87 -121
View File
@@ -1,138 +1,104 @@
"""Data models for Shopping List Manager.""" """Data models for Shopping List Manager."""
from dataclasses import dataclass, field, asdict from dataclasses import dataclass, asdict
from datetime import datetime from typing import Dict
from typing import Optional, List, Dict, Any
import uuid
def generate_id() -> str:
"""Generate a unique ID."""
return str(uuid.uuid4())
def current_timestamp() -> str:
"""Get current ISO timestamp."""
return datetime.utcnow().isoformat() + "Z"
@dataclass
class Category:
"""Category model."""
id: str
name: str
icon: str
color: str
sort_order: int
system: bool = True
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)
@dataclass @dataclass
class Product: class Product:
"""Product model.""" """
id: str Product catalog entry - authoritative product definition.
name: str
category_id: str
aliases: List[str] = field(default_factory=list)
default_unit: str = "units"
default_quantity: float = 1
price: Optional[float] = None
currency: Optional[str] = None
price_per_unit: Optional[float] = None
price_updated: Optional[str] = None
image_url: Optional[str] = None
image_source: Optional[str] = None
barcode: Optional[str] = None
brands: List[str] = field(default_factory=list)
nutrition: Optional[Dict[str, Any]] = None
user_frequency: int = 0
last_used: Optional[str] = None
custom: bool = False
source: str = "user"
tags: List[str] = field(default_factory=list)
collections: List[str] = field(default_factory=list)
taxonomy: Dict[str, Any] = field(default_factory=dict)
allergens: List[str] = field(default_factory=list)
substitution_group: str = ""
priority_level: int = 0
image_hint: str = ""
def to_dict(self) -> Dict[str, Any]: Products exist independently of the shopping list.
"""Convert to dictionary.""" They define WHAT can be shopped, not HOW MUCH is needed.
return asdict(self) """
key: str
name: str
category: str = "other"
unit: str = "pcs"
image: str = ""
def to_dict(self) -> dict:
"""Convert to dictionary for storage/transmission."""
return {
"key": self.key,
"name": self.name,
"category": self.category,
"unit": self.unit,
"image": self.image
}
@staticmethod
def from_dict(data: dict) -> 'Product':
"""Create Product from dictionary."""
return Product(
key=data["key"],
name=data["name"],
category=data.get("category", "other"),
unit=data.get("unit", "pcs"),
image=data.get("image", "")
)
def __post_init__(self):
"""Validate product data."""
if not self.key:
raise ValueError("Product key cannot be empty")
if not self.name:
raise ValueError("Product name cannot be empty")
@dataclass @dataclass
class Item: class ActiveItem:
"""Shopping list item model.""" """
id: str Shopping list state - quantity only.
list_id: str
name: str
category_id: str
product_id: Optional[str] = None
quantity: float = 1
unit: str = "units"
note: Optional[str] = None
checked: bool = False
checked_at: Optional[str] = None
created_at: str = field(default_factory=current_timestamp)
updated_at: str = field(default_factory=current_timestamp)
image_url: Optional[str] = None
order_index: int = 0
price: Optional[float] = None
estimated_total: Optional[float] = None
barcode: Optional[str] = None
def to_dict(self) -> Dict[str, Any]: Contains NO product metadata, only references products by key.
"""Convert to dictionary.""" qty > 0 means "on the list"
return asdict(self) qty == 0 means "not on the list" (should be removed)
"""
qty: int
def calculate_total(self) -> None: def to_dict(self) -> dict:
"""Calculate estimated total from quantity and price.""" """Convert to dictionary for storage/transmission."""
if self.price is not None: return {"qty": self.qty}
self.estimated_total = self.quantity * self.price
@staticmethod
def from_dict(data: dict) -> 'ActiveItem':
"""Create ActiveItem from dictionary."""
return ActiveItem(qty=data["qty"])
def __post_init__(self):
"""Validate quantity."""
if self.qty < 0:
raise ValueError("Quantity cannot be negative")
@dataclass class InvariantError(Exception):
class LoyaltyCard: """
"""Loyalty card model.""" Raised when the core data model invariant is violated.
id: str
name: str
number: str
barcode: str = ""
barcode_type: str = "barcode" # "barcode" or "qrcode"
logo: str = ""
notes: str = ""
color: str = "#9fa8da"
created_at: str = field(default_factory=current_timestamp)
updated_at: str = field(default_factory=current_timestamp)
# Ownership: None = visible to all users; set = private to owner + allowed_users
owner_id: Optional[str] = None
allowed_users: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]: Invariant: Every key in active_list MUST exist in products.
"""Convert to dictionary."""
return asdict(self) If this exception is raised, the system is in an inconsistent state
and must be repaired before continuing.
"""
pass
@dataclass def validate_invariant(products: Dict[str, Product],
class ShoppingList: active_list: Dict[str, ActiveItem]) -> None:
"""Shopping list model.""" """
id: str Validate the core data model invariant.
name: str
icon: str = "mdi:cart"
created_at: str = field(default_factory=current_timestamp)
updated_at: str = field(default_factory=current_timestamp)
item_order: List[str] = field(default_factory=list)
category_order: List[str] = field(default_factory=list)
active: bool = False
# Ownership: None = visible to all users; set = private to owner + allowed_users
owner_id: Optional[str] = None
allowed_users: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]: Args:
"""Convert to dictionary.""" products: Product catalog dictionary
return asdict(self) active_list: Active shopping list dictionary
Raises:
InvariantError: If any key in active_list doesn't exist in products
"""
for key in active_list:
if key not in products:
raise InvariantError(
f"Invariant violated: active_list contains unknown product key '{key}'. "
f"This product must be added to the catalog first."
)
@@ -1,752 +0,0 @@
"""Storage management for Shopping List Manager."""
import json
import logging
import os
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any
from .utils.search import ProductSearch
from homeassistant.core import HomeAssistant
from homeassistant.helpers.storage import Store
from .const import (
STORAGE_VERSION,
STORAGE_KEY_LISTS,
STORAGE_KEY_ITEMS,
STORAGE_KEY_PRODUCTS,
STORAGE_KEY_CATEGORIES,
STORAGE_KEY_LOYALTY_CARDS,
)
from .data.catalog_loader import load_product_catalog
from .models import ShoppingList, Item, Product, Category, LoyaltyCard, generate_id
from .data.category_loader import load_categories
_LOGGER = logging.getLogger(__name__)
class ShoppingListStorage:
"""Handle storage for shopping lists."""
def __init__(self, hass: HomeAssistant, component_path: str, country: str = "NZ") -> None:
"""Initialize storage.
Args:
hass: Home Assistant instance
component_path: Path to the component directory
country: Country code (NZ, AU, US, GB, CA, etc.)
"""
self.hass = hass
self._component_path = component_path
self._country = country # Store country
self._store_lists = Store(hass, STORAGE_VERSION, STORAGE_KEY_LISTS)
self._store_items = Store(hass, STORAGE_VERSION, STORAGE_KEY_ITEMS)
self._store_products = Store(hass, STORAGE_VERSION, STORAGE_KEY_PRODUCTS)
self._store_categories = Store(hass, STORAGE_VERSION, STORAGE_KEY_CATEGORIES)
self._store_loyalty_cards = Store(hass, STORAGE_VERSION, STORAGE_KEY_LOYALTY_CARDS)
self._lists: Dict[str, ShoppingList] = {}
self._items: Dict[str, List[Item]] = {}
self._products: Dict[str, Product] = {}
self._categories: List[Category] = []
self._loyalty_cards: Dict[str, LoyaltyCard] = {}
self._search_engine: Optional[ProductSearch] = None
async def async_load(self) -> None:
"""Load data from storage."""
# Load lists
lists_data = await self._store_lists.async_load()
if lists_data:
self._lists = {
list_id: ShoppingList(**list_data)
for list_id, list_data in lists_data.items()
}
_LOGGER.debug("Loaded %d lists", len(self._lists))
else:
# Create default list if none exist
default_list = ShoppingList(
id=generate_id(),
name="Shopping List",
icon="mdi:cart",
active=True
)
self._lists[default_list.id] = default_list
await self._save_lists()
_LOGGER.info("Created default shopping list")
# Load items
items_data = await self._store_items.async_load()
if items_data:
self._items = {
list_id: [Item(**item_data) for item_data in items]
for list_id, items in items_data.items()
}
_LOGGER.debug("Loaded items for %d lists", len(self._items))
# Load products
products_data = await self._store_products.async_load()
if products_data:
self._products = {
product_id: Product(**product_data)
for product_id, product_data in products_data.items()
}
_LOGGER.debug("Loaded %d products", len(self._products))
# Load categories
categories_data = await self._store_categories.async_load()
if categories_data:
self._categories = [Category(**cat_data) for cat_data in categories_data]
_LOGGER.debug("Loaded %d categories", len(self._categories))
else:
# Initialize with default categories from JSON file
default_categories = await load_categories(self._component_path, self._country) # Use self._country
self._categories = [Category(**cat) for cat in default_categories]
await self._save_categories()
_LOGGER.info(
"Initialized %d default categories for country: %s",
len(self._categories),
self._country # Use self._country
)
# Load product catalog if products are empty
if not self._products:
_LOGGER.info("Loading product catalog for country: %s", self._country)
catalog_products = await load_product_catalog(self._component_path, self._country) # Use self._country
if catalog_products:
_LOGGER.info("Importing %d products from catalog", len(catalog_products))
# ... rest of import code ...
for prod_data in catalog_products:
try:
# Create Product from catalog data
product = Product(
id=prod_data.get("id", generate_id()),
name=prod_data["name"],
category_id=prod_data.get("category_id", "other"),
aliases=prod_data.get("aliases", []),
default_unit=prod_data.get("default_unit", "units"),
default_quantity=prod_data.get("default_quantity", 1),
price=prod_data.get("price") or prod_data.get("typical_price"),
currency=self.hass.config.currency,
barcode=prod_data.get("barcode"),
brands=prod_data.get("brands", []),
image_url=prod_data.get("image_url", ""),
custom=False,
source="catalog",
tags=prod_data.get("tags", []),
collections=prod_data.get("collections", []),
taxonomy=prod_data.get("taxonomy", {}),
allergens=prod_data.get("allergens", []),
substitution_group=prod_data.get("substitution_group", ""),
priority_level=prod_data.get("priority_level", 0),
image_hint=prod_data.get("image_hint", "")
)
self._products[product.id] = product
except Exception as err:
_LOGGER.error("Failed to import product %s: %s", prod_data.get("name"), err)
continue
await self._save_products()
_LOGGER.info("Successfully imported %d products from catalog", len(self._products))
# Load loyalty cards
loyalty_data = await self._store_loyalty_cards.async_load()
if loyalty_data:
self._loyalty_cards = {
card_id: LoyaltyCard(**card_data)
for card_id, card_data in loyalty_data.items()
}
_LOGGER.debug("Loaded %d loyalty cards", len(self._loyalty_cards))
# Initialize search engine after products are loaded
if self._products:
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
self._search_engine = ProductSearch(products_dict)
_LOGGER.debug("Initialized product search engine with %d products", len(self._products))
else:
self._search_engine = None
_LOGGER.warning("No products loaded, search engine not initialized")
# Lists methods
async def _save_lists(self) -> None:
"""Save lists to storage."""
data = {list_id: lst.to_dict() for list_id, lst in self._lists.items()}
await self._store_lists.async_save(data)
def get_lists(self, user_id: str = None, is_admin: bool = False) -> List[ShoppingList]:
"""Get lists visible to the specified user.
Global lists (owner_id=None) are visible to everyone.
Private lists are visible to their owner, anyone in allowed_users, and admins.
"""
all_lists = list(self._lists.values())
if is_admin or user_id is None:
return all_lists
return [
lst for lst in all_lists
if lst.owner_id is None
or lst.owner_id == user_id
or user_id in (lst.allowed_users or [])
]
def get_list(self, list_id: str) -> Optional[ShoppingList]:
"""Get a specific list."""
return self._lists.get(list_id)
def get_active_list(self) -> Optional[ShoppingList]:
"""Get the active list."""
for lst in self._lists.values():
if lst.active:
return lst
return None
async def create_list(self, name: str, icon: str = "mdi:cart", owner_id: str = None) -> ShoppingList:
"""Create a new list. Pass owner_id to make the list private to that user."""
new_list = ShoppingList(
id=generate_id(),
name=name,
icon=icon,
category_order=[cat.id for cat in self._categories],
owner_id=owner_id,
)
self._lists[new_list.id] = new_list
self._items[new_list.id] = []
await self._save_lists()
await self._write_config_backup()
_LOGGER.info("Created new list: %s", name)
return new_list
async def update_list(self, list_id: str, **kwargs) -> Optional[ShoppingList]:
"""Update a list."""
if list_id not in self._lists:
return None
lst = self._lists[list_id]
for key, value in kwargs.items():
if hasattr(lst, key):
setattr(lst, key, value)
from .models import current_timestamp
lst.updated_at = current_timestamp()
await self._save_lists()
_LOGGER.debug("Updated list: %s", list_id)
return lst
async def update_list_members(self, list_id: str, allowed_users: List[str]) -> Optional[ShoppingList]:
"""Update the allowed_users for a private list."""
if list_id not in self._lists:
return None
lst = self._lists[list_id]
lst.allowed_users = allowed_users
from .models import current_timestamp
lst.updated_at = current_timestamp()
await self._save_lists()
_LOGGER.debug("Updated members for list: %s", list_id)
return lst
async def delete_list(self, list_id: str) -> bool:
"""Delete a list."""
if list_id not in self._lists:
return False
del self._lists[list_id]
if list_id in self._items:
del self._items[list_id]
await self._save_lists()
await self._save_items()
_LOGGER.info("Deleted list: %s", list_id)
return True
async def set_active_list(self, list_id: str) -> bool:
"""Set the active list."""
if list_id not in self._lists:
return False
# Deactivate all lists
for lst in self._lists.values():
lst.active = False
# Activate the specified list
self._lists[list_id].active = True
await self._save_lists()
_LOGGER.debug("Set active list: %s", list_id)
return True
# Items methods
async def _save_items(self) -> None:
"""Save items to storage."""
data = {
list_id: [item.to_dict() for item in items]
for list_id, items in self._items.items()
}
await self._store_items.async_save(data)
def get_items(self, list_id: str) -> List[Item]:
"""Get items for a list."""
return self._items.get(list_id, [])
async def add_item(self, list_id: str, **kwargs) -> Optional[Item]:
"""Add an item to a list."""
if list_id not in self._lists:
return None
new_item = Item(
id=generate_id(),
list_id=list_id,
**kwargs
)
new_item.calculate_total()
if list_id not in self._items:
self._items[list_id] = []
self._items[list_id].append(new_item)
# Update product frequency if product_id provided
if new_item.product_id and new_item.product_id in self._products:
product = self._products[new_item.product_id]
product.user_frequency += 1
from .models import current_timestamp
product.last_used = current_timestamp()
await self._save_products()
await self._save_items()
_LOGGER.debug("Added item to list %s: %s", list_id, new_item.name)
return new_item
async def update_item(self, item_id: str, **kwargs) -> Optional[Item]:
"""Update an item."""
for list_id, items in self._items.items():
for item in items:
if item.id == item_id:
for key, value in kwargs.items():
if hasattr(item, key):
setattr(item, key, value)
from .models import current_timestamp
item.updated_at = current_timestamp()
item.calculate_total()
await self._save_items()
_LOGGER.debug("Updated item: %s", item_id)
return item
return None
async def check_item(self, item_id: str, checked: bool) -> Optional[Item]:
"""Check or uncheck an item."""
for items in self._items.values():
for item in items:
if item.id == item_id:
item.checked = checked
from .models import current_timestamp
item.checked_at = current_timestamp() if checked else None
item.updated_at = current_timestamp()
await self._save_items()
_LOGGER.debug("Checked item: %s = %s", item_id, checked)
return item
return None
async def delete_item(self, item_id: str) -> bool:
"""Delete an item."""
for list_id, items in self._items.items():
for i, item in enumerate(items):
if item.id == item_id:
del self._items[list_id][i]
await self._save_items()
_LOGGER.debug("Deleted item: %s", item_id)
return True
return False
async def bulk_check_items(self, item_ids: List[str], checked: bool) -> int:
"""Bulk check/uncheck items."""
count = 0
from .models import current_timestamp
timestamp = current_timestamp()
for items in self._items.values():
for item in items:
if item.id in item_ids:
item.checked = checked
item.checked_at = timestamp if checked else None
item.updated_at = timestamp
count += 1
if count > 0:
await self._save_items()
_LOGGER.debug("Bulk checked %d items", count)
return count
async def clear_checked_items(self, list_id: str) -> int:
"""Clear all checked items from a list."""
if list_id not in self._items:
return 0
original_count = len(self._items[list_id])
self._items[list_id] = [item for item in self._items[list_id] if not item.checked]
removed_count = original_count - len(self._items[list_id])
if removed_count > 0:
await self._save_items()
_LOGGER.info("Cleared %d checked items from list %s", removed_count, list_id)
return removed_count
def get_list_total(self, list_id: str) -> Dict[str, Any]:
"""Get total price for a list."""
items = self.get_items(list_id)
total = 0.0
item_count = 0
for item in items:
if not item.checked and item.price is not None:
total += item.quantity * item.price
item_count += 1
return {
"total": round(total, 2),
"currency": self.hass.config.currency,
"item_count": item_count
}
# Products methods
async def _save_products(self) -> None:
"""Save products to storage."""
data = {product_id: product.to_dict() for product_id, product in self._products.items()}
await self._store_products.async_save(data)
def get_products(self) -> List[Product]:
"""Get all products."""
return list(self._products.values())
def get_product(self, product_id: str) -> Optional[Product]:
"""Get a specific product."""
return self._products.get(product_id)
def search_products(
self,
query: str,
limit: int = 10,
exclude_allergens: Optional[List[str]] = None,
include_tags: Optional[List[str]] = None,
substitution_group: Optional[str] = None,
) -> List[Product]:
"""Search products with enhanced fuzzy matching and filters.
Args:
query: Search query
limit: Maximum results
exclude_allergens: Allergens to exclude
include_tags: Tags to include
substitution_group: Filter by substitution group
Returns:
List of matching products
"""
if not self._search_engine:
_LOGGER.warning("Search engine not initialized")
return []
# Convert products dict to format search engine expects
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
search_engine = ProductSearch(products_dict)
results = search_engine.search(
query=query,
limit=limit,
exclude_allergens=exclude_allergens,
include_tags=include_tags,
substitution_group=substitution_group,
)
# Convert back to Product objects
return [self._products[r["id"]] for r in results if r["id"] in self._products]
def find_product_substitutes(self, product_id: str, limit: int = 5) -> List[Product]:
"""Find substitute products.
Args:
product_id: Product to find substitutes for
limit: Maximum substitutes
Returns:
List of substitute products
"""
if not self._search_engine:
return []
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
search_engine = ProductSearch(products_dict)
results = search_engine.find_substitutes(product_id, limit)
return [self._products[r["id"]] for r in results if r["id"] in self._products]
def get_product_suggestions(self, limit: int = 20) -> List[Product]:
"""Get product suggestions based on usage frequency."""
products = list(self._products.values())
products.sort(key=lambda p: p.user_frequency, reverse=True)
return products[:limit]
async def add_product(self, **kwargs) -> Product:
"""Add a new product."""
new_product = Product(
id=generate_id(),
currency=self.hass.config.currency,
**kwargs
)
self._products[new_product.id] = new_product
await self._save_products()
await self._write_config_backup()
# Rebuild search engine so the new product is immediately searchable
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
self._search_engine = ProductSearch(products_dict)
_LOGGER.debug("Added product: %s", new_product.name)
return new_product
async def reload_catalog(self, country_code: str) -> int:
"""Replace catalog-sourced products with those from a new country's catalog.
Products with source='user' are preserved."""
catalog_ids = [
pid for pid, p in self._products.items()
if getattr(p, 'source', 'user') == 'catalog'
]
for pid in catalog_ids:
del self._products[pid]
self._country = country_code
catalog_products = await load_product_catalog(self._component_path, country_code)
count = 0
for prod_data in catalog_products:
try:
product = Product(
id=prod_data.get("id", generate_id()),
name=prod_data["name"],
category_id=prod_data.get("category_id", "other"),
aliases=prod_data.get("aliases", []),
default_unit=prod_data.get("default_unit", "units"),
default_quantity=prod_data.get("default_quantity", 1),
price=prod_data.get("price") or prod_data.get("typical_price"),
currency=self.hass.config.currency,
barcode=prod_data.get("barcode"),
brands=prod_data.get("brands", []),
image_url=prod_data.get("image_url", ""),
custom=False,
source="catalog",
tags=prod_data.get("tags", []),
collections=prod_data.get("collections", []),
taxonomy=prod_data.get("taxonomy", {}),
allergens=prod_data.get("allergens", []),
substitution_group=prod_data.get("substitution_group", ""),
priority_level=prod_data.get("priority_level", 0),
image_hint=prod_data.get("image_hint", "")
)
self._products[product.id] = product
count += 1
except Exception as err:
_LOGGER.error("Failed to import product %s: %s", prod_data.get("name"), err)
await self._save_products()
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
self._search_engine = ProductSearch(products_dict)
_LOGGER.info("Reloaded catalog for %s: %d products imported", country_code, count)
return count
async def update_product(self, product_id: str, **kwargs) -> Optional[Product]:
"""Update a product."""
if product_id not in self._products:
return None
product = self._products[product_id]
for key, value in kwargs.items():
if hasattr(product, key):
setattr(product, key, value)
await self._save_products()
await self._write_config_backup()
_LOGGER.debug("Updated product: %s", product_id)
return product
# ---------------------------------------------------------------------------
# Backup / Restore
# ---------------------------------------------------------------------------
async def export_user_data(self) -> dict:
"""Return a serialisable snapshot of all user-created data."""
user_products = [
p.to_dict() for p in self._products.values()
if getattr(p, "source", "user") == "user"
]
lists = [lst.to_dict() for lst in self._lists.values()]
items = {
list_id: [item.to_dict() for item in items_list]
for list_id, items_list in self._items.items()
}
return {
"slm_backup_version": "1.0",
"exported_at": datetime.now(timezone.utc).isoformat(),
"country": self._country,
"user_products": user_products,
"lists": lists,
"items": items,
}
async def import_user_data(self, data: dict) -> dict:
"""Merge a backup into live storage. Skips anything already present by ID."""
imported_products = 0
imported_lists = 0
imported_items = 0
for prod_data in data.get("user_products", []):
prod_id = prod_data.get("id")
if prod_id and prod_id not in self._products:
try:
self._products[prod_id] = Product(**prod_data)
imported_products += 1
except Exception as err:
_LOGGER.warning("Skipped product during import: %s", err)
if imported_products:
await self._save_products()
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
self._search_engine = ProductSearch(products_dict)
for list_data in data.get("lists", []):
list_id = list_data.get("id")
if list_id and list_id not in self._lists:
try:
lst = ShoppingList(**list_data)
lst.active = False
self._lists[list_id] = lst
imported_lists += 1
except Exception as err:
_LOGGER.warning("Skipped list during import: %s", err)
backup_items = data.get("items", {})
for list_id, items_list in backup_items.items():
if list_id in self._lists and list_id not in self._items:
try:
self._items[list_id] = [Item(**d) for d in items_list]
imported_items += len(self._items[list_id])
except Exception as err:
_LOGGER.warning("Skipped items for list %s: %s", list_id, err)
if imported_lists or imported_items:
await self._save_lists()
await self._save_items()
_LOGGER.info(
"Import complete: %d products, %d lists, %d items",
imported_products, imported_lists, imported_items,
)
return {"products": imported_products, "lists": imported_lists, "items": imported_items}
async def _write_config_backup(self) -> None:
"""Silently write a backup JSON to the HA config directory."""
try:
backup_path = os.path.join(
self.hass.config.config_dir,
"shopping_list_manager_backup.json",
)
data = await self.export_user_data()
def _write() -> None:
with open(backup_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
await self.hass.async_add_executor_job(_write)
_LOGGER.debug("Auto-backup written to %s", backup_path)
except Exception as err:
_LOGGER.warning("Failed to write config backup: %s", err)
# Categories methods
async def _save_categories(self) -> None:
"""Save categories to storage."""
data = [cat.to_dict() for cat in self._categories]
await self._store_categories.async_save(data)
def get_categories(self) -> List[Category]:
"""Get all categories."""
return self._categories
# Loyalty card methods
async def _save_loyalty_cards(self) -> None:
"""Save loyalty cards to storage."""
data = {card_id: card.to_dict() for card_id, card in self._loyalty_cards.items()}
await self._store_loyalty_cards.async_save(data)
def get_loyalty_cards(self, user_id: str = None, is_admin: bool = False) -> List[LoyaltyCard]:
"""Get loyalty cards visible to the specified user.
Global cards (owner_id=None) are visible to everyone.
Private cards are visible to their owner, anyone in allowed_users, and admins.
"""
all_cards = list(self._loyalty_cards.values())
if is_admin or user_id is None:
return all_cards
return [
card for card in all_cards
if card.owner_id is None
or card.owner_id == user_id
or user_id in (card.allowed_users or [])
]
def get_loyalty_card(self, card_id: str) -> Optional[LoyaltyCard]:
"""Get a specific loyalty card."""
return self._loyalty_cards.get(card_id)
async def create_loyalty_card(self, owner_id: str = None, **kwargs) -> LoyaltyCard:
"""Create a new loyalty card."""
from .models import current_timestamp
new_card = LoyaltyCard(
id=generate_id(),
owner_id=owner_id,
**kwargs
)
self._loyalty_cards[new_card.id] = new_card
await self._save_loyalty_cards()
_LOGGER.debug("Created loyalty card: %s", new_card.name)
return new_card
async def update_loyalty_card(self, card_id: str, **kwargs) -> Optional[LoyaltyCard]:
"""Update a loyalty card."""
if card_id not in self._loyalty_cards:
return None
card = self._loyalty_cards[card_id]
for key, value in kwargs.items():
if hasattr(card, key):
setattr(card, key, value)
from .models import current_timestamp
card.updated_at = current_timestamp()
await self._save_loyalty_cards()
_LOGGER.debug("Updated loyalty card: %s", card_id)
return card
async def delete_loyalty_card(self, card_id: str) -> bool:
"""Delete a loyalty card."""
if card_id not in self._loyalty_cards:
return False
del self._loyalty_cards[card_id]
await self._save_loyalty_cards()
_LOGGER.debug("Deleted loyalty card: %s", card_id)
return True
async def update_loyalty_card_members(self, card_id: str, allowed_users: List[str]) -> Optional[LoyaltyCard]:
"""Update the allowed_users for a private loyalty card."""
if card_id not in self._loyalty_cards:
return None
card = self._loyalty_cards[card_id]
card.allowed_users = allowed_users
from .models import current_timestamp
card.updated_at = current_timestamp()
await self._save_loyalty_cards()
_LOGGER.debug("Updated members for loyalty card: %s", card_id)
return card
@@ -1 +0,0 @@
"""Utilities for Shopping List Manager."""
@@ -1,109 +0,0 @@
"""Image handling utilities for Shopping List Manager."""
import logging
import os
from pathlib import Path
from typing import Optional
_LOGGER = logging.getLogger(__name__)
class ImageHandler:
"""Handle product images with URL and local file support."""
def __init__(self, hass, config_path: str):
"""Initialize image handler.
Args:
hass: Home Assistant instance
config_path: Path to HA config directory
"""
self.hass = hass
# Images stored in /config/www/shopping_list_manager/images/
self._local_images_dir = Path(config_path) / "www" / "shopping_list_manager" / "images"
self._local_images_dir.mkdir(parents=True, exist_ok=True)
_LOGGER.info("Image directory: %s", self._local_images_dir)
def get_image_url(self, product_name: str, external_url: Optional[str] = None) -> str:
"""Get image URL for a product.
Priority:
1. External URL (if provided)
2. Local file match
3. Placeholder
Args:
product_name: Name of product to find image for
external_url: Optional external image URL
Returns:
Image URL (external, local, or placeholder)
"""
# Priority 1: Use external URL if provided
if external_url:
return external_url
# Priority 2: Look for local file
local_url = self._find_local_image(product_name)
if local_url:
return local_url
# Priority 3: Placeholder
return self._get_placeholder_url()
def _find_local_image(self, product_name: str) -> Optional[str]:
"""Find local image file for product.
Searches for files matching product name (case-insensitive).
Supports: .webp, .jpg, .jpeg, .png
Args:
product_name: Product name to search for
Returns:
Local URL if found, None otherwise
"""
# Normalize product name for filename matching
normalized_name = product_name.lower().replace(" ", "_")
# Supported extensions
extensions = [".webp", ".jpg", ".jpeg", ".png"]
for ext in extensions:
# Check exact match
image_file = self._local_images_dir / f"{normalized_name}{ext}"
if image_file.exists():
return f"/local/shopping_list_manager/images/{normalized_name}{ext}"
# Check for files starting with the product name
for file in self._local_images_dir.glob(f"{normalized_name}*{ext}"):
return f"/local/shopping_list_manager/images/{file.name}"
return None
def _get_placeholder_url(self) -> str:
"""Get placeholder image URL.
Returns:
URL to placeholder image
"""
# Use a simple colored placeholder
# You can replace this with a real placeholder image later
return "data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='200' height='200'%3E%3Crect width='200' height='200' fill='%23f0f0f0'/%3E%3Ctext x='50%25' y='50%25' dominant-baseline='middle' text-anchor='middle' font-family='Arial' font-size='16' fill='%23999'%3ENo Image%3C/text%3E%3C/svg%3E"
def list_available_images(self) -> list:
"""List all available local images.
Returns:
List of (filename, product_name_guess) tuples
"""
images = []
extensions = [".webp", ".jpg", ".jpeg", ".png"]
for ext in extensions:
for image_file in self._local_images_dir.glob(f"*{ext}"):
# Guess product name from filename
product_name = image_file.stem.replace("_", " ").title()
images.append((image_file.name, product_name))
return sorted(images)
@@ -1,192 +0,0 @@
"""Enhanced product search utilities."""
import logging
from typing import List, Dict, Any, Optional
from rapidfuzz import fuzz, process
_LOGGER = logging.getLogger(__name__)
class ProductSearch:
"""Advanced product search with fuzzy matching and filtering."""
def __init__(self, products: Dict[str, Any]):
"""Initialize search with product catalog.
Args:
products: Dictionary of product_id -> Product objects
"""
self.products = products
def search(
self,
query: str,
limit: int = 10,
exclude_allergens: Optional[List[str]] = None,
include_tags: Optional[List[str]] = None,
substitution_group: Optional[str] = None,
taxonomy_filters: Optional[Dict[str, Any]] = None,
min_score: int = 60,
) -> List[Dict[str, Any]]:
"""Advanced product search with multiple filters.
Args:
query: Search query string
limit: Maximum results to return
exclude_allergens: List of allergens to exclude (e.g., ["milk", "gluten"])
include_tags: Only include products with these tags
substitution_group: Filter by substitution group
taxonomy_filters: Filter by taxonomy (e.g., {"dietary": ["vegan"]})
min_score: Minimum fuzzy match score (0-100)
Returns:
List of matching products with scores
"""
query_lower = query.lower().strip()
if not query_lower:
return []
candidates = []
for product_id, product in self.products.items():
# Apply allergen filter
if exclude_allergens:
if any(
allergen in product.get("allergens", [])
for allergen in exclude_allergens
):
continue
# Apply tag filter
if include_tags:
if not any(
tag in product.get("tags", [])
for tag in include_tags
):
continue
# Apply substitution group filter
if substitution_group:
if product.get("substitution_group") != substitution_group:
continue
# Apply taxonomy filters
if taxonomy_filters:
product_taxonomy = product.get("taxonomy", {})
matches_taxonomy = True
for key, values in taxonomy_filters.items():
if key not in product_taxonomy:
matches_taxonomy = False
break
product_values = product_taxonomy[key]
if isinstance(product_values, list):
if not any(v in product_values for v in values):
matches_taxonomy = False
break
else:
if product_values not in values:
matches_taxonomy = False
break
if not matches_taxonomy:
continue
# Calculate match score
score = self._calculate_score(query_lower, product)
if score >= min_score:
candidates.append({
"product": product,
"score": score,
})
# Sort by score (descending), then by user frequency, then by priority
candidates.sort(
key=lambda x: (
x["score"],
x["product"].get("user_frequency", 0),
x["product"].get("priority_level", 0),
),
reverse=True
)
# Return top results
return [c["product"] for c in candidates[:limit]]
def _calculate_score(self, query: str, product: Dict[str, Any]) -> int:
"""Calculate fuzzy match score for a product.
Args:
query: Search query
product: Product dictionary
Returns:
Score from 0-100
"""
product_name = product.get("name", "").lower()
aliases = [a.lower() for a in product.get("aliases", [])]
# Exact match gets highest score
if query == product_name:
return 100
# Check aliases for exact match
if query in aliases:
return 95
# Check if query is substring of product name
if query in product_name:
return 90
# Check if query is substring of any alias
for alias in aliases:
if query in alias:
return 85
# Fuzzy match on product name
name_score = fuzz.WRatio(query, product_name)
# Fuzzy match on aliases
alias_scores = [fuzz.WRatio(query, alias) for alias in aliases]
best_alias_score = max(alias_scores) if alias_scores else 0
# Return best score
return max(name_score, best_alias_score)
def find_substitutes(self, product_id: str, limit: int = 5) -> List[Dict[str, Any]]:
"""Find substitute products for a given product.
Args:
product_id: ID of product to find substitutes for
limit: Maximum substitutes to return
Returns:
List of substitute products
"""
if product_id not in self.products:
return []
product = self.products[product_id]
substitution_group = product.get("substitution_group")
if not substitution_group:
return []
# Find all products in the same substitution group
substitutes = []
for pid, p in self.products.items():
if pid != product_id and p.get("substitution_group") == substitution_group:
substitutes.append(p)
# Sort by priority and frequency
substitutes.sort(
key=lambda x: (
x.get("priority_level", 0),
x.get("user_frequency", 0),
),
reverse=True
)
return substitutes[:limit]
@@ -1 +0,0 @@
"""WebSocket API handlers for Shopping List Manager."""
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,256 @@
"""WebSocket API for Shopping List Manager."""
import logging
import voluptuous as vol
from homeassistant.components import websocket_api
from homeassistant.core import HomeAssistant, callback
from .const import DOMAIN
from .models import InvariantError
_LOGGER = logging.getLogger(__name__)
@websocket_api.websocket_command({
vol.Required("type"): "shopping_list_manager/add_product",
vol.Optional("list_id", default="groceries"): str,
vol.Required("key"): str,
vol.Required("name"): str,
vol.Optional("category", default="other"): str,
vol.Optional("unit", default="pcs"): str,
vol.Optional("image", default=""): str,
})
@websocket_api.async_response
async def websocket_add_product(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict,
) -> None:
"""
Add or update a product in the catalog.
Does NOT modify quantity - use set_qty for that.
Request:
{
"type": "shopping_list_manager/add_product",
"list_id": "groceries", # optional, defaults to "groceries"
"key": "milk",
"name": "Milk",
"category": "dairy",
"unit": "pcs",
"image": ""
}
Response:
{
"success": true,
"result": {
"key": "milk",
"name": "Milk",
"category": "dairy",
"unit": "pcs",
"image": ""
}
}
"""
manager = hass.data[DOMAIN]["manager"]
list_id = msg.get("list_id", "groceries")
try:
product = await manager.async_add_product(
list_id=list_id,
key=msg["key"],
name=msg["name"],
category=msg.get("category", "other"),
unit=msg.get("unit", "pcs"),
image=msg.get("image", "")
)
connection.send_result(msg["id"], product.to_dict())
except Exception as err:
_LOGGER.error("Error adding product to list '%s': %s", list_id, err)
connection.send_error(msg["id"], "add_product_failed", str(err))
@websocket_api.websocket_command({
vol.Required("type"): "shopping_list_manager/set_qty",
vol.Optional("list_id", default="groceries"): str,
vol.Required("key"): str,
vol.Required("qty"): vol.All(int, vol.Range(min=0)),
})
@websocket_api.async_response
async def websocket_set_qty(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict,
) -> None:
"""
Set quantity for a product on the shopping list.
Product MUST exist in catalog first.
qty = 0 removes from list.
qty > 0 adds/updates on list.
Request:
{
"type": "shopping_list_manager/set_qty",
"list_id": "groceries", # optional, defaults to "groceries"
"key": "milk",
"qty": 2
}
Response:
{
"success": true
}
Error (if product doesn't exist):
{
"success": false,
"error": {
"code": "invariant_violation",
"message": "Cannot set quantity for unknown product 'milk'..."
}
}
"""
manager = hass.data[DOMAIN]["manager"]
list_id = msg.get("list_id", "groceries")
try:
await manager.async_set_qty(
list_id=list_id,
key=msg["key"],
qty=msg["qty"]
)
connection.send_result(msg["id"], {"success": True})
except InvariantError as err:
# This is expected if frontend tries to set qty for non-existent product
_LOGGER.warning("Invariant violation in set_qty (list '%s'): %s", list_id, err)
connection.send_error(msg["id"], "invariant_violation", str(err))
except Exception as err:
_LOGGER.error("Error setting quantity in list '%s': %s", list_id, err)
connection.send_error(msg["id"], "set_qty_failed", str(err))
@websocket_api.websocket_command({
vol.Required("type"): "shopping_list_manager/get_products",
vol.Optional("list_id", default="groceries"): str,
})
@websocket_api.async_response
async def websocket_get_products(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict,
) -> None:
"""
Get all products in the catalog.
Request:
{
"type": "shopping_list_manager/get_products",
"list_id": "groceries" # optional, defaults to "groceries"
}
Response:
{
"milk": {
"key": "milk",
"name": "Milk",
"category": "dairy",
"unit": "pcs",
"image": ""
},
...
}
"""
manager = hass.data[DOMAIN]["manager"]
list_id = msg.get("list_id", "groceries")
try:
products = await manager.async_get_products(list_id=list_id)
connection.send_result(msg["id"], products)
except Exception as err:
_LOGGER.error("Error getting products for list '%s': %s", list_id, err)
connection.send_error(msg["id"], "get_products_failed", str(err))
@websocket_api.websocket_command({
vol.Required("type"): "shopping_list_manager/get_active",
vol.Optional("list_id", default="groceries"): str,
})
@websocket_api.async_response
async def websocket_get_active(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict,
) -> None:
"""
Get active shopping list (quantities only).
Request:
{
"type": "shopping_list_manager/get_active",
"list_id": "groceries" # optional, defaults to "groceries"
}
Response:
{
"milk": {"qty": 2},
"bread": {"qty": 1},
...
}
"""
manager = hass.data[DOMAIN]["manager"]
list_id = msg.get("list_id", "groceries")
try:
active = await manager.async_get_active(list_id=list_id)
connection.send_result(msg["id"], active)
except Exception as err:
_LOGGER.error("Error getting active list for '%s': %s", list_id, err)
connection.send_error(msg["id"], "get_active_failed", str(err))
@websocket_api.websocket_command({
vol.Required("type"): "shopping_list_manager/delete_product",
vol.Optional("list_id", default="groceries"): str,
vol.Required("key"): str,
})
@websocket_api.async_response
async def websocket_delete_product(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict,
) -> None:
"""
Delete a product from catalog (and remove from active list).
Request:
{
"type": "shopping_list_manager/delete_product",
"list_id": "groceries", # optional, defaults to "groceries"
"key": "milk"
}
Response:
{
"success": true
}
"""
manager = hass.data[DOMAIN]["manager"]
list_id = msg.get("list_id", "groceries")
try:
await manager.async_delete_product(list_id=list_id, key=msg["key"])
connection.send_result(msg["id"], {"success": True})
except Exception as err:
_LOGGER.error("Error deleting product from list '%s': %s", list_id, err)
connection.send_error(msg["id"], "delete_product_failed", str(err))