Compare commits

..

29 Commits

Author SHA1 Message Date
thekiwismarthome 433c03035b feat: Add websocket command to search for products by barcode. 2026-03-01 20:15:29 +13:00
thekiwismarthome 8eb403ed8e docs: extensively update README with detailed features, requirements, and installation instructions. 2026-02-28 22:29:19 +13:00
thekiwismarthome 03fb9a9a67 Merge pull request #5 from thekiwismarthome/v2.1.0---Themes
V2.1.0   themes
2026-02-28 22:16:05 +13:00
thekiwismarthome ae133ae59b feat: Implement comprehensive access control for shopping list and item WebSocket handlers and refine country code validation. 2026-02-28 10:34:40 +13:00
thekiwismarthome 2a8b12a07e Add compiled Python bytecode files for the shopping list manager custom component. 2026-02-28 00:23:52 +13:00
thekiwismarthome a36933c4c6 Merge pull request #4 from thekiwismarthome/v2.0.5-Country-Specific-Product-Catalogues
V2.0.5 country specific product catalogues
2026-02-27 17:57:50 +13:00
thekiwismarthome 402881c687 feat: Add barcode_type field to the Card model and its WebSocket handlers for creation and updates. 2026-02-27 17:35:49 +13:00
thekiwismarthome d412764cba fix: Change the default value of the loyalty card 'private' field to true in the websocket handler. 2026-02-27 14:49:30 +13:00
thekiwismarthome 752f9e5622 feat: add loyalty card management with dedicated storage, data model, and websocket API endpoints. 2026-02-27 13:00:08 +13:00
thekiwismarthome 86896ba4af feat: Introduce private shopping lists with owner and member management, and add Home Assistant user lookup. 2026-02-26 14:41:43 +13:00
thekiwismarthome 36a8939ebc feat: Implement backup/restore functionality for user data via new WebSocket handlers and automatic config backups. 2026-02-26 09:53:05 +13:00
thekiwismarthome 57b6d52ddf feat: Add country selection for product catalogs, including new WebSocket handlers and catalog reloading logic. 2026-02-26 06:51:26 +13:00
thekiwismarthome 03249df651 Merge pull request #3 from thekiwismarthome/v2.0.4---Lots-of-Enhancements
V2.0.4 - Bug Fixes and New Features
2026-02-25 22:40:58 +13:00
thekiwismarthome 11180db0e3 feat: rebuild product search engine immediately after adding a new product 2026-02-25 21:54:33 +13:00
thekiwismarthome 9bdaea0b1b refactor: Replace storage.get_all_products() with storage.get_products() for product retrieval. 2026-02-25 08:19:29 +13:00
thekiwismarthome ec0f44f109 style: update color codes for Health & Beauty and Baby categories. 2026-02-19 11:32:39 +13:00
thekiwismarthome 88b3f2d435 Update handlers.py 2026-02-17 20:58:37 +13:00
thekiwismarthome d93ea86d72 duplicate websocket_search_products in handlers.py 2026-02-17 20:13:17 +13:00
thekiwismarthome 5b3dcb65b4 another fix 2026-02-17 19:14:23 +13:00
thekiwismarthome 94cdede3b9 non-admin event handler 2026-02-17 18:57:40 +13:00
thekiwismarthome e6117073be Sync Bug 2026-02-17 14:53:06 +13:00
thekiwismarthome e76fad5a92 Correct Increment Handler 2026-02-16 22:17:16 +13:00
thekiwismarthome bcfde6df9a Fix the Broken Increment Handler 2026-02-16 22:00:20 +13:00
thekiwismarthome 85e0e68af9 spelling error 2026-02-16 21:39:31 +13:00
thekiwismarthome 11eb698c20 Product Items VACA Sync Bug Fix 2026-02-16 21:27:23 +13:00
thekiwismarthome 0e3fcd56f5 Update __init__.py 2026-02-16 20:14:23 +13:00
thekiwismarthome 9d8fd3f63e Update handlers.py 2026-02-16 20:09:54 +13:00
thekiwismarthome c9c1d16f08 Update handlers.py 2026-02-16 20:05:22 +13:00
thekiwismarthome 3b0cff3476 Update handlers.py 2026-02-16 20:00:58 +13:00
19 changed files with 1065 additions and 73 deletions
+82 -17
View File
@@ -1,33 +1,98 @@
## 1. Installation (HACS) # Shopping List Manager Integration for Home Assistant
### Recommended The backend integration that powers the Shopping List Manager. Provides persistent multi-list storage, a 500+ product catalog, real-time WebSocket events, and a full API for the Lovelace card — all running natively inside Home Assistant.
> **Pair with the [Shopping List Manager Card](https://github.com/thekiwismarthome/shopping-list-manager-card)** for the full UI experience.
[![Open your Home Assistant instance and open this repository inside the Home Assistant Community Store.](https://my.home-assistant.io/badges/hacs_repository.svg)](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager&category=integration) [![Open your Home Assistant instance and open this repository inside the Home Assistant Community Store.](https://my.home-assistant.io/badges/hacs_repository.svg)](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager&category=integration)
1. Click the button above. ---
2. Confirm adding the repository to HACS.
3. Install **Shopping List Manager** from **HACS → Integrations**. ## Features
4. Restart Home Assistant.
### 🛒 Multi-List Management
- Create and manage multiple shopping lists
- Private or shared lists with per-member access control
- Active list state shared across all connected devices and users
- List total price calculation
### 📦 Items
- Add, update, check, and delete items with quantity and unit
- Atomic quantity increment / decrement
- Bulk check and clear checked items
- Per-item pricing, notes, and category assignment
### 🔍 Product Catalog
- **500+ products** (NZ-focused, extensible to AU, US, GB, CA)
- Fuzzy search with alias matching
- Recently-used product suggestions
- Custom product creation
- Allergen filtering and product substitute groups
- Product images (WebP, 200×200px, optimised)
### 🗂️ Categories
- 13 default categories — Produce, Dairy, Meat, Bakery, Pantry, Frozen, Beverages, Snacks, Household, Health, Pet, Baby, Other
- Category colour coding and emoji icons
- Per-list category ordering
### 💳 Loyalty Cards
- Store loyalty and rewards card data
- Private or shared card access per user
### 🔄 Real-Time Events
- All changes fire events on the Home Assistant bus
- Custom WebSocket subscription proxy so **non-admin users** receive live updates without requiring HA admin privileges
--- ---
### Manual Repository URL ## Requirements
https://github.com/thekiwismarthome/shopping-list-manager | Component | Minimum Version |
|---|---|
Repository type: **Integration** | Home Assistant | 2024.1 |
| HACS | 2.x |
--- ---
## 2. Manual Installation (Optional) ## Installation
1. Copy the folder: ### Via HACS (Recommended)
custom_components/shopping_list_manager
2. Paste it into: [![Open your Home Assistant instance and open this repository inside the Home Assistant Community Store.](https://my.home-assistant.io/badges/hacs_repository.svg)](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager&category=integration)
/config/custom_components/
3. Restart Home Assistant. 1. Click the button above
2. Confirm adding the repository to HACS
3. Install **Shopping List Manager** from **HACS → Integrations**
4. Restart Home Assistant
5. Go to **Settings → Devices & Services → Add Integration** and search for **Shopping List Manager**
### Manual Installation
1. Copy the `custom_components/shopping_list_manager/` folder into your HA `/config/custom_components/` directory
2. Restart Home Assistant
3. Go to **Settings → Devices & Services → Add Integration** and search for **Shopping List Manager**
---
## Lovelace Card
Install the companion card to get the full shopping UI:
## 3. Shopping List Card to go with this Integration
[![Open your Home Assistant instance and open this repository inside the Home Assistant Community Store.](https://my.home-assistant.io/badges/hacs_repository.svg)](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager-card&category=plugin) [![Open your Home Assistant instance and open this repository inside the Home Assistant Community Store.](https://my.home-assistant.io/badges/hacs_repository.svg)](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager-card&category=plugin)
---
## Documentation
Full documentation is available in the [Wiki](https://github.com/thekiwismarthome/shopping-list-manager/wiki).
## Support & Feedback
- [Open an Issue](https://github.com/thekiwismarthome/shopping-list-manager/issues)
- [Home Assistant Community Forum](https://community.home-assistant.io)
---
## License
MIT — see [LICENSE](LICENSE) for details.
@@ -2,11 +2,12 @@
import logging import logging
import os import os
from pathlib import Path
from homeassistant.config_entries import ConfigEntry from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
from homeassistant.helpers.typing import ConfigType from homeassistant.helpers.typing import ConfigType
from .const import DOMAIN from .const import DOMAIN, EVENT_ITEM_ADDED, EVENT_ITEM_UPDATED, EVENT_ITEM_CHECKED, EVENT_ITEM_DELETED, EVENT_LIST_UPDATED, EVENT_LIST_DELETED
from .storage import ShoppingListStorage from .storage import ShoppingListStorage
from .utils.images import ImageHandler from .utils.images import ImageHandler
@@ -23,7 +24,6 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
return True return True
# In async_setup_entry function, after storage initialization:
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Shopping List Manager from a config entry.""" """Set up Shopping List Manager from a config entry."""
_LOGGER.info("Setting up Shopping List Manager") _LOGGER.info("Setting up Shopping List Manager")
@@ -58,7 +58,21 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
# Register frontend resources # Register frontend resources
await _async_register_frontend(hass) await _async_register_frontend(hass)
_LOGGER.info("Shopping List Manager setup complete") # CRITICAL: Register event listeners so non-admin users can subscribe
# Without these, non-admin users cannot receive real-time updates
def _dummy_listener(event):
"""Dummy listener to enable event subscription for all users."""
pass
hass.bus.async_listen(EVENT_ITEM_ADDED, _dummy_listener)
hass.bus.async_listen(EVENT_ITEM_UPDATED, _dummy_listener)
hass.bus.async_listen(EVENT_ITEM_CHECKED, _dummy_listener)
hass.bus.async_listen(EVENT_ITEM_DELETED, _dummy_listener)
hass.bus.async_listen(EVENT_LIST_UPDATED, _dummy_listener)
hass.bus.async_listen(EVENT_LIST_DELETED, _dummy_listener)
_LOGGER.info("Shopping List Manager setup complete with event subscriptions enabled")
return True return True
@@ -88,6 +102,10 @@ async def _async_register_websocket_handlers(
from .websocket import handlers from .websocket import handlers
# Lists handlers # Lists handlers
websocket_api.async_register_command(
hass,
handlers.websocket_subscribe,
)
websocket_api.async_register_command( websocket_api.async_register_command(
hass, hass,
handlers.websocket_get_lists, handlers.websocket_get_lists,
@@ -126,6 +144,11 @@ async def _async_register_websocket_handlers(
hass, hass,
handlers.websocket_check_item, handlers.websocket_check_item,
) )
websocket_api.async_register_command(
hass,
handlers.websocket_increment_item,
)
websocket_api.async_register_command( websocket_api.async_register_command(
hass, hass,
handlers.websocket_delete_item, handlers.websocket_delete_item,
@@ -148,10 +171,19 @@ async def _async_register_websocket_handlers(
) )
# Products handlers # Products handlers
websocket_api.async_register_command(
hass,
handlers.websocket_search_by_barcode,
)
websocket_api.async_register_command( websocket_api.async_register_command(
hass, hass,
handlers.websocket_search_products, handlers.websocket_search_products,
) )
websocket_api.async_register_command(
hass,
handlers.ws_get_products_by_ids,
)
websocket_api.async_register_command( websocket_api.async_register_command(
hass, hass,
handlers.websocket_get_product_suggestions, handlers.websocket_get_product_suggestions,
@@ -174,7 +206,61 @@ async def _async_register_websocket_handlers(
hass, hass,
handlers.websocket_get_categories, handlers.websocket_get_categories,
) )
# Integration settings handlers
websocket_api.async_register_command(
hass,
handlers.websocket_get_integration_settings,
)
websocket_api.async_register_command(
hass,
handlers.websocket_set_country,
)
# Backup / Restore handlers
websocket_api.async_register_command(
hass,
handlers.websocket_export_data,
)
websocket_api.async_register_command(
hass,
handlers.websocket_import_data,
)
# List members handler
websocket_api.async_register_command(
hass,
handlers.websocket_update_list_members,
)
# HA users handler
websocket_api.async_register_command(
hass,
handlers.websocket_get_ha_users,
)
# Loyalty card handlers
websocket_api.async_register_command(
hass,
handlers.websocket_get_loyalty_cards,
)
websocket_api.async_register_command(
hass,
handlers.websocket_add_loyalty_card,
)
websocket_api.async_register_command(
hass,
handlers.websocket_update_loyalty_card,
)
websocket_api.async_register_command(
hass,
handlers.websocket_delete_loyalty_card,
)
websocket_api.async_register_command(
hass,
handlers.websocket_update_loyalty_card_members,
)
_LOGGER.debug("WebSocket handlers registered") _LOGGER.debug("WebSocket handlers registered")
@@ -183,8 +269,6 @@ async def _async_register_frontend(hass: HomeAssistant) -> None:
# Since frontend is a separate HACS module, we don't need to register it here # Since frontend is a separate HACS module, we don't need to register it here
# The frontend card registers itself independently # The frontend card registers itself independently
_LOGGER.debug("Frontend resources skipped (separate HACS module)") _LOGGER.debug("Frontend resources skipped (separate HACS module)")
_LOGGER.debug("Frontend resources registered")
def get_storage(hass: HomeAssistant) -> ShoppingListStorage: def get_storage(hass: HomeAssistant) -> ShoppingListStorage:
@@ -192,4 +276,4 @@ def get_storage(hass: HomeAssistant) -> ShoppingListStorage:
Helper function for WebSocket handlers to access storage. Helper function for WebSocket handlers to access storage.
""" """
return hass.data[DOMAIN][DATA_STORAGE] return hass.data[DOMAIN][DATA_STORAGE]
@@ -9,6 +9,7 @@ STORAGE_KEY_LISTS = f"{DOMAIN}.lists"
STORAGE_KEY_ITEMS = f"{DOMAIN}.items" STORAGE_KEY_ITEMS = f"{DOMAIN}.items"
STORAGE_KEY_PRODUCTS = f"{DOMAIN}.products" STORAGE_KEY_PRODUCTS = f"{DOMAIN}.products"
STORAGE_KEY_CATEGORIES = f"{DOMAIN}.categories" STORAGE_KEY_CATEGORIES = f"{DOMAIN}.categories"
STORAGE_KEY_LOYALTY_CARDS = f"{DOMAIN}.loyalty_cards"
# WebSocket Commands - Lists # WebSocket Commands - Lists
WS_TYPE_LISTS_GET_ALL = f"{DOMAIN}/lists/get_all" WS_TYPE_LISTS_GET_ALL = f"{DOMAIN}/lists/get_all"
@@ -16,6 +17,10 @@ WS_TYPE_LISTS_CREATE = f"{DOMAIN}/lists/create"
WS_TYPE_LISTS_UPDATE = f"{DOMAIN}/lists/update" WS_TYPE_LISTS_UPDATE = f"{DOMAIN}/lists/update"
WS_TYPE_LISTS_DELETE = f"{DOMAIN}/lists/delete" WS_TYPE_LISTS_DELETE = f"{DOMAIN}/lists/delete"
WS_TYPE_LISTS_SET_ACTIVE = f"{DOMAIN}/lists/set_active" WS_TYPE_LISTS_SET_ACTIVE = f"{DOMAIN}/lists/set_active"
WS_TYPE_LISTS_UPDATE_MEMBERS = f"{DOMAIN}/lists/update_members"
# WebSocket Commands - Users
WS_TYPE_USERS_GET_ALL = f"{DOMAIN}/users/get_all"
# WebSocket Commands - Items # WebSocket Commands - Items
WS_TYPE_ITEMS_GET = f"{DOMAIN}/items/get" WS_TYPE_ITEMS_GET = f"{DOMAIN}/items/get"
@@ -39,6 +44,13 @@ WS_TYPE_PRODUCTS_DELETE = f"{DOMAIN}/products/delete"
WS_TYPE_CATEGORIES_GET_ALL = f"{DOMAIN}/categories/get_all" WS_TYPE_CATEGORIES_GET_ALL = f"{DOMAIN}/categories/get_all"
WS_TYPE_CATEGORIES_REORDER = f"{DOMAIN}/categories/reorder" WS_TYPE_CATEGORIES_REORDER = f"{DOMAIN}/categories/reorder"
# WebSocket Commands - Loyalty Cards
WS_TYPE_LOYALTY_GET_ALL = f"{DOMAIN}/loyalty/get_all"
WS_TYPE_LOYALTY_ADD = f"{DOMAIN}/loyalty/add"
WS_TYPE_LOYALTY_UPDATE = f"{DOMAIN}/loyalty/update"
WS_TYPE_LOYALTY_DELETE = f"{DOMAIN}/loyalty/delete"
WS_TYPE_LOYALTY_UPDATE_MEMBERS = f"{DOMAIN}/loyalty/update_members"
# WebSocket Commands - Subscriptions # WebSocket Commands - Subscriptions
WS_TYPE_SUBSCRIBE = f"{DOMAIN}/subscribe" WS_TYPE_SUBSCRIBE = f"{DOMAIN}/subscribe"
WS_TYPE_UNSUBSCRIBE = f"{DOMAIN}/unsubscribe" WS_TYPE_UNSUBSCRIBE = f"{DOMAIN}/unsubscribe"
@@ -78,7 +78,7 @@
"id": "health", "id": "health",
"name": "Health & Beauty", "name": "Health & Beauty",
"icon": "mdi:heart-pulse", "icon": "mdi:heart-pulse",
"color": "#E91E63", "color": "#009688",
"sort_order": 10, "sort_order": 10,
"system": true "system": true
}, },
@@ -94,7 +94,7 @@
"id": "baby", "id": "baby",
"name": "Baby", "name": "Baby",
"icon": "mdi:baby-face", "icon": "mdi:baby-face",
"color": "#FFEB3B", "color": "#F48FB1",
"sort_order": 12, "sort_order": 12,
"system": true "system": true
}, },
@@ -96,6 +96,28 @@ class Item:
self.estimated_total = self.quantity * self.price self.estimated_total = self.quantity * self.price
@dataclass
class LoyaltyCard:
"""Loyalty card model."""
id: str
name: str
number: str
barcode: str = ""
barcode_type: str = "barcode" # "barcode" or "qrcode"
logo: str = ""
notes: str = ""
color: str = "#9fa8da"
created_at: str = field(default_factory=current_timestamp)
updated_at: str = field(default_factory=current_timestamp)
# Ownership: None = visible to all users; set = private to owner + allowed_users
owner_id: Optional[str] = None
allowed_users: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)
@dataclass @dataclass
class ShoppingList: class ShoppingList:
"""Shopping list model.""" """Shopping list model."""
@@ -107,6 +129,9 @@ class ShoppingList:
item_order: List[str] = field(default_factory=list) item_order: List[str] = field(default_factory=list)
category_order: List[str] = field(default_factory=list) category_order: List[str] = field(default_factory=list)
active: bool = False active: bool = False
# Ownership: None = visible to all users; set = private to owner + allowed_users
owner_id: Optional[str] = None
allowed_users: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]: def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary.""" """Convert to dictionary."""
@@ -1,5 +1,8 @@
"""Storage management for Shopping List Manager.""" """Storage management for Shopping List Manager."""
import json
import logging import logging
import os
from datetime import datetime, timezone
from typing import Dict, List, Optional, Any from typing import Dict, List, Optional, Any
from .utils.search import ProductSearch from .utils.search import ProductSearch
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
@@ -11,9 +14,10 @@ from .const import (
STORAGE_KEY_ITEMS, STORAGE_KEY_ITEMS,
STORAGE_KEY_PRODUCTS, STORAGE_KEY_PRODUCTS,
STORAGE_KEY_CATEGORIES, STORAGE_KEY_CATEGORIES,
STORAGE_KEY_LOYALTY_CARDS,
) )
from .data.catalog_loader import load_product_catalog from .data.catalog_loader import load_product_catalog
from .models import ShoppingList, Item, Product, Category, generate_id from .models import ShoppingList, Item, Product, Category, LoyaltyCard, generate_id
from .data.category_loader import load_categories from .data.category_loader import load_categories
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -37,11 +41,13 @@ class ShoppingListStorage:
self._store_items = Store(hass, STORAGE_VERSION, STORAGE_KEY_ITEMS) self._store_items = Store(hass, STORAGE_VERSION, STORAGE_KEY_ITEMS)
self._store_products = Store(hass, STORAGE_VERSION, STORAGE_KEY_PRODUCTS) self._store_products = Store(hass, STORAGE_VERSION, STORAGE_KEY_PRODUCTS)
self._store_categories = Store(hass, STORAGE_VERSION, STORAGE_KEY_CATEGORIES) self._store_categories = Store(hass, STORAGE_VERSION, STORAGE_KEY_CATEGORIES)
self._store_loyalty_cards = Store(hass, STORAGE_VERSION, STORAGE_KEY_LOYALTY_CARDS)
self._lists: Dict[str, ShoppingList] = {} self._lists: Dict[str, ShoppingList] = {}
self._items: Dict[str, List[Item]] = {} self._items: Dict[str, List[Item]] = {}
self._products: Dict[str, Product] = {} self._products: Dict[str, Product] = {}
self._categories: List[Category] = [] self._categories: List[Category] = []
self._loyalty_cards: Dict[str, LoyaltyCard] = {}
self._search_engine: Optional[ProductSearch] = None self._search_engine: Optional[ProductSearch] = None
async def async_load(self) -> None: async def async_load(self) -> None:
@@ -141,6 +147,15 @@ class ShoppingListStorage:
await self._save_products() await self._save_products()
_LOGGER.info("Successfully imported %d products from catalog", len(self._products)) _LOGGER.info("Successfully imported %d products from catalog", len(self._products))
# Load loyalty cards
loyalty_data = await self._store_loyalty_cards.async_load()
if loyalty_data:
self._loyalty_cards = {
card_id: LoyaltyCard(**card_data)
for card_id, card_data in loyalty_data.items()
}
_LOGGER.debug("Loaded %d loyalty cards", len(self._loyalty_cards))
# Initialize search engine after products are loaded # Initialize search engine after products are loaded
if self._products: if self._products:
products_dict = {pid: p.to_dict() for pid, p in self._products.items()} products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
@@ -156,9 +171,21 @@ class ShoppingListStorage:
data = {list_id: lst.to_dict() for list_id, lst in self._lists.items()} data = {list_id: lst.to_dict() for list_id, lst in self._lists.items()}
await self._store_lists.async_save(data) await self._store_lists.async_save(data)
def get_lists(self) -> List[ShoppingList]: def get_lists(self, user_id: str = None, is_admin: bool = False) -> List[ShoppingList]:
"""Get all lists.""" """Get lists visible to the specified user.
return list(self._lists.values())
Global lists (owner_id=None) are visible to everyone.
Private lists are visible to their owner, anyone in allowed_users, and admins.
"""
all_lists = list(self._lists.values())
if is_admin or user_id is None:
return all_lists
return [
lst for lst in all_lists
if lst.owner_id is None
or lst.owner_id == user_id
or user_id in (lst.allowed_users or [])
]
def get_list(self, list_id: str) -> Optional[ShoppingList]: def get_list(self, list_id: str) -> Optional[ShoppingList]:
"""Get a specific list.""" """Get a specific list."""
@@ -171,17 +198,19 @@ class ShoppingListStorage:
return lst return lst
return None return None
async def create_list(self, name: str, icon: str = "mdi:cart") -> ShoppingList: async def create_list(self, name: str, icon: str = "mdi:cart", owner_id: str = None) -> ShoppingList:
"""Create a new list.""" """Create a new list. Pass owner_id to make the list private to that user."""
new_list = ShoppingList( new_list = ShoppingList(
id=generate_id(), id=generate_id(),
name=name, name=name,
icon=icon, icon=icon,
category_order=[cat.id for cat in self._categories] category_order=[cat.id for cat in self._categories],
owner_id=owner_id,
) )
self._lists[new_list.id] = new_list self._lists[new_list.id] = new_list
self._items[new_list.id] = [] self._items[new_list.id] = []
await self._save_lists() await self._save_lists()
await self._write_config_backup()
_LOGGER.info("Created new list: %s", name) _LOGGER.info("Created new list: %s", name)
return new_list return new_list
@@ -202,6 +231,18 @@ class ShoppingListStorage:
_LOGGER.debug("Updated list: %s", list_id) _LOGGER.debug("Updated list: %s", list_id)
return lst return lst
async def update_list_members(self, list_id: str, allowed_users: List[str]) -> Optional[ShoppingList]:
"""Update the allowed_users for a private list."""
if list_id not in self._lists:
return None
lst = self._lists[list_id]
lst.allowed_users = allowed_users
from .models import current_timestamp
lst.updated_at = current_timestamp()
await self._save_lists()
_LOGGER.debug("Updated members for list: %s", list_id)
return lst
async def delete_list(self, list_id: str) -> bool: async def delete_list(self, list_id: str) -> bool:
"""Delete a list.""" """Delete a list."""
if list_id not in self._lists: if list_id not in self._lists:
@@ -460,22 +501,167 @@ class ShoppingListStorage:
) )
self._products[new_product.id] = new_product self._products[new_product.id] = new_product
await self._save_products() await self._save_products()
await self._write_config_backup()
# Rebuild search engine so the new product is immediately searchable
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
self._search_engine = ProductSearch(products_dict)
_LOGGER.debug("Added product: %s", new_product.name) _LOGGER.debug("Added product: %s", new_product.name)
return new_product return new_product
async def reload_catalog(self, country_code: str) -> int:
"""Replace catalog-sourced products with those from a new country's catalog.
Products with source='user' are preserved."""
catalog_ids = [
pid for pid, p in self._products.items()
if getattr(p, 'source', 'user') == 'catalog'
]
for pid in catalog_ids:
del self._products[pid]
self._country = country_code
catalog_products = await load_product_catalog(self._component_path, country_code)
count = 0
for prod_data in catalog_products:
try:
product = Product(
id=prod_data.get("id", generate_id()),
name=prod_data["name"],
category_id=prod_data.get("category_id", "other"),
aliases=prod_data.get("aliases", []),
default_unit=prod_data.get("default_unit", "units"),
default_quantity=prod_data.get("default_quantity", 1),
price=prod_data.get("price") or prod_data.get("typical_price"),
currency=self.hass.config.currency,
barcode=prod_data.get("barcode"),
brands=prod_data.get("brands", []),
image_url=prod_data.get("image_url", ""),
custom=False,
source="catalog",
tags=prod_data.get("tags", []),
collections=prod_data.get("collections", []),
taxonomy=prod_data.get("taxonomy", {}),
allergens=prod_data.get("allergens", []),
substitution_group=prod_data.get("substitution_group", ""),
priority_level=prod_data.get("priority_level", 0),
image_hint=prod_data.get("image_hint", "")
)
self._products[product.id] = product
count += 1
except Exception as err:
_LOGGER.error("Failed to import product %s: %s", prod_data.get("name"), err)
await self._save_products()
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
self._search_engine = ProductSearch(products_dict)
_LOGGER.info("Reloaded catalog for %s: %d products imported", country_code, count)
return count
async def update_product(self, product_id: str, **kwargs) -> Optional[Product]: async def update_product(self, product_id: str, **kwargs) -> Optional[Product]:
"""Update a product.""" """Update a product."""
if product_id not in self._products: if product_id not in self._products:
return None return None
product = self._products[product_id] product = self._products[product_id]
for key, value in kwargs.items(): for key, value in kwargs.items():
if hasattr(product, key): if hasattr(product, key):
setattr(product, key, value) setattr(product, key, value)
await self._save_products() await self._save_products()
await self._write_config_backup()
_LOGGER.debug("Updated product: %s", product_id) _LOGGER.debug("Updated product: %s", product_id)
return product return product
# ---------------------------------------------------------------------------
# Backup / Restore
# ---------------------------------------------------------------------------
async def export_user_data(self) -> dict:
"""Return a serialisable snapshot of all user-created data."""
user_products = [
p.to_dict() for p in self._products.values()
if getattr(p, "source", "user") == "user"
]
lists = [lst.to_dict() for lst in self._lists.values()]
items = {
list_id: [item.to_dict() for item in items_list]
for list_id, items_list in self._items.items()
}
return {
"slm_backup_version": "1.0",
"exported_at": datetime.now(timezone.utc).isoformat(),
"country": self._country,
"user_products": user_products,
"lists": lists,
"items": items,
}
async def import_user_data(self, data: dict) -> dict:
"""Merge a backup into live storage. Skips anything already present by ID."""
imported_products = 0
imported_lists = 0
imported_items = 0
for prod_data in data.get("user_products", []):
prod_id = prod_data.get("id")
if prod_id and prod_id not in self._products:
try:
self._products[prod_id] = Product(**prod_data)
imported_products += 1
except Exception as err:
_LOGGER.warning("Skipped product during import: %s", err)
if imported_products:
await self._save_products()
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
self._search_engine = ProductSearch(products_dict)
for list_data in data.get("lists", []):
list_id = list_data.get("id")
if list_id and list_id not in self._lists:
try:
lst = ShoppingList(**list_data)
lst.active = False
self._lists[list_id] = lst
imported_lists += 1
except Exception as err:
_LOGGER.warning("Skipped list during import: %s", err)
backup_items = data.get("items", {})
for list_id, items_list in backup_items.items():
if list_id in self._lists and list_id not in self._items:
try:
self._items[list_id] = [Item(**d) for d in items_list]
imported_items += len(self._items[list_id])
except Exception as err:
_LOGGER.warning("Skipped items for list %s: %s", list_id, err)
if imported_lists or imported_items:
await self._save_lists()
await self._save_items()
_LOGGER.info(
"Import complete: %d products, %d lists, %d items",
imported_products, imported_lists, imported_items,
)
return {"products": imported_products, "lists": imported_lists, "items": imported_items}
async def _write_config_backup(self) -> None:
"""Silently write a backup JSON to the HA config directory."""
try:
backup_path = os.path.join(
self.hass.config.config_dir,
"shopping_list_manager_backup.json",
)
data = await self.export_user_data()
def _write() -> None:
with open(backup_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
await self.hass.async_add_executor_job(_write)
_LOGGER.debug("Auto-backup written to %s", backup_path)
except Exception as err:
_LOGGER.warning("Failed to write config backup: %s", err)
# Categories methods # Categories methods
async def _save_categories(self) -> None: async def _save_categories(self) -> None:
@@ -486,3 +672,81 @@ class ShoppingListStorage:
def get_categories(self) -> List[Category]: def get_categories(self) -> List[Category]:
"""Get all categories.""" """Get all categories."""
return self._categories return self._categories
# Loyalty card methods
async def _save_loyalty_cards(self) -> None:
"""Save loyalty cards to storage."""
data = {card_id: card.to_dict() for card_id, card in self._loyalty_cards.items()}
await self._store_loyalty_cards.async_save(data)
def get_loyalty_cards(self, user_id: str = None, is_admin: bool = False) -> List[LoyaltyCard]:
"""Get loyalty cards visible to the specified user.
Global cards (owner_id=None) are visible to everyone.
Private cards are visible to their owner, anyone in allowed_users, and admins.
"""
all_cards = list(self._loyalty_cards.values())
if is_admin or user_id is None:
return all_cards
return [
card for card in all_cards
if card.owner_id is None
or card.owner_id == user_id
or user_id in (card.allowed_users or [])
]
def get_loyalty_card(self, card_id: str) -> Optional[LoyaltyCard]:
"""Get a specific loyalty card."""
return self._loyalty_cards.get(card_id)
async def create_loyalty_card(self, owner_id: str = None, **kwargs) -> LoyaltyCard:
"""Create a new loyalty card."""
from .models import current_timestamp
new_card = LoyaltyCard(
id=generate_id(),
owner_id=owner_id,
**kwargs
)
self._loyalty_cards[new_card.id] = new_card
await self._save_loyalty_cards()
_LOGGER.debug("Created loyalty card: %s", new_card.name)
return new_card
async def update_loyalty_card(self, card_id: str, **kwargs) -> Optional[LoyaltyCard]:
"""Update a loyalty card."""
if card_id not in self._loyalty_cards:
return None
card = self._loyalty_cards[card_id]
for key, value in kwargs.items():
if hasattr(card, key):
setattr(card, key, value)
from .models import current_timestamp
card.updated_at = current_timestamp()
await self._save_loyalty_cards()
_LOGGER.debug("Updated loyalty card: %s", card_id)
return card
async def delete_loyalty_card(self, card_id: str) -> bool:
"""Delete a loyalty card."""
if card_id not in self._loyalty_cards:
return False
del self._loyalty_cards[card_id]
await self._save_loyalty_cards()
_LOGGER.debug("Deleted loyalty card: %s", card_id)
return True
async def update_loyalty_card_members(self, card_id: str, allowed_users: List[str]) -> Optional[LoyaltyCard]:
"""Update the allowed_users for a private loyalty card."""
if card_id not in self._loyalty_cards:
return None
card = self._loyalty_cards[card_id]
card.allowed_users = allowed_users
from .models import current_timestamp
card.updated_at = current_timestamp()
await self._save_loyalty_cards()
_LOGGER.debug("Updated members for loyalty card: %s", card_id)
return card
@@ -6,6 +6,7 @@ import voluptuous as vol
from homeassistant.components import websocket_api from homeassistant.components import websocket_api
from homeassistant.core import HomeAssistant, callback from homeassistant.core import HomeAssistant, callback
from ..const import DOMAIN
from ..const import ( from ..const import (
WS_TYPE_LISTS_GET_ALL, WS_TYPE_LISTS_GET_ALL,
@@ -13,6 +14,8 @@ from ..const import (
WS_TYPE_LISTS_UPDATE, WS_TYPE_LISTS_UPDATE,
WS_TYPE_LISTS_DELETE, WS_TYPE_LISTS_DELETE,
WS_TYPE_LISTS_SET_ACTIVE, WS_TYPE_LISTS_SET_ACTIVE,
WS_TYPE_LISTS_UPDATE_MEMBERS,
WS_TYPE_USERS_GET_ALL,
WS_TYPE_ITEMS_GET, WS_TYPE_ITEMS_GET,
WS_TYPE_ITEMS_ADD, WS_TYPE_ITEMS_ADD,
WS_TYPE_ITEMS_UPDATE, WS_TYPE_ITEMS_UPDATE,
@@ -27,6 +30,12 @@ from ..const import (
WS_TYPE_PRODUCTS_ADD, WS_TYPE_PRODUCTS_ADD,
WS_TYPE_PRODUCTS_UPDATE, WS_TYPE_PRODUCTS_UPDATE,
WS_TYPE_CATEGORIES_GET_ALL, WS_TYPE_CATEGORIES_GET_ALL,
WS_TYPE_LOYALTY_GET_ALL,
WS_TYPE_LOYALTY_ADD,
WS_TYPE_LOYALTY_UPDATE,
WS_TYPE_LOYALTY_DELETE,
WS_TYPE_LOYALTY_UPDATE_MEMBERS,
WS_TYPE_SUBSCRIBE,
EVENT_ITEM_ADDED, EVENT_ITEM_ADDED,
EVENT_ITEM_UPDATED, EVENT_ITEM_UPDATED,
EVENT_ITEM_CHECKED, EVENT_ITEM_CHECKED,
@@ -39,10 +48,191 @@ from .. import get_storage
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
# =============================================================================
# ACCESS-CHECK HELPERS
# =============================================================================
def _user_can_access_list(lst, user) -> bool:
"""Return True if the user may read or write to this list.
Global lists (owner_id=None) are accessible to everyone.
Private lists are accessible to their owner, anyone in allowed_users, and admins.
"""
if lst.owner_id is None:
return True
if user is None:
return False
if user.is_admin or user.id == lst.owner_id:
return True
return user.id in (lst.allowed_users or [])
def _check_list_access(storage, connection, msg, list_id, require_owner=False):
"""Verify the connected user may access list_id.
Sends the appropriate WebSocket error if access is denied.
Returns the ShoppingList object on success, or None if an error was sent.
Args:
require_owner: When True, only the list owner (or an admin) is allowed.
Use for destructive/administrative operations.
"""
lst = storage.get_list(list_id)
if lst is None:
connection.send_error(msg["id"], "not_found", "List not found")
return None
user = connection.user
if require_owner:
if lst.owner_id is not None and not (user and (user.is_admin or user.id == lst.owner_id)):
connection.send_error(msg["id"], "forbidden", "Only the list owner can perform this action")
return None
else:
if not _user_can_access_list(lst, user):
connection.send_error(msg["id"], "forbidden", "You do not have access to this list")
return None
return lst
def _find_item_list_id(storage, item_id):
"""Return the list_id that contains item_id, or None if not found."""
for list_id, items in storage._items.items():
for item in items:
if item.id == item_id:
return list_id
return None
# ============================================================================= # =============================================================================
# LIST HANDLERS # LIST HANDLERS
# ============================================================================= # =============================================================================
@websocket_api.websocket_command({
vol.Required("type"): WS_TYPE_SUBSCRIBE,
})
@websocket_api.async_response
async def websocket_subscribe(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict,
) -> None:
"""Subscribe to shopping list manager events via WebSocket."""
storage = get_storage(hass)
@callback
def forward_event(event):
"""Forward HA bus event to WebSocket connection.
Events that reference a list_id are only forwarded if the connected
user has access to that list, preventing cross-user data leakage.
"""
data = event.data
list_id = data.get("list_id")
if list_id:
lst = storage.get_list(list_id)
if lst and not _user_can_access_list(lst, connection.user):
return # skip — user cannot see this list
connection.send_message(
websocket_api.event_message(
msg["id"],
{
"event_type": event.event_type,
"data": data,
}
)
)
# Subscribe to all SLM events on the HA bus (backend has permission)
unsubs = []
unsubs.append(hass.bus.async_listen(EVENT_ITEM_ADDED, forward_event))
unsubs.append(hass.bus.async_listen(EVENT_ITEM_UPDATED, forward_event))
unsubs.append(hass.bus.async_listen(EVENT_ITEM_CHECKED, forward_event))
unsubs.append(hass.bus.async_listen(EVENT_ITEM_DELETED, forward_event))
unsubs.append(hass.bus.async_listen(EVENT_LIST_UPDATED, forward_event))
unsubs.append(hass.bus.async_listen(EVENT_LIST_DELETED, forward_event))
# Clean up when connection closes
connection.subscriptions[msg["id"]] = lambda: [unsub() for unsub in unsubs]
connection.send_message(websocket_api.result_message(msg["id"]))
@websocket_api.websocket_command({
vol.Required("type"): "shopping_list_manager/items/increment",
vol.Required("item_id"): str,
vol.Required("amount"): vol.Coerce(float),
})
@websocket_api.async_response
async def websocket_increment_item(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Increment item quantity atomically."""
storage = get_storage(hass)
item_id = msg["item_id"]
amount = msg["amount"]
# First get current item
for list_id, items in storage._items.items():
for item in items:
if item.id == item_id:
new_quantity = item.quantity + amount
if new_quantity < 1:
new_quantity = 1
updated_item = await storage.update_item(
item_id,
quantity=new_quantity
)
if updated_item:
hass.bus.async_fire(
EVENT_ITEM_UPDATED,
{
"list_id": updated_item.list_id,
"item_id": item_id,
"item": updated_item.to_dict()
}
)
connection.send_result(msg["id"], {
"item": updated_item.to_dict()
})
return
connection.send_error(msg["id"], "not_found", "Item not found")
@websocket_api.websocket_command({
vol.Required("type"): "shopping_list_manager/products/get_by_ids",
vol.Required("product_ids"): [str],
})
@websocket_api.async_response
async def ws_get_products_by_ids(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Return products matching given product IDs."""
storage = get_storage(hass)
product_ids = set(msg["product_ids"])
# Get all products from storage
all_products = storage.get_products()
products = [
product.to_dict()
for product in all_products
if product.id in product_ids
]
connection.send_result(msg["id"], {"products": products})
@websocket_api.websocket_command( @websocket_api.websocket_command(
{ {
vol.Required("type"): WS_TYPE_LISTS_GET_ALL, vol.Required("type"): WS_TYPE_LISTS_GET_ALL,
@@ -56,8 +246,11 @@ def websocket_get_lists(
) -> None: ) -> None:
"""Handle get all lists command.""" """Handle get all lists command."""
storage = get_storage(hass) storage = get_storage(hass)
lists = storage.get_lists() user = connection.user
user_id = user.id if user else None
is_admin = user.is_admin if user else False
lists = storage.get_lists(user_id=user_id, is_admin=is_admin)
connection.send_result( connection.send_result(
msg["id"], msg["id"],
{ {
@@ -71,6 +264,7 @@ def websocket_get_lists(
vol.Required("type"): WS_TYPE_LISTS_CREATE, vol.Required("type"): WS_TYPE_LISTS_CREATE,
vol.Required("name"): str, vol.Required("name"): str,
vol.Optional("icon", default="mdi:cart"): str, vol.Optional("icon", default="mdi:cart"): str,
vol.Optional("private", default=True): bool,
} }
) )
@websocket_api.async_response @websocket_api.async_response
@@ -81,10 +275,15 @@ async def websocket_create_list(
) -> None: ) -> None:
"""Handle create list command.""" """Handle create list command."""
storage = get_storage(hass) storage = get_storage(hass)
# Private lists are owned by the creating user; global lists have no owner.
is_private = msg.get("private", True)
owner_id = connection.user.id if is_private and connection.user else None
new_list = await storage.create_list( new_list = await storage.create_list(
name=msg["name"], name=msg["name"],
icon=msg.get("icon", "mdi:cart") icon=msg.get("icon", "mdi:cart"),
owner_id=owner_id,
) )
# Fire event # Fire event
@@ -117,7 +316,10 @@ async def websocket_update_list(
"""Handle update list command.""" """Handle update list command."""
storage = get_storage(hass) storage = get_storage(hass)
list_id = msg["list_id"] list_id = msg["list_id"]
if _check_list_access(storage, connection, msg, list_id, require_owner=True) is None:
return
# Build update kwargs # Build update kwargs
update_data = {} update_data = {}
if "name" in msg: if "name" in msg:
@@ -160,9 +362,21 @@ async def websocket_delete_list(
"""Handle delete list command.""" """Handle delete list command."""
storage = get_storage(hass) storage = get_storage(hass)
list_id = msg["list_id"] list_id = msg["list_id"]
lst = storage.get_list(list_id)
if lst is None:
connection.send_error(msg["id"], "not_found", "List not found")
return
# Only the owner or an admin may delete a private list
if lst.owner_id is not None:
user = connection.user
if not (user and (user.is_admin or user.id == lst.owner_id)):
connection.send_error(msg["id"], "forbidden", "Only the list owner can delete this list")
return
success = await storage.delete_list(list_id) success = await storage.delete_list(list_id)
if not success: if not success:
connection.send_error(msg["id"], "not_found", "List not found") connection.send_error(msg["id"], "not_found", "List not found")
return return
@@ -191,7 +405,10 @@ async def websocket_set_active_list(
"""Handle set active list command.""" """Handle set active list command."""
storage = get_storage(hass) storage = get_storage(hass)
list_id = msg["list_id"] list_id = msg["list_id"]
if _check_list_access(storage, connection, msg, list_id) is None:
return
success = await storage.set_active_list(list_id) success = await storage.set_active_list(list_id)
if not success: if not success:
@@ -226,7 +443,10 @@ def websocket_get_items(
"""Handle get items command.""" """Handle get items command."""
storage = get_storage(hass) storage = get_storage(hass)
list_id = msg["list_id"] list_id = msg["list_id"]
if _check_list_access(storage, connection, msg, list_id) is None:
return
items = storage.get_items(list_id) items = storage.get_items(list_id)
connection.send_result( connection.send_result(
@@ -261,7 +481,10 @@ async def websocket_add_item(
"""Handle add item command.""" """Handle add item command."""
storage = get_storage(hass) storage = get_storage(hass)
list_id = msg["list_id"] list_id = msg["list_id"]
if _check_list_access(storage, connection, msg, list_id) is None:
return
# Build item data # Build item data
item_data = { item_data = {
"name": msg["name"], "name": msg["name"],
@@ -320,7 +543,14 @@ async def websocket_update_item(
"""Handle update item command.""" """Handle update item command."""
storage = get_storage(hass) storage = get_storage(hass)
item_id = msg["item_id"] item_id = msg["item_id"]
list_id = _find_item_list_id(storage, item_id)
if list_id is None:
connection.send_error(msg["id"], "not_found", "Item not found")
return
if _check_list_access(storage, connection, msg, list_id) is None:
return
# Build update data # Build update data
update_data = {} update_data = {}
update_fields = ["name", "quantity", "unit", "note", "price", "category_id", "image_url"] update_fields = ["name", "quantity", "unit", "note", "price", "category_id", "image_url"]
@@ -405,7 +635,14 @@ async def websocket_delete_item(
"""Handle delete item command.""" """Handle delete item command."""
storage = get_storage(hass) storage = get_storage(hass)
item_id = msg["item_id"] item_id = msg["item_id"]
list_id = _find_item_list_id(storage, item_id)
if list_id is None:
connection.send_error(msg["id"], "not_found", "Item not found")
return
if _check_list_access(storage, connection, msg, list_id) is None:
return
success = await storage.delete_item(item_id) success = await storage.delete_item(item_id)
if not success: if not success:
@@ -547,39 +784,24 @@ def websocket_get_list_total(
@websocket_api.websocket_command( @websocket_api.websocket_command(
{ {
vol.Required("type"): WS_TYPE_PRODUCTS_SEARCH, vol.Required("type"): "shopping_list_manager/products/search_by_barcode",
vol.Required("query"): str, vol.Required("barcode"): str,
vol.Optional("limit", default=10): int,
vol.Optional("exclude_allergens", default=None): vol.Any(None, [str]),
vol.Optional("include_tags", default=None): vol.Any(None, [str]),
vol.Optional("substitution_group", default=None): vol.Any(None, str),
} }
) )
@callback @callback
def websocket_search_products( def websocket_search_by_barcode(
hass: HomeAssistant, hass: HomeAssistant,
connection: websocket_api.ActiveConnection, connection: websocket_api.ActiveConnection,
msg: Dict[str, Any], msg: Dict[str, Any],
) -> None: ) -> None:
"""Handle search products command with enhanced filters.""" """Find a single product by exact barcode match."""
storage = get_storage(hass) storage = get_storage(hass)
barcode = msg["barcode"].strip()
try: match = next(
results = storage.search_products( (p for p in storage._products.values() if p.barcode and p.barcode == barcode),
query=msg["query"], None,
limit=msg.get("limit", 10), )
exclude_allergens=msg.get("exclude_allergens"), connection.send_result(msg["id"], {"product": match.to_dict() if match else None})
include_tags=msg.get("include_tags"),
substitution_group=msg.get("substitution_group"),
)
connection.send_result(
msg["id"],
{"products": [product.to_dict() for product in results]}
)
except Exception as err:
_LOGGER.error("Error searching products: %s", err)
connection.send_error(msg["id"], "search_failed", str(err))
@websocket_api.websocket_command( @websocket_api.websocket_command(
@@ -791,3 +1013,323 @@ def websocket_get_categories(
"categories": [cat.to_dict() for cat in categories] "categories": [cat.to_dict() for cat in categories]
} }
) )
# =============================================================================
# INTEGRATION SETTINGS HANDLERS
# =============================================================================
@websocket_api.websocket_command(
{
vol.Required("type"): "shopping_list_manager/get_integration_settings",
}
)
@callback
def websocket_get_integration_settings(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Return current country and available country options."""
country = hass.data[DOMAIN].get("country", "NZ")
connection.send_result(
msg["id"],
{
"country": country,
"available_countries": {
"NZ": "New Zealand",
"AU": "Australia",
"US": "United States",
"GB": "United Kingdom",
"CA": "Canada",
},
}
)
_VALID_COUNTRIES = ["NZ", "AU", "US", "GB", "CA"]
@websocket_api.websocket_command(
{
vol.Required("type"): "shopping_list_manager/set_country",
vol.Required("country"): vol.In(_VALID_COUNTRIES),
}
)
@websocket_api.async_response
async def websocket_set_country(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Switch to a different country catalog. Preserves user-added products."""
country = msg["country"].upper()
storage = get_storage(hass)
count = await storage.reload_catalog(country)
# Persist to HA config entry so country survives restart
entries = hass.config_entries.async_entries(DOMAIN)
if entries:
entry = entries[0]
hass.config_entries.async_update_entry(entry, options={**entry.options, "country": country})
hass.data[DOMAIN]["country"] = country
connection.send_result(
msg["id"],
{"success": True, "country": country, "products_loaded": count}
)
# =============================================================================
# BACKUP / RESTORE HANDLERS
# =============================================================================
@websocket_api.websocket_command(
{
vol.Required("type"): "shopping_list_manager/export_data",
}
)
@websocket_api.async_response
async def websocket_export_data(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Export all user-created data as a JSON-serialisable dict."""
storage = get_storage(hass)
data = await storage.export_user_data()
connection.send_result(msg["id"], data)
@websocket_api.websocket_command(
{
vol.Required("type"): "shopping_list_manager/import_data",
vol.Required("data"): dict,
}
)
@websocket_api.async_response
async def websocket_import_data(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Import user data from a backup payload."""
storage = get_storage(hass)
counts = await storage.import_user_data(msg["data"])
connection.send_result(msg["id"], {"success": True, "imported": counts})
# =============================================================================
# LIST MEMBERS HANDLER
# =============================================================================
@websocket_api.websocket_command(
{
vol.Required("type"): WS_TYPE_LISTS_UPDATE_MEMBERS,
vol.Required("list_id"): str,
vol.Required("allowed_users"): [str],
}
)
@websocket_api.async_response
async def websocket_update_list_members(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Update the allowed_users for a private list."""
storage = get_storage(hass)
list_id = msg["list_id"]
lst = storage.get_list(list_id)
if lst is None:
connection.send_error(msg["id"], "not_found", "List not found")
return
# Only the owner or an admin may manage members
user = connection.user
if lst.owner_id is not None and not (user and (user.is_admin or user.id == lst.owner_id)):
connection.send_error(msg["id"], "forbidden", "Only the list owner can manage members")
return
updated = await storage.update_list_members(list_id, msg["allowed_users"])
hass.bus.async_fire(
EVENT_LIST_UPDATED,
{"list_id": list_id, "action": "members_updated"}
)
connection.send_result(msg["id"], {"list": updated.to_dict()})
# =============================================================================
# HA USERS HANDLER
# =============================================================================
@websocket_api.websocket_command(
{
vol.Required("type"): WS_TYPE_USERS_GET_ALL,
}
)
@websocket_api.async_response
async def websocket_get_ha_users(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Return all active, non-system HA users."""
users = await hass.auth.async_get_users()
result = [
{"id": u.id, "name": u.name}
for u in users
if not u.system_generated and u.is_active
]
connection.send_result(msg["id"], {"users": result})
# =============================================================================
# LOYALTY CARD HANDLERS
# =============================================================================
@websocket_api.websocket_command({
vol.Required("type"): WS_TYPE_LOYALTY_GET_ALL,
})
@websocket_api.async_response
async def websocket_get_loyalty_cards(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Return all loyalty cards visible to the current user."""
storage = get_storage(hass)
user = connection.user
user_id = user.id if user else None
is_admin = user.is_admin if user else False
cards = storage.get_loyalty_cards(user_id=user_id, is_admin=is_admin)
connection.send_result(msg["id"], {"cards": [c.to_dict() for c in cards]})
@websocket_api.websocket_command({
vol.Required("type"): WS_TYPE_LOYALTY_ADD,
vol.Required("name"): str,
vol.Required("number"): str,
vol.Optional("barcode", default=""): str,
vol.Optional("barcode_type", default="barcode"): str,
vol.Optional("logo", default=""): str,
vol.Optional("notes", default=""): str,
vol.Optional("color", default="#9fa8da"): str,
vol.Optional("private", default=True): bool,
})
@websocket_api.async_response
async def websocket_add_loyalty_card(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Add a new loyalty card."""
storage = get_storage(hass)
user = connection.user
owner_id = user.id if (user and msg.get("private")) else None
card = await storage.create_loyalty_card(
owner_id=owner_id,
name=msg["name"],
number=msg["number"],
barcode=msg.get("barcode", ""),
barcode_type=msg.get("barcode_type", "barcode"),
logo=msg.get("logo", ""),
notes=msg.get("notes", ""),
color=msg.get("color", "#9fa8da"),
)
connection.send_result(msg["id"], {"card": card.to_dict()})
@websocket_api.websocket_command({
vol.Required("type"): WS_TYPE_LOYALTY_UPDATE,
vol.Required("card_id"): str,
vol.Optional("name"): str,
vol.Optional("number"): str,
vol.Optional("barcode"): str,
vol.Optional("barcode_type"): str,
vol.Optional("logo"): str,
vol.Optional("notes"): str,
vol.Optional("color"): str,
})
@websocket_api.async_response
async def websocket_update_loyalty_card(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Update an existing loyalty card."""
storage = get_storage(hass)
card_id = msg["card_id"]
card = storage.get_loyalty_card(card_id)
if card is None:
connection.send_error(msg["id"], "not_found", "Loyalty card not found")
return
user = connection.user
if card.owner_id is not None and not (user and (user.is_admin or user.id == card.owner_id)):
connection.send_error(msg["id"], "forbidden", "Only the card owner can update it")
return
fields = {k: v for k, v in msg.items() if k not in ("type", "id", "card_id")}
updated = await storage.update_loyalty_card(card_id, **fields)
connection.send_result(msg["id"], {"card": updated.to_dict()})
@websocket_api.websocket_command({
vol.Required("type"): WS_TYPE_LOYALTY_DELETE,
vol.Required("card_id"): str,
})
@websocket_api.async_response
async def websocket_delete_loyalty_card(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Delete a loyalty card."""
storage = get_storage(hass)
card_id = msg["card_id"]
card = storage.get_loyalty_card(card_id)
if card is None:
connection.send_error(msg["id"], "not_found", "Loyalty card not found")
return
user = connection.user
if card.owner_id is not None and not (user and (user.is_admin or user.id == card.owner_id)):
connection.send_error(msg["id"], "forbidden", "Only the card owner can delete it")
return
await storage.delete_loyalty_card(card_id)
connection.send_result(msg["id"], {"success": True})
@websocket_api.websocket_command({
vol.Required("type"): WS_TYPE_LOYALTY_UPDATE_MEMBERS,
vol.Required("card_id"): str,
vol.Required("allowed_users"): [str],
})
@websocket_api.async_response
async def websocket_update_loyalty_card_members(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Update the allowed_users for a private loyalty card."""
storage = get_storage(hass)
card_id = msg["card_id"]
card = storage.get_loyalty_card(card_id)
if card is None:
connection.send_error(msg["id"], "not_found", "Loyalty card not found")
return
user = connection.user
if card.owner_id is not None and not (user and (user.is_admin or user.id == card.owner_id)):
connection.send_error(msg["id"], "forbidden", "Only the card owner can manage members")
return
updated = await storage.update_loyalty_card_members(card_id, msg["allowed_users"])
connection.send_result(msg["id"], {"card": updated.to_dict()})