Compare commits

...

22 Commits

Author SHA1 Message Date
thekiwismarthome d50bd39210 feat: expose installed version from manifest via get_integration_settings WS 2026-05-19 22:49:56 +12:00
thekiwismarthome 8652996b65 fix: add translations/en.json to resolve 500 on options flow 2026-05-18 20:29:48 +12:00
thekiwismarthome d5c43fe3b5 Merge pull request #12 from thekiwismarthome/v2.5.0---Improvements-and-Tweaks
V2.5.0   improvements and tweaks
2026-05-11 14:39:37 +12:00
thekiwismarthome 380bba0408 fix: proxy OpenFoodFacts API through HA backend to fix browser CORS errors; implement products/delete handler 2026-03-31 19:57:01 +13:00
thekiwismarthome cf3ab90d74 fix: implement products/delete websocket handler — deleted products were silently ignored 2026-03-25 08:56:25 +13:00
thekiwismarthome 4b7043d075 feat: product match review step when adding ingredients to SLM shopping list 2026-03-25 08:34:50 +13:00
thekiwismarthome e7306275e4 refactor: standardize image storage location and migrate existing files and URLs. 2026-03-03 22:59:44 +13:00
thekiwismarthome 9430811cda Merge pull request #6 from thekiwismarthome/2.3.0---Product-Barcode-Scanner-w/-OpenFoodFacts-Lookup
2.3.0   product barcode scanner w/ open food facts lookup
2026-03-03 17:52:21 +13:00
thekiwismarthome 2b11632253 feat: Improve image download robustness with a User-Agent header and extended timeout, and enhance WebP conversion by ensuring images are in RGB mode. 2026-03-01 22:24:14 +13:00
thekiwismarthome ae7717a8eb feat: Add WebSocket endpoint to download, process, and save product images locally as WebP. 2026-03-01 21:20:09 +13:00
thekiwismarthome 433c03035b feat: Add websocket command to search for products by barcode. 2026-03-01 20:15:29 +13:00
thekiwismarthome 8eb403ed8e docs: extensively update README with detailed features, requirements, and installation instructions. 2026-02-28 22:29:19 +13:00
thekiwismarthome 03fb9a9a67 Merge pull request #5 from thekiwismarthome/v2.1.0---Themes
V2.1.0   themes
2026-02-28 22:16:05 +13:00
thekiwismarthome ae133ae59b feat: Implement comprehensive access control for shopping list and item WebSocket handlers and refine country code validation. 2026-02-28 10:34:40 +13:00
thekiwismarthome 2a8b12a07e Add compiled Python bytecode files for the shopping list manager custom component. 2026-02-28 00:23:52 +13:00
thekiwismarthome a36933c4c6 Merge pull request #4 from thekiwismarthome/v2.0.5-Country-Specific-Product-Catalogues
V2.0.5 country specific product catalogues
2026-02-27 17:57:50 +13:00
thekiwismarthome 402881c687 feat: Add barcode_type field to the Card model and its WebSocket handlers for creation and updates. 2026-02-27 17:35:49 +13:00
thekiwismarthome d412764cba fix: Change the default value of the loyalty card 'private' field to true in the websocket handler. 2026-02-27 14:49:30 +13:00
thekiwismarthome 752f9e5622 feat: add loyalty card management with dedicated storage, data model, and websocket API endpoints. 2026-02-27 13:00:08 +13:00
thekiwismarthome 86896ba4af feat: Introduce private shopping lists with owner and member management, and add Home Assistant user lookup. 2026-02-26 14:41:43 +13:00
thekiwismarthome 36a8939ebc feat: Implement backup/restore functionality for user data via new WebSocket handlers and automatic config backups. 2026-02-26 09:53:05 +13:00
thekiwismarthome 57b6d52ddf feat: Add country selection for product catalogs, including new WebSocket handlers and catalog reloading logic. 2026-02-26 06:51:26 +13:00
20 changed files with 1244 additions and 54 deletions
+82 -17
View File
@@ -1,33 +1,98 @@
## 1. Installation (HACS) # Shopping List Manager Integration for Home Assistant
### Recommended The backend integration that powers the Shopping List Manager. Provides persistent multi-list storage, a 500+ product catalog, real-time WebSocket events, and a full API for the Lovelace card — all running natively inside Home Assistant.
> **Pair with the [Shopping List Manager Card](https://github.com/thekiwismarthome/shopping-list-manager-card)** for the full UI experience.
[![Open your Home Assistant instance and open this repository inside the Home Assistant Community Store.](https://my.home-assistant.io/badges/hacs_repository.svg)](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager&category=integration) [![Open your Home Assistant instance and open this repository inside the Home Assistant Community Store.](https://my.home-assistant.io/badges/hacs_repository.svg)](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager&category=integration)
1. Click the button above. ---
2. Confirm adding the repository to HACS.
3. Install **Shopping List Manager** from **HACS → Integrations**. ## Features
4. Restart Home Assistant.
### 🛒 Multi-List Management
- Create and manage multiple shopping lists
- Private or shared lists with per-member access control
- Active list state shared across all connected devices and users
- List total price calculation
### 📦 Items
- Add, update, check, and delete items with quantity and unit
- Atomic quantity increment / decrement
- Bulk check and clear checked items
- Per-item pricing, notes, and category assignment
### 🔍 Product Catalog
- **500+ products** (NZ-focused, extensible to AU, US, GB, CA)
- Fuzzy search with alias matching
- Recently-used product suggestions
- Custom product creation
- Allergen filtering and product substitute groups
- Product images (WebP, 200×200px, optimised)
### 🗂️ Categories
- 13 default categories — Produce, Dairy, Meat, Bakery, Pantry, Frozen, Beverages, Snacks, Household, Health, Pet, Baby, Other
- Category colour coding and emoji icons
- Per-list category ordering
### 💳 Loyalty Cards
- Store loyalty and rewards card data
- Private or shared card access per user
### 🔄 Real-Time Events
- All changes fire events on the Home Assistant bus
- Custom WebSocket subscription proxy so **non-admin users** receive live updates without requiring HA admin privileges
--- ---
### Manual Repository URL ## Requirements
https://github.com/thekiwismarthome/shopping-list-manager | Component | Minimum Version |
|---|---|
Repository type: **Integration** | Home Assistant | 2024.1 |
| HACS | 2.x |
--- ---
## 2. Manual Installation (Optional) ## Installation
1. Copy the folder: ### Via HACS (Recommended)
custom_components/shopping_list_manager
2. Paste it into: [![Open your Home Assistant instance and open this repository inside the Home Assistant Community Store.](https://my.home-assistant.io/badges/hacs_repository.svg)](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager&category=integration)
/config/custom_components/
3. Restart Home Assistant. 1. Click the button above
2. Confirm adding the repository to HACS
3. Install **Shopping List Manager** from **HACS → Integrations**
4. Restart Home Assistant
5. Go to **Settings → Devices & Services → Add Integration** and search for **Shopping List Manager**
### Manual Installation
1. Copy the `custom_components/shopping_list_manager/` folder into your HA `/config/custom_components/` directory
2. Restart Home Assistant
3. Go to **Settings → Devices & Services → Add Integration** and search for **Shopping List Manager**
---
## Lovelace Card
Install the companion card to get the full shopping UI:
## 3. Shopping List Card to go with this Integration
[![Open your Home Assistant instance and open this repository inside the Home Assistant Community Store.](https://my.home-assistant.io/badges/hacs_repository.svg)](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager-card&category=plugin) [![Open your Home Assistant instance and open this repository inside the Home Assistant Community Store.](https://my.home-assistant.io/badges/hacs_repository.svg)](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager-card&category=plugin)
---
## Documentation
Full documentation is available in the [Wiki](https://github.com/thekiwismarthome/shopping-list-manager/wiki).
## Support & Feedback
- [Open an Issue](https://github.com/thekiwismarthome/shopping-list-manager/issues)
- [Home Assistant Community Forum](https://community.home-assistant.io)
---
## License
MIT — see [LICENSE](LICENSE) for details.
@@ -27,27 +27,33 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Shopping List Manager from a config entry.""" """Set up Shopping List Manager from a config entry."""
_LOGGER.info("Setting up Shopping List Manager") _LOGGER.info("Setting up Shopping List Manager")
# Get component path for loading data files # Get component path for loading data files
component_path = os.path.dirname(__file__) component_path = os.path.dirname(__file__)
config_path = hass.config.path() config_path = hass.config.path()
# Get country from options (or fall back to data, or default to NZ) # Get country from options (or fall back to data, or default to NZ)
country = entry.options.get("country") or entry.data.get("country", "NZ") country = entry.options.get("country") or entry.data.get("country", "NZ")
_LOGGER.info("Using country: %s", country) _LOGGER.info("Using country: %s", country)
# Initialize storage with country # Initialize storage with country
storage = ShoppingListStorage(hass, component_path, country) storage = ShoppingListStorage(hass, component_path, country)
await storage.async_load() await storage.async_load()
# Initialize image handler # Initialize image handler
image_handler = ImageHandler(hass, config_path) image_handler = ImageHandler(hass, config_path)
# Read installed version from manifest
import json as _json
with open(os.path.join(component_path, "manifest.json")) as _f:
_manifest = _json.load(_f)
# Store instances in hass.data # Store instances in hass.data
hass.data.setdefault(DOMAIN, {}) hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][DATA_STORAGE] = storage hass.data[DOMAIN][DATA_STORAGE] = storage
hass.data[DOMAIN]["image_handler"] = image_handler hass.data[DOMAIN]["image_handler"] = image_handler
hass.data[DOMAIN]["country"] = country hass.data[DOMAIN]["country"] = country
hass.data[DOMAIN]["version"] = _manifest.get("version", "unknown")
# Register update listener for options changes # Register update listener for options changes
entry.async_on_unload(entry.add_update_listener(update_listener)) entry.async_on_unload(entry.add_update_listener(update_listener))
@@ -171,6 +177,14 @@ async def _async_register_websocket_handlers(
) )
# Products handlers # Products handlers
websocket_api.async_register_command(
hass,
handlers.websocket_download_product_image,
)
websocket_api.async_register_command(
hass,
handlers.websocket_search_by_barcode,
)
websocket_api.async_register_command( websocket_api.async_register_command(
hass, hass,
handlers.websocket_search_products, handlers.websocket_search_products,
@@ -192,17 +206,81 @@ async def _async_register_websocket_handlers(
hass, hass,
handlers.websocket_update_product, handlers.websocket_update_product,
) )
websocket_api.async_register_command(
hass,
handlers.websocket_delete_product,
)
websocket_api.async_register_command( websocket_api.async_register_command(
hass, hass,
handlers.websocket_get_product_substitutes, handlers.websocket_get_product_substitutes,
) )
# OpenFoodFacts proxy
websocket_api.async_register_command(
hass,
handlers.websocket_off_fetch,
)
# Categories handlers # Categories handlers
websocket_api.async_register_command( websocket_api.async_register_command(
hass, hass,
handlers.websocket_get_categories, handlers.websocket_get_categories,
) )
# Integration settings handlers
websocket_api.async_register_command(
hass,
handlers.websocket_get_integration_settings,
)
websocket_api.async_register_command(
hass,
handlers.websocket_set_country,
)
# Backup / Restore handlers
websocket_api.async_register_command(
hass,
handlers.websocket_export_data,
)
websocket_api.async_register_command(
hass,
handlers.websocket_import_data,
)
# List members handler
websocket_api.async_register_command(
hass,
handlers.websocket_update_list_members,
)
# HA users handler
websocket_api.async_register_command(
hass,
handlers.websocket_get_ha_users,
)
# Loyalty card handlers
websocket_api.async_register_command(
hass,
handlers.websocket_get_loyalty_cards,
)
websocket_api.async_register_command(
hass,
handlers.websocket_add_loyalty_card,
)
websocket_api.async_register_command(
hass,
handlers.websocket_update_loyalty_card,
)
websocket_api.async_register_command(
hass,
handlers.websocket_delete_loyalty_card,
)
websocket_api.async_register_command(
hass,
handlers.websocket_update_loyalty_card_members,
)
_LOGGER.debug("WebSocket handlers registered") _LOGGER.debug("WebSocket handlers registered")
@@ -9,6 +9,7 @@ STORAGE_KEY_LISTS = f"{DOMAIN}.lists"
STORAGE_KEY_ITEMS = f"{DOMAIN}.items" STORAGE_KEY_ITEMS = f"{DOMAIN}.items"
STORAGE_KEY_PRODUCTS = f"{DOMAIN}.products" STORAGE_KEY_PRODUCTS = f"{DOMAIN}.products"
STORAGE_KEY_CATEGORIES = f"{DOMAIN}.categories" STORAGE_KEY_CATEGORIES = f"{DOMAIN}.categories"
STORAGE_KEY_LOYALTY_CARDS = f"{DOMAIN}.loyalty_cards"
# WebSocket Commands - Lists # WebSocket Commands - Lists
WS_TYPE_LISTS_GET_ALL = f"{DOMAIN}/lists/get_all" WS_TYPE_LISTS_GET_ALL = f"{DOMAIN}/lists/get_all"
@@ -16,6 +17,10 @@ WS_TYPE_LISTS_CREATE = f"{DOMAIN}/lists/create"
WS_TYPE_LISTS_UPDATE = f"{DOMAIN}/lists/update" WS_TYPE_LISTS_UPDATE = f"{DOMAIN}/lists/update"
WS_TYPE_LISTS_DELETE = f"{DOMAIN}/lists/delete" WS_TYPE_LISTS_DELETE = f"{DOMAIN}/lists/delete"
WS_TYPE_LISTS_SET_ACTIVE = f"{DOMAIN}/lists/set_active" WS_TYPE_LISTS_SET_ACTIVE = f"{DOMAIN}/lists/set_active"
WS_TYPE_LISTS_UPDATE_MEMBERS = f"{DOMAIN}/lists/update_members"
# WebSocket Commands - Users
WS_TYPE_USERS_GET_ALL = f"{DOMAIN}/users/get_all"
# WebSocket Commands - Items # WebSocket Commands - Items
WS_TYPE_ITEMS_GET = f"{DOMAIN}/items/get" WS_TYPE_ITEMS_GET = f"{DOMAIN}/items/get"
@@ -39,6 +44,13 @@ WS_TYPE_PRODUCTS_DELETE = f"{DOMAIN}/products/delete"
WS_TYPE_CATEGORIES_GET_ALL = f"{DOMAIN}/categories/get_all" WS_TYPE_CATEGORIES_GET_ALL = f"{DOMAIN}/categories/get_all"
WS_TYPE_CATEGORIES_REORDER = f"{DOMAIN}/categories/reorder" WS_TYPE_CATEGORIES_REORDER = f"{DOMAIN}/categories/reorder"
# WebSocket Commands - Loyalty Cards
WS_TYPE_LOYALTY_GET_ALL = f"{DOMAIN}/loyalty/get_all"
WS_TYPE_LOYALTY_ADD = f"{DOMAIN}/loyalty/add"
WS_TYPE_LOYALTY_UPDATE = f"{DOMAIN}/loyalty/update"
WS_TYPE_LOYALTY_DELETE = f"{DOMAIN}/loyalty/delete"
WS_TYPE_LOYALTY_UPDATE_MEMBERS = f"{DOMAIN}/loyalty/update_members"
# WebSocket Commands - Subscriptions # WebSocket Commands - Subscriptions
WS_TYPE_SUBSCRIBE = f"{DOMAIN}/subscribe" WS_TYPE_SUBSCRIBE = f"{DOMAIN}/subscribe"
WS_TYPE_UNSUBSCRIBE = f"{DOMAIN}/unsubscribe" WS_TYPE_UNSUBSCRIBE = f"{DOMAIN}/unsubscribe"
@@ -64,7 +76,10 @@ IMAGE_FORMAT = "webp"
IMAGE_SIZE = 200 # 200x200px IMAGE_SIZE = 200 # 200x200px
IMAGE_QUALITY = 85 IMAGE_QUALITY = 85
IMAGE_MAX_SIZE_KB = 15 IMAGE_MAX_SIZE_KB = 15
IMAGES_LOCAL_DIR = "www/shopping_list_manager/images" IMAGES_LOCAL_DIR = "www/images/shopping_list_manager"
LEGACY_IMAGES_LOCAL_DIR = "www/shopping_list_manager/images"
LOCAL_IMAGE_URL_PREFIX = "/local/images/shopping_list_manager/"
LEGACY_IMAGE_URL_PREFIX = "/local/shopping_list_manager/images/"
# Placeholder image (inline SVG) # Placeholder image (inline SVG)
PLACEHOLDER_IMAGE = "data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='200' height='200'%3E%3Crect width='200' height='200' fill='%23f0f0f0'/%3E%3Ctext x='50%25' y='50%25' dominant-baseline='middle' text-anchor='middle' font-family='Arial' font-size='16' fill='%23999'%3ENo Image%3C/text%3E%3C/svg%3E" PLACEHOLDER_IMAGE = "data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='200' height='200'%3E%3Crect width='200' height='200' fill='%23f0f0f0'/%3E%3Ctext x='50%25' y='50%25' dominant-baseline='middle' text-anchor='middle' font-family='Arial' font-size='16' fill='%23999'%3ENo Image%3C/text%3E%3C/svg%3E"
@@ -96,6 +96,28 @@ class Item:
self.estimated_total = self.quantity * self.price self.estimated_total = self.quantity * self.price
@dataclass
class LoyaltyCard:
"""Loyalty card model."""
id: str
name: str
number: str
barcode: str = ""
barcode_type: str = "barcode" # "barcode" or "qrcode"
logo: str = ""
notes: str = ""
color: str = "#9fa8da"
created_at: str = field(default_factory=current_timestamp)
updated_at: str = field(default_factory=current_timestamp)
# Ownership: None = visible to all users; set = private to owner + allowed_users
owner_id: Optional[str] = None
allowed_users: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)
@dataclass @dataclass
class ShoppingList: class ShoppingList:
"""Shopping list model.""" """Shopping list model."""
@@ -107,6 +129,9 @@ class ShoppingList:
item_order: List[str] = field(default_factory=list) item_order: List[str] = field(default_factory=list)
category_order: List[str] = field(default_factory=list) category_order: List[str] = field(default_factory=list)
active: bool = False active: bool = False
# Ownership: None = visible to all users; set = private to owner + allowed_users
owner_id: Optional[str] = None
allowed_users: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]: def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary.""" """Convert to dictionary."""
@@ -1,5 +1,10 @@
"""Storage management for Shopping List Manager.""" """Storage management for Shopping List Manager."""
import json
import logging import logging
import os
import shutil
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Any from typing import Dict, List, Optional, Any
from .utils.search import ProductSearch from .utils.search import ProductSearch
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
@@ -11,9 +16,14 @@ from .const import (
STORAGE_KEY_ITEMS, STORAGE_KEY_ITEMS,
STORAGE_KEY_PRODUCTS, STORAGE_KEY_PRODUCTS,
STORAGE_KEY_CATEGORIES, STORAGE_KEY_CATEGORIES,
STORAGE_KEY_LOYALTY_CARDS,
IMAGES_LOCAL_DIR,
LEGACY_IMAGES_LOCAL_DIR,
LOCAL_IMAGE_URL_PREFIX,
LEGACY_IMAGE_URL_PREFIX,
) )
from .data.catalog_loader import load_product_catalog from .data.catalog_loader import load_product_catalog
from .models import ShoppingList, Item, Product, Category, generate_id from .models import ShoppingList, Item, Product, Category, LoyaltyCard, generate_id
from .data.category_loader import load_categories from .data.category_loader import load_categories
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -37,12 +47,16 @@ class ShoppingListStorage:
self._store_items = Store(hass, STORAGE_VERSION, STORAGE_KEY_ITEMS) self._store_items = Store(hass, STORAGE_VERSION, STORAGE_KEY_ITEMS)
self._store_products = Store(hass, STORAGE_VERSION, STORAGE_KEY_PRODUCTS) self._store_products = Store(hass, STORAGE_VERSION, STORAGE_KEY_PRODUCTS)
self._store_categories = Store(hass, STORAGE_VERSION, STORAGE_KEY_CATEGORIES) self._store_categories = Store(hass, STORAGE_VERSION, STORAGE_KEY_CATEGORIES)
self._store_loyalty_cards = Store(hass, STORAGE_VERSION, STORAGE_KEY_LOYALTY_CARDS)
self._lists: Dict[str, ShoppingList] = {} self._lists: Dict[str, ShoppingList] = {}
self._items: Dict[str, List[Item]] = {} self._items: Dict[str, List[Item]] = {}
self._products: Dict[str, Product] = {} self._products: Dict[str, Product] = {}
self._categories: List[Category] = [] self._categories: List[Category] = []
self._loyalty_cards: Dict[str, LoyaltyCard] = {}
self._search_engine: Optional[ProductSearch] = None self._search_engine: Optional[ProductSearch] = None
self._images_dir = Path(hass.config.path(IMAGES_LOCAL_DIR))
self._legacy_images_dir = Path(hass.config.path(LEGACY_IMAGES_LOCAL_DIR))
async def async_load(self) -> None: async def async_load(self) -> None:
"""Load data from storage.""" """Load data from storage."""
@@ -83,6 +97,10 @@ class ShoppingListStorage:
for product_id, product_data in products_data.items() for product_id, product_data in products_data.items()
} }
_LOGGER.debug("Loaded %d products", len(self._products)) _LOGGER.debug("Loaded %d products", len(self._products))
# Ensure image directory exists and migrate legacy image paths/URLs.
self._images_dir.mkdir(parents=True, exist_ok=True)
await self._migrate_legacy_images_and_urls()
# Load categories # Load categories
categories_data = await self._store_categories.async_load() categories_data = await self._store_categories.async_load()
@@ -141,6 +159,15 @@ class ShoppingListStorage:
await self._save_products() await self._save_products()
_LOGGER.info("Successfully imported %d products from catalog", len(self._products)) _LOGGER.info("Successfully imported %d products from catalog", len(self._products))
# Load loyalty cards
loyalty_data = await self._store_loyalty_cards.async_load()
if loyalty_data:
self._loyalty_cards = {
card_id: LoyaltyCard(**card_data)
for card_id, card_data in loyalty_data.items()
}
_LOGGER.debug("Loaded %d loyalty cards", len(self._loyalty_cards))
# Initialize search engine after products are loaded # Initialize search engine after products are loaded
if self._products: if self._products:
products_dict = {pid: p.to_dict() for pid, p in self._products.items()} products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
@@ -156,9 +183,21 @@ class ShoppingListStorage:
data = {list_id: lst.to_dict() for list_id, lst in self._lists.items()} data = {list_id: lst.to_dict() for list_id, lst in self._lists.items()}
await self._store_lists.async_save(data) await self._store_lists.async_save(data)
def get_lists(self) -> List[ShoppingList]: def get_lists(self, user_id: str = None, is_admin: bool = False) -> List[ShoppingList]:
"""Get all lists.""" """Get lists visible to the specified user.
return list(self._lists.values())
Global lists (owner_id=None) are visible to everyone.
Private lists are visible to their owner, anyone in allowed_users, and admins.
"""
all_lists = list(self._lists.values())
if is_admin or user_id is None:
return all_lists
return [
lst for lst in all_lists
if lst.owner_id is None
or lst.owner_id == user_id
or user_id in (lst.allowed_users or [])
]
def get_list(self, list_id: str) -> Optional[ShoppingList]: def get_list(self, list_id: str) -> Optional[ShoppingList]:
"""Get a specific list.""" """Get a specific list."""
@@ -171,17 +210,19 @@ class ShoppingListStorage:
return lst return lst
return None return None
async def create_list(self, name: str, icon: str = "mdi:cart") -> ShoppingList: async def create_list(self, name: str, icon: str = "mdi:cart", owner_id: str = None) -> ShoppingList:
"""Create a new list.""" """Create a new list. Pass owner_id to make the list private to that user."""
new_list = ShoppingList( new_list = ShoppingList(
id=generate_id(), id=generate_id(),
name=name, name=name,
icon=icon, icon=icon,
category_order=[cat.id for cat in self._categories] category_order=[cat.id for cat in self._categories],
owner_id=owner_id,
) )
self._lists[new_list.id] = new_list self._lists[new_list.id] = new_list
self._items[new_list.id] = [] self._items[new_list.id] = []
await self._save_lists() await self._save_lists()
await self._write_config_backup()
_LOGGER.info("Created new list: %s", name) _LOGGER.info("Created new list: %s", name)
return new_list return new_list
@@ -202,6 +243,18 @@ class ShoppingListStorage:
_LOGGER.debug("Updated list: %s", list_id) _LOGGER.debug("Updated list: %s", list_id)
return lst return lst
async def update_list_members(self, list_id: str, allowed_users: List[str]) -> Optional[ShoppingList]:
"""Update the allowed_users for a private list."""
if list_id not in self._lists:
return None
lst = self._lists[list_id]
lst.allowed_users = allowed_users
from .models import current_timestamp
lst.updated_at = current_timestamp()
await self._save_lists()
_LOGGER.debug("Updated members for list: %s", list_id)
return lst
async def delete_list(self, list_id: str) -> bool: async def delete_list(self, list_id: str) -> bool:
"""Delete a list.""" """Delete a list."""
if list_id not in self._lists: if list_id not in self._lists:
@@ -378,6 +431,56 @@ class ShoppingListStorage:
"""Save products to storage.""" """Save products to storage."""
data = {product_id: product.to_dict() for product_id, product in self._products.items()} data = {product_id: product.to_dict() for product_id, product in self._products.items()}
await self._store_products.async_save(data) await self._store_products.async_save(data)
async def _migrate_legacy_images_and_urls(self) -> None:
"""Move old image files and rewrite stored URLs to the new path."""
moved_files = 0
updated_product_urls = 0
updated_item_urls = 0
if self._legacy_images_dir.exists() and self._legacy_images_dir != self._images_dir:
for src in self._legacy_images_dir.glob("*"):
if not src.is_file():
continue
dest = self._images_dir / src.name
if dest.exists():
continue
try:
shutil.move(str(src), str(dest))
moved_files += 1
except Exception as err:
_LOGGER.debug("Could not move legacy image %s: %s", src, err)
for product in self._products.values():
image_url = product.image_url or ""
if image_url.startswith(LEGACY_IMAGE_URL_PREFIX):
product.image_url = image_url.replace(
LEGACY_IMAGE_URL_PREFIX, LOCAL_IMAGE_URL_PREFIX, 1
)
updated_product_urls += 1
for items in self._items.values():
for item in items:
image_url = item.image_url or ""
if image_url.startswith(LEGACY_IMAGE_URL_PREFIX):
item.image_url = image_url.replace(
LEGACY_IMAGE_URL_PREFIX, LOCAL_IMAGE_URL_PREFIX, 1
)
updated_item_urls += 1
if updated_product_urls:
await self._save_products()
if updated_item_urls:
await self._save_items()
if moved_files or updated_product_urls or updated_item_urls:
_LOGGER.info(
"Migrated shopping list images to %s (moved_files=%d, updated_product_urls=%d, updated_item_urls=%d)",
IMAGES_LOCAL_DIR,
moved_files,
updated_product_urls,
updated_item_urls,
)
def get_products(self) -> List[Product]: def get_products(self) -> List[Product]:
"""Get all products.""" """Get all products."""
@@ -460,25 +563,179 @@ class ShoppingListStorage:
) )
self._products[new_product.id] = new_product self._products[new_product.id] = new_product
await self._save_products() await self._save_products()
await self._write_config_backup()
# Rebuild search engine so the new product is immediately searchable # Rebuild search engine so the new product is immediately searchable
products_dict = {pid: p.to_dict() for pid, p in self._products.items()} products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
self._search_engine = ProductSearch(products_dict) self._search_engine = ProductSearch(products_dict)
_LOGGER.debug("Added product: %s", new_product.name) _LOGGER.debug("Added product: %s", new_product.name)
return new_product return new_product
async def reload_catalog(self, country_code: str) -> int:
"""Replace catalog-sourced products with those from a new country's catalog.
Products with source='user' are preserved."""
catalog_ids = [
pid for pid, p in self._products.items()
if getattr(p, 'source', 'user') == 'catalog'
]
for pid in catalog_ids:
del self._products[pid]
self._country = country_code
catalog_products = await load_product_catalog(self._component_path, country_code)
count = 0
for prod_data in catalog_products:
try:
product = Product(
id=prod_data.get("id", generate_id()),
name=prod_data["name"],
category_id=prod_data.get("category_id", "other"),
aliases=prod_data.get("aliases", []),
default_unit=prod_data.get("default_unit", "units"),
default_quantity=prod_data.get("default_quantity", 1),
price=prod_data.get("price") or prod_data.get("typical_price"),
currency=self.hass.config.currency,
barcode=prod_data.get("barcode"),
brands=prod_data.get("brands", []),
image_url=prod_data.get("image_url", ""),
custom=False,
source="catalog",
tags=prod_data.get("tags", []),
collections=prod_data.get("collections", []),
taxonomy=prod_data.get("taxonomy", {}),
allergens=prod_data.get("allergens", []),
substitution_group=prod_data.get("substitution_group", ""),
priority_level=prod_data.get("priority_level", 0),
image_hint=prod_data.get("image_hint", "")
)
self._products[product.id] = product
count += 1
except Exception as err:
_LOGGER.error("Failed to import product %s: %s", prod_data.get("name"), err)
await self._save_products()
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
self._search_engine = ProductSearch(products_dict)
_LOGGER.info("Reloaded catalog for %s: %d products imported", country_code, count)
return count
async def delete_product(self, product_id: str) -> bool:
"""Delete a product from the catalog."""
if product_id not in self._products:
return False
del self._products[product_id]
await self._save_products()
# Rebuild search engine so the product is no longer searchable
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
self._search_engine = ProductSearch(products_dict)
_LOGGER.debug("Deleted product: %s", product_id)
return True
async def update_product(self, product_id: str, **kwargs) -> Optional[Product]: async def update_product(self, product_id: str, **kwargs) -> Optional[Product]:
"""Update a product.""" """Update a product."""
if product_id not in self._products: if product_id not in self._products:
return None return None
product = self._products[product_id] product = self._products[product_id]
for key, value in kwargs.items(): for key, value in kwargs.items():
if hasattr(product, key): if hasattr(product, key):
setattr(product, key, value) setattr(product, key, value)
await self._save_products() await self._save_products()
await self._write_config_backup()
_LOGGER.debug("Updated product: %s", product_id) _LOGGER.debug("Updated product: %s", product_id)
return product return product
# ---------------------------------------------------------------------------
# Backup / Restore
# ---------------------------------------------------------------------------
async def export_user_data(self) -> dict:
"""Return a serialisable snapshot of all user-created data."""
user_products = [
p.to_dict() for p in self._products.values()
if getattr(p, "source", "user") == "user"
]
lists = [lst.to_dict() for lst in self._lists.values()]
items = {
list_id: [item.to_dict() for item in items_list]
for list_id, items_list in self._items.items()
}
return {
"slm_backup_version": "1.0",
"exported_at": datetime.now(timezone.utc).isoformat(),
"country": self._country,
"user_products": user_products,
"lists": lists,
"items": items,
}
async def import_user_data(self, data: dict) -> dict:
"""Merge a backup into live storage. Skips anything already present by ID."""
imported_products = 0
imported_lists = 0
imported_items = 0
for prod_data in data.get("user_products", []):
prod_id = prod_data.get("id")
if prod_id and prod_id not in self._products:
try:
self._products[prod_id] = Product(**prod_data)
imported_products += 1
except Exception as err:
_LOGGER.warning("Skipped product during import: %s", err)
if imported_products:
await self._save_products()
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
self._search_engine = ProductSearch(products_dict)
for list_data in data.get("lists", []):
list_id = list_data.get("id")
if list_id and list_id not in self._lists:
try:
lst = ShoppingList(**list_data)
lst.active = False
self._lists[list_id] = lst
imported_lists += 1
except Exception as err:
_LOGGER.warning("Skipped list during import: %s", err)
backup_items = data.get("items", {})
for list_id, items_list in backup_items.items():
if list_id in self._lists and list_id not in self._items:
try:
self._items[list_id] = [Item(**d) for d in items_list]
imported_items += len(self._items[list_id])
except Exception as err:
_LOGGER.warning("Skipped items for list %s: %s", list_id, err)
if imported_lists or imported_items:
await self._save_lists()
await self._save_items()
_LOGGER.info(
"Import complete: %d products, %d lists, %d items",
imported_products, imported_lists, imported_items,
)
return {"products": imported_products, "lists": imported_lists, "items": imported_items}
async def _write_config_backup(self) -> None:
"""Silently write a backup JSON to the HA config directory."""
try:
backup_path = os.path.join(
self.hass.config.config_dir,
"shopping_list_manager_backup.json",
)
data = await self.export_user_data()
def _write() -> None:
with open(backup_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
await self.hass.async_add_executor_job(_write)
_LOGGER.debug("Auto-backup written to %s", backup_path)
except Exception as err:
_LOGGER.warning("Failed to write config backup: %s", err)
# Categories methods # Categories methods
async def _save_categories(self) -> None: async def _save_categories(self) -> None:
@@ -489,3 +746,81 @@ class ShoppingListStorage:
def get_categories(self) -> List[Category]: def get_categories(self) -> List[Category]:
"""Get all categories.""" """Get all categories."""
return self._categories return self._categories
# Loyalty card methods
async def _save_loyalty_cards(self) -> None:
"""Save loyalty cards to storage."""
data = {card_id: card.to_dict() for card_id, card in self._loyalty_cards.items()}
await self._store_loyalty_cards.async_save(data)
def get_loyalty_cards(self, user_id: str = None, is_admin: bool = False) -> List[LoyaltyCard]:
"""Get loyalty cards visible to the specified user.
Global cards (owner_id=None) are visible to everyone.
Private cards are visible to their owner, anyone in allowed_users, and admins.
"""
all_cards = list(self._loyalty_cards.values())
if is_admin or user_id is None:
return all_cards
return [
card for card in all_cards
if card.owner_id is None
or card.owner_id == user_id
or user_id in (card.allowed_users or [])
]
def get_loyalty_card(self, card_id: str) -> Optional[LoyaltyCard]:
"""Get a specific loyalty card."""
return self._loyalty_cards.get(card_id)
async def create_loyalty_card(self, owner_id: str = None, **kwargs) -> LoyaltyCard:
"""Create a new loyalty card."""
from .models import current_timestamp
new_card = LoyaltyCard(
id=generate_id(),
owner_id=owner_id,
**kwargs
)
self._loyalty_cards[new_card.id] = new_card
await self._save_loyalty_cards()
_LOGGER.debug("Created loyalty card: %s", new_card.name)
return new_card
async def update_loyalty_card(self, card_id: str, **kwargs) -> Optional[LoyaltyCard]:
"""Update a loyalty card."""
if card_id not in self._loyalty_cards:
return None
card = self._loyalty_cards[card_id]
for key, value in kwargs.items():
if hasattr(card, key):
setattr(card, key, value)
from .models import current_timestamp
card.updated_at = current_timestamp()
await self._save_loyalty_cards()
_LOGGER.debug("Updated loyalty card: %s", card_id)
return card
async def delete_loyalty_card(self, card_id: str) -> bool:
"""Delete a loyalty card."""
if card_id not in self._loyalty_cards:
return False
del self._loyalty_cards[card_id]
await self._save_loyalty_cards()
_LOGGER.debug("Deleted loyalty card: %s", card_id)
return True
async def update_loyalty_card_members(self, card_id: str, allowed_users: List[str]) -> Optional[LoyaltyCard]:
"""Update the allowed_users for a private loyalty card."""
if card_id not in self._loyalty_cards:
return None
card = self._loyalty_cards[card_id]
card.allowed_users = allowed_users
from .models import current_timestamp
card.updated_at = current_timestamp()
await self._save_loyalty_cards()
_LOGGER.debug("Updated members for loyalty card: %s", card_id)
return card
@@ -0,0 +1,33 @@
{
"config": {
"step": {
"user": {
"title": "Shopping List Manager",
"description": "Set up the Shopping List Manager integration. Country and other settings can be configured after setup via the Configure button."
}
},
"abort": {
"single_instance_allowed": "Only a single instance of Shopping List Manager is allowed."
}
},
"options": {
"step": {
"init": {
"title": "Shopping List Manager Options",
"description": "Changing country will reload the product catalog on next restart.",
"data": {
"country": "Country",
"enable_price_tracking": "Enable price tracking",
"enable_image_search": "Enable image search",
"metric_units_only": "Metric units only"
},
"data_description": {
"country": "Used to localise product catalog and pricing.",
"enable_price_tracking": "Track and display product prices.",
"enable_image_search": "Search for product images automatically.",
"metric_units_only": "Show only metric units (g, kg, ml, L)."
}
}
}
}
}
@@ -1,9 +1,15 @@
"""Image handling utilities for Shopping List Manager.""" """Image handling utilities for Shopping List Manager."""
import logging import logging
import os import shutil
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Optional
from ..const import (
IMAGES_LOCAL_DIR,
LEGACY_IMAGES_LOCAL_DIR,
LOCAL_IMAGE_URL_PREFIX,
)
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -18,11 +24,28 @@ class ImageHandler:
config_path: Path to HA config directory config_path: Path to HA config directory
""" """
self.hass = hass self.hass = hass
# Images stored in /config/www/shopping_list_manager/images/ # Images stored in /config/www/images/shopping_list_manager/
self._local_images_dir = Path(config_path) / "www" / "shopping_list_manager" / "images" self._local_images_dir = Path(hass.config.path(IMAGES_LOCAL_DIR))
self._legacy_images_dir = Path(hass.config.path(LEGACY_IMAGES_LOCAL_DIR))
self._local_images_dir.mkdir(parents=True, exist_ok=True) self._local_images_dir.mkdir(parents=True, exist_ok=True)
self._migrate_legacy_files()
_LOGGER.info("Image directory: %s", self._local_images_dir) _LOGGER.info("Image directory: %s", self._local_images_dir)
def _migrate_legacy_files(self) -> None:
"""Move legacy image files to the new standardized directory."""
if not self._legacy_images_dir.exists() or self._legacy_images_dir == self._local_images_dir:
return
for src in self._legacy_images_dir.glob("*"):
if not src.is_file():
continue
dest = self._local_images_dir / src.name
if dest.exists():
continue
try:
shutil.move(str(src), str(dest))
except Exception as err:
_LOGGER.debug("Could not move legacy image %s: %s", src, err)
def get_image_url(self, product_name: str, external_url: Optional[str] = None) -> str: def get_image_url(self, product_name: str, external_url: Optional[str] = None) -> str:
"""Get image URL for a product. """Get image URL for a product.
@@ -73,11 +96,11 @@ class ImageHandler:
# Check exact match # Check exact match
image_file = self._local_images_dir / f"{normalized_name}{ext}" image_file = self._local_images_dir / f"{normalized_name}{ext}"
if image_file.exists(): if image_file.exists():
return f"/local/shopping_list_manager/images/{normalized_name}{ext}" return f"{LOCAL_IMAGE_URL_PREFIX}{normalized_name}{ext}"
# Check for files starting with the product name # Check for files starting with the product name
for file in self._local_images_dir.glob(f"{normalized_name}*{ext}"): for file in self._local_images_dir.glob(f"{normalized_name}*{ext}"):
return f"/local/shopping_list_manager/images/{file.name}" return f"{LOCAL_IMAGE_URL_PREFIX}{file.name}"
return None return None
@@ -1,19 +1,31 @@
"""WebSocket API handlers for Shopping List Manager.""" """WebSocket API handlers for Shopping List Manager."""
import io
import logging import logging
import re
from pathlib import Path
from typing import Any, Dict from typing import Any, Dict
import voluptuous as vol import voluptuous as vol
from aiohttp import ClientTimeout
from PIL import Image
from homeassistant.components import websocket_api from homeassistant.components import websocket_api
from homeassistant.core import HomeAssistant, callback from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from ..const import DOMAIN from ..const import DOMAIN
from ..const import ( from ..const import (
IMAGE_SIZE,
IMAGE_QUALITY,
IMAGES_LOCAL_DIR,
LOCAL_IMAGE_URL_PREFIX,
WS_TYPE_LISTS_GET_ALL, WS_TYPE_LISTS_GET_ALL,
WS_TYPE_LISTS_CREATE, WS_TYPE_LISTS_CREATE,
WS_TYPE_LISTS_UPDATE, WS_TYPE_LISTS_UPDATE,
WS_TYPE_LISTS_DELETE, WS_TYPE_LISTS_DELETE,
WS_TYPE_LISTS_SET_ACTIVE, WS_TYPE_LISTS_SET_ACTIVE,
WS_TYPE_LISTS_UPDATE_MEMBERS,
WS_TYPE_USERS_GET_ALL,
WS_TYPE_ITEMS_GET, WS_TYPE_ITEMS_GET,
WS_TYPE_ITEMS_ADD, WS_TYPE_ITEMS_ADD,
WS_TYPE_ITEMS_UPDATE, WS_TYPE_ITEMS_UPDATE,
@@ -27,7 +39,14 @@ from ..const import (
WS_TYPE_PRODUCTS_SUGGESTIONS, WS_TYPE_PRODUCTS_SUGGESTIONS,
WS_TYPE_PRODUCTS_ADD, WS_TYPE_PRODUCTS_ADD,
WS_TYPE_PRODUCTS_UPDATE, WS_TYPE_PRODUCTS_UPDATE,
WS_TYPE_PRODUCTS_DELETE,
WS_TYPE_OFF_FETCH,
WS_TYPE_CATEGORIES_GET_ALL, WS_TYPE_CATEGORIES_GET_ALL,
WS_TYPE_LOYALTY_GET_ALL,
WS_TYPE_LOYALTY_ADD,
WS_TYPE_LOYALTY_UPDATE,
WS_TYPE_LOYALTY_DELETE,
WS_TYPE_LOYALTY_UPDATE_MEMBERS,
WS_TYPE_SUBSCRIBE, WS_TYPE_SUBSCRIBE,
EVENT_ITEM_ADDED, EVENT_ITEM_ADDED,
EVENT_ITEM_UPDATED, EVENT_ITEM_UPDATED,
@@ -41,6 +60,62 @@ from .. import get_storage
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
# =============================================================================
# ACCESS-CHECK HELPERS
# =============================================================================
def _user_can_access_list(lst, user) -> bool:
"""Return True if the user may read or write to this list.
Global lists (owner_id=None) are accessible to everyone.
Private lists are accessible to their owner, anyone in allowed_users, and admins.
"""
if lst.owner_id is None:
return True
if user is None:
return False
if user.is_admin or user.id == lst.owner_id:
return True
return user.id in (lst.allowed_users or [])
def _check_list_access(storage, connection, msg, list_id, require_owner=False):
"""Verify the connected user may access list_id.
Sends the appropriate WebSocket error if access is denied.
Returns the ShoppingList object on success, or None if an error was sent.
Args:
require_owner: When True, only the list owner (or an admin) is allowed.
Use for destructive/administrative operations.
"""
lst = storage.get_list(list_id)
if lst is None:
connection.send_error(msg["id"], "not_found", "List not found")
return None
user = connection.user
if require_owner:
if lst.owner_id is not None and not (user and (user.is_admin or user.id == lst.owner_id)):
connection.send_error(msg["id"], "forbidden", "Only the list owner can perform this action")
return None
else:
if not _user_can_access_list(lst, user):
connection.send_error(msg["id"], "forbidden", "You do not have access to this list")
return None
return lst
def _find_item_list_id(storage, item_id):
"""Return the list_id that contains item_id, or None if not found."""
for list_id, items in storage._items.items():
for item in items:
if item.id == item_id:
return list_id
return None
# ============================================================================= # =============================================================================
# LIST HANDLERS # LIST HANDLERS
# ============================================================================= # =============================================================================
@@ -55,16 +130,28 @@ async def websocket_subscribe(
msg: dict, msg: dict,
) -> None: ) -> None:
"""Subscribe to shopping list manager events via WebSocket.""" """Subscribe to shopping list manager events via WebSocket."""
storage = get_storage(hass)
@callback @callback
def forward_event(event): def forward_event(event):
"""Forward HA bus event to WebSocket connection.""" """Forward HA bus event to WebSocket connection.
Events that reference a list_id are only forwarded if the connected
user has access to that list, preventing cross-user data leakage.
"""
data = event.data
list_id = data.get("list_id")
if list_id:
lst = storage.get_list(list_id)
if lst and not _user_can_access_list(lst, connection.user):
return # skip — user cannot see this list
connection.send_message( connection.send_message(
websocket_api.event_message( websocket_api.event_message(
msg["id"], msg["id"],
{ {
"event_type": event.event_type, "event_type": event.event_type,
"data": event.data, "data": data,
} }
) )
) )
@@ -171,8 +258,11 @@ def websocket_get_lists(
) -> None: ) -> None:
"""Handle get all lists command.""" """Handle get all lists command."""
storage = get_storage(hass) storage = get_storage(hass)
lists = storage.get_lists() user = connection.user
user_id = user.id if user else None
is_admin = user.is_admin if user else False
lists = storage.get_lists(user_id=user_id, is_admin=is_admin)
connection.send_result( connection.send_result(
msg["id"], msg["id"],
{ {
@@ -186,6 +276,7 @@ def websocket_get_lists(
vol.Required("type"): WS_TYPE_LISTS_CREATE, vol.Required("type"): WS_TYPE_LISTS_CREATE,
vol.Required("name"): str, vol.Required("name"): str,
vol.Optional("icon", default="mdi:cart"): str, vol.Optional("icon", default="mdi:cart"): str,
vol.Optional("private", default=True): bool,
} }
) )
@websocket_api.async_response @websocket_api.async_response
@@ -196,10 +287,15 @@ async def websocket_create_list(
) -> None: ) -> None:
"""Handle create list command.""" """Handle create list command."""
storage = get_storage(hass) storage = get_storage(hass)
# Private lists are owned by the creating user; global lists have no owner.
is_private = msg.get("private", True)
owner_id = connection.user.id if is_private and connection.user else None
new_list = await storage.create_list( new_list = await storage.create_list(
name=msg["name"], name=msg["name"],
icon=msg.get("icon", "mdi:cart") icon=msg.get("icon", "mdi:cart"),
owner_id=owner_id,
) )
# Fire event # Fire event
@@ -232,7 +328,10 @@ async def websocket_update_list(
"""Handle update list command.""" """Handle update list command."""
storage = get_storage(hass) storage = get_storage(hass)
list_id = msg["list_id"] list_id = msg["list_id"]
if _check_list_access(storage, connection, msg, list_id, require_owner=True) is None:
return
# Build update kwargs # Build update kwargs
update_data = {} update_data = {}
if "name" in msg: if "name" in msg:
@@ -275,9 +374,21 @@ async def websocket_delete_list(
"""Handle delete list command.""" """Handle delete list command."""
storage = get_storage(hass) storage = get_storage(hass)
list_id = msg["list_id"] list_id = msg["list_id"]
lst = storage.get_list(list_id)
if lst is None:
connection.send_error(msg["id"], "not_found", "List not found")
return
# Only the owner or an admin may delete a private list
if lst.owner_id is not None:
user = connection.user
if not (user and (user.is_admin or user.id == lst.owner_id)):
connection.send_error(msg["id"], "forbidden", "Only the list owner can delete this list")
return
success = await storage.delete_list(list_id) success = await storage.delete_list(list_id)
if not success: if not success:
connection.send_error(msg["id"], "not_found", "List not found") connection.send_error(msg["id"], "not_found", "List not found")
return return
@@ -306,7 +417,10 @@ async def websocket_set_active_list(
"""Handle set active list command.""" """Handle set active list command."""
storage = get_storage(hass) storage = get_storage(hass)
list_id = msg["list_id"] list_id = msg["list_id"]
if _check_list_access(storage, connection, msg, list_id) is None:
return
success = await storage.set_active_list(list_id) success = await storage.set_active_list(list_id)
if not success: if not success:
@@ -341,7 +455,10 @@ def websocket_get_items(
"""Handle get items command.""" """Handle get items command."""
storage = get_storage(hass) storage = get_storage(hass)
list_id = msg["list_id"] list_id = msg["list_id"]
if _check_list_access(storage, connection, msg, list_id) is None:
return
items = storage.get_items(list_id) items = storage.get_items(list_id)
connection.send_result( connection.send_result(
@@ -357,7 +474,7 @@ def websocket_get_items(
vol.Required("type"): WS_TYPE_ITEMS_ADD, vol.Required("type"): WS_TYPE_ITEMS_ADD,
vol.Required("list_id"): str, vol.Required("list_id"): str,
vol.Required("name"): str, vol.Required("name"): str,
vol.Required("category_id"): str, vol.Optional("category_id", default="other"): str,
vol.Optional("product_id"): str, vol.Optional("product_id"): str,
vol.Optional("quantity", default=1): vol.Coerce(float), vol.Optional("quantity", default=1): vol.Coerce(float),
vol.Optional("unit", default="units"): str, vol.Optional("unit", default="units"): str,
@@ -376,7 +493,10 @@ async def websocket_add_item(
"""Handle add item command.""" """Handle add item command."""
storage = get_storage(hass) storage = get_storage(hass)
list_id = msg["list_id"] list_id = msg["list_id"]
if _check_list_access(storage, connection, msg, list_id) is None:
return
# Build item data # Build item data
item_data = { item_data = {
"name": msg["name"], "name": msg["name"],
@@ -435,7 +555,14 @@ async def websocket_update_item(
"""Handle update item command.""" """Handle update item command."""
storage = get_storage(hass) storage = get_storage(hass)
item_id = msg["item_id"] item_id = msg["item_id"]
list_id = _find_item_list_id(storage, item_id)
if list_id is None:
connection.send_error(msg["id"], "not_found", "Item not found")
return
if _check_list_access(storage, connection, msg, list_id) is None:
return
# Build update data # Build update data
update_data = {} update_data = {}
update_fields = ["name", "quantity", "unit", "note", "price", "category_id", "image_url"] update_fields = ["name", "quantity", "unit", "note", "price", "category_id", "image_url"]
@@ -520,7 +647,14 @@ async def websocket_delete_item(
"""Handle delete item command.""" """Handle delete item command."""
storage = get_storage(hass) storage = get_storage(hass)
item_id = msg["item_id"] item_id = msg["item_id"]
list_id = _find_item_list_id(storage, item_id)
if list_id is None:
connection.send_error(msg["id"], "not_found", "Item not found")
return
if _check_list_access(storage, connection, msg, list_id) is None:
return
success = await storage.delete_item(item_id) success = await storage.delete_item(item_id)
if not success: if not success:
@@ -660,6 +794,87 @@ def websocket_get_list_total(
# PRODUCT HANDLERS # PRODUCT HANDLERS
# ============================================================================= # =============================================================================
@websocket_api.websocket_command(
{
vol.Required("type"): "shopping_list_manager/products/download_image",
vol.Required("image_url"): str,
vol.Required("product_name"): str,
}
)
@websocket_api.async_response
async def websocket_download_product_image(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Download a remote image and save it as WebP to the local images directory."""
raw_url: str = msg["image_url"]
product_name: str = msg["product_name"]
safe_stem = re.sub(r"[^a-z0-9_]", "", product_name.lower().replace(" ", "_")) or "product"
filename = f"{safe_stem}.webp"
images_dir = Path(hass.config.path(IMAGES_LOCAL_DIR))
images_dir.mkdir(parents=True, exist_ok=True)
dest = images_dir / filename
try:
session = async_get_clientsession(hass)
headers = {"User-Agent": "Mozilla/5.0 (compatible; HomeAssistant/ShoppingListManager)"}
async with session.get(raw_url, timeout=ClientTimeout(total=15), headers=headers) as resp:
if resp.status != 200:
connection.send_error(msg["id"], "download_failed", f"HTTP {resp.status}")
return
raw = await resp.read()
except Exception as exc: # noqa: BLE001
connection.send_error(msg["id"], "download_failed", str(exc))
return
try:
img = Image.open(io.BytesIO(raw))
# Convert to RGB for reliable lossy WebP encoding
# (RGBA, palette, grayscale modes can fail or produce oversized files)
if img.mode == "RGBA":
bg = Image.new("RGB", img.size, (255, 255, 255))
bg.paste(img, mask=img.split()[3])
img = bg
elif img.mode != "RGB":
img = img.convert("RGB")
img.thumbnail((IMAGE_SIZE, IMAGE_SIZE), Image.LANCZOS)
out = io.BytesIO()
img.save(out, format="WEBP", quality=IMAGE_QUALITY)
dest.write_bytes(out.getvalue())
except Exception as exc: # noqa: BLE001
connection.send_error(msg["id"], "conversion_failed", str(exc))
return
connection.send_result(
msg["id"],
{"local_url": f"{LOCAL_IMAGE_URL_PREFIX}{filename}"},
)
@websocket_api.websocket_command(
{
vol.Required("type"): "shopping_list_manager/products/search_by_barcode",
vol.Required("barcode"): str,
}
)
@callback
def websocket_search_by_barcode(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Find a single product by exact barcode match."""
storage = get_storage(hass)
barcode = msg["barcode"].strip()
match = next(
(p for p in storage._products.values() if p.barcode and p.barcode == barcode),
None,
)
connection.send_result(msg["id"], {"product": match.to_dict() if match else None})
@websocket_api.websocket_command( @websocket_api.websocket_command(
{ {
vol.Required("type"): "shopping_list_manager/products/substitutes", vol.Required("type"): "shopping_list_manager/products/substitutes",
@@ -844,6 +1059,27 @@ async def websocket_update_product(
) )
@websocket_api.websocket_command(
{
vol.Required("type"): WS_TYPE_PRODUCTS_DELETE,
vol.Required("product_id"): str,
}
)
@websocket_api.async_response
async def websocket_delete_product(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Handle delete product command."""
storage = get_storage(hass)
deleted = await storage.delete_product(msg["product_id"])
if not deleted:
connection.send_error(msg["id"], "not_found", "Product not found")
return
connection.send_result(msg["id"], {"deleted": True})
# ============================================================================= # =============================================================================
# CATEGORY HANDLERS # CATEGORY HANDLERS
# ============================================================================= # =============================================================================
@@ -869,3 +1105,383 @@ def websocket_get_categories(
"categories": [cat.to_dict() for cat in categories] "categories": [cat.to_dict() for cat in categories]
} }
) )
# =============================================================================
# OPENFOODFACTS PROXY HANDLERS
# =============================================================================
@websocket_api.websocket_command(
{
vol.Required("type"): WS_TYPE_OFF_FETCH,
vol.Optional("query"): str,
vol.Optional("barcode"): str,
vol.Optional("page_size", default=5): int,
}
)
@websocket_api.async_response
async def websocket_off_fetch(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Proxy OpenFoodFacts requests through HA to avoid browser CORS restrictions."""
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from aiohttp import ClientTimeout
session = async_get_clientsession(hass)
headers = {"User-Agent": "HomeAssistant/ShoppingListManager (contact@homeassistant.io)"}
try:
if msg.get("barcode"):
barcode = msg["barcode"]
fields = "product_name,categories_tags,image_front_thumb_url,image_front_url,image_url,price"
url = f"https://world.openfoodfacts.org/api/v2/product/{barcode}.json?fields={fields}"
async with session.get(url, timeout=ClientTimeout(total=10), headers=headers) as resp:
if not resp.ok:
connection.send_result(msg["id"], {"status": 0})
return
data = await resp.json(content_type=None)
connection.send_result(msg["id"], {
"status": data.get("status", 0),
"product": data.get("product"),
})
else:
query = msg.get("query", "")
page_size = msg.get("page_size", 5)
fields = "product_name,categories_tags,image_front_thumb_url,image_front_url,image_url,price"
url = (
f"https://world.openfoodfacts.org/api/v2/search"
f"?search_terms={query}&fields={fields}&page_size={page_size}"
)
async with session.get(url, timeout=ClientTimeout(total=10), headers=headers) as resp:
if not resp.ok:
connection.send_result(msg["id"], {"products": []})
return
data = await resp.json(content_type=None)
connection.send_result(msg["id"], {"products": data.get("products", [])})
except Exception as err:
_LOGGER.warning("OpenFoodFacts proxy request failed: %s", err)
connection.send_error(msg["id"], "fetch_failed", str(err))
# =============================================================================
# INTEGRATION SETTINGS HANDLERS
# =============================================================================
@websocket_api.websocket_command(
{
vol.Required("type"): "shopping_list_manager/get_integration_settings",
}
)
@callback
def websocket_get_integration_settings(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Return current country and available country options."""
country = hass.data[DOMAIN].get("country", "NZ")
version = hass.data[DOMAIN].get("version", "unknown")
connection.send_result(
msg["id"],
{
"country": country,
"version": version,
"available_countries": {
"NZ": "New Zealand",
"AU": "Australia",
"US": "United States",
"GB": "United Kingdom",
"CA": "Canada",
},
}
)
_VALID_COUNTRIES = ["NZ", "AU", "US", "GB", "CA"]
@websocket_api.websocket_command(
{
vol.Required("type"): "shopping_list_manager/set_country",
vol.Required("country"): vol.In(_VALID_COUNTRIES),
}
)
@websocket_api.async_response
async def websocket_set_country(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Switch to a different country catalog. Preserves user-added products."""
country = msg["country"].upper()
storage = get_storage(hass)
count = await storage.reload_catalog(country)
# Persist to HA config entry so country survives restart
entries = hass.config_entries.async_entries(DOMAIN)
if entries:
entry = entries[0]
hass.config_entries.async_update_entry(entry, options={**entry.options, "country": country})
hass.data[DOMAIN]["country"] = country
connection.send_result(
msg["id"],
{"success": True, "country": country, "products_loaded": count}
)
# =============================================================================
# BACKUP / RESTORE HANDLERS
# =============================================================================
@websocket_api.websocket_command(
{
vol.Required("type"): "shopping_list_manager/export_data",
}
)
@websocket_api.async_response
async def websocket_export_data(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Export all user-created data as a JSON-serialisable dict."""
storage = get_storage(hass)
data = await storage.export_user_data()
connection.send_result(msg["id"], data)
@websocket_api.websocket_command(
{
vol.Required("type"): "shopping_list_manager/import_data",
vol.Required("data"): dict,
}
)
@websocket_api.async_response
async def websocket_import_data(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Import user data from a backup payload."""
storage = get_storage(hass)
counts = await storage.import_user_data(msg["data"])
connection.send_result(msg["id"], {"success": True, "imported": counts})
# =============================================================================
# LIST MEMBERS HANDLER
# =============================================================================
@websocket_api.websocket_command(
{
vol.Required("type"): WS_TYPE_LISTS_UPDATE_MEMBERS,
vol.Required("list_id"): str,
vol.Required("allowed_users"): [str],
}
)
@websocket_api.async_response
async def websocket_update_list_members(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Update the allowed_users for a private list."""
storage = get_storage(hass)
list_id = msg["list_id"]
lst = storage.get_list(list_id)
if lst is None:
connection.send_error(msg["id"], "not_found", "List not found")
return
# Only the owner or an admin may manage members
user = connection.user
if lst.owner_id is not None and not (user and (user.is_admin or user.id == lst.owner_id)):
connection.send_error(msg["id"], "forbidden", "Only the list owner can manage members")
return
updated = await storage.update_list_members(list_id, msg["allowed_users"])
hass.bus.async_fire(
EVENT_LIST_UPDATED,
{"list_id": list_id, "action": "members_updated"}
)
connection.send_result(msg["id"], {"list": updated.to_dict()})
# =============================================================================
# HA USERS HANDLER
# =============================================================================
@websocket_api.websocket_command(
{
vol.Required("type"): WS_TYPE_USERS_GET_ALL,
}
)
@websocket_api.async_response
async def websocket_get_ha_users(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Return all active, non-system HA users."""
users = await hass.auth.async_get_users()
result = [
{"id": u.id, "name": u.name}
for u in users
if not u.system_generated and u.is_active
]
connection.send_result(msg["id"], {"users": result})
# =============================================================================
# LOYALTY CARD HANDLERS
# =============================================================================
@websocket_api.websocket_command({
vol.Required("type"): WS_TYPE_LOYALTY_GET_ALL,
})
@websocket_api.async_response
async def websocket_get_loyalty_cards(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Return all loyalty cards visible to the current user."""
storage = get_storage(hass)
user = connection.user
user_id = user.id if user else None
is_admin = user.is_admin if user else False
cards = storage.get_loyalty_cards(user_id=user_id, is_admin=is_admin)
connection.send_result(msg["id"], {"cards": [c.to_dict() for c in cards]})
@websocket_api.websocket_command({
vol.Required("type"): WS_TYPE_LOYALTY_ADD,
vol.Required("name"): str,
vol.Required("number"): str,
vol.Optional("barcode", default=""): str,
vol.Optional("barcode_type", default="barcode"): str,
vol.Optional("logo", default=""): str,
vol.Optional("notes", default=""): str,
vol.Optional("color", default="#9fa8da"): str,
vol.Optional("private", default=True): bool,
})
@websocket_api.async_response
async def websocket_add_loyalty_card(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Add a new loyalty card."""
storage = get_storage(hass)
user = connection.user
owner_id = user.id if (user and msg.get("private")) else None
card = await storage.create_loyalty_card(
owner_id=owner_id,
name=msg["name"],
number=msg["number"],
barcode=msg.get("barcode", ""),
barcode_type=msg.get("barcode_type", "barcode"),
logo=msg.get("logo", ""),
notes=msg.get("notes", ""),
color=msg.get("color", "#9fa8da"),
)
connection.send_result(msg["id"], {"card": card.to_dict()})
@websocket_api.websocket_command({
vol.Required("type"): WS_TYPE_LOYALTY_UPDATE,
vol.Required("card_id"): str,
vol.Optional("name"): str,
vol.Optional("number"): str,
vol.Optional("barcode"): str,
vol.Optional("barcode_type"): str,
vol.Optional("logo"): str,
vol.Optional("notes"): str,
vol.Optional("color"): str,
})
@websocket_api.async_response
async def websocket_update_loyalty_card(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Update an existing loyalty card."""
storage = get_storage(hass)
card_id = msg["card_id"]
card = storage.get_loyalty_card(card_id)
if card is None:
connection.send_error(msg["id"], "not_found", "Loyalty card not found")
return
user = connection.user
if card.owner_id is not None and not (user and (user.is_admin or user.id == card.owner_id)):
connection.send_error(msg["id"], "forbidden", "Only the card owner can update it")
return
fields = {k: v for k, v in msg.items() if k not in ("type", "id", "card_id")}
updated = await storage.update_loyalty_card(card_id, **fields)
connection.send_result(msg["id"], {"card": updated.to_dict()})
@websocket_api.websocket_command({
vol.Required("type"): WS_TYPE_LOYALTY_DELETE,
vol.Required("card_id"): str,
})
@websocket_api.async_response
async def websocket_delete_loyalty_card(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Delete a loyalty card."""
storage = get_storage(hass)
card_id = msg["card_id"]
card = storage.get_loyalty_card(card_id)
if card is None:
connection.send_error(msg["id"], "not_found", "Loyalty card not found")
return
user = connection.user
if card.owner_id is not None and not (user and (user.is_admin or user.id == card.owner_id)):
connection.send_error(msg["id"], "forbidden", "Only the card owner can delete it")
return
await storage.delete_loyalty_card(card_id)
connection.send_result(msg["id"], {"success": True})
@websocket_api.websocket_command({
vol.Required("type"): WS_TYPE_LOYALTY_UPDATE_MEMBERS,
vol.Required("card_id"): str,
vol.Required("allowed_users"): [str],
})
@websocket_api.async_response
async def websocket_update_loyalty_card_members(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Update the allowed_users for a private loyalty card."""
storage = get_storage(hass)
card_id = msg["card_id"]
card = storage.get_loyalty_card(card_id)
if card is None:
connection.send_error(msg["id"], "not_found", "Loyalty card not found")
return
user = connection.user
if card.owner_id is not None and not (user and (user.is_admin or user.id == card.owner_id)):
connection.send_error(msg["id"], "forbidden", "Only the card owner can manage members")
return
updated = await storage.update_loyalty_card_members(card_id, msg["allowed_users"])
connection.send_result(msg["id"], {"card": updated.to_dict()})