mirror of
https://github.com/thekiwismarthome/shopping-list-manager.git
synced 2026-06-30 21:46:30 +00:00
Compare commits
110 Commits
v1.3.0
...
v2.3.0-beta01
| Author | SHA1 | Date | |
|---|---|---|---|
| 433c03035b | |||
| 8eb403ed8e | |||
| 03fb9a9a67 | |||
| ae133ae59b | |||
| 2a8b12a07e | |||
| a36933c4c6 | |||
| 402881c687 | |||
| d412764cba | |||
| 752f9e5622 | |||
| 86896ba4af | |||
| 36a8939ebc | |||
| 57b6d52ddf | |||
| 03249df651 | |||
| 11180db0e3 | |||
| 9bdaea0b1b | |||
| ec0f44f109 | |||
| 88b3f2d435 | |||
| d93ea86d72 | |||
| 5b3dcb65b4 | |||
| 94cdede3b9 | |||
| e6117073be | |||
| e76fad5a92 | |||
| bcfde6df9a | |||
| 85e0e68af9 | |||
| 11eb698c20 | |||
| 0e3fcd56f5 | |||
| 9d8fd3f63e | |||
| c9c1d16f08 | |||
| 3b0cff3476 | |||
| 9c98972b40 | |||
| 575e59225f | |||
| 960319231f | |||
| 4dbae38bd9 | |||
| 06eae30773 | |||
| 0ad377114b | |||
| bc584c647a | |||
| 70dc3f1693 | |||
| b9ae304ba0 | |||
| 18040e5017 | |||
| 76d0eed1d6 | |||
| fd841270d8 | |||
| 30859a7d26 | |||
| 91615964c5 | |||
| 61a7678e24 | |||
| c7310d4213 | |||
| f8324ccf8f | |||
| 9a74fd9027 | |||
| 1b05185dfe | |||
| 94f771d0e3 | |||
| e8ba135d9d | |||
| 02cddeafc1 | |||
| 5378f79ac4 | |||
| d22b234f68 | |||
| b4ea6bc7f0 | |||
| e09f9004a6 | |||
| b349a3142a | |||
| 5fab64bd4d | |||
| d1573846cd | |||
| b2da5e62bd | |||
| f0c2bb6f29 | |||
| ab5bd34c07 | |||
| f80efe7582 | |||
| e21d136369 | |||
| 8689e8aa3f | |||
| 5f5066e6d8 | |||
| 0345c091be | |||
| 073218b851 | |||
| 0258bc8690 | |||
| 5a7b8203f6 | |||
| c7f91ac37c | |||
| 7dda89c5cb | |||
| 95b8a99304 | |||
| acce175d0a | |||
| 4a9a12ea7a | |||
| 980a99c7ff | |||
| 3829039a8a | |||
| 5af3e83fde | |||
| 4ce63938e9 | |||
| c67cc4b445 | |||
| bdff875fcb | |||
| 5c33f01e51 | |||
| 4f56c655ca | |||
| d0806aaf69 | |||
| 8bc67b5331 | |||
| ab59be6f1d | |||
| 296a8ad5fd | |||
| 16b594f11a | |||
| 3303776cd0 | |||
| 832d29b0f9 | |||
| b91d027d08 | |||
| a8ee630b7c | |||
| 4135231a7e | |||
| 903d2577c0 | |||
| d3b19e61c5 | |||
| 0ec00d38ec | |||
| b6bfa78eb5 | |||
| 9022ed0dc8 | |||
| 8b2197794a | |||
| 0eb06f240d | |||
| b6941a1499 | |||
| 78829aefb0 | |||
| c114645f88 | |||
| 5e043d45f2 | |||
| 17cb680f1f | |||
| 94450a4450 | |||
| ae11549469 | |||
| 5429b97605 | |||
| 0664f5331a | |||
| 30d8d8defd | |||
| 07329323bf |
@@ -1,135 +1,98 @@
|
||||
# Shopping List Manager
|
||||
# Shopping List Manager Integration for Home Assistant
|
||||
|
||||
A custom Home Assistant integration that provides an enhanced shopping list experience, including a companion Lovelace card for managing items directly from your dashboard.
|
||||
The backend integration that powers the Shopping List Manager. Provides persistent multi-list storage, a 500+ product catalog, real-time WebSocket events, and a full API for the Lovelace card — all running natively inside Home Assistant.
|
||||
|
||||
> **Pair with the [Shopping List Manager Card](https://github.com/thekiwismarthome/shopping-list-manager-card)** for the full UI experience.
|
||||
|
||||
[](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager&category=integration)
|
||||
|
||||
---
|
||||
|
||||
## Features
|
||||
|
||||
- 📋 Manage shopping list items from Home Assistant
|
||||
- 🔌 WebSocket-based backend (no polling entities)
|
||||
- 🖥️ Custom Lovelace card
|
||||
- ⚙️ UI-based configuration (Config Flow)
|
||||
- 🚀 Compatible with Home Assistant **2024.8+**
|
||||
### 🛒 Multi-List Management
|
||||
- Create and manage multiple shopping lists
|
||||
- Private or shared lists with per-member access control
|
||||
- Active list state shared across all connected devices and users
|
||||
- List total price calculation
|
||||
|
||||
### 📦 Items
|
||||
- Add, update, check, and delete items with quantity and unit
|
||||
- Atomic quantity increment / decrement
|
||||
- Bulk check and clear checked items
|
||||
- Per-item pricing, notes, and category assignment
|
||||
|
||||
### 🔍 Product Catalog
|
||||
- **500+ products** (NZ-focused, extensible to AU, US, GB, CA)
|
||||
- Fuzzy search with alias matching
|
||||
- Recently-used product suggestions
|
||||
- Custom product creation
|
||||
- Allergen filtering and product substitute groups
|
||||
- Product images (WebP, 200×200px, optimised)
|
||||
|
||||
### 🗂️ Categories
|
||||
- 13 default categories — Produce, Dairy, Meat, Bakery, Pantry, Frozen, Beverages, Snacks, Household, Health, Pet, Baby, Other
|
||||
- Category colour coding and emoji icons
|
||||
- Per-list category ordering
|
||||
|
||||
### 💳 Loyalty Cards
|
||||
- Store loyalty and rewards card data
|
||||
- Private or shared card access per user
|
||||
|
||||
### 🔄 Real-Time Events
|
||||
- All changes fire events on the Home Assistant bus
|
||||
- Custom WebSocket subscription proxy so **non-admin users** receive live updates without requiring HA admin privileges
|
||||
|
||||
---
|
||||
|
||||
## 1. Installation (HACS)
|
||||
## Requirements
|
||||
|
||||
[](
|
||||
https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager&category=integration
|
||||
)
|
||||
|
||||
Click the button above **or** follow the manual steps below.
|
||||
|
||||
1. Open **HACS**
|
||||
2. Go to **Integrations**
|
||||
3. Click **⋮ → Custom repositories**
|
||||
4. Add this repository:
|
||||
- **Repository:** `https://github.com/thekiwismarthome/shopping-list-manager`
|
||||
- **Category:** Integration
|
||||
5. Install **Shopping List Manager**
|
||||
6. **Restart Home Assistant**
|
||||
| Component | Minimum Version |
|
||||
|---|---|
|
||||
| Home Assistant | 2024.1 |
|
||||
| HACS | 2.x |
|
||||
|
||||
---
|
||||
|
||||
## 2. Install the Lovelace Card Resource (Required)
|
||||
## Installation
|
||||
|
||||
The Lovelace card JavaScript file is included with the integration, but **must be copied manually** to the `www` directory so Home Assistant can load it.
|
||||
### Via HACS (Recommended)
|
||||
|
||||
### Step 1: Copy the card file
|
||||
[](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager&category=integration)
|
||||
|
||||
Run the following command (via SSH, Terminal add-on, or container shell):
|
||||
1. Click the button above
|
||||
2. Confirm adding the repository to HACS
|
||||
3. Install **Shopping List Manager** from **HACS → Integrations**
|
||||
4. Restart Home Assistant
|
||||
5. Go to **Settings → Devices & Services → Add Integration** and search for **Shopping List Manager**
|
||||
|
||||
```bash
|
||||
mkdir -p /config/www/community/shopping_list_card && \
|
||||
cp /config/custom_components/shopping_list_manager/frontend/shopping_list_card.js \
|
||||
/config/www/community/shopping_list_card/shopping_list_card.js
|
||||
```
|
||||
### Manual Installation
|
||||
|
||||
1. Copy the `custom_components/shopping_list_manager/` folder into your HA `/config/custom_components/` directory
|
||||
2. Restart Home Assistant
|
||||
3. Go to **Settings → Devices & Services → Add Integration** and search for **Shopping List Manager**
|
||||
|
||||
---
|
||||
|
||||
### Step 2: Add the resource to Home Assistant
|
||||
## Lovelace Card
|
||||
|
||||
1. Go to **Settings → Dashboards**
|
||||
2. Click **⋮ (top right) → Resources**
|
||||
3. Click **Add Resource**
|
||||
4. Enter:
|
||||
Install the companion card to get the full shopping UI:
|
||||
|
||||
```text
|
||||
URL: /local/community/shopping_list_card/shopping_list_card.js
|
||||
Type: JavaScript Module
|
||||
```
|
||||
|
||||
5. Click **Create**
|
||||
6. Refresh your browser (**Ctrl + F5**)
|
||||
[](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager-card&category=plugin)
|
||||
|
||||
---
|
||||
|
||||
## 3. Add the Integration
|
||||
## Documentation
|
||||
|
||||
[](
|
||||
https://my.home-assistant.io/redirect/config_flow_start/?domain=shopping_list_manager
|
||||
)
|
||||
Full documentation is available in the [Wiki](https://github.com/thekiwismarthome/shopping-list-manager/wiki).
|
||||
|
||||
Click the button above **or** add it manually:
|
||||
## Support & Feedback
|
||||
|
||||
1. Go to **Settings → Devices & Services**
|
||||
2. Click **Add Integration**
|
||||
3. Search for **Shopping List Manager**
|
||||
4. Follow the setup steps
|
||||
|
||||
No YAML configuration is required.
|
||||
|
||||
---
|
||||
|
||||
## 4. Add the Card to a Dashboard
|
||||
|
||||
Add a **Manual** card to your dashboard and use the following YAML:
|
||||
|
||||
```yaml
|
||||
type: custom:shopping-list-card
|
||||
title: Shopping List
|
||||
list_id: groceries
|
||||
```
|
||||
|
||||
Use the **⚙️ cog button** in the card to configure additional settings.
|
||||
|
||||
---
|
||||
|
||||
## Updating
|
||||
|
||||
- HACS updates will update the **integration**
|
||||
- If the Lovelace card JavaScript changes in a future release, you must **repeat the copy command** above
|
||||
|
||||
This is expected behavior for single-repository integrations.
|
||||
|
||||
---
|
||||
|
||||
## Compatibility Notes
|
||||
|
||||
- Designed for **Home Assistant 2024.8+**
|
||||
- Uses WebSocket APIs
|
||||
- Fully compatible with the **Services → Actions** change introduced in Home Assistant 2024.8
|
||||
|
||||
---
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
If the card does not load:
|
||||
|
||||
1. Ensure Home Assistant was restarted after installation
|
||||
2. Verify the file exists at:
|
||||
|
||||
```
|
||||
/config/www/community/shopping_list_card/shopping_list_card.js
|
||||
```
|
||||
|
||||
3. Confirm the resource URL is correct
|
||||
4. Perform a hard browser refresh (**Ctrl + F5**)
|
||||
- [Open an Issue](https://github.com/thekiwismarthome/shopping-list-manager/issues)
|
||||
- [Home Assistant Community Forum](https://community.home-assistant.io)
|
||||
|
||||
---
|
||||
|
||||
## License
|
||||
|
||||
MIT License
|
||||
MIT — see [LICENSE](LICENSE) for details.
|
||||
|
||||
@@ -1,49 +1,279 @@
|
||||
"""
|
||||
Shopping List Manager - Home Assistant Custom Integration
|
||||
Clean-slate architecture with enforced invariants
|
||||
"""
|
||||
"""Shopping List Manager integration for Home Assistant."""
|
||||
import logging
|
||||
import os
|
||||
|
||||
from pathlib import Path
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.components import websocket_api as ha_websocket
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
|
||||
from .const import DOMAIN
|
||||
from .manager import ShoppingListManager
|
||||
# Import websocket handler functions directly
|
||||
from .websocket_api import (
|
||||
websocket_add_product,
|
||||
websocket_set_qty,
|
||||
websocket_get_products,
|
||||
websocket_get_active,
|
||||
websocket_delete_product,
|
||||
)
|
||||
from .const import DOMAIN, EVENT_ITEM_ADDED, EVENT_ITEM_UPDATED, EVENT_ITEM_CHECKED, EVENT_ITEM_DELETED, EVENT_LIST_UPDATED, EVENT_LIST_DELETED
|
||||
from .storage import ShoppingListStorage
|
||||
from .utils.images import ImageHandler
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
# Track storage instance globally
|
||||
DATA_STORAGE = f"{DOMAIN}_storage"
|
||||
|
||||
|
||||
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
"""Set up the Shopping List Manager component from yaml (not used)."""
|
||||
# This integration doesn't support YAML configuration
|
||||
# All setup is done via config entries (UI configuration)
|
||||
return True
|
||||
|
||||
|
||||
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
"""Set up Shopping List Manager from a config entry."""
|
||||
# Initialize the manager
|
||||
manager = ShoppingListManager(hass)
|
||||
await manager.async_load()
|
||||
_LOGGER.info("Setting up Shopping List Manager")
|
||||
|
||||
# Store manager in hass.data
|
||||
# Get component path for loading data files
|
||||
component_path = os.path.dirname(__file__)
|
||||
config_path = hass.config.path()
|
||||
|
||||
# Get country from options (or fall back to data, or default to NZ)
|
||||
country = entry.options.get("country") or entry.data.get("country", "NZ")
|
||||
_LOGGER.info("Using country: %s", country)
|
||||
|
||||
# Initialize storage with country
|
||||
storage = ShoppingListStorage(hass, component_path, country)
|
||||
await storage.async_load()
|
||||
|
||||
# Initialize image handler
|
||||
image_handler = ImageHandler(hass, config_path)
|
||||
|
||||
# Store instances in hass.data
|
||||
hass.data.setdefault(DOMAIN, {})
|
||||
hass.data[DOMAIN]["manager"] = manager
|
||||
hass.data[DOMAIN][DATA_STORAGE] = storage
|
||||
hass.data[DOMAIN]["image_handler"] = image_handler
|
||||
hass.data[DOMAIN]["country"] = country
|
||||
|
||||
# Register WebSocket commands using Home Assistant's websocket_api
|
||||
ha_websocket.async_register_command(hass, websocket_add_product)
|
||||
ha_websocket.async_register_command(hass, websocket_set_qty)
|
||||
ha_websocket.async_register_command(hass, websocket_get_products)
|
||||
ha_websocket.async_register_command(hass, websocket_get_active)
|
||||
ha_websocket.async_register_command(hass, websocket_delete_product)
|
||||
# Register update listener for options changes
|
||||
entry.async_on_unload(entry.add_update_listener(update_listener))
|
||||
|
||||
_LOGGER.info("Shopping List Manager setup complete - registered 5 WebSocket commands")
|
||||
# Register WebSocket commands
|
||||
await _async_register_websocket_handlers(hass, storage)
|
||||
|
||||
# Register frontend resources
|
||||
await _async_register_frontend(hass)
|
||||
|
||||
# CRITICAL: Register event listeners so non-admin users can subscribe
|
||||
# Without these, non-admin users cannot receive real-time updates
|
||||
def _dummy_listener(event):
|
||||
"""Dummy listener to enable event subscription for all users."""
|
||||
pass
|
||||
|
||||
hass.bus.async_listen(EVENT_ITEM_ADDED, _dummy_listener)
|
||||
hass.bus.async_listen(EVENT_ITEM_UPDATED, _dummy_listener)
|
||||
hass.bus.async_listen(EVENT_ITEM_CHECKED, _dummy_listener)
|
||||
hass.bus.async_listen(EVENT_ITEM_DELETED, _dummy_listener)
|
||||
hass.bus.async_listen(EVENT_LIST_UPDATED, _dummy_listener)
|
||||
hass.bus.async_listen(EVENT_LIST_DELETED, _dummy_listener)
|
||||
|
||||
_LOGGER.info("Shopping List Manager setup complete with event subscriptions enabled")
|
||||
|
||||
return True
|
||||
|
||||
|
||||
async def update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
|
||||
"""Handle options update."""
|
||||
# Reload the integration when options change
|
||||
await hass.config_entries.async_reload(entry.entry_id)
|
||||
|
||||
|
||||
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
"""Unload Shopping List Manager."""
|
||||
hass.data[DOMAIN].pop("manager", None)
|
||||
"""Unload a config entry."""
|
||||
_LOGGER.info("Unloading Shopping List Manager")
|
||||
|
||||
# Clean up hass.data
|
||||
if DOMAIN in hass.data:
|
||||
hass.data[DOMAIN].pop(DATA_STORAGE, None)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
async def _async_register_websocket_handlers(
|
||||
hass: HomeAssistant,
|
||||
storage: ShoppingListStorage
|
||||
) -> None:
|
||||
"""Register WebSocket API handlers."""
|
||||
from homeassistant.components import websocket_api
|
||||
from .websocket import handlers
|
||||
|
||||
# Lists handlers
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_subscribe,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_get_lists,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_create_list,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_update_list,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_delete_list,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_set_active_list,
|
||||
)
|
||||
|
||||
# Items handlers
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_get_items,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_add_item,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_update_item,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_check_item,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_increment_item,
|
||||
)
|
||||
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_delete_item,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_reorder_items,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_bulk_check_items,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_clear_checked_items,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_get_list_total,
|
||||
)
|
||||
|
||||
# Products handlers
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_search_by_barcode,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_search_products,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.ws_get_products_by_ids,
|
||||
)
|
||||
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_get_product_suggestions,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_add_product,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_update_product,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_get_product_substitutes,
|
||||
)
|
||||
|
||||
# Categories handlers
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_get_categories,
|
||||
)
|
||||
|
||||
# Integration settings handlers
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_get_integration_settings,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_set_country,
|
||||
)
|
||||
|
||||
# Backup / Restore handlers
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_export_data,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_import_data,
|
||||
)
|
||||
|
||||
# List members handler
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_update_list_members,
|
||||
)
|
||||
|
||||
# HA users handler
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_get_ha_users,
|
||||
)
|
||||
|
||||
# Loyalty card handlers
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_get_loyalty_cards,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_add_loyalty_card,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_update_loyalty_card,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_delete_loyalty_card,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_update_loyalty_card_members,
|
||||
)
|
||||
|
||||
_LOGGER.debug("WebSocket handlers registered")
|
||||
|
||||
|
||||
async def _async_register_frontend(hass: HomeAssistant) -> None:
|
||||
"""Register frontend resources."""
|
||||
# Since frontend is a separate HACS module, we don't need to register it here
|
||||
# The frontend card registers itself independently
|
||||
_LOGGER.debug("Frontend resources skipped (separate HACS module)")
|
||||
|
||||
|
||||
def get_storage(hass: HomeAssistant) -> ShoppingListStorage:
|
||||
"""Get the storage instance from hass.data.
|
||||
|
||||
Helper function for WebSocket handlers to access storage.
|
||||
"""
|
||||
return hass.data[DOMAIN][DATA_STORAGE]
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,5 +1,7 @@
|
||||
"""Config flow for Shopping List Manager."""
|
||||
import voluptuous as vol
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.core import callback
|
||||
|
||||
from .const import DOMAIN
|
||||
|
||||
@@ -16,10 +18,76 @@ class ShoppingListManagerConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
return self.async_abort(reason="single_instance_allowed")
|
||||
|
||||
if user_input is not None:
|
||||
# Create entry with default country
|
||||
return self.async_create_entry(
|
||||
title="Shopping List Manager",
|
||||
data={}
|
||||
data={"country": "NZ"},
|
||||
options={
|
||||
"country": "NZ",
|
||||
"enable_price_tracking": True,
|
||||
"enable_image_search": True,
|
||||
"metric_units_only": True,
|
||||
}
|
||||
)
|
||||
|
||||
# Show simple form
|
||||
return self.async_show_form(step_id="user")
|
||||
# Show simple setup form
|
||||
return self.async_show_form(
|
||||
step_id="user",
|
||||
description_placeholders={
|
||||
"info": "Country and other settings can be configured after setup via the Configure button."
|
||||
}
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
@callback
|
||||
def async_get_options_flow(config_entry):
|
||||
"""Get the options flow for this handler."""
|
||||
return OptionsFlowHandler(config_entry)
|
||||
|
||||
|
||||
class OptionsFlowHandler(config_entries.OptionsFlow):
|
||||
"""Handle options flow for Shopping List Manager."""
|
||||
|
||||
def __init__(self, config_entry):
|
||||
"""Initialize options flow."""
|
||||
self.config_entry = config_entry
|
||||
|
||||
async def async_step_init(self, user_input=None):
|
||||
"""Manage the options."""
|
||||
if user_input is not None:
|
||||
# Update options
|
||||
return self.async_create_entry(title="", data=user_input)
|
||||
|
||||
# Get current settings
|
||||
current_country = self.config_entry.options.get(
|
||||
"country",
|
||||
self.config_entry.data.get("country", "NZ")
|
||||
)
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="init",
|
||||
data_schema=vol.Schema({
|
||||
vol.Required("country", default=current_country): vol.In({
|
||||
"NZ": "New Zealand",
|
||||
"AU": "Australia",
|
||||
"US": "United States",
|
||||
"GB": "United Kingdom",
|
||||
"CA": "Canada",
|
||||
}),
|
||||
vol.Optional(
|
||||
"enable_price_tracking",
|
||||
default=self.config_entry.options.get("enable_price_tracking", True)
|
||||
): bool,
|
||||
vol.Optional(
|
||||
"enable_image_search",
|
||||
default=self.config_entry.options.get("enable_image_search", True)
|
||||
): bool,
|
||||
vol.Optional(
|
||||
"metric_units_only",
|
||||
default=self.config_entry.options.get("metric_units_only", True)
|
||||
): bool,
|
||||
}),
|
||||
description_placeholders={
|
||||
"info": "Changing country will reload the product catalog on next restart."
|
||||
}
|
||||
)
|
||||
|
||||
@@ -1,11 +1,131 @@
|
||||
"""Constants for Shopping List Manager."""
|
||||
|
||||
# Domain
|
||||
DOMAIN = "shopping_list_manager"
|
||||
|
||||
# Storage keys
|
||||
STORAGE_VERSION = 1
|
||||
# Storage Keys
|
||||
STORAGE_VERSION = 2
|
||||
STORAGE_KEY_LISTS = f"{DOMAIN}.lists"
|
||||
STORAGE_KEY_ITEMS = f"{DOMAIN}.items"
|
||||
STORAGE_KEY_PRODUCTS = f"{DOMAIN}.products"
|
||||
STORAGE_KEY_ACTIVE = f"{DOMAIN}.active_list"
|
||||
STORAGE_KEY_CATEGORIES = f"{DOMAIN}.categories"
|
||||
STORAGE_KEY_LOYALTY_CARDS = f"{DOMAIN}.loyalty_cards"
|
||||
|
||||
# WebSocket Commands - Lists
|
||||
WS_TYPE_LISTS_GET_ALL = f"{DOMAIN}/lists/get_all"
|
||||
WS_TYPE_LISTS_CREATE = f"{DOMAIN}/lists/create"
|
||||
WS_TYPE_LISTS_UPDATE = f"{DOMAIN}/lists/update"
|
||||
WS_TYPE_LISTS_DELETE = f"{DOMAIN}/lists/delete"
|
||||
WS_TYPE_LISTS_SET_ACTIVE = f"{DOMAIN}/lists/set_active"
|
||||
WS_TYPE_LISTS_UPDATE_MEMBERS = f"{DOMAIN}/lists/update_members"
|
||||
|
||||
# WebSocket Commands - Users
|
||||
WS_TYPE_USERS_GET_ALL = f"{DOMAIN}/users/get_all"
|
||||
|
||||
# WebSocket Commands - Items
|
||||
WS_TYPE_ITEMS_GET = f"{DOMAIN}/items/get"
|
||||
WS_TYPE_ITEMS_ADD = f"{DOMAIN}/items/add"
|
||||
WS_TYPE_ITEMS_UPDATE = f"{DOMAIN}/items/update"
|
||||
WS_TYPE_ITEMS_CHECK = f"{DOMAIN}/items/check"
|
||||
WS_TYPE_ITEMS_DELETE = f"{DOMAIN}/items/delete"
|
||||
WS_TYPE_ITEMS_REORDER = f"{DOMAIN}/items/reorder"
|
||||
WS_TYPE_ITEMS_BULK_CHECK = f"{DOMAIN}/items/bulk_check"
|
||||
WS_TYPE_ITEMS_CLEAR_CHECKED = f"{DOMAIN}/items/clear_checked"
|
||||
WS_TYPE_ITEMS_GET_TOTAL = f"{DOMAIN}/items/get_total"
|
||||
|
||||
# WebSocket Commands - Products
|
||||
WS_TYPE_PRODUCTS_SEARCH = f"{DOMAIN}/products/search"
|
||||
WS_TYPE_PRODUCTS_SUGGESTIONS = f"{DOMAIN}/products/suggestions"
|
||||
WS_TYPE_PRODUCTS_ADD = f"{DOMAIN}/products/add"
|
||||
WS_TYPE_PRODUCTS_UPDATE = f"{DOMAIN}/products/update"
|
||||
WS_TYPE_PRODUCTS_DELETE = f"{DOMAIN}/products/delete"
|
||||
|
||||
# WebSocket Commands - Categories
|
||||
WS_TYPE_CATEGORIES_GET_ALL = f"{DOMAIN}/categories/get_all"
|
||||
WS_TYPE_CATEGORIES_REORDER = f"{DOMAIN}/categories/reorder"
|
||||
|
||||
# WebSocket Commands - Loyalty Cards
|
||||
WS_TYPE_LOYALTY_GET_ALL = f"{DOMAIN}/loyalty/get_all"
|
||||
WS_TYPE_LOYALTY_ADD = f"{DOMAIN}/loyalty/add"
|
||||
WS_TYPE_LOYALTY_UPDATE = f"{DOMAIN}/loyalty/update"
|
||||
WS_TYPE_LOYALTY_DELETE = f"{DOMAIN}/loyalty/delete"
|
||||
WS_TYPE_LOYALTY_UPDATE_MEMBERS = f"{DOMAIN}/loyalty/update_members"
|
||||
|
||||
# WebSocket Commands - Subscriptions
|
||||
WS_TYPE_SUBSCRIBE = f"{DOMAIN}/subscribe"
|
||||
WS_TYPE_UNSUBSCRIBE = f"{DOMAIN}/unsubscribe"
|
||||
|
||||
# WebSocket Commands - Barcode (Phase 5)
|
||||
WS_TYPE_BARCODE_SCAN = f"{DOMAIN}/barcode/scan"
|
||||
WS_TYPE_BARCODE_ADD = f"{DOMAIN}/barcode/add_to_list"
|
||||
|
||||
# WebSocket Commands - OpenFoodFacts (Phase 5)
|
||||
WS_TYPE_OFF_FETCH = f"{DOMAIN}/openfoodfacts/fetch"
|
||||
WS_TYPE_OFF_IMPORT = f"{DOMAIN}/openfoodfacts/import"
|
||||
|
||||
# Events
|
||||
EVENT_SHOPPING_LIST_UPDATED = f"{DOMAIN}_updated"
|
||||
EVENT_ITEM_ADDED = f"{DOMAIN}_item_added"
|
||||
EVENT_ITEM_UPDATED = f"{DOMAIN}_item_updated"
|
||||
EVENT_ITEM_CHECKED = f"{DOMAIN}_item_checked"
|
||||
EVENT_ITEM_DELETED = f"{DOMAIN}_item_deleted"
|
||||
EVENT_LIST_UPDATED = f"{DOMAIN}_list_updated"
|
||||
EVENT_LIST_DELETED = f"{DOMAIN}_list_deleted"
|
||||
|
||||
# Image Configuration
|
||||
IMAGE_FORMAT = "webp"
|
||||
IMAGE_SIZE = 200 # 200x200px
|
||||
IMAGE_QUALITY = 85
|
||||
IMAGE_MAX_SIZE_KB = 15
|
||||
IMAGES_LOCAL_DIR = "www/shopping_list_manager/images"
|
||||
|
||||
# Placeholder image (inline SVG)
|
||||
PLACEHOLDER_IMAGE = "data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='200' height='200'%3E%3Crect width='200' height='200' fill='%23f0f0f0'/%3E%3Ctext x='50%25' y='50%25' dominant-baseline='middle' text-anchor='middle' font-family='Arial' font-size='16' fill='%23999'%3ENo Image%3C/text%3E%3C/svg%3E"
|
||||
|
||||
# Metric Units (always metric, regardless of country)
|
||||
METRIC_UNITS = {
|
||||
"weight": ["kg", "g"],
|
||||
"volume": ["L", "mL"],
|
||||
"count": ["units", "pack", "loaf", "dozen", "ea", "pkt", "tray", "bottle", "can", "bunch", "pottle", "roll", "sachet", "tub", "bar"]
|
||||
}
|
||||
|
||||
# Default quantities for common products (NZ-focused, can be country-specific later)
|
||||
DEFAULT_QUANTITIES = {
|
||||
"milk": {"quantity": 2, "unit": "L"},
|
||||
"bread": {"quantity": 1, "unit": "loaf"},
|
||||
"butter": {"quantity": 500, "unit": "g"},
|
||||
"eggs": {"quantity": 12, "unit": "ea"},
|
||||
"cheese": {"quantity": 500, "unit": "g"},
|
||||
"yogurt": {"quantity": 1, "unit": "kg"},
|
||||
"flour": {"quantity": 1.5, "unit": "kg"},
|
||||
"sugar": {"quantity": 1.5, "unit": "kg"},
|
||||
"rice": {"quantity": 1, "unit": "kg"},
|
||||
"pasta": {"quantity": 500, "unit": "g"},
|
||||
"chicken breast": {"quantity": 1, "unit": "kg"},
|
||||
"beef mince": {"quantity": 500, "unit": "g"},
|
||||
"sausages": {"quantity": 500, "unit": "g"},
|
||||
"bacon": {"quantity": 500, "unit": "g"},
|
||||
"apples": {"quantity": 1, "unit": "kg"},
|
||||
"bananas": {"quantity": 1, "unit": "kg"},
|
||||
"potatoes": {"quantity": 2, "unit": "kg"},
|
||||
"onions": {"quantity": 1, "unit": "kg"},
|
||||
"carrots": {"quantity": 1, "unit": "kg"},
|
||||
"tomatoes": {"quantity": 500, "unit": "g"},
|
||||
"lettuce": {"quantity": 1, "unit": "ea"},
|
||||
"capsicum": {"quantity": 1, "unit": "ea"},
|
||||
"broccoli": {"quantity": 1, "unit": "ea"},
|
||||
"cereal": {"quantity": 1, "unit": "pack"},
|
||||
"baked beans": {"quantity": 1, "unit": "can"},
|
||||
"tuna": {"quantity": 1, "unit": "can"},
|
||||
"olive oil": {"quantity": 1, "unit": "L"},
|
||||
"coffee": {"quantity": 200, "unit": "g"},
|
||||
"tea bags": {"quantity": 100, "unit": "ea"},
|
||||
"toilet paper": {"quantity": 12, "unit": "roll"},
|
||||
"paper towels": {"quantity": 2, "unit": "roll"},
|
||||
"dishwashing liquid": {"quantity": 500, "unit": "mL"},
|
||||
"laundry powder": {"quantity": 2, "unit": "kg"}
|
||||
}
|
||||
|
||||
# Paths
|
||||
CATEGORIES_FILE = "categories.json"
|
||||
PRODUCTS_CATALOG_FILE = "products_catalog.json"
|
||||
IMAGES_PATH = "images/products"
|
||||
|
||||
BIN
Binary file not shown.
BIN
Binary file not shown.
@@ -0,0 +1,62 @@
|
||||
"""Product catalog loader for Shopping List Manager."""
|
||||
import json
|
||||
import logging
|
||||
from typing import List, Dict, Any
|
||||
import aiofiles
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def load_product_catalog(component_path: str, country_code: str = "NZ") -> List[Dict[str, Any]]:
|
||||
"""Load product catalog from JSON file asynchronously.
|
||||
|
||||
Args:
|
||||
component_path: Path to the component directory
|
||||
country_code: Country code (e.g., 'NZ', 'AU', 'US')
|
||||
|
||||
Returns:
|
||||
List of product dictionaries
|
||||
"""
|
||||
import os
|
||||
|
||||
# Try country-specific catalog first
|
||||
if country_code:
|
||||
catalog_file = os.path.join(
|
||||
component_path, "data", f"products_catalog_{country_code.lower()}.json"
|
||||
)
|
||||
if not os.path.exists(catalog_file):
|
||||
_LOGGER.warning(
|
||||
"No country-specific catalog found for %s at %s",
|
||||
country_code,
|
||||
catalog_file
|
||||
)
|
||||
return []
|
||||
else:
|
||||
return []
|
||||
|
||||
try:
|
||||
# Use aiofiles for async file reading
|
||||
async with aiofiles.open(catalog_file, "r", encoding="utf-8") as f:
|
||||
content = await f.read()
|
||||
data = json.loads(content)
|
||||
|
||||
_LOGGER.info(
|
||||
"Loaded product catalog version %s for region %s",
|
||||
data.get("version", "unknown"),
|
||||
data.get("region", "default")
|
||||
)
|
||||
|
||||
products = data.get("products", [])
|
||||
_LOGGER.info("Loaded %d products from catalog", len(products))
|
||||
|
||||
return products
|
||||
|
||||
except FileNotFoundError:
|
||||
_LOGGER.error("Product catalog file not found: %s", catalog_file)
|
||||
return []
|
||||
except json.JSONDecodeError as err:
|
||||
_LOGGER.error("Failed to parse product catalog file: %s", err)
|
||||
return []
|
||||
except Exception as err:
|
||||
_LOGGER.error("Unexpected error loading product catalog: %s", err)
|
||||
return []
|
||||
@@ -0,0 +1,110 @@
|
||||
{
|
||||
"version": "1.0.0",
|
||||
"region": "NZ",
|
||||
"categories": [
|
||||
{
|
||||
"id": "produce",
|
||||
"name": "Fruit & Veg",
|
||||
"icon": "mdi:fruit-cherries",
|
||||
"color": "#4CAF50",
|
||||
"sort_order": 1,
|
||||
"system": true
|
||||
},
|
||||
{
|
||||
"id": "dairy",
|
||||
"name": "Dairy & Eggs",
|
||||
"icon": "mdi:cheese",
|
||||
"color": "#FFC107",
|
||||
"sort_order": 2,
|
||||
"system": true
|
||||
},
|
||||
{
|
||||
"id": "meat",
|
||||
"name": "Meat & Seafood",
|
||||
"icon": "mdi:food-steak",
|
||||
"color": "#F44336",
|
||||
"sort_order": 3,
|
||||
"system": true
|
||||
},
|
||||
{
|
||||
"id": "bakery",
|
||||
"name": "Bakery",
|
||||
"icon": "mdi:bread-slice",
|
||||
"color": "#FF9800",
|
||||
"sort_order": 4,
|
||||
"system": true
|
||||
},
|
||||
{
|
||||
"id": "frozen",
|
||||
"name": "Frozen Foods",
|
||||
"icon": "mdi:snowflake",
|
||||
"color": "#2196F3",
|
||||
"sort_order": 5,
|
||||
"system": true
|
||||
},
|
||||
{
|
||||
"id": "pantry",
|
||||
"name": "Pantry",
|
||||
"icon": "mdi:package-variant",
|
||||
"color": "#795548",
|
||||
"sort_order": 6,
|
||||
"system": true
|
||||
},
|
||||
{
|
||||
"id": "beverages",
|
||||
"name": "Drinks",
|
||||
"icon": "mdi:cup",
|
||||
"color": "#00BCD4",
|
||||
"sort_order": 7,
|
||||
"system": true
|
||||
},
|
||||
{
|
||||
"id": "snacks",
|
||||
"name": "Snacks & Biscuits",
|
||||
"icon": "mdi:food-apple",
|
||||
"color": "#E91E63",
|
||||
"sort_order": 8,
|
||||
"system": true
|
||||
},
|
||||
{
|
||||
"id": "household",
|
||||
"name": "Household",
|
||||
"icon": "mdi:spray-bottle",
|
||||
"color": "#9C27B0",
|
||||
"sort_order": 9,
|
||||
"system": true
|
||||
},
|
||||
{
|
||||
"id": "health",
|
||||
"name": "Health & Beauty",
|
||||
"icon": "mdi:heart-pulse",
|
||||
"color": "#009688",
|
||||
"sort_order": 10,
|
||||
"system": true
|
||||
},
|
||||
{
|
||||
"id": "pet",
|
||||
"name": "Pet Supplies",
|
||||
"icon": "mdi:paw",
|
||||
"color": "#FF5722",
|
||||
"sort_order": 11,
|
||||
"system": true
|
||||
},
|
||||
{
|
||||
"id": "baby",
|
||||
"name": "Baby",
|
||||
"icon": "mdi:baby-face",
|
||||
"color": "#F48FB1",
|
||||
"sort_order": 12,
|
||||
"system": true
|
||||
},
|
||||
{
|
||||
"id": "other",
|
||||
"name": "Other",
|
||||
"icon": "mdi:dots-horizontal",
|
||||
"color": "#9E9E9E",
|
||||
"sort_order": 99,
|
||||
"system": true
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -0,0 +1,98 @@
|
||||
"""Category loader utility."""
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from typing import List, Dict, Any
|
||||
import aiofiles
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def load_categories(component_path: str, country_code: str = None) -> List[Dict[str, Any]]:
|
||||
"""Load categories from JSON file asynchronously.
|
||||
|
||||
Args:
|
||||
component_path: Path to the component directory
|
||||
country_code: Country code from HA config (e.g., 'NZ', 'AU', 'US')
|
||||
If None, loads default categories.json
|
||||
|
||||
Returns:
|
||||
List of category dictionaries
|
||||
"""
|
||||
import os
|
||||
|
||||
# Try country-specific file first if country_code provided
|
||||
if country_code:
|
||||
country_file = os.path.join(
|
||||
component_path, "data", f"categories_{country_code.lower()}.json"
|
||||
)
|
||||
if os.path.exists(country_file):
|
||||
categories_file = country_file
|
||||
_LOGGER.debug("Using country-specific categories: %s", country_code)
|
||||
else:
|
||||
_LOGGER.debug(
|
||||
"No country-specific categories found for %s, using default",
|
||||
country_code
|
||||
)
|
||||
categories_file = os.path.join(component_path, "data", "categories.json")
|
||||
else:
|
||||
categories_file = os.path.join(component_path, "data", "categories.json")
|
||||
|
||||
try:
|
||||
async with aiofiles.open(categories_file, "r", encoding="utf-8") as f:
|
||||
content = await f.read()
|
||||
data = json.loads(content)
|
||||
|
||||
_LOGGER.info(
|
||||
"Loaded categories version %s for region %s",
|
||||
data.get("version", "unknown"),
|
||||
data.get("region", "default")
|
||||
)
|
||||
|
||||
return data.get("categories", [])
|
||||
|
||||
except FileNotFoundError:
|
||||
_LOGGER.error("Categories file not found: %s", categories_file)
|
||||
return _get_fallback_categories()
|
||||
except json.JSONDecodeError as err:
|
||||
_LOGGER.error("Failed to parse categories file: %s", err)
|
||||
return _get_fallback_categories()
|
||||
except Exception as err:
|
||||
_LOGGER.error("Unexpected error loading categories: %s", err)
|
||||
return _get_fallback_categories()
|
||||
|
||||
|
||||
def _get_fallback_categories() -> List[Dict[str, Any]]:
|
||||
"""Get minimal fallback categories if file loading fails.
|
||||
|
||||
Returns:
|
||||
List of basic category dictionaries
|
||||
"""
|
||||
_LOGGER.warning("Using fallback categories")
|
||||
|
||||
return [
|
||||
{
|
||||
"id": "produce",
|
||||
"name": "Produce",
|
||||
"icon": "mdi:fruit-cherries",
|
||||
"color": "#4CAF50",
|
||||
"sort_order": 1,
|
||||
"system": True
|
||||
},
|
||||
{
|
||||
"id": "dairy",
|
||||
"name": "Dairy",
|
||||
"icon": "mdi:cheese",
|
||||
"color": "#FFC107",
|
||||
"sort_order": 2,
|
||||
"system": True
|
||||
},
|
||||
{
|
||||
"id": "other",
|
||||
"name": "Other",
|
||||
"icon": "mdi:dots-horizontal",
|
||||
"color": "#9E9E9E",
|
||||
"sort_order": 99,
|
||||
"system": True
|
||||
}
|
||||
]
|
||||
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@@ -1,300 +0,0 @@
|
||||
"""Core Shopping List Manager with invariant enforcement."""
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Dict, Optional, List
|
||||
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers import storage
|
||||
|
||||
from .const import (
|
||||
DOMAIN,
|
||||
EVENT_SHOPPING_LIST_UPDATED,
|
||||
STORAGE_KEY_ACTIVE,
|
||||
STORAGE_KEY_PRODUCTS,
|
||||
STORAGE_VERSION,
|
||||
)
|
||||
from .models import Product, ActiveItem, InvariantError, validate_invariant
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ShoppingListManager:
|
||||
"""
|
||||
Manages multiple independent shopping lists.
|
||||
|
||||
Each list_id gets its own pair of storage files:
|
||||
- shopping_list_manager.{list_id}.products
|
||||
- shopping_list_manager.{list_id}.active_list
|
||||
|
||||
The default "groceries" list uses the original flat keys for backward compat:
|
||||
- shopping_list_manager.products → groceries products
|
||||
- shopping_list_manager.active_list → groceries active
|
||||
|
||||
Architecture principles:
|
||||
1. Products and active_list are separate concerns per list
|
||||
2. Products are authoritative, persistent data
|
||||
3. Active list is ephemeral state
|
||||
4. Invariant (active ⊆ products) enforced on every mutation
|
||||
5. Lock ensures atomic operations per list
|
||||
"""
|
||||
|
||||
def __init__(self, hass: HomeAssistant):
|
||||
"""Initialize the manager."""
|
||||
self.hass = hass
|
||||
# Per-list in-memory caches: list_id -> {key: Product}
|
||||
self._products: Dict[str, Dict[str, Product]] = {}
|
||||
self._active_list: Dict[str, Dict[str, ActiveItem]] = {}
|
||||
# Per-list locks
|
||||
self._locks: Dict[str, asyncio.Lock] = {}
|
||||
# Per-list storage Store instances (created lazily, except groceries)
|
||||
self._store_products: Dict[str, storage.Store] = {}
|
||||
self._store_active: Dict[str, storage.Store] = {}
|
||||
|
||||
# Pre-create stores for the default "groceries" list using the original flat keys
|
||||
# for backward compatibility — existing data just works
|
||||
self._store_products["groceries"] = storage.Store(
|
||||
hass, STORAGE_VERSION, STORAGE_KEY_PRODUCTS # "shopping_list_manager.products"
|
||||
)
|
||||
self._store_active["groceries"] = storage.Store(
|
||||
hass, STORAGE_VERSION, STORAGE_KEY_ACTIVE # "shopping_list_manager.active_list"
|
||||
)
|
||||
|
||||
def _lock_for(self, list_id: str) -> asyncio.Lock:
|
||||
"""Get or create lock for a list."""
|
||||
if list_id not in self._locks:
|
||||
self._locks[list_id] = asyncio.Lock()
|
||||
return self._locks[list_id]
|
||||
|
||||
def _store_products_for(self, list_id: str) -> storage.Store:
|
||||
"""Get or create products Store for a list."""
|
||||
if list_id not in self._store_products:
|
||||
# Non-groceries lists use namespaced keys
|
||||
key = f"{DOMAIN}.{list_id}.products"
|
||||
self._store_products[list_id] = storage.Store(self.hass, STORAGE_VERSION, key)
|
||||
return self._store_products[list_id]
|
||||
|
||||
def _store_active_for(self, list_id: str) -> storage.Store:
|
||||
"""Get or create active Store for a list."""
|
||||
if list_id not in self._store_active:
|
||||
key = f"{DOMAIN}.{list_id}.active_list"
|
||||
self._store_active[list_id] = storage.Store(self.hass, STORAGE_VERSION, key)
|
||||
return self._store_active[list_id]
|
||||
|
||||
async def _ensure_loaded(self, list_id: str) -> None:
|
||||
"""Lazily load a list from storage if not yet in memory."""
|
||||
if list_id in self._products:
|
||||
return # already loaded
|
||||
|
||||
products_data = await self._store_products_for(list_id).async_load()
|
||||
self._products[list_id] = {
|
||||
key: Product.from_dict(data) for key, data in (products_data or {}).items()
|
||||
}
|
||||
|
||||
active_data = await self._store_active_for(list_id).async_load()
|
||||
self._active_list[list_id] = {
|
||||
key: ActiveItem.from_dict(data) for key, data in (active_data or {}).items()
|
||||
}
|
||||
|
||||
# Repair any orphaned active items
|
||||
await self._async_repair_invariant(list_id)
|
||||
|
||||
_LOGGER.info(
|
||||
"Loaded list '%s': %d products, %d active",
|
||||
list_id, len(self._products[list_id]), len(self._active_list[list_id])
|
||||
)
|
||||
|
||||
async def async_load(self) -> None:
|
||||
"""Pre-load the default groceries list for backward compat."""
|
||||
async with self._lock_for("groceries"):
|
||||
await self._ensure_loaded("groceries")
|
||||
|
||||
async def _async_repair_invariant(self, list_id: str) -> None:
|
||||
"""Remove active items whose product no longer exists."""
|
||||
orphaned = [k for k in self._active_list[list_id] if k not in self._products[list_id]]
|
||||
if orphaned:
|
||||
_LOGGER.warning(
|
||||
"List '%s': removing %d orphaned active items: %s",
|
||||
list_id, len(orphaned), orphaned
|
||||
)
|
||||
for k in orphaned:
|
||||
del self._active_list[list_id][k]
|
||||
await self._async_save_active(list_id)
|
||||
|
||||
async def _async_save_products(self, list_id: str) -> None:
|
||||
"""Persist products to storage."""
|
||||
data = {key: p.to_dict() for key, p in self._products[list_id].items()}
|
||||
await self._store_products_for(list_id).async_save(data)
|
||||
|
||||
async def _async_save_active(self, list_id: str) -> None:
|
||||
"""Persist active list to storage."""
|
||||
data = {key: a.to_dict() for key, a in self._active_list[list_id].items()}
|
||||
await self._store_active_for(list_id).async_save(data)
|
||||
|
||||
def _fire_update_event(self) -> None:
|
||||
"""Fire event to notify listeners of changes."""
|
||||
self.hass.bus.async_fire(EVENT_SHOPPING_LIST_UPDATED)
|
||||
|
||||
# ========================================================================
|
||||
# PUBLIC API - All operations enforce invariants
|
||||
# ========================================================================
|
||||
|
||||
async def async_add_product(
|
||||
self,
|
||||
list_id: str,
|
||||
key: str,
|
||||
name: str,
|
||||
category: str = "other",
|
||||
unit: str = "pcs",
|
||||
image: str = ""
|
||||
) -> Product:
|
||||
"""
|
||||
Add or update a product in a list's catalog.
|
||||
|
||||
This operation:
|
||||
- Creates/updates product metadata
|
||||
- Does NOT modify quantities
|
||||
- Is idempotent
|
||||
- Persists to storage
|
||||
|
||||
Args:
|
||||
list_id: List identifier
|
||||
key: Unique product identifier
|
||||
name: Display name
|
||||
category: Product category
|
||||
unit: Unit of measurement
|
||||
image: Image URL
|
||||
|
||||
Returns:
|
||||
The created/updated Product
|
||||
"""
|
||||
async with self._lock_for(list_id):
|
||||
await self._ensure_loaded(list_id)
|
||||
|
||||
product = Product(
|
||||
key=key,
|
||||
name=name,
|
||||
category=category,
|
||||
unit=unit,
|
||||
image=image
|
||||
)
|
||||
|
||||
self._products[list_id][key] = product
|
||||
await self._async_save_products(list_id)
|
||||
|
||||
_LOGGER.debug("List '%s': added/updated product %s (%s)", list_id, name, key)
|
||||
self._fire_update_event()
|
||||
|
||||
return product
|
||||
|
||||
async def async_set_qty(self, list_id: str, key: str, qty: int) -> None:
|
||||
"""
|
||||
Set quantity for a product on the shopping list.
|
||||
|
||||
This operation:
|
||||
- REQUIRES product to exist (enforces invariant)
|
||||
- qty > 0: adds/updates active_list
|
||||
- qty == 0: removes from active_list
|
||||
- Persists state
|
||||
- Fires update event
|
||||
|
||||
Args:
|
||||
list_id: List identifier
|
||||
key: Product key (must exist in catalog)
|
||||
qty: New quantity (0 to remove, >0 to add/update)
|
||||
|
||||
Raises:
|
||||
InvariantError: If product doesn't exist
|
||||
ValueError: If qty is negative
|
||||
"""
|
||||
if qty < 0:
|
||||
raise ValueError(f"Quantity cannot be negative: {qty}")
|
||||
|
||||
async with self._lock_for(list_id):
|
||||
await self._ensure_loaded(list_id)
|
||||
|
||||
# INVARIANT ENFORCEMENT: Product must exist
|
||||
if key not in self._products[list_id]:
|
||||
raise InvariantError(
|
||||
f"Cannot set quantity for unknown product '{key}' in list '{list_id}'. "
|
||||
f"Product must be created first with add_product."
|
||||
)
|
||||
|
||||
# Update or remove from active list
|
||||
if qty > 0:
|
||||
self._active_list[list_id][key] = ActiveItem(qty=qty)
|
||||
_LOGGER.debug("List '%s': set qty for %s: %d", list_id, key, qty)
|
||||
else:
|
||||
# qty == 0: remove from list
|
||||
if key in self._active_list[list_id]:
|
||||
del self._active_list[list_id][key]
|
||||
_LOGGER.debug("List '%s': removed %s from active list", list_id, key)
|
||||
|
||||
await self._async_save_active(list_id)
|
||||
self._fire_update_event()
|
||||
|
||||
async def async_delete_product(self, list_id: str, key: str) -> None:
|
||||
"""
|
||||
Delete a product from the catalog.
|
||||
|
||||
This operation:
|
||||
- Removes product from catalog
|
||||
- Removes from active list (maintains invariant)
|
||||
- Persists both changes
|
||||
|
||||
Args:
|
||||
list_id: List identifier
|
||||
key: Product key to delete
|
||||
"""
|
||||
async with self._lock_for(list_id):
|
||||
await self._ensure_loaded(list_id)
|
||||
|
||||
if key not in self._products[list_id]:
|
||||
_LOGGER.warning("List '%s': attempted to delete non-existent product: %s", list_id, key)
|
||||
return
|
||||
|
||||
# Remove from catalog
|
||||
del self._products[list_id][key]
|
||||
|
||||
# Remove from active list (maintain invariant)
|
||||
if key in self._active_list[list_id]:
|
||||
del self._active_list[list_id][key]
|
||||
|
||||
await self._async_save_products(list_id)
|
||||
await self._async_save_active(list_id)
|
||||
|
||||
_LOGGER.debug("List '%s': deleted product: %s", list_id, key)
|
||||
self._fire_update_event()
|
||||
|
||||
async def async_get_products(self, list_id: str) -> Dict[str, dict]:
|
||||
"""
|
||||
Get all products in a list's catalog.
|
||||
|
||||
Args:
|
||||
list_id: List identifier
|
||||
|
||||
Returns:
|
||||
Dictionary of product key -> product data
|
||||
"""
|
||||
async with self._lock_for(list_id):
|
||||
await self._ensure_loaded(list_id)
|
||||
return {key: product.to_dict() for key, product in self._products[list_id].items()}
|
||||
|
||||
async def async_get_active(self, list_id: str) -> Dict[str, dict]:
|
||||
"""
|
||||
Get active shopping list (quantities only).
|
||||
|
||||
Args:
|
||||
list_id: List identifier
|
||||
|
||||
Returns:
|
||||
Dictionary of product key -> active item data (qty only)
|
||||
"""
|
||||
async with self._lock_for(list_id):
|
||||
await self._ensure_loaded(list_id)
|
||||
return {key: item.to_dict() for key, item in self._active_list[list_id].items()}
|
||||
|
||||
# NOTE: The following methods were removed as they're not used by the websocket API
|
||||
# and would need updating to support per-list structure:
|
||||
# - async_get_full_state()
|
||||
# - get_product()
|
||||
# - get_active_qty()
|
||||
@@ -1,12 +1,17 @@
|
||||
{
|
||||
"domain": "shopping_list_manager",
|
||||
"name": "Shopping List Manager",
|
||||
"version": "1.2.1",
|
||||
"version": "2.0.0",
|
||||
"documentation": "https://github.com/thekiwismarthome/shopping-list-manager",
|
||||
"issue_tracker": "https://github.com/thekiwismarthome/shopping-list-manager/issues",
|
||||
"requirements": [],
|
||||
"requirements": [
|
||||
"Pillow>=10.0.0",
|
||||
"aiofiles>=23.0.0",
|
||||
"rapidfuzz>=3.0.0"
|
||||
],
|
||||
"dependencies": [],
|
||||
"codeowners": ["@thekiwismarthome"],
|
||||
"config_flow": true,
|
||||
"iot_class": "local_push"
|
||||
"iot_class": "local_push",
|
||||
"integration_type": "service"
|
||||
}
|
||||
|
||||
@@ -1,104 +1,138 @@
|
||||
"""Data models for Shopping List Manager."""
|
||||
from dataclasses import dataclass, asdict
|
||||
from typing import Dict
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from datetime import datetime
|
||||
from typing import Optional, List, Dict, Any
|
||||
import uuid
|
||||
|
||||
|
||||
def generate_id() -> str:
|
||||
"""Generate a unique ID."""
|
||||
return str(uuid.uuid4())
|
||||
|
||||
|
||||
def current_timestamp() -> str:
|
||||
"""Get current ISO timestamp."""
|
||||
return datetime.utcnow().isoformat() + "Z"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Category:
|
||||
"""Category model."""
|
||||
id: str
|
||||
name: str
|
||||
icon: str
|
||||
color: str
|
||||
sort_order: int
|
||||
system: bool = True
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary."""
|
||||
return asdict(self)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Product:
|
||||
"""
|
||||
Product catalog entry - authoritative product definition.
|
||||
|
||||
Products exist independently of the shopping list.
|
||||
They define WHAT can be shopped, not HOW MUCH is needed.
|
||||
"""
|
||||
key: str
|
||||
"""Product model."""
|
||||
id: str
|
||||
name: str
|
||||
category: str = "other"
|
||||
unit: str = "pcs"
|
||||
image: str = ""
|
||||
category_id: str
|
||||
aliases: List[str] = field(default_factory=list)
|
||||
default_unit: str = "units"
|
||||
default_quantity: float = 1
|
||||
price: Optional[float] = None
|
||||
currency: Optional[str] = None
|
||||
price_per_unit: Optional[float] = None
|
||||
price_updated: Optional[str] = None
|
||||
image_url: Optional[str] = None
|
||||
image_source: Optional[str] = None
|
||||
barcode: Optional[str] = None
|
||||
brands: List[str] = field(default_factory=list)
|
||||
nutrition: Optional[Dict[str, Any]] = None
|
||||
user_frequency: int = 0
|
||||
last_used: Optional[str] = None
|
||||
custom: bool = False
|
||||
source: str = "user"
|
||||
tags: List[str] = field(default_factory=list)
|
||||
collections: List[str] = field(default_factory=list)
|
||||
taxonomy: Dict[str, Any] = field(default_factory=dict)
|
||||
allergens: List[str] = field(default_factory=list)
|
||||
substitution_group: str = ""
|
||||
priority_level: int = 0
|
||||
image_hint: str = ""
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
"""Convert to dictionary for storage/transmission."""
|
||||
return {
|
||||
"key": self.key,
|
||||
"name": self.name,
|
||||
"category": self.category,
|
||||
"unit": self.unit,
|
||||
"image": self.image
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def from_dict(data: dict) -> 'Product':
|
||||
"""Create Product from dictionary."""
|
||||
return Product(
|
||||
key=data["key"],
|
||||
name=data["name"],
|
||||
category=data.get("category", "other"),
|
||||
unit=data.get("unit", "pcs"),
|
||||
image=data.get("image", "")
|
||||
)
|
||||
|
||||
def __post_init__(self):
|
||||
"""Validate product data."""
|
||||
if not self.key:
|
||||
raise ValueError("Product key cannot be empty")
|
||||
if not self.name:
|
||||
raise ValueError("Product name cannot be empty")
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary."""
|
||||
return asdict(self)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ActiveItem:
|
||||
"""
|
||||
Shopping list state - quantity only.
|
||||
class Item:
|
||||
"""Shopping list item model."""
|
||||
id: str
|
||||
list_id: str
|
||||
name: str
|
||||
category_id: str
|
||||
product_id: Optional[str] = None
|
||||
quantity: float = 1
|
||||
unit: str = "units"
|
||||
note: Optional[str] = None
|
||||
checked: bool = False
|
||||
checked_at: Optional[str] = None
|
||||
created_at: str = field(default_factory=current_timestamp)
|
||||
updated_at: str = field(default_factory=current_timestamp)
|
||||
image_url: Optional[str] = None
|
||||
order_index: int = 0
|
||||
price: Optional[float] = None
|
||||
estimated_total: Optional[float] = None
|
||||
barcode: Optional[str] = None
|
||||
|
||||
Contains NO product metadata, only references products by key.
|
||||
qty > 0 means "on the list"
|
||||
qty == 0 means "not on the list" (should be removed)
|
||||
"""
|
||||
qty: int
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary."""
|
||||
return asdict(self)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
"""Convert to dictionary for storage/transmission."""
|
||||
return {"qty": self.qty}
|
||||
|
||||
@staticmethod
|
||||
def from_dict(data: dict) -> 'ActiveItem':
|
||||
"""Create ActiveItem from dictionary."""
|
||||
return ActiveItem(qty=data["qty"])
|
||||
|
||||
def __post_init__(self):
|
||||
"""Validate quantity."""
|
||||
if self.qty < 0:
|
||||
raise ValueError("Quantity cannot be negative")
|
||||
def calculate_total(self) -> None:
|
||||
"""Calculate estimated total from quantity and price."""
|
||||
if self.price is not None:
|
||||
self.estimated_total = self.quantity * self.price
|
||||
|
||||
|
||||
class InvariantError(Exception):
|
||||
"""
|
||||
Raised when the core data model invariant is violated.
|
||||
@dataclass
|
||||
class LoyaltyCard:
|
||||
"""Loyalty card model."""
|
||||
id: str
|
||||
name: str
|
||||
number: str
|
||||
barcode: str = ""
|
||||
barcode_type: str = "barcode" # "barcode" or "qrcode"
|
||||
logo: str = ""
|
||||
notes: str = ""
|
||||
color: str = "#9fa8da"
|
||||
created_at: str = field(default_factory=current_timestamp)
|
||||
updated_at: str = field(default_factory=current_timestamp)
|
||||
# Ownership: None = visible to all users; set = private to owner + allowed_users
|
||||
owner_id: Optional[str] = None
|
||||
allowed_users: List[str] = field(default_factory=list)
|
||||
|
||||
Invariant: Every key in active_list MUST exist in products.
|
||||
|
||||
If this exception is raised, the system is in an inconsistent state
|
||||
and must be repaired before continuing.
|
||||
"""
|
||||
pass
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary."""
|
||||
return asdict(self)
|
||||
|
||||
|
||||
def validate_invariant(products: Dict[str, Product],
|
||||
active_list: Dict[str, ActiveItem]) -> None:
|
||||
"""
|
||||
Validate the core data model invariant.
|
||||
@dataclass
|
||||
class ShoppingList:
|
||||
"""Shopping list model."""
|
||||
id: str
|
||||
name: str
|
||||
icon: str = "mdi:cart"
|
||||
created_at: str = field(default_factory=current_timestamp)
|
||||
updated_at: str = field(default_factory=current_timestamp)
|
||||
item_order: List[str] = field(default_factory=list)
|
||||
category_order: List[str] = field(default_factory=list)
|
||||
active: bool = False
|
||||
# Ownership: None = visible to all users; set = private to owner + allowed_users
|
||||
owner_id: Optional[str] = None
|
||||
allowed_users: List[str] = field(default_factory=list)
|
||||
|
||||
Args:
|
||||
products: Product catalog dictionary
|
||||
active_list: Active shopping list dictionary
|
||||
|
||||
Raises:
|
||||
InvariantError: If any key in active_list doesn't exist in products
|
||||
"""
|
||||
for key in active_list:
|
||||
if key not in products:
|
||||
raise InvariantError(
|
||||
f"Invariant violated: active_list contains unknown product key '{key}'. "
|
||||
f"This product must be added to the catalog first."
|
||||
)
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary."""
|
||||
return asdict(self)
|
||||
|
||||
@@ -0,0 +1,752 @@
|
||||
"""Storage management for Shopping List Manager."""
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from datetime import datetime, timezone
|
||||
from typing import Dict, List, Optional, Any
|
||||
from .utils.search import ProductSearch
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.storage import Store
|
||||
|
||||
from .const import (
|
||||
STORAGE_VERSION,
|
||||
STORAGE_KEY_LISTS,
|
||||
STORAGE_KEY_ITEMS,
|
||||
STORAGE_KEY_PRODUCTS,
|
||||
STORAGE_KEY_CATEGORIES,
|
||||
STORAGE_KEY_LOYALTY_CARDS,
|
||||
)
|
||||
from .data.catalog_loader import load_product_catalog
|
||||
from .models import ShoppingList, Item, Product, Category, LoyaltyCard, generate_id
|
||||
from .data.category_loader import load_categories
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ShoppingListStorage:
|
||||
"""Handle storage for shopping lists."""
|
||||
|
||||
def __init__(self, hass: HomeAssistant, component_path: str, country: str = "NZ") -> None:
|
||||
"""Initialize storage.
|
||||
|
||||
Args:
|
||||
hass: Home Assistant instance
|
||||
component_path: Path to the component directory
|
||||
country: Country code (NZ, AU, US, GB, CA, etc.)
|
||||
"""
|
||||
self.hass = hass
|
||||
self._component_path = component_path
|
||||
self._country = country # Store country
|
||||
self._store_lists = Store(hass, STORAGE_VERSION, STORAGE_KEY_LISTS)
|
||||
self._store_items = Store(hass, STORAGE_VERSION, STORAGE_KEY_ITEMS)
|
||||
self._store_products = Store(hass, STORAGE_VERSION, STORAGE_KEY_PRODUCTS)
|
||||
self._store_categories = Store(hass, STORAGE_VERSION, STORAGE_KEY_CATEGORIES)
|
||||
self._store_loyalty_cards = Store(hass, STORAGE_VERSION, STORAGE_KEY_LOYALTY_CARDS)
|
||||
|
||||
self._lists: Dict[str, ShoppingList] = {}
|
||||
self._items: Dict[str, List[Item]] = {}
|
||||
self._products: Dict[str, Product] = {}
|
||||
self._categories: List[Category] = []
|
||||
self._loyalty_cards: Dict[str, LoyaltyCard] = {}
|
||||
self._search_engine: Optional[ProductSearch] = None
|
||||
|
||||
async def async_load(self) -> None:
|
||||
"""Load data from storage."""
|
||||
# Load lists
|
||||
lists_data = await self._store_lists.async_load()
|
||||
if lists_data:
|
||||
self._lists = {
|
||||
list_id: ShoppingList(**list_data)
|
||||
for list_id, list_data in lists_data.items()
|
||||
}
|
||||
_LOGGER.debug("Loaded %d lists", len(self._lists))
|
||||
else:
|
||||
# Create default list if none exist
|
||||
default_list = ShoppingList(
|
||||
id=generate_id(),
|
||||
name="Shopping List",
|
||||
icon="mdi:cart",
|
||||
active=True
|
||||
)
|
||||
self._lists[default_list.id] = default_list
|
||||
await self._save_lists()
|
||||
_LOGGER.info("Created default shopping list")
|
||||
|
||||
# Load items
|
||||
items_data = await self._store_items.async_load()
|
||||
if items_data:
|
||||
self._items = {
|
||||
list_id: [Item(**item_data) for item_data in items]
|
||||
for list_id, items in items_data.items()
|
||||
}
|
||||
_LOGGER.debug("Loaded items for %d lists", len(self._items))
|
||||
|
||||
# Load products
|
||||
products_data = await self._store_products.async_load()
|
||||
if products_data:
|
||||
self._products = {
|
||||
product_id: Product(**product_data)
|
||||
for product_id, product_data in products_data.items()
|
||||
}
|
||||
_LOGGER.debug("Loaded %d products", len(self._products))
|
||||
|
||||
# Load categories
|
||||
categories_data = await self._store_categories.async_load()
|
||||
if categories_data:
|
||||
self._categories = [Category(**cat_data) for cat_data in categories_data]
|
||||
_LOGGER.debug("Loaded %d categories", len(self._categories))
|
||||
else:
|
||||
# Initialize with default categories from JSON file
|
||||
default_categories = await load_categories(self._component_path, self._country) # Use self._country
|
||||
self._categories = [Category(**cat) for cat in default_categories]
|
||||
await self._save_categories()
|
||||
_LOGGER.info(
|
||||
"Initialized %d default categories for country: %s",
|
||||
len(self._categories),
|
||||
self._country # Use self._country
|
||||
)
|
||||
|
||||
# Load product catalog if products are empty
|
||||
if not self._products:
|
||||
_LOGGER.info("Loading product catalog for country: %s", self._country)
|
||||
catalog_products = await load_product_catalog(self._component_path, self._country) # Use self._country
|
||||
|
||||
if catalog_products:
|
||||
_LOGGER.info("Importing %d products from catalog", len(catalog_products))
|
||||
# ... rest of import code ...
|
||||
for prod_data in catalog_products:
|
||||
try:
|
||||
# Create Product from catalog data
|
||||
product = Product(
|
||||
id=prod_data.get("id", generate_id()),
|
||||
name=prod_data["name"],
|
||||
category_id=prod_data.get("category_id", "other"),
|
||||
aliases=prod_data.get("aliases", []),
|
||||
default_unit=prod_data.get("default_unit", "units"),
|
||||
default_quantity=prod_data.get("default_quantity", 1),
|
||||
price=prod_data.get("price") or prod_data.get("typical_price"),
|
||||
currency=self.hass.config.currency,
|
||||
barcode=prod_data.get("barcode"),
|
||||
brands=prod_data.get("brands", []),
|
||||
image_url=prod_data.get("image_url", ""),
|
||||
custom=False,
|
||||
source="catalog",
|
||||
tags=prod_data.get("tags", []),
|
||||
collections=prod_data.get("collections", []),
|
||||
taxonomy=prod_data.get("taxonomy", {}),
|
||||
allergens=prod_data.get("allergens", []),
|
||||
substitution_group=prod_data.get("substitution_group", ""),
|
||||
priority_level=prod_data.get("priority_level", 0),
|
||||
image_hint=prod_data.get("image_hint", "")
|
||||
)
|
||||
self._products[product.id] = product
|
||||
except Exception as err:
|
||||
_LOGGER.error("Failed to import product %s: %s", prod_data.get("name"), err)
|
||||
continue
|
||||
|
||||
await self._save_products()
|
||||
_LOGGER.info("Successfully imported %d products from catalog", len(self._products))
|
||||
|
||||
# Load loyalty cards
|
||||
loyalty_data = await self._store_loyalty_cards.async_load()
|
||||
if loyalty_data:
|
||||
self._loyalty_cards = {
|
||||
card_id: LoyaltyCard(**card_data)
|
||||
for card_id, card_data in loyalty_data.items()
|
||||
}
|
||||
_LOGGER.debug("Loaded %d loyalty cards", len(self._loyalty_cards))
|
||||
|
||||
# Initialize search engine after products are loaded
|
||||
if self._products:
|
||||
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
|
||||
self._search_engine = ProductSearch(products_dict)
|
||||
_LOGGER.debug("Initialized product search engine with %d products", len(self._products))
|
||||
else:
|
||||
self._search_engine = None
|
||||
_LOGGER.warning("No products loaded, search engine not initialized")
|
||||
|
||||
# Lists methods
|
||||
async def _save_lists(self) -> None:
|
||||
"""Save lists to storage."""
|
||||
data = {list_id: lst.to_dict() for list_id, lst in self._lists.items()}
|
||||
await self._store_lists.async_save(data)
|
||||
|
||||
def get_lists(self, user_id: str = None, is_admin: bool = False) -> List[ShoppingList]:
|
||||
"""Get lists visible to the specified user.
|
||||
|
||||
Global lists (owner_id=None) are visible to everyone.
|
||||
Private lists are visible to their owner, anyone in allowed_users, and admins.
|
||||
"""
|
||||
all_lists = list(self._lists.values())
|
||||
if is_admin or user_id is None:
|
||||
return all_lists
|
||||
return [
|
||||
lst for lst in all_lists
|
||||
if lst.owner_id is None
|
||||
or lst.owner_id == user_id
|
||||
or user_id in (lst.allowed_users or [])
|
||||
]
|
||||
|
||||
def get_list(self, list_id: str) -> Optional[ShoppingList]:
|
||||
"""Get a specific list."""
|
||||
return self._lists.get(list_id)
|
||||
|
||||
def get_active_list(self) -> Optional[ShoppingList]:
|
||||
"""Get the active list."""
|
||||
for lst in self._lists.values():
|
||||
if lst.active:
|
||||
return lst
|
||||
return None
|
||||
|
||||
async def create_list(self, name: str, icon: str = "mdi:cart", owner_id: str = None) -> ShoppingList:
|
||||
"""Create a new list. Pass owner_id to make the list private to that user."""
|
||||
new_list = ShoppingList(
|
||||
id=generate_id(),
|
||||
name=name,
|
||||
icon=icon,
|
||||
category_order=[cat.id for cat in self._categories],
|
||||
owner_id=owner_id,
|
||||
)
|
||||
self._lists[new_list.id] = new_list
|
||||
self._items[new_list.id] = []
|
||||
await self._save_lists()
|
||||
await self._write_config_backup()
|
||||
_LOGGER.info("Created new list: %s", name)
|
||||
return new_list
|
||||
|
||||
async def update_list(self, list_id: str, **kwargs) -> Optional[ShoppingList]:
|
||||
"""Update a list."""
|
||||
if list_id not in self._lists:
|
||||
return None
|
||||
|
||||
lst = self._lists[list_id]
|
||||
for key, value in kwargs.items():
|
||||
if hasattr(lst, key):
|
||||
setattr(lst, key, value)
|
||||
|
||||
from .models import current_timestamp
|
||||
lst.updated_at = current_timestamp()
|
||||
|
||||
await self._save_lists()
|
||||
_LOGGER.debug("Updated list: %s", list_id)
|
||||
return lst
|
||||
|
||||
async def update_list_members(self, list_id: str, allowed_users: List[str]) -> Optional[ShoppingList]:
|
||||
"""Update the allowed_users for a private list."""
|
||||
if list_id not in self._lists:
|
||||
return None
|
||||
lst = self._lists[list_id]
|
||||
lst.allowed_users = allowed_users
|
||||
from .models import current_timestamp
|
||||
lst.updated_at = current_timestamp()
|
||||
await self._save_lists()
|
||||
_LOGGER.debug("Updated members for list: %s", list_id)
|
||||
return lst
|
||||
|
||||
async def delete_list(self, list_id: str) -> bool:
|
||||
"""Delete a list."""
|
||||
if list_id not in self._lists:
|
||||
return False
|
||||
|
||||
del self._lists[list_id]
|
||||
if list_id in self._items:
|
||||
del self._items[list_id]
|
||||
|
||||
await self._save_lists()
|
||||
await self._save_items()
|
||||
_LOGGER.info("Deleted list: %s", list_id)
|
||||
return True
|
||||
|
||||
async def set_active_list(self, list_id: str) -> bool:
|
||||
"""Set the active list."""
|
||||
if list_id not in self._lists:
|
||||
return False
|
||||
|
||||
# Deactivate all lists
|
||||
for lst in self._lists.values():
|
||||
lst.active = False
|
||||
|
||||
# Activate the specified list
|
||||
self._lists[list_id].active = True
|
||||
|
||||
await self._save_lists()
|
||||
_LOGGER.debug("Set active list: %s", list_id)
|
||||
return True
|
||||
|
||||
# Items methods
|
||||
async def _save_items(self) -> None:
|
||||
"""Save items to storage."""
|
||||
data = {
|
||||
list_id: [item.to_dict() for item in items]
|
||||
for list_id, items in self._items.items()
|
||||
}
|
||||
await self._store_items.async_save(data)
|
||||
|
||||
def get_items(self, list_id: str) -> List[Item]:
|
||||
"""Get items for a list."""
|
||||
return self._items.get(list_id, [])
|
||||
|
||||
async def add_item(self, list_id: str, **kwargs) -> Optional[Item]:
|
||||
"""Add an item to a list."""
|
||||
if list_id not in self._lists:
|
||||
return None
|
||||
|
||||
new_item = Item(
|
||||
id=generate_id(),
|
||||
list_id=list_id,
|
||||
**kwargs
|
||||
)
|
||||
new_item.calculate_total()
|
||||
|
||||
if list_id not in self._items:
|
||||
self._items[list_id] = []
|
||||
|
||||
self._items[list_id].append(new_item)
|
||||
|
||||
# Update product frequency if product_id provided
|
||||
if new_item.product_id and new_item.product_id in self._products:
|
||||
product = self._products[new_item.product_id]
|
||||
product.user_frequency += 1
|
||||
from .models import current_timestamp
|
||||
product.last_used = current_timestamp()
|
||||
await self._save_products()
|
||||
|
||||
await self._save_items()
|
||||
_LOGGER.debug("Added item to list %s: %s", list_id, new_item.name)
|
||||
return new_item
|
||||
|
||||
async def update_item(self, item_id: str, **kwargs) -> Optional[Item]:
|
||||
"""Update an item."""
|
||||
for list_id, items in self._items.items():
|
||||
for item in items:
|
||||
if item.id == item_id:
|
||||
for key, value in kwargs.items():
|
||||
if hasattr(item, key):
|
||||
setattr(item, key, value)
|
||||
|
||||
from .models import current_timestamp
|
||||
item.updated_at = current_timestamp()
|
||||
item.calculate_total()
|
||||
|
||||
await self._save_items()
|
||||
_LOGGER.debug("Updated item: %s", item_id)
|
||||
return item
|
||||
|
||||
return None
|
||||
|
||||
async def check_item(self, item_id: str, checked: bool) -> Optional[Item]:
|
||||
"""Check or uncheck an item."""
|
||||
for items in self._items.values():
|
||||
for item in items:
|
||||
if item.id == item_id:
|
||||
item.checked = checked
|
||||
from .models import current_timestamp
|
||||
item.checked_at = current_timestamp() if checked else None
|
||||
item.updated_at = current_timestamp()
|
||||
|
||||
await self._save_items()
|
||||
_LOGGER.debug("Checked item: %s = %s", item_id, checked)
|
||||
return item
|
||||
|
||||
return None
|
||||
|
||||
async def delete_item(self, item_id: str) -> bool:
|
||||
"""Delete an item."""
|
||||
for list_id, items in self._items.items():
|
||||
for i, item in enumerate(items):
|
||||
if item.id == item_id:
|
||||
del self._items[list_id][i]
|
||||
await self._save_items()
|
||||
_LOGGER.debug("Deleted item: %s", item_id)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
async def bulk_check_items(self, item_ids: List[str], checked: bool) -> int:
|
||||
"""Bulk check/uncheck items."""
|
||||
count = 0
|
||||
from .models import current_timestamp
|
||||
timestamp = current_timestamp()
|
||||
|
||||
for items in self._items.values():
|
||||
for item in items:
|
||||
if item.id in item_ids:
|
||||
item.checked = checked
|
||||
item.checked_at = timestamp if checked else None
|
||||
item.updated_at = timestamp
|
||||
count += 1
|
||||
|
||||
if count > 0:
|
||||
await self._save_items()
|
||||
_LOGGER.debug("Bulk checked %d items", count)
|
||||
|
||||
return count
|
||||
|
||||
async def clear_checked_items(self, list_id: str) -> int:
|
||||
"""Clear all checked items from a list."""
|
||||
if list_id not in self._items:
|
||||
return 0
|
||||
|
||||
original_count = len(self._items[list_id])
|
||||
self._items[list_id] = [item for item in self._items[list_id] if not item.checked]
|
||||
removed_count = original_count - len(self._items[list_id])
|
||||
|
||||
if removed_count > 0:
|
||||
await self._save_items()
|
||||
_LOGGER.info("Cleared %d checked items from list %s", removed_count, list_id)
|
||||
|
||||
return removed_count
|
||||
|
||||
def get_list_total(self, list_id: str) -> Dict[str, Any]:
|
||||
"""Get total price for a list."""
|
||||
items = self.get_items(list_id)
|
||||
total = 0.0
|
||||
item_count = 0
|
||||
|
||||
for item in items:
|
||||
if not item.checked and item.price is not None:
|
||||
total += item.quantity * item.price
|
||||
item_count += 1
|
||||
|
||||
return {
|
||||
"total": round(total, 2),
|
||||
"currency": self.hass.config.currency,
|
||||
"item_count": item_count
|
||||
}
|
||||
|
||||
# Products methods
|
||||
async def _save_products(self) -> None:
|
||||
"""Save products to storage."""
|
||||
data = {product_id: product.to_dict() for product_id, product in self._products.items()}
|
||||
await self._store_products.async_save(data)
|
||||
|
||||
def get_products(self) -> List[Product]:
|
||||
"""Get all products."""
|
||||
return list(self._products.values())
|
||||
|
||||
def get_product(self, product_id: str) -> Optional[Product]:
|
||||
"""Get a specific product."""
|
||||
return self._products.get(product_id)
|
||||
|
||||
def search_products(
|
||||
self,
|
||||
query: str,
|
||||
limit: int = 10,
|
||||
exclude_allergens: Optional[List[str]] = None,
|
||||
include_tags: Optional[List[str]] = None,
|
||||
substitution_group: Optional[str] = None,
|
||||
) -> List[Product]:
|
||||
"""Search products with enhanced fuzzy matching and filters.
|
||||
|
||||
Args:
|
||||
query: Search query
|
||||
limit: Maximum results
|
||||
exclude_allergens: Allergens to exclude
|
||||
include_tags: Tags to include
|
||||
substitution_group: Filter by substitution group
|
||||
|
||||
Returns:
|
||||
List of matching products
|
||||
"""
|
||||
if not self._search_engine:
|
||||
_LOGGER.warning("Search engine not initialized")
|
||||
return []
|
||||
|
||||
# Convert products dict to format search engine expects
|
||||
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
|
||||
search_engine = ProductSearch(products_dict)
|
||||
|
||||
results = search_engine.search(
|
||||
query=query,
|
||||
limit=limit,
|
||||
exclude_allergens=exclude_allergens,
|
||||
include_tags=include_tags,
|
||||
substitution_group=substitution_group,
|
||||
)
|
||||
|
||||
# Convert back to Product objects
|
||||
return [self._products[r["id"]] for r in results if r["id"] in self._products]
|
||||
|
||||
def find_product_substitutes(self, product_id: str, limit: int = 5) -> List[Product]:
|
||||
"""Find substitute products.
|
||||
|
||||
Args:
|
||||
product_id: Product to find substitutes for
|
||||
limit: Maximum substitutes
|
||||
|
||||
Returns:
|
||||
List of substitute products
|
||||
"""
|
||||
if not self._search_engine:
|
||||
return []
|
||||
|
||||
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
|
||||
search_engine = ProductSearch(products_dict)
|
||||
|
||||
results = search_engine.find_substitutes(product_id, limit)
|
||||
return [self._products[r["id"]] for r in results if r["id"] in self._products]
|
||||
|
||||
def get_product_suggestions(self, limit: int = 20) -> List[Product]:
|
||||
"""Get product suggestions based on usage frequency."""
|
||||
products = list(self._products.values())
|
||||
products.sort(key=lambda p: p.user_frequency, reverse=True)
|
||||
return products[:limit]
|
||||
|
||||
async def add_product(self, **kwargs) -> Product:
|
||||
"""Add a new product."""
|
||||
new_product = Product(
|
||||
id=generate_id(),
|
||||
currency=self.hass.config.currency,
|
||||
**kwargs
|
||||
)
|
||||
self._products[new_product.id] = new_product
|
||||
await self._save_products()
|
||||
await self._write_config_backup()
|
||||
# Rebuild search engine so the new product is immediately searchable
|
||||
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
|
||||
self._search_engine = ProductSearch(products_dict)
|
||||
_LOGGER.debug("Added product: %s", new_product.name)
|
||||
return new_product
|
||||
|
||||
async def reload_catalog(self, country_code: str) -> int:
|
||||
"""Replace catalog-sourced products with those from a new country's catalog.
|
||||
Products with source='user' are preserved."""
|
||||
catalog_ids = [
|
||||
pid for pid, p in self._products.items()
|
||||
if getattr(p, 'source', 'user') == 'catalog'
|
||||
]
|
||||
for pid in catalog_ids:
|
||||
del self._products[pid]
|
||||
|
||||
self._country = country_code
|
||||
catalog_products = await load_product_catalog(self._component_path, country_code)
|
||||
count = 0
|
||||
for prod_data in catalog_products:
|
||||
try:
|
||||
product = Product(
|
||||
id=prod_data.get("id", generate_id()),
|
||||
name=prod_data["name"],
|
||||
category_id=prod_data.get("category_id", "other"),
|
||||
aliases=prod_data.get("aliases", []),
|
||||
default_unit=prod_data.get("default_unit", "units"),
|
||||
default_quantity=prod_data.get("default_quantity", 1),
|
||||
price=prod_data.get("price") or prod_data.get("typical_price"),
|
||||
currency=self.hass.config.currency,
|
||||
barcode=prod_data.get("barcode"),
|
||||
brands=prod_data.get("brands", []),
|
||||
image_url=prod_data.get("image_url", ""),
|
||||
custom=False,
|
||||
source="catalog",
|
||||
tags=prod_data.get("tags", []),
|
||||
collections=prod_data.get("collections", []),
|
||||
taxonomy=prod_data.get("taxonomy", {}),
|
||||
allergens=prod_data.get("allergens", []),
|
||||
substitution_group=prod_data.get("substitution_group", ""),
|
||||
priority_level=prod_data.get("priority_level", 0),
|
||||
image_hint=prod_data.get("image_hint", "")
|
||||
)
|
||||
self._products[product.id] = product
|
||||
count += 1
|
||||
except Exception as err:
|
||||
_LOGGER.error("Failed to import product %s: %s", prod_data.get("name"), err)
|
||||
|
||||
await self._save_products()
|
||||
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
|
||||
self._search_engine = ProductSearch(products_dict)
|
||||
_LOGGER.info("Reloaded catalog for %s: %d products imported", country_code, count)
|
||||
return count
|
||||
|
||||
async def update_product(self, product_id: str, **kwargs) -> Optional[Product]:
|
||||
"""Update a product."""
|
||||
if product_id not in self._products:
|
||||
return None
|
||||
|
||||
product = self._products[product_id]
|
||||
for key, value in kwargs.items():
|
||||
if hasattr(product, key):
|
||||
setattr(product, key, value)
|
||||
|
||||
await self._save_products()
|
||||
await self._write_config_backup()
|
||||
_LOGGER.debug("Updated product: %s", product_id)
|
||||
return product
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Backup / Restore
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def export_user_data(self) -> dict:
|
||||
"""Return a serialisable snapshot of all user-created data."""
|
||||
user_products = [
|
||||
p.to_dict() for p in self._products.values()
|
||||
if getattr(p, "source", "user") == "user"
|
||||
]
|
||||
lists = [lst.to_dict() for lst in self._lists.values()]
|
||||
items = {
|
||||
list_id: [item.to_dict() for item in items_list]
|
||||
for list_id, items_list in self._items.items()
|
||||
}
|
||||
return {
|
||||
"slm_backup_version": "1.0",
|
||||
"exported_at": datetime.now(timezone.utc).isoformat(),
|
||||
"country": self._country,
|
||||
"user_products": user_products,
|
||||
"lists": lists,
|
||||
"items": items,
|
||||
}
|
||||
|
||||
async def import_user_data(self, data: dict) -> dict:
|
||||
"""Merge a backup into live storage. Skips anything already present by ID."""
|
||||
imported_products = 0
|
||||
imported_lists = 0
|
||||
imported_items = 0
|
||||
|
||||
for prod_data in data.get("user_products", []):
|
||||
prod_id = prod_data.get("id")
|
||||
if prod_id and prod_id not in self._products:
|
||||
try:
|
||||
self._products[prod_id] = Product(**prod_data)
|
||||
imported_products += 1
|
||||
except Exception as err:
|
||||
_LOGGER.warning("Skipped product during import: %s", err)
|
||||
|
||||
if imported_products:
|
||||
await self._save_products()
|
||||
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
|
||||
self._search_engine = ProductSearch(products_dict)
|
||||
|
||||
for list_data in data.get("lists", []):
|
||||
list_id = list_data.get("id")
|
||||
if list_id and list_id not in self._lists:
|
||||
try:
|
||||
lst = ShoppingList(**list_data)
|
||||
lst.active = False
|
||||
self._lists[list_id] = lst
|
||||
imported_lists += 1
|
||||
except Exception as err:
|
||||
_LOGGER.warning("Skipped list during import: %s", err)
|
||||
|
||||
backup_items = data.get("items", {})
|
||||
for list_id, items_list in backup_items.items():
|
||||
if list_id in self._lists and list_id not in self._items:
|
||||
try:
|
||||
self._items[list_id] = [Item(**d) for d in items_list]
|
||||
imported_items += len(self._items[list_id])
|
||||
except Exception as err:
|
||||
_LOGGER.warning("Skipped items for list %s: %s", list_id, err)
|
||||
|
||||
if imported_lists or imported_items:
|
||||
await self._save_lists()
|
||||
await self._save_items()
|
||||
|
||||
_LOGGER.info(
|
||||
"Import complete: %d products, %d lists, %d items",
|
||||
imported_products, imported_lists, imported_items,
|
||||
)
|
||||
return {"products": imported_products, "lists": imported_lists, "items": imported_items}
|
||||
|
||||
async def _write_config_backup(self) -> None:
|
||||
"""Silently write a backup JSON to the HA config directory."""
|
||||
try:
|
||||
backup_path = os.path.join(
|
||||
self.hass.config.config_dir,
|
||||
"shopping_list_manager_backup.json",
|
||||
)
|
||||
data = await self.export_user_data()
|
||||
|
||||
def _write() -> None:
|
||||
with open(backup_path, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||||
|
||||
await self.hass.async_add_executor_job(_write)
|
||||
_LOGGER.debug("Auto-backup written to %s", backup_path)
|
||||
except Exception as err:
|
||||
_LOGGER.warning("Failed to write config backup: %s", err)
|
||||
|
||||
# Categories methods
|
||||
async def _save_categories(self) -> None:
|
||||
"""Save categories to storage."""
|
||||
data = [cat.to_dict() for cat in self._categories]
|
||||
await self._store_categories.async_save(data)
|
||||
|
||||
def get_categories(self) -> List[Category]:
|
||||
"""Get all categories."""
|
||||
return self._categories
|
||||
|
||||
# Loyalty card methods
|
||||
async def _save_loyalty_cards(self) -> None:
|
||||
"""Save loyalty cards to storage."""
|
||||
data = {card_id: card.to_dict() for card_id, card in self._loyalty_cards.items()}
|
||||
await self._store_loyalty_cards.async_save(data)
|
||||
|
||||
def get_loyalty_cards(self, user_id: str = None, is_admin: bool = False) -> List[LoyaltyCard]:
|
||||
"""Get loyalty cards visible to the specified user.
|
||||
|
||||
Global cards (owner_id=None) are visible to everyone.
|
||||
Private cards are visible to their owner, anyone in allowed_users, and admins.
|
||||
"""
|
||||
all_cards = list(self._loyalty_cards.values())
|
||||
if is_admin or user_id is None:
|
||||
return all_cards
|
||||
return [
|
||||
card for card in all_cards
|
||||
if card.owner_id is None
|
||||
or card.owner_id == user_id
|
||||
or user_id in (card.allowed_users or [])
|
||||
]
|
||||
|
||||
def get_loyalty_card(self, card_id: str) -> Optional[LoyaltyCard]:
|
||||
"""Get a specific loyalty card."""
|
||||
return self._loyalty_cards.get(card_id)
|
||||
|
||||
async def create_loyalty_card(self, owner_id: str = None, **kwargs) -> LoyaltyCard:
|
||||
"""Create a new loyalty card."""
|
||||
from .models import current_timestamp
|
||||
new_card = LoyaltyCard(
|
||||
id=generate_id(),
|
||||
owner_id=owner_id,
|
||||
**kwargs
|
||||
)
|
||||
self._loyalty_cards[new_card.id] = new_card
|
||||
await self._save_loyalty_cards()
|
||||
_LOGGER.debug("Created loyalty card: %s", new_card.name)
|
||||
return new_card
|
||||
|
||||
async def update_loyalty_card(self, card_id: str, **kwargs) -> Optional[LoyaltyCard]:
|
||||
"""Update a loyalty card."""
|
||||
if card_id not in self._loyalty_cards:
|
||||
return None
|
||||
|
||||
card = self._loyalty_cards[card_id]
|
||||
for key, value in kwargs.items():
|
||||
if hasattr(card, key):
|
||||
setattr(card, key, value)
|
||||
|
||||
from .models import current_timestamp
|
||||
card.updated_at = current_timestamp()
|
||||
await self._save_loyalty_cards()
|
||||
_LOGGER.debug("Updated loyalty card: %s", card_id)
|
||||
return card
|
||||
|
||||
async def delete_loyalty_card(self, card_id: str) -> bool:
|
||||
"""Delete a loyalty card."""
|
||||
if card_id not in self._loyalty_cards:
|
||||
return False
|
||||
|
||||
del self._loyalty_cards[card_id]
|
||||
await self._save_loyalty_cards()
|
||||
_LOGGER.debug("Deleted loyalty card: %s", card_id)
|
||||
return True
|
||||
|
||||
async def update_loyalty_card_members(self, card_id: str, allowed_users: List[str]) -> Optional[LoyaltyCard]:
|
||||
"""Update the allowed_users for a private loyalty card."""
|
||||
if card_id not in self._loyalty_cards:
|
||||
return None
|
||||
|
||||
card = self._loyalty_cards[card_id]
|
||||
card.allowed_users = allowed_users
|
||||
from .models import current_timestamp
|
||||
card.updated_at = current_timestamp()
|
||||
await self._save_loyalty_cards()
|
||||
_LOGGER.debug("Updated members for loyalty card: %s", card_id)
|
||||
return card
|
||||
@@ -0,0 +1 @@
|
||||
"""Utilities for Shopping List Manager."""
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -0,0 +1,109 @@
|
||||
"""Image handling utilities for Shopping List Manager."""
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ImageHandler:
|
||||
"""Handle product images with URL and local file support."""
|
||||
|
||||
def __init__(self, hass, config_path: str):
|
||||
"""Initialize image handler.
|
||||
|
||||
Args:
|
||||
hass: Home Assistant instance
|
||||
config_path: Path to HA config directory
|
||||
"""
|
||||
self.hass = hass
|
||||
# Images stored in /config/www/shopping_list_manager/images/
|
||||
self._local_images_dir = Path(config_path) / "www" / "shopping_list_manager" / "images"
|
||||
self._local_images_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
_LOGGER.info("Image directory: %s", self._local_images_dir)
|
||||
|
||||
def get_image_url(self, product_name: str, external_url: Optional[str] = None) -> str:
|
||||
"""Get image URL for a product.
|
||||
|
||||
Priority:
|
||||
1. External URL (if provided)
|
||||
2. Local file match
|
||||
3. Placeholder
|
||||
|
||||
Args:
|
||||
product_name: Name of product to find image for
|
||||
external_url: Optional external image URL
|
||||
|
||||
Returns:
|
||||
Image URL (external, local, or placeholder)
|
||||
"""
|
||||
# Priority 1: Use external URL if provided
|
||||
if external_url:
|
||||
return external_url
|
||||
|
||||
# Priority 2: Look for local file
|
||||
local_url = self._find_local_image(product_name)
|
||||
if local_url:
|
||||
return local_url
|
||||
|
||||
# Priority 3: Placeholder
|
||||
return self._get_placeholder_url()
|
||||
|
||||
def _find_local_image(self, product_name: str) -> Optional[str]:
|
||||
"""Find local image file for product.
|
||||
|
||||
Searches for files matching product name (case-insensitive).
|
||||
Supports: .webp, .jpg, .jpeg, .png
|
||||
|
||||
Args:
|
||||
product_name: Product name to search for
|
||||
|
||||
Returns:
|
||||
Local URL if found, None otherwise
|
||||
"""
|
||||
# Normalize product name for filename matching
|
||||
normalized_name = product_name.lower().replace(" ", "_")
|
||||
|
||||
# Supported extensions
|
||||
extensions = [".webp", ".jpg", ".jpeg", ".png"]
|
||||
|
||||
for ext in extensions:
|
||||
# Check exact match
|
||||
image_file = self._local_images_dir / f"{normalized_name}{ext}"
|
||||
if image_file.exists():
|
||||
return f"/local/shopping_list_manager/images/{normalized_name}{ext}"
|
||||
|
||||
# Check for files starting with the product name
|
||||
for file in self._local_images_dir.glob(f"{normalized_name}*{ext}"):
|
||||
return f"/local/shopping_list_manager/images/{file.name}"
|
||||
|
||||
return None
|
||||
|
||||
def _get_placeholder_url(self) -> str:
|
||||
"""Get placeholder image URL.
|
||||
|
||||
Returns:
|
||||
URL to placeholder image
|
||||
"""
|
||||
# Use a simple colored placeholder
|
||||
# You can replace this with a real placeholder image later
|
||||
return "data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='200' height='200'%3E%3Crect width='200' height='200' fill='%23f0f0f0'/%3E%3Ctext x='50%25' y='50%25' dominant-baseline='middle' text-anchor='middle' font-family='Arial' font-size='16' fill='%23999'%3ENo Image%3C/text%3E%3C/svg%3E"
|
||||
|
||||
def list_available_images(self) -> list:
|
||||
"""List all available local images.
|
||||
|
||||
Returns:
|
||||
List of (filename, product_name_guess) tuples
|
||||
"""
|
||||
images = []
|
||||
extensions = [".webp", ".jpg", ".jpeg", ".png"]
|
||||
|
||||
for ext in extensions:
|
||||
for image_file in self._local_images_dir.glob(f"*{ext}"):
|
||||
# Guess product name from filename
|
||||
product_name = image_file.stem.replace("_", " ").title()
|
||||
images.append((image_file.name, product_name))
|
||||
|
||||
return sorted(images)
|
||||
@@ -0,0 +1,192 @@
|
||||
"""Enhanced product search utilities."""
|
||||
import logging
|
||||
from typing import List, Dict, Any, Optional
|
||||
from rapidfuzz import fuzz, process
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ProductSearch:
|
||||
"""Advanced product search with fuzzy matching and filtering."""
|
||||
|
||||
def __init__(self, products: Dict[str, Any]):
|
||||
"""Initialize search with product catalog.
|
||||
|
||||
Args:
|
||||
products: Dictionary of product_id -> Product objects
|
||||
"""
|
||||
self.products = products
|
||||
|
||||
def search(
|
||||
self,
|
||||
query: str,
|
||||
limit: int = 10,
|
||||
exclude_allergens: Optional[List[str]] = None,
|
||||
include_tags: Optional[List[str]] = None,
|
||||
substitution_group: Optional[str] = None,
|
||||
taxonomy_filters: Optional[Dict[str, Any]] = None,
|
||||
min_score: int = 60,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Advanced product search with multiple filters.
|
||||
|
||||
Args:
|
||||
query: Search query string
|
||||
limit: Maximum results to return
|
||||
exclude_allergens: List of allergens to exclude (e.g., ["milk", "gluten"])
|
||||
include_tags: Only include products with these tags
|
||||
substitution_group: Filter by substitution group
|
||||
taxonomy_filters: Filter by taxonomy (e.g., {"dietary": ["vegan"]})
|
||||
min_score: Minimum fuzzy match score (0-100)
|
||||
|
||||
Returns:
|
||||
List of matching products with scores
|
||||
"""
|
||||
query_lower = query.lower().strip()
|
||||
|
||||
if not query_lower:
|
||||
return []
|
||||
|
||||
candidates = []
|
||||
|
||||
for product_id, product in self.products.items():
|
||||
# Apply allergen filter
|
||||
if exclude_allergens:
|
||||
if any(
|
||||
allergen in product.get("allergens", [])
|
||||
for allergen in exclude_allergens
|
||||
):
|
||||
continue
|
||||
|
||||
# Apply tag filter
|
||||
if include_tags:
|
||||
if not any(
|
||||
tag in product.get("tags", [])
|
||||
for tag in include_tags
|
||||
):
|
||||
continue
|
||||
|
||||
# Apply substitution group filter
|
||||
if substitution_group:
|
||||
if product.get("substitution_group") != substitution_group:
|
||||
continue
|
||||
|
||||
# Apply taxonomy filters
|
||||
if taxonomy_filters:
|
||||
product_taxonomy = product.get("taxonomy", {})
|
||||
matches_taxonomy = True
|
||||
|
||||
for key, values in taxonomy_filters.items():
|
||||
if key not in product_taxonomy:
|
||||
matches_taxonomy = False
|
||||
break
|
||||
|
||||
product_values = product_taxonomy[key]
|
||||
if isinstance(product_values, list):
|
||||
if not any(v in product_values for v in values):
|
||||
matches_taxonomy = False
|
||||
break
|
||||
else:
|
||||
if product_values not in values:
|
||||
matches_taxonomy = False
|
||||
break
|
||||
|
||||
if not matches_taxonomy:
|
||||
continue
|
||||
|
||||
# Calculate match score
|
||||
score = self._calculate_score(query_lower, product)
|
||||
|
||||
if score >= min_score:
|
||||
candidates.append({
|
||||
"product": product,
|
||||
"score": score,
|
||||
})
|
||||
|
||||
# Sort by score (descending), then by user frequency, then by priority
|
||||
candidates.sort(
|
||||
key=lambda x: (
|
||||
x["score"],
|
||||
x["product"].get("user_frequency", 0),
|
||||
x["product"].get("priority_level", 0),
|
||||
),
|
||||
reverse=True
|
||||
)
|
||||
|
||||
# Return top results
|
||||
return [c["product"] for c in candidates[:limit]]
|
||||
|
||||
def _calculate_score(self, query: str, product: Dict[str, Any]) -> int:
|
||||
"""Calculate fuzzy match score for a product.
|
||||
|
||||
Args:
|
||||
query: Search query
|
||||
product: Product dictionary
|
||||
|
||||
Returns:
|
||||
Score from 0-100
|
||||
"""
|
||||
product_name = product.get("name", "").lower()
|
||||
aliases = [a.lower() for a in product.get("aliases", [])]
|
||||
|
||||
# Exact match gets highest score
|
||||
if query == product_name:
|
||||
return 100
|
||||
|
||||
# Check aliases for exact match
|
||||
if query in aliases:
|
||||
return 95
|
||||
|
||||
# Check if query is substring of product name
|
||||
if query in product_name:
|
||||
return 90
|
||||
|
||||
# Check if query is substring of any alias
|
||||
for alias in aliases:
|
||||
if query in alias:
|
||||
return 85
|
||||
|
||||
# Fuzzy match on product name
|
||||
name_score = fuzz.WRatio(query, product_name)
|
||||
|
||||
# Fuzzy match on aliases
|
||||
alias_scores = [fuzz.WRatio(query, alias) for alias in aliases]
|
||||
best_alias_score = max(alias_scores) if alias_scores else 0
|
||||
|
||||
# Return best score
|
||||
return max(name_score, best_alias_score)
|
||||
|
||||
def find_substitutes(self, product_id: str, limit: int = 5) -> List[Dict[str, Any]]:
|
||||
"""Find substitute products for a given product.
|
||||
|
||||
Args:
|
||||
product_id: ID of product to find substitutes for
|
||||
limit: Maximum substitutes to return
|
||||
|
||||
Returns:
|
||||
List of substitute products
|
||||
"""
|
||||
if product_id not in self.products:
|
||||
return []
|
||||
|
||||
product = self.products[product_id]
|
||||
substitution_group = product.get("substitution_group")
|
||||
|
||||
if not substitution_group:
|
||||
return []
|
||||
|
||||
# Find all products in the same substitution group
|
||||
substitutes = []
|
||||
for pid, p in self.products.items():
|
||||
if pid != product_id and p.get("substitution_group") == substitution_group:
|
||||
substitutes.append(p)
|
||||
|
||||
# Sort by priority and frequency
|
||||
substitutes.sort(
|
||||
key=lambda x: (
|
||||
x.get("priority_level", 0),
|
||||
x.get("user_frequency", 0),
|
||||
),
|
||||
reverse=True
|
||||
)
|
||||
|
||||
return substitutes[:limit]
|
||||
@@ -0,0 +1 @@
|
||||
"""WebSocket API handlers for Shopping List Manager."""
|
||||
BIN
Binary file not shown.
BIN
Binary file not shown.
File diff suppressed because it is too large
Load Diff
@@ -1,256 +0,0 @@
|
||||
"""WebSocket API for Shopping List Manager."""
|
||||
import logging
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components import websocket_api
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
|
||||
from .const import DOMAIN
|
||||
from .models import InvariantError
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@websocket_api.websocket_command({
|
||||
vol.Required("type"): "shopping_list_manager/add_product",
|
||||
vol.Optional("list_id", default="groceries"): str,
|
||||
vol.Required("key"): str,
|
||||
vol.Required("name"): str,
|
||||
vol.Optional("category", default="other"): str,
|
||||
vol.Optional("unit", default="pcs"): str,
|
||||
vol.Optional("image", default=""): str,
|
||||
})
|
||||
@websocket_api.async_response
|
||||
async def websocket_add_product(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: dict,
|
||||
) -> None:
|
||||
"""
|
||||
Add or update a product in the catalog.
|
||||
|
||||
Does NOT modify quantity - use set_qty for that.
|
||||
|
||||
Request:
|
||||
{
|
||||
"type": "shopping_list_manager/add_product",
|
||||
"list_id": "groceries", # optional, defaults to "groceries"
|
||||
"key": "milk",
|
||||
"name": "Milk",
|
||||
"category": "dairy",
|
||||
"unit": "pcs",
|
||||
"image": ""
|
||||
}
|
||||
|
||||
Response:
|
||||
{
|
||||
"success": true,
|
||||
"result": {
|
||||
"key": "milk",
|
||||
"name": "Milk",
|
||||
"category": "dairy",
|
||||
"unit": "pcs",
|
||||
"image": ""
|
||||
}
|
||||
}
|
||||
"""
|
||||
manager = hass.data[DOMAIN]["manager"]
|
||||
list_id = msg.get("list_id", "groceries")
|
||||
|
||||
try:
|
||||
product = await manager.async_add_product(
|
||||
list_id=list_id,
|
||||
key=msg["key"],
|
||||
name=msg["name"],
|
||||
category=msg.get("category", "other"),
|
||||
unit=msg.get("unit", "pcs"),
|
||||
image=msg.get("image", "")
|
||||
)
|
||||
|
||||
connection.send_result(msg["id"], product.to_dict())
|
||||
|
||||
except Exception as err:
|
||||
_LOGGER.error("Error adding product to list '%s': %s", list_id, err)
|
||||
connection.send_error(msg["id"], "add_product_failed", str(err))
|
||||
|
||||
|
||||
@websocket_api.websocket_command({
|
||||
vol.Required("type"): "shopping_list_manager/set_qty",
|
||||
vol.Optional("list_id", default="groceries"): str,
|
||||
vol.Required("key"): str,
|
||||
vol.Required("qty"): vol.All(int, vol.Range(min=0)),
|
||||
})
|
||||
@websocket_api.async_response
|
||||
async def websocket_set_qty(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: dict,
|
||||
) -> None:
|
||||
"""
|
||||
Set quantity for a product on the shopping list.
|
||||
|
||||
Product MUST exist in catalog first.
|
||||
qty = 0 removes from list.
|
||||
qty > 0 adds/updates on list.
|
||||
|
||||
Request:
|
||||
{
|
||||
"type": "shopping_list_manager/set_qty",
|
||||
"list_id": "groceries", # optional, defaults to "groceries"
|
||||
"key": "milk",
|
||||
"qty": 2
|
||||
}
|
||||
|
||||
Response:
|
||||
{
|
||||
"success": true
|
||||
}
|
||||
|
||||
Error (if product doesn't exist):
|
||||
{
|
||||
"success": false,
|
||||
"error": {
|
||||
"code": "invariant_violation",
|
||||
"message": "Cannot set quantity for unknown product 'milk'..."
|
||||
}
|
||||
}
|
||||
"""
|
||||
manager = hass.data[DOMAIN]["manager"]
|
||||
list_id = msg.get("list_id", "groceries")
|
||||
|
||||
try:
|
||||
await manager.async_set_qty(
|
||||
list_id=list_id,
|
||||
key=msg["key"],
|
||||
qty=msg["qty"]
|
||||
)
|
||||
|
||||
connection.send_result(msg["id"], {"success": True})
|
||||
|
||||
except InvariantError as err:
|
||||
# This is expected if frontend tries to set qty for non-existent product
|
||||
_LOGGER.warning("Invariant violation in set_qty (list '%s'): %s", list_id, err)
|
||||
connection.send_error(msg["id"], "invariant_violation", str(err))
|
||||
|
||||
except Exception as err:
|
||||
_LOGGER.error("Error setting quantity in list '%s': %s", list_id, err)
|
||||
connection.send_error(msg["id"], "set_qty_failed", str(err))
|
||||
|
||||
|
||||
@websocket_api.websocket_command({
|
||||
vol.Required("type"): "shopping_list_manager/get_products",
|
||||
vol.Optional("list_id", default="groceries"): str,
|
||||
})
|
||||
@websocket_api.async_response
|
||||
async def websocket_get_products(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: dict,
|
||||
) -> None:
|
||||
"""
|
||||
Get all products in the catalog.
|
||||
|
||||
Request:
|
||||
{
|
||||
"type": "shopping_list_manager/get_products",
|
||||
"list_id": "groceries" # optional, defaults to "groceries"
|
||||
}
|
||||
|
||||
Response:
|
||||
{
|
||||
"milk": {
|
||||
"key": "milk",
|
||||
"name": "Milk",
|
||||
"category": "dairy",
|
||||
"unit": "pcs",
|
||||
"image": ""
|
||||
},
|
||||
...
|
||||
}
|
||||
"""
|
||||
manager = hass.data[DOMAIN]["manager"]
|
||||
list_id = msg.get("list_id", "groceries")
|
||||
|
||||
try:
|
||||
products = await manager.async_get_products(list_id=list_id)
|
||||
connection.send_result(msg["id"], products)
|
||||
|
||||
except Exception as err:
|
||||
_LOGGER.error("Error getting products for list '%s': %s", list_id, err)
|
||||
connection.send_error(msg["id"], "get_products_failed", str(err))
|
||||
|
||||
|
||||
@websocket_api.websocket_command({
|
||||
vol.Required("type"): "shopping_list_manager/get_active",
|
||||
vol.Optional("list_id", default="groceries"): str,
|
||||
})
|
||||
@websocket_api.async_response
|
||||
async def websocket_get_active(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: dict,
|
||||
) -> None:
|
||||
"""
|
||||
Get active shopping list (quantities only).
|
||||
|
||||
Request:
|
||||
{
|
||||
"type": "shopping_list_manager/get_active",
|
||||
"list_id": "groceries" # optional, defaults to "groceries"
|
||||
}
|
||||
|
||||
Response:
|
||||
{
|
||||
"milk": {"qty": 2},
|
||||
"bread": {"qty": 1},
|
||||
...
|
||||
}
|
||||
"""
|
||||
manager = hass.data[DOMAIN]["manager"]
|
||||
list_id = msg.get("list_id", "groceries")
|
||||
|
||||
try:
|
||||
active = await manager.async_get_active(list_id=list_id)
|
||||
connection.send_result(msg["id"], active)
|
||||
|
||||
except Exception as err:
|
||||
_LOGGER.error("Error getting active list for '%s': %s", list_id, err)
|
||||
connection.send_error(msg["id"], "get_active_failed", str(err))
|
||||
|
||||
|
||||
@websocket_api.websocket_command({
|
||||
vol.Required("type"): "shopping_list_manager/delete_product",
|
||||
vol.Optional("list_id", default="groceries"): str,
|
||||
vol.Required("key"): str,
|
||||
})
|
||||
@websocket_api.async_response
|
||||
async def websocket_delete_product(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: dict,
|
||||
) -> None:
|
||||
"""
|
||||
Delete a product from catalog (and remove from active list).
|
||||
|
||||
Request:
|
||||
{
|
||||
"type": "shopping_list_manager/delete_product",
|
||||
"list_id": "groceries", # optional, defaults to "groceries"
|
||||
"key": "milk"
|
||||
}
|
||||
|
||||
Response:
|
||||
{
|
||||
"success": true
|
||||
}
|
||||
"""
|
||||
manager = hass.data[DOMAIN]["manager"]
|
||||
list_id = msg.get("list_id", "groceries")
|
||||
|
||||
try:
|
||||
await manager.async_delete_product(list_id=list_id, key=msg["key"])
|
||||
connection.send_result(msg["id"], {"success": True})
|
||||
|
||||
except Exception as err:
|
||||
_LOGGER.error("Error deleting product from list '%s': %s", list_id, err)
|
||||
connection.send_error(msg["id"], "delete_product_failed", str(err))
|
||||
Reference in New Issue
Block a user