mirror of
https://github.com/thekiwismarthome/shopping-list-manager.git
synced 2026-06-30 21:46:30 +00:00
Compare commits
33 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e7306275e4 | |||
| 9430811cda | |||
| 2b11632253 | |||
| ae7717a8eb | |||
| 433c03035b | |||
| 8eb403ed8e | |||
| 03fb9a9a67 | |||
| ae133ae59b | |||
| 2a8b12a07e | |||
| a36933c4c6 | |||
| 402881c687 | |||
| d412764cba | |||
| 752f9e5622 | |||
| 86896ba4af | |||
| 36a8939ebc | |||
| 57b6d52ddf | |||
| 03249df651 | |||
| 11180db0e3 | |||
| 9bdaea0b1b | |||
| ec0f44f109 | |||
| 88b3f2d435 | |||
| d93ea86d72 | |||
| 5b3dcb65b4 | |||
| 94cdede3b9 | |||
| e6117073be | |||
| e76fad5a92 | |||
| bcfde6df9a | |||
| 85e0e68af9 | |||
| 11eb698c20 | |||
| 0e3fcd56f5 | |||
| 9d8fd3f63e | |||
| c9c1d16f08 | |||
| 3b0cff3476 |
@@ -1,33 +1,98 @@
|
||||
## 1. Installation (HACS)
|
||||
# Shopping List Manager Integration for Home Assistant
|
||||
|
||||
### Recommended
|
||||
The backend integration that powers the Shopping List Manager. Provides persistent multi-list storage, a 500+ product catalog, real-time WebSocket events, and a full API for the Lovelace card — all running natively inside Home Assistant.
|
||||
|
||||
> **Pair with the [Shopping List Manager Card](https://github.com/thekiwismarthome/shopping-list-manager-card)** for the full UI experience.
|
||||
|
||||
[](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager&category=integration)
|
||||
|
||||
1. Click the button above.
|
||||
2. Confirm adding the repository to HACS.
|
||||
3. Install **Shopping List Manager** from **HACS → Integrations**.
|
||||
4. Restart Home Assistant.
|
||||
---
|
||||
|
||||
## Features
|
||||
|
||||
### 🛒 Multi-List Management
|
||||
- Create and manage multiple shopping lists
|
||||
- Private or shared lists with per-member access control
|
||||
- Active list state shared across all connected devices and users
|
||||
- List total price calculation
|
||||
|
||||
### 📦 Items
|
||||
- Add, update, check, and delete items with quantity and unit
|
||||
- Atomic quantity increment / decrement
|
||||
- Bulk check and clear checked items
|
||||
- Per-item pricing, notes, and category assignment
|
||||
|
||||
### 🔍 Product Catalog
|
||||
- **500+ products** (NZ-focused, extensible to AU, US, GB, CA)
|
||||
- Fuzzy search with alias matching
|
||||
- Recently-used product suggestions
|
||||
- Custom product creation
|
||||
- Allergen filtering and product substitute groups
|
||||
- Product images (WebP, 200×200px, optimised)
|
||||
|
||||
### 🗂️ Categories
|
||||
- 13 default categories — Produce, Dairy, Meat, Bakery, Pantry, Frozen, Beverages, Snacks, Household, Health, Pet, Baby, Other
|
||||
- Category colour coding and emoji icons
|
||||
- Per-list category ordering
|
||||
|
||||
### 💳 Loyalty Cards
|
||||
- Store loyalty and rewards card data
|
||||
- Private or shared card access per user
|
||||
|
||||
### 🔄 Real-Time Events
|
||||
- All changes fire events on the Home Assistant bus
|
||||
- Custom WebSocket subscription proxy so **non-admin users** receive live updates without requiring HA admin privileges
|
||||
|
||||
---
|
||||
|
||||
### Manual Repository URL
|
||||
## Requirements
|
||||
|
||||
https://github.com/thekiwismarthome/shopping-list-manager
|
||||
|
||||
Repository type: **Integration**
|
||||
| Component | Minimum Version |
|
||||
|---|---|
|
||||
| Home Assistant | 2024.1 |
|
||||
| HACS | 2.x |
|
||||
|
||||
---
|
||||
|
||||
## 2. Manual Installation (Optional)
|
||||
## Installation
|
||||
|
||||
1. Copy the folder:
|
||||
custom_components/shopping_list_manager
|
||||
### Via HACS (Recommended)
|
||||
|
||||
2. Paste it into:
|
||||
/config/custom_components/
|
||||
[](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager&category=integration)
|
||||
|
||||
3. Restart Home Assistant.
|
||||
1. Click the button above
|
||||
2. Confirm adding the repository to HACS
|
||||
3. Install **Shopping List Manager** from **HACS → Integrations**
|
||||
4. Restart Home Assistant
|
||||
5. Go to **Settings → Devices & Services → Add Integration** and search for **Shopping List Manager**
|
||||
|
||||
### Manual Installation
|
||||
|
||||
1. Copy the `custom_components/shopping_list_manager/` folder into your HA `/config/custom_components/` directory
|
||||
2. Restart Home Assistant
|
||||
3. Go to **Settings → Devices & Services → Add Integration** and search for **Shopping List Manager**
|
||||
|
||||
---
|
||||
|
||||
## Lovelace Card
|
||||
|
||||
Install the companion card to get the full shopping UI:
|
||||
|
||||
## 3. Shopping List Card to go with this Integration
|
||||
[](https://my.home-assistant.io/redirect/hacs_repository/?owner=thekiwismarthome&repository=shopping-list-manager-card&category=plugin)
|
||||
|
||||
---
|
||||
|
||||
## Documentation
|
||||
|
||||
Full documentation is available in the [Wiki](https://github.com/thekiwismarthome/shopping-list-manager/wiki).
|
||||
|
||||
## Support & Feedback
|
||||
|
||||
- [Open an Issue](https://github.com/thekiwismarthome/shopping-list-manager/issues)
|
||||
- [Home Assistant Community Forum](https://community.home-assistant.io)
|
||||
|
||||
---
|
||||
|
||||
## License
|
||||
|
||||
MIT — see [LICENSE](LICENSE) for details.
|
||||
|
||||
@@ -2,11 +2,12 @@
|
||||
import logging
|
||||
import os
|
||||
|
||||
from pathlib import Path
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
|
||||
from .const import DOMAIN
|
||||
from .const import DOMAIN, EVENT_ITEM_ADDED, EVENT_ITEM_UPDATED, EVENT_ITEM_CHECKED, EVENT_ITEM_DELETED, EVENT_LIST_UPDATED, EVENT_LIST_DELETED
|
||||
from .storage import ShoppingListStorage
|
||||
from .utils.images import ImageHandler
|
||||
|
||||
@@ -23,7 +24,6 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
# In async_setup_entry function, after storage initialization:
|
||||
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
"""Set up Shopping List Manager from a config entry."""
|
||||
_LOGGER.info("Setting up Shopping List Manager")
|
||||
@@ -58,7 +58,21 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
# Register frontend resources
|
||||
await _async_register_frontend(hass)
|
||||
|
||||
_LOGGER.info("Shopping List Manager setup complete")
|
||||
# CRITICAL: Register event listeners so non-admin users can subscribe
|
||||
# Without these, non-admin users cannot receive real-time updates
|
||||
def _dummy_listener(event):
|
||||
"""Dummy listener to enable event subscription for all users."""
|
||||
pass
|
||||
|
||||
hass.bus.async_listen(EVENT_ITEM_ADDED, _dummy_listener)
|
||||
hass.bus.async_listen(EVENT_ITEM_UPDATED, _dummy_listener)
|
||||
hass.bus.async_listen(EVENT_ITEM_CHECKED, _dummy_listener)
|
||||
hass.bus.async_listen(EVENT_ITEM_DELETED, _dummy_listener)
|
||||
hass.bus.async_listen(EVENT_LIST_UPDATED, _dummy_listener)
|
||||
hass.bus.async_listen(EVENT_LIST_DELETED, _dummy_listener)
|
||||
|
||||
_LOGGER.info("Shopping List Manager setup complete with event subscriptions enabled")
|
||||
|
||||
return True
|
||||
|
||||
|
||||
@@ -88,6 +102,10 @@ async def _async_register_websocket_handlers(
|
||||
from .websocket import handlers
|
||||
|
||||
# Lists handlers
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_subscribe,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_get_lists,
|
||||
@@ -126,6 +144,11 @@ async def _async_register_websocket_handlers(
|
||||
hass,
|
||||
handlers.websocket_check_item,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_increment_item,
|
||||
)
|
||||
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_delete_item,
|
||||
@@ -148,10 +171,23 @@ async def _async_register_websocket_handlers(
|
||||
)
|
||||
|
||||
# Products handlers
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_download_product_image,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_search_by_barcode,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_search_products,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.ws_get_products_by_ids,
|
||||
)
|
||||
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_get_product_suggestions,
|
||||
@@ -175,6 +211,60 @@ async def _async_register_websocket_handlers(
|
||||
handlers.websocket_get_categories,
|
||||
)
|
||||
|
||||
# Integration settings handlers
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_get_integration_settings,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_set_country,
|
||||
)
|
||||
|
||||
# Backup / Restore handlers
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_export_data,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_import_data,
|
||||
)
|
||||
|
||||
# List members handler
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_update_list_members,
|
||||
)
|
||||
|
||||
# HA users handler
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_get_ha_users,
|
||||
)
|
||||
|
||||
# Loyalty card handlers
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_get_loyalty_cards,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_add_loyalty_card,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_update_loyalty_card,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_delete_loyalty_card,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_update_loyalty_card_members,
|
||||
)
|
||||
|
||||
_LOGGER.debug("WebSocket handlers registered")
|
||||
|
||||
|
||||
@@ -184,8 +274,6 @@ async def _async_register_frontend(hass: HomeAssistant) -> None:
|
||||
# The frontend card registers itself independently
|
||||
_LOGGER.debug("Frontend resources skipped (separate HACS module)")
|
||||
|
||||
_LOGGER.debug("Frontend resources registered")
|
||||
|
||||
|
||||
def get_storage(hass: HomeAssistant) -> ShoppingListStorage:
|
||||
"""Get the storage instance from hass.data.
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -9,6 +9,7 @@ STORAGE_KEY_LISTS = f"{DOMAIN}.lists"
|
||||
STORAGE_KEY_ITEMS = f"{DOMAIN}.items"
|
||||
STORAGE_KEY_PRODUCTS = f"{DOMAIN}.products"
|
||||
STORAGE_KEY_CATEGORIES = f"{DOMAIN}.categories"
|
||||
STORAGE_KEY_LOYALTY_CARDS = f"{DOMAIN}.loyalty_cards"
|
||||
|
||||
# WebSocket Commands - Lists
|
||||
WS_TYPE_LISTS_GET_ALL = f"{DOMAIN}/lists/get_all"
|
||||
@@ -16,6 +17,10 @@ WS_TYPE_LISTS_CREATE = f"{DOMAIN}/lists/create"
|
||||
WS_TYPE_LISTS_UPDATE = f"{DOMAIN}/lists/update"
|
||||
WS_TYPE_LISTS_DELETE = f"{DOMAIN}/lists/delete"
|
||||
WS_TYPE_LISTS_SET_ACTIVE = f"{DOMAIN}/lists/set_active"
|
||||
WS_TYPE_LISTS_UPDATE_MEMBERS = f"{DOMAIN}/lists/update_members"
|
||||
|
||||
# WebSocket Commands - Users
|
||||
WS_TYPE_USERS_GET_ALL = f"{DOMAIN}/users/get_all"
|
||||
|
||||
# WebSocket Commands - Items
|
||||
WS_TYPE_ITEMS_GET = f"{DOMAIN}/items/get"
|
||||
@@ -39,6 +44,13 @@ WS_TYPE_PRODUCTS_DELETE = f"{DOMAIN}/products/delete"
|
||||
WS_TYPE_CATEGORIES_GET_ALL = f"{DOMAIN}/categories/get_all"
|
||||
WS_TYPE_CATEGORIES_REORDER = f"{DOMAIN}/categories/reorder"
|
||||
|
||||
# WebSocket Commands - Loyalty Cards
|
||||
WS_TYPE_LOYALTY_GET_ALL = f"{DOMAIN}/loyalty/get_all"
|
||||
WS_TYPE_LOYALTY_ADD = f"{DOMAIN}/loyalty/add"
|
||||
WS_TYPE_LOYALTY_UPDATE = f"{DOMAIN}/loyalty/update"
|
||||
WS_TYPE_LOYALTY_DELETE = f"{DOMAIN}/loyalty/delete"
|
||||
WS_TYPE_LOYALTY_UPDATE_MEMBERS = f"{DOMAIN}/loyalty/update_members"
|
||||
|
||||
# WebSocket Commands - Subscriptions
|
||||
WS_TYPE_SUBSCRIBE = f"{DOMAIN}/subscribe"
|
||||
WS_TYPE_UNSUBSCRIBE = f"{DOMAIN}/unsubscribe"
|
||||
@@ -64,7 +76,10 @@ IMAGE_FORMAT = "webp"
|
||||
IMAGE_SIZE = 200 # 200x200px
|
||||
IMAGE_QUALITY = 85
|
||||
IMAGE_MAX_SIZE_KB = 15
|
||||
IMAGES_LOCAL_DIR = "www/shopping_list_manager/images"
|
||||
IMAGES_LOCAL_DIR = "www/images/shopping_list_manager"
|
||||
LEGACY_IMAGES_LOCAL_DIR = "www/shopping_list_manager/images"
|
||||
LOCAL_IMAGE_URL_PREFIX = "/local/images/shopping_list_manager/"
|
||||
LEGACY_IMAGE_URL_PREFIX = "/local/shopping_list_manager/images/"
|
||||
|
||||
# Placeholder image (inline SVG)
|
||||
PLACEHOLDER_IMAGE = "data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='200' height='200'%3E%3Crect width='200' height='200' fill='%23f0f0f0'/%3E%3Ctext x='50%25' y='50%25' dominant-baseline='middle' text-anchor='middle' font-family='Arial' font-size='16' fill='%23999'%3ENo Image%3C/text%3E%3C/svg%3E"
|
||||
|
||||
BIN
Binary file not shown.
BIN
Binary file not shown.
@@ -78,7 +78,7 @@
|
||||
"id": "health",
|
||||
"name": "Health & Beauty",
|
||||
"icon": "mdi:heart-pulse",
|
||||
"color": "#E91E63",
|
||||
"color": "#009688",
|
||||
"sort_order": 10,
|
||||
"system": true
|
||||
},
|
||||
@@ -94,7 +94,7 @@
|
||||
"id": "baby",
|
||||
"name": "Baby",
|
||||
"icon": "mdi:baby-face",
|
||||
"color": "#FFEB3B",
|
||||
"color": "#F48FB1",
|
||||
"sort_order": 12,
|
||||
"system": true
|
||||
},
|
||||
|
||||
@@ -96,6 +96,28 @@ class Item:
|
||||
self.estimated_total = self.quantity * self.price
|
||||
|
||||
|
||||
@dataclass
|
||||
class LoyaltyCard:
|
||||
"""Loyalty card model."""
|
||||
id: str
|
||||
name: str
|
||||
number: str
|
||||
barcode: str = ""
|
||||
barcode_type: str = "barcode" # "barcode" or "qrcode"
|
||||
logo: str = ""
|
||||
notes: str = ""
|
||||
color: str = "#9fa8da"
|
||||
created_at: str = field(default_factory=current_timestamp)
|
||||
updated_at: str = field(default_factory=current_timestamp)
|
||||
# Ownership: None = visible to all users; set = private to owner + allowed_users
|
||||
owner_id: Optional[str] = None
|
||||
allowed_users: List[str] = field(default_factory=list)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary."""
|
||||
return asdict(self)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ShoppingList:
|
||||
"""Shopping list model."""
|
||||
@@ -107,6 +129,9 @@ class ShoppingList:
|
||||
item_order: List[str] = field(default_factory=list)
|
||||
category_order: List[str] = field(default_factory=list)
|
||||
active: bool = False
|
||||
# Ownership: None = visible to all users; set = private to owner + allowed_users
|
||||
owner_id: Optional[str] = None
|
||||
allowed_users: List[str] = field(default_factory=list)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert to dictionary."""
|
||||
|
||||
@@ -1,5 +1,10 @@
|
||||
"""Storage management for Shopping List Manager."""
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Any
|
||||
from .utils.search import ProductSearch
|
||||
from homeassistant.core import HomeAssistant
|
||||
@@ -11,9 +16,14 @@ from .const import (
|
||||
STORAGE_KEY_ITEMS,
|
||||
STORAGE_KEY_PRODUCTS,
|
||||
STORAGE_KEY_CATEGORIES,
|
||||
STORAGE_KEY_LOYALTY_CARDS,
|
||||
IMAGES_LOCAL_DIR,
|
||||
LEGACY_IMAGES_LOCAL_DIR,
|
||||
LOCAL_IMAGE_URL_PREFIX,
|
||||
LEGACY_IMAGE_URL_PREFIX,
|
||||
)
|
||||
from .data.catalog_loader import load_product_catalog
|
||||
from .models import ShoppingList, Item, Product, Category, generate_id
|
||||
from .models import ShoppingList, Item, Product, Category, LoyaltyCard, generate_id
|
||||
from .data.category_loader import load_categories
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@@ -37,12 +47,16 @@ class ShoppingListStorage:
|
||||
self._store_items = Store(hass, STORAGE_VERSION, STORAGE_KEY_ITEMS)
|
||||
self._store_products = Store(hass, STORAGE_VERSION, STORAGE_KEY_PRODUCTS)
|
||||
self._store_categories = Store(hass, STORAGE_VERSION, STORAGE_KEY_CATEGORIES)
|
||||
self._store_loyalty_cards = Store(hass, STORAGE_VERSION, STORAGE_KEY_LOYALTY_CARDS)
|
||||
|
||||
self._lists: Dict[str, ShoppingList] = {}
|
||||
self._items: Dict[str, List[Item]] = {}
|
||||
self._products: Dict[str, Product] = {}
|
||||
self._categories: List[Category] = []
|
||||
self._loyalty_cards: Dict[str, LoyaltyCard] = {}
|
||||
self._search_engine: Optional[ProductSearch] = None
|
||||
self._images_dir = Path(hass.config.path(IMAGES_LOCAL_DIR))
|
||||
self._legacy_images_dir = Path(hass.config.path(LEGACY_IMAGES_LOCAL_DIR))
|
||||
|
||||
async def async_load(self) -> None:
|
||||
"""Load data from storage."""
|
||||
@@ -84,6 +98,10 @@ class ShoppingListStorage:
|
||||
}
|
||||
_LOGGER.debug("Loaded %d products", len(self._products))
|
||||
|
||||
# Ensure image directory exists and migrate legacy image paths/URLs.
|
||||
self._images_dir.mkdir(parents=True, exist_ok=True)
|
||||
await self._migrate_legacy_images_and_urls()
|
||||
|
||||
# Load categories
|
||||
categories_data = await self._store_categories.async_load()
|
||||
if categories_data:
|
||||
@@ -141,6 +159,15 @@ class ShoppingListStorage:
|
||||
await self._save_products()
|
||||
_LOGGER.info("Successfully imported %d products from catalog", len(self._products))
|
||||
|
||||
# Load loyalty cards
|
||||
loyalty_data = await self._store_loyalty_cards.async_load()
|
||||
if loyalty_data:
|
||||
self._loyalty_cards = {
|
||||
card_id: LoyaltyCard(**card_data)
|
||||
for card_id, card_data in loyalty_data.items()
|
||||
}
|
||||
_LOGGER.debug("Loaded %d loyalty cards", len(self._loyalty_cards))
|
||||
|
||||
# Initialize search engine after products are loaded
|
||||
if self._products:
|
||||
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
|
||||
@@ -156,9 +183,21 @@ class ShoppingListStorage:
|
||||
data = {list_id: lst.to_dict() for list_id, lst in self._lists.items()}
|
||||
await self._store_lists.async_save(data)
|
||||
|
||||
def get_lists(self) -> List[ShoppingList]:
|
||||
"""Get all lists."""
|
||||
return list(self._lists.values())
|
||||
def get_lists(self, user_id: str = None, is_admin: bool = False) -> List[ShoppingList]:
|
||||
"""Get lists visible to the specified user.
|
||||
|
||||
Global lists (owner_id=None) are visible to everyone.
|
||||
Private lists are visible to their owner, anyone in allowed_users, and admins.
|
||||
"""
|
||||
all_lists = list(self._lists.values())
|
||||
if is_admin or user_id is None:
|
||||
return all_lists
|
||||
return [
|
||||
lst for lst in all_lists
|
||||
if lst.owner_id is None
|
||||
or lst.owner_id == user_id
|
||||
or user_id in (lst.allowed_users or [])
|
||||
]
|
||||
|
||||
def get_list(self, list_id: str) -> Optional[ShoppingList]:
|
||||
"""Get a specific list."""
|
||||
@@ -171,17 +210,19 @@ class ShoppingListStorage:
|
||||
return lst
|
||||
return None
|
||||
|
||||
async def create_list(self, name: str, icon: str = "mdi:cart") -> ShoppingList:
|
||||
"""Create a new list."""
|
||||
async def create_list(self, name: str, icon: str = "mdi:cart", owner_id: str = None) -> ShoppingList:
|
||||
"""Create a new list. Pass owner_id to make the list private to that user."""
|
||||
new_list = ShoppingList(
|
||||
id=generate_id(),
|
||||
name=name,
|
||||
icon=icon,
|
||||
category_order=[cat.id for cat in self._categories]
|
||||
category_order=[cat.id for cat in self._categories],
|
||||
owner_id=owner_id,
|
||||
)
|
||||
self._lists[new_list.id] = new_list
|
||||
self._items[new_list.id] = []
|
||||
await self._save_lists()
|
||||
await self._write_config_backup()
|
||||
_LOGGER.info("Created new list: %s", name)
|
||||
return new_list
|
||||
|
||||
@@ -202,6 +243,18 @@ class ShoppingListStorage:
|
||||
_LOGGER.debug("Updated list: %s", list_id)
|
||||
return lst
|
||||
|
||||
async def update_list_members(self, list_id: str, allowed_users: List[str]) -> Optional[ShoppingList]:
|
||||
"""Update the allowed_users for a private list."""
|
||||
if list_id not in self._lists:
|
||||
return None
|
||||
lst = self._lists[list_id]
|
||||
lst.allowed_users = allowed_users
|
||||
from .models import current_timestamp
|
||||
lst.updated_at = current_timestamp()
|
||||
await self._save_lists()
|
||||
_LOGGER.debug("Updated members for list: %s", list_id)
|
||||
return lst
|
||||
|
||||
async def delete_list(self, list_id: str) -> bool:
|
||||
"""Delete a list."""
|
||||
if list_id not in self._lists:
|
||||
@@ -379,6 +432,56 @@ class ShoppingListStorage:
|
||||
data = {product_id: product.to_dict() for product_id, product in self._products.items()}
|
||||
await self._store_products.async_save(data)
|
||||
|
||||
async def _migrate_legacy_images_and_urls(self) -> None:
|
||||
"""Move old image files and rewrite stored URLs to the new path."""
|
||||
moved_files = 0
|
||||
updated_product_urls = 0
|
||||
updated_item_urls = 0
|
||||
|
||||
if self._legacy_images_dir.exists() and self._legacy_images_dir != self._images_dir:
|
||||
for src in self._legacy_images_dir.glob("*"):
|
||||
if not src.is_file():
|
||||
continue
|
||||
dest = self._images_dir / src.name
|
||||
if dest.exists():
|
||||
continue
|
||||
try:
|
||||
shutil.move(str(src), str(dest))
|
||||
moved_files += 1
|
||||
except Exception as err:
|
||||
_LOGGER.debug("Could not move legacy image %s: %s", src, err)
|
||||
|
||||
for product in self._products.values():
|
||||
image_url = product.image_url or ""
|
||||
if image_url.startswith(LEGACY_IMAGE_URL_PREFIX):
|
||||
product.image_url = image_url.replace(
|
||||
LEGACY_IMAGE_URL_PREFIX, LOCAL_IMAGE_URL_PREFIX, 1
|
||||
)
|
||||
updated_product_urls += 1
|
||||
|
||||
for items in self._items.values():
|
||||
for item in items:
|
||||
image_url = item.image_url or ""
|
||||
if image_url.startswith(LEGACY_IMAGE_URL_PREFIX):
|
||||
item.image_url = image_url.replace(
|
||||
LEGACY_IMAGE_URL_PREFIX, LOCAL_IMAGE_URL_PREFIX, 1
|
||||
)
|
||||
updated_item_urls += 1
|
||||
|
||||
if updated_product_urls:
|
||||
await self._save_products()
|
||||
if updated_item_urls:
|
||||
await self._save_items()
|
||||
|
||||
if moved_files or updated_product_urls or updated_item_urls:
|
||||
_LOGGER.info(
|
||||
"Migrated shopping list images to %s (moved_files=%d, updated_product_urls=%d, updated_item_urls=%d)",
|
||||
IMAGES_LOCAL_DIR,
|
||||
moved_files,
|
||||
updated_product_urls,
|
||||
updated_item_urls,
|
||||
)
|
||||
|
||||
def get_products(self) -> List[Product]:
|
||||
"""Get all products."""
|
||||
return list(self._products.values())
|
||||
@@ -460,9 +563,61 @@ class ShoppingListStorage:
|
||||
)
|
||||
self._products[new_product.id] = new_product
|
||||
await self._save_products()
|
||||
await self._write_config_backup()
|
||||
# Rebuild search engine so the new product is immediately searchable
|
||||
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
|
||||
self._search_engine = ProductSearch(products_dict)
|
||||
_LOGGER.debug("Added product: %s", new_product.name)
|
||||
return new_product
|
||||
|
||||
async def reload_catalog(self, country_code: str) -> int:
|
||||
"""Replace catalog-sourced products with those from a new country's catalog.
|
||||
Products with source='user' are preserved."""
|
||||
catalog_ids = [
|
||||
pid for pid, p in self._products.items()
|
||||
if getattr(p, 'source', 'user') == 'catalog'
|
||||
]
|
||||
for pid in catalog_ids:
|
||||
del self._products[pid]
|
||||
|
||||
self._country = country_code
|
||||
catalog_products = await load_product_catalog(self._component_path, country_code)
|
||||
count = 0
|
||||
for prod_data in catalog_products:
|
||||
try:
|
||||
product = Product(
|
||||
id=prod_data.get("id", generate_id()),
|
||||
name=prod_data["name"],
|
||||
category_id=prod_data.get("category_id", "other"),
|
||||
aliases=prod_data.get("aliases", []),
|
||||
default_unit=prod_data.get("default_unit", "units"),
|
||||
default_quantity=prod_data.get("default_quantity", 1),
|
||||
price=prod_data.get("price") or prod_data.get("typical_price"),
|
||||
currency=self.hass.config.currency,
|
||||
barcode=prod_data.get("barcode"),
|
||||
brands=prod_data.get("brands", []),
|
||||
image_url=prod_data.get("image_url", ""),
|
||||
custom=False,
|
||||
source="catalog",
|
||||
tags=prod_data.get("tags", []),
|
||||
collections=prod_data.get("collections", []),
|
||||
taxonomy=prod_data.get("taxonomy", {}),
|
||||
allergens=prod_data.get("allergens", []),
|
||||
substitution_group=prod_data.get("substitution_group", ""),
|
||||
priority_level=prod_data.get("priority_level", 0),
|
||||
image_hint=prod_data.get("image_hint", "")
|
||||
)
|
||||
self._products[product.id] = product
|
||||
count += 1
|
||||
except Exception as err:
|
||||
_LOGGER.error("Failed to import product %s: %s", prod_data.get("name"), err)
|
||||
|
||||
await self._save_products()
|
||||
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
|
||||
self._search_engine = ProductSearch(products_dict)
|
||||
_LOGGER.info("Reloaded catalog for %s: %d products imported", country_code, count)
|
||||
return count
|
||||
|
||||
async def update_product(self, product_id: str, **kwargs) -> Optional[Product]:
|
||||
"""Update a product."""
|
||||
if product_id not in self._products:
|
||||
@@ -474,9 +629,102 @@ class ShoppingListStorage:
|
||||
setattr(product, key, value)
|
||||
|
||||
await self._save_products()
|
||||
await self._write_config_backup()
|
||||
_LOGGER.debug("Updated product: %s", product_id)
|
||||
return product
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Backup / Restore
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def export_user_data(self) -> dict:
|
||||
"""Return a serialisable snapshot of all user-created data."""
|
||||
user_products = [
|
||||
p.to_dict() for p in self._products.values()
|
||||
if getattr(p, "source", "user") == "user"
|
||||
]
|
||||
lists = [lst.to_dict() for lst in self._lists.values()]
|
||||
items = {
|
||||
list_id: [item.to_dict() for item in items_list]
|
||||
for list_id, items_list in self._items.items()
|
||||
}
|
||||
return {
|
||||
"slm_backup_version": "1.0",
|
||||
"exported_at": datetime.now(timezone.utc).isoformat(),
|
||||
"country": self._country,
|
||||
"user_products": user_products,
|
||||
"lists": lists,
|
||||
"items": items,
|
||||
}
|
||||
|
||||
async def import_user_data(self, data: dict) -> dict:
|
||||
"""Merge a backup into live storage. Skips anything already present by ID."""
|
||||
imported_products = 0
|
||||
imported_lists = 0
|
||||
imported_items = 0
|
||||
|
||||
for prod_data in data.get("user_products", []):
|
||||
prod_id = prod_data.get("id")
|
||||
if prod_id and prod_id not in self._products:
|
||||
try:
|
||||
self._products[prod_id] = Product(**prod_data)
|
||||
imported_products += 1
|
||||
except Exception as err:
|
||||
_LOGGER.warning("Skipped product during import: %s", err)
|
||||
|
||||
if imported_products:
|
||||
await self._save_products()
|
||||
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
|
||||
self._search_engine = ProductSearch(products_dict)
|
||||
|
||||
for list_data in data.get("lists", []):
|
||||
list_id = list_data.get("id")
|
||||
if list_id and list_id not in self._lists:
|
||||
try:
|
||||
lst = ShoppingList(**list_data)
|
||||
lst.active = False
|
||||
self._lists[list_id] = lst
|
||||
imported_lists += 1
|
||||
except Exception as err:
|
||||
_LOGGER.warning("Skipped list during import: %s", err)
|
||||
|
||||
backup_items = data.get("items", {})
|
||||
for list_id, items_list in backup_items.items():
|
||||
if list_id in self._lists and list_id not in self._items:
|
||||
try:
|
||||
self._items[list_id] = [Item(**d) for d in items_list]
|
||||
imported_items += len(self._items[list_id])
|
||||
except Exception as err:
|
||||
_LOGGER.warning("Skipped items for list %s: %s", list_id, err)
|
||||
|
||||
if imported_lists or imported_items:
|
||||
await self._save_lists()
|
||||
await self._save_items()
|
||||
|
||||
_LOGGER.info(
|
||||
"Import complete: %d products, %d lists, %d items",
|
||||
imported_products, imported_lists, imported_items,
|
||||
)
|
||||
return {"products": imported_products, "lists": imported_lists, "items": imported_items}
|
||||
|
||||
async def _write_config_backup(self) -> None:
|
||||
"""Silently write a backup JSON to the HA config directory."""
|
||||
try:
|
||||
backup_path = os.path.join(
|
||||
self.hass.config.config_dir,
|
||||
"shopping_list_manager_backup.json",
|
||||
)
|
||||
data = await self.export_user_data()
|
||||
|
||||
def _write() -> None:
|
||||
with open(backup_path, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, indent=2, ensure_ascii=False)
|
||||
|
||||
await self.hass.async_add_executor_job(_write)
|
||||
_LOGGER.debug("Auto-backup written to %s", backup_path)
|
||||
except Exception as err:
|
||||
_LOGGER.warning("Failed to write config backup: %s", err)
|
||||
|
||||
# Categories methods
|
||||
async def _save_categories(self) -> None:
|
||||
"""Save categories to storage."""
|
||||
@@ -486,3 +734,81 @@ class ShoppingListStorage:
|
||||
def get_categories(self) -> List[Category]:
|
||||
"""Get all categories."""
|
||||
return self._categories
|
||||
|
||||
# Loyalty card methods
|
||||
async def _save_loyalty_cards(self) -> None:
|
||||
"""Save loyalty cards to storage."""
|
||||
data = {card_id: card.to_dict() for card_id, card in self._loyalty_cards.items()}
|
||||
await self._store_loyalty_cards.async_save(data)
|
||||
|
||||
def get_loyalty_cards(self, user_id: str = None, is_admin: bool = False) -> List[LoyaltyCard]:
|
||||
"""Get loyalty cards visible to the specified user.
|
||||
|
||||
Global cards (owner_id=None) are visible to everyone.
|
||||
Private cards are visible to their owner, anyone in allowed_users, and admins.
|
||||
"""
|
||||
all_cards = list(self._loyalty_cards.values())
|
||||
if is_admin or user_id is None:
|
||||
return all_cards
|
||||
return [
|
||||
card for card in all_cards
|
||||
if card.owner_id is None
|
||||
or card.owner_id == user_id
|
||||
or user_id in (card.allowed_users or [])
|
||||
]
|
||||
|
||||
def get_loyalty_card(self, card_id: str) -> Optional[LoyaltyCard]:
|
||||
"""Get a specific loyalty card."""
|
||||
return self._loyalty_cards.get(card_id)
|
||||
|
||||
async def create_loyalty_card(self, owner_id: str = None, **kwargs) -> LoyaltyCard:
|
||||
"""Create a new loyalty card."""
|
||||
from .models import current_timestamp
|
||||
new_card = LoyaltyCard(
|
||||
id=generate_id(),
|
||||
owner_id=owner_id,
|
||||
**kwargs
|
||||
)
|
||||
self._loyalty_cards[new_card.id] = new_card
|
||||
await self._save_loyalty_cards()
|
||||
_LOGGER.debug("Created loyalty card: %s", new_card.name)
|
||||
return new_card
|
||||
|
||||
async def update_loyalty_card(self, card_id: str, **kwargs) -> Optional[LoyaltyCard]:
|
||||
"""Update a loyalty card."""
|
||||
if card_id not in self._loyalty_cards:
|
||||
return None
|
||||
|
||||
card = self._loyalty_cards[card_id]
|
||||
for key, value in kwargs.items():
|
||||
if hasattr(card, key):
|
||||
setattr(card, key, value)
|
||||
|
||||
from .models import current_timestamp
|
||||
card.updated_at = current_timestamp()
|
||||
await self._save_loyalty_cards()
|
||||
_LOGGER.debug("Updated loyalty card: %s", card_id)
|
||||
return card
|
||||
|
||||
async def delete_loyalty_card(self, card_id: str) -> bool:
|
||||
"""Delete a loyalty card."""
|
||||
if card_id not in self._loyalty_cards:
|
||||
return False
|
||||
|
||||
del self._loyalty_cards[card_id]
|
||||
await self._save_loyalty_cards()
|
||||
_LOGGER.debug("Deleted loyalty card: %s", card_id)
|
||||
return True
|
||||
|
||||
async def update_loyalty_card_members(self, card_id: str, allowed_users: List[str]) -> Optional[LoyaltyCard]:
|
||||
"""Update the allowed_users for a private loyalty card."""
|
||||
if card_id not in self._loyalty_cards:
|
||||
return None
|
||||
|
||||
card = self._loyalty_cards[card_id]
|
||||
card.allowed_users = allowed_users
|
||||
from .models import current_timestamp
|
||||
card.updated_at = current_timestamp()
|
||||
await self._save_loyalty_cards()
|
||||
_LOGGER.debug("Updated members for loyalty card: %s", card_id)
|
||||
return card
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,9 +1,15 @@
|
||||
"""Image handling utilities for Shopping List Manager."""
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from ..const import (
|
||||
IMAGES_LOCAL_DIR,
|
||||
LEGACY_IMAGES_LOCAL_DIR,
|
||||
LOCAL_IMAGE_URL_PREFIX,
|
||||
)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -18,12 +24,29 @@ class ImageHandler:
|
||||
config_path: Path to HA config directory
|
||||
"""
|
||||
self.hass = hass
|
||||
# Images stored in /config/www/shopping_list_manager/images/
|
||||
self._local_images_dir = Path(config_path) / "www" / "shopping_list_manager" / "images"
|
||||
# Images stored in /config/www/images/shopping_list_manager/
|
||||
self._local_images_dir = Path(hass.config.path(IMAGES_LOCAL_DIR))
|
||||
self._legacy_images_dir = Path(hass.config.path(LEGACY_IMAGES_LOCAL_DIR))
|
||||
self._local_images_dir.mkdir(parents=True, exist_ok=True)
|
||||
self._migrate_legacy_files()
|
||||
|
||||
_LOGGER.info("Image directory: %s", self._local_images_dir)
|
||||
|
||||
def _migrate_legacy_files(self) -> None:
|
||||
"""Move legacy image files to the new standardized directory."""
|
||||
if not self._legacy_images_dir.exists() or self._legacy_images_dir == self._local_images_dir:
|
||||
return
|
||||
for src in self._legacy_images_dir.glob("*"):
|
||||
if not src.is_file():
|
||||
continue
|
||||
dest = self._local_images_dir / src.name
|
||||
if dest.exists():
|
||||
continue
|
||||
try:
|
||||
shutil.move(str(src), str(dest))
|
||||
except Exception as err:
|
||||
_LOGGER.debug("Could not move legacy image %s: %s", src, err)
|
||||
|
||||
def get_image_url(self, product_name: str, external_url: Optional[str] = None) -> str:
|
||||
"""Get image URL for a product.
|
||||
|
||||
@@ -73,11 +96,11 @@ class ImageHandler:
|
||||
# Check exact match
|
||||
image_file = self._local_images_dir / f"{normalized_name}{ext}"
|
||||
if image_file.exists():
|
||||
return f"/local/shopping_list_manager/images/{normalized_name}{ext}"
|
||||
return f"{LOCAL_IMAGE_URL_PREFIX}{normalized_name}{ext}"
|
||||
|
||||
# Check for files starting with the product name
|
||||
for file in self._local_images_dir.glob(f"{normalized_name}*{ext}"):
|
||||
return f"/local/shopping_list_manager/images/{file.name}"
|
||||
return f"{LOCAL_IMAGE_URL_PREFIX}{file.name}"
|
||||
|
||||
return None
|
||||
|
||||
|
||||
BIN
Binary file not shown.
BIN
Binary file not shown.
@@ -1,18 +1,31 @@
|
||||
"""WebSocket API handlers for Shopping List Manager."""
|
||||
import io
|
||||
import logging
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict
|
||||
|
||||
import voluptuous as vol
|
||||
from aiohttp import ClientTimeout
|
||||
from PIL import Image
|
||||
|
||||
from homeassistant.components import websocket_api
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
from ..const import DOMAIN
|
||||
|
||||
from ..const import (
|
||||
IMAGE_SIZE,
|
||||
IMAGE_QUALITY,
|
||||
IMAGES_LOCAL_DIR,
|
||||
LOCAL_IMAGE_URL_PREFIX,
|
||||
WS_TYPE_LISTS_GET_ALL,
|
||||
WS_TYPE_LISTS_CREATE,
|
||||
WS_TYPE_LISTS_UPDATE,
|
||||
WS_TYPE_LISTS_DELETE,
|
||||
WS_TYPE_LISTS_SET_ACTIVE,
|
||||
WS_TYPE_LISTS_UPDATE_MEMBERS,
|
||||
WS_TYPE_USERS_GET_ALL,
|
||||
WS_TYPE_ITEMS_GET,
|
||||
WS_TYPE_ITEMS_ADD,
|
||||
WS_TYPE_ITEMS_UPDATE,
|
||||
@@ -27,6 +40,12 @@ from ..const import (
|
||||
WS_TYPE_PRODUCTS_ADD,
|
||||
WS_TYPE_PRODUCTS_UPDATE,
|
||||
WS_TYPE_CATEGORIES_GET_ALL,
|
||||
WS_TYPE_LOYALTY_GET_ALL,
|
||||
WS_TYPE_LOYALTY_ADD,
|
||||
WS_TYPE_LOYALTY_UPDATE,
|
||||
WS_TYPE_LOYALTY_DELETE,
|
||||
WS_TYPE_LOYALTY_UPDATE_MEMBERS,
|
||||
WS_TYPE_SUBSCRIBE,
|
||||
EVENT_ITEM_ADDED,
|
||||
EVENT_ITEM_UPDATED,
|
||||
EVENT_ITEM_CHECKED,
|
||||
@@ -39,10 +58,191 @@ from .. import get_storage
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# ACCESS-CHECK HELPERS
|
||||
# =============================================================================
|
||||
|
||||
def _user_can_access_list(lst, user) -> bool:
|
||||
"""Return True if the user may read or write to this list.
|
||||
|
||||
Global lists (owner_id=None) are accessible to everyone.
|
||||
Private lists are accessible to their owner, anyone in allowed_users, and admins.
|
||||
"""
|
||||
if lst.owner_id is None:
|
||||
return True
|
||||
if user is None:
|
||||
return False
|
||||
if user.is_admin or user.id == lst.owner_id:
|
||||
return True
|
||||
return user.id in (lst.allowed_users or [])
|
||||
|
||||
|
||||
def _check_list_access(storage, connection, msg, list_id, require_owner=False):
|
||||
"""Verify the connected user may access list_id.
|
||||
|
||||
Sends the appropriate WebSocket error if access is denied.
|
||||
Returns the ShoppingList object on success, or None if an error was sent.
|
||||
|
||||
Args:
|
||||
require_owner: When True, only the list owner (or an admin) is allowed.
|
||||
Use for destructive/administrative operations.
|
||||
"""
|
||||
lst = storage.get_list(list_id)
|
||||
if lst is None:
|
||||
connection.send_error(msg["id"], "not_found", "List not found")
|
||||
return None
|
||||
|
||||
user = connection.user
|
||||
if require_owner:
|
||||
if lst.owner_id is not None and not (user and (user.is_admin or user.id == lst.owner_id)):
|
||||
connection.send_error(msg["id"], "forbidden", "Only the list owner can perform this action")
|
||||
return None
|
||||
else:
|
||||
if not _user_can_access_list(lst, user):
|
||||
connection.send_error(msg["id"], "forbidden", "You do not have access to this list")
|
||||
return None
|
||||
|
||||
return lst
|
||||
|
||||
|
||||
def _find_item_list_id(storage, item_id):
|
||||
"""Return the list_id that contains item_id, or None if not found."""
|
||||
for list_id, items in storage._items.items():
|
||||
for item in items:
|
||||
if item.id == item_id:
|
||||
return list_id
|
||||
return None
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# LIST HANDLERS
|
||||
# =============================================================================
|
||||
|
||||
@websocket_api.websocket_command({
|
||||
vol.Required("type"): WS_TYPE_SUBSCRIBE,
|
||||
})
|
||||
@websocket_api.async_response
|
||||
async def websocket_subscribe(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: dict,
|
||||
) -> None:
|
||||
"""Subscribe to shopping list manager events via WebSocket."""
|
||||
storage = get_storage(hass)
|
||||
|
||||
@callback
|
||||
def forward_event(event):
|
||||
"""Forward HA bus event to WebSocket connection.
|
||||
|
||||
Events that reference a list_id are only forwarded if the connected
|
||||
user has access to that list, preventing cross-user data leakage.
|
||||
"""
|
||||
data = event.data
|
||||
list_id = data.get("list_id")
|
||||
if list_id:
|
||||
lst = storage.get_list(list_id)
|
||||
if lst and not _user_can_access_list(lst, connection.user):
|
||||
return # skip — user cannot see this list
|
||||
|
||||
connection.send_message(
|
||||
websocket_api.event_message(
|
||||
msg["id"],
|
||||
{
|
||||
"event_type": event.event_type,
|
||||
"data": data,
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
# Subscribe to all SLM events on the HA bus (backend has permission)
|
||||
unsubs = []
|
||||
unsubs.append(hass.bus.async_listen(EVENT_ITEM_ADDED, forward_event))
|
||||
unsubs.append(hass.bus.async_listen(EVENT_ITEM_UPDATED, forward_event))
|
||||
unsubs.append(hass.bus.async_listen(EVENT_ITEM_CHECKED, forward_event))
|
||||
unsubs.append(hass.bus.async_listen(EVENT_ITEM_DELETED, forward_event))
|
||||
unsubs.append(hass.bus.async_listen(EVENT_LIST_UPDATED, forward_event))
|
||||
unsubs.append(hass.bus.async_listen(EVENT_LIST_DELETED, forward_event))
|
||||
|
||||
# Clean up when connection closes
|
||||
connection.subscriptions[msg["id"]] = lambda: [unsub() for unsub in unsubs]
|
||||
|
||||
connection.send_message(websocket_api.result_message(msg["id"]))
|
||||
|
||||
@websocket_api.websocket_command({
|
||||
vol.Required("type"): "shopping_list_manager/items/increment",
|
||||
vol.Required("item_id"): str,
|
||||
vol.Required("amount"): vol.Coerce(float),
|
||||
})
|
||||
@websocket_api.async_response
|
||||
async def websocket_increment_item(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: Dict[str, Any],
|
||||
) -> None:
|
||||
"""Increment item quantity atomically."""
|
||||
|
||||
storage = get_storage(hass)
|
||||
item_id = msg["item_id"]
|
||||
amount = msg["amount"]
|
||||
|
||||
# First get current item
|
||||
for list_id, items in storage._items.items():
|
||||
for item in items:
|
||||
if item.id == item_id:
|
||||
new_quantity = item.quantity + amount
|
||||
|
||||
if new_quantity < 1:
|
||||
new_quantity = 1
|
||||
|
||||
updated_item = await storage.update_item(
|
||||
item_id,
|
||||
quantity=new_quantity
|
||||
)
|
||||
if updated_item:
|
||||
hass.bus.async_fire(
|
||||
EVENT_ITEM_UPDATED,
|
||||
{
|
||||
"list_id": updated_item.list_id,
|
||||
"item_id": item_id,
|
||||
"item": updated_item.to_dict()
|
||||
}
|
||||
)
|
||||
connection.send_result(msg["id"], {
|
||||
"item": updated_item.to_dict()
|
||||
})
|
||||
return
|
||||
|
||||
connection.send_error(msg["id"], "not_found", "Item not found")
|
||||
|
||||
|
||||
@websocket_api.websocket_command({
|
||||
vol.Required("type"): "shopping_list_manager/products/get_by_ids",
|
||||
vol.Required("product_ids"): [str],
|
||||
})
|
||||
@websocket_api.async_response
|
||||
async def ws_get_products_by_ids(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: Dict[str, Any],
|
||||
) -> None:
|
||||
"""Return products matching given product IDs."""
|
||||
|
||||
storage = get_storage(hass)
|
||||
product_ids = set(msg["product_ids"])
|
||||
|
||||
# Get all products from storage
|
||||
all_products = storage.get_products()
|
||||
|
||||
products = [
|
||||
product.to_dict()
|
||||
for product in all_products
|
||||
if product.id in product_ids
|
||||
]
|
||||
|
||||
connection.send_result(msg["id"], {"products": products})
|
||||
|
||||
|
||||
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): WS_TYPE_LISTS_GET_ALL,
|
||||
@@ -56,7 +256,10 @@ def websocket_get_lists(
|
||||
) -> None:
|
||||
"""Handle get all lists command."""
|
||||
storage = get_storage(hass)
|
||||
lists = storage.get_lists()
|
||||
user = connection.user
|
||||
user_id = user.id if user else None
|
||||
is_admin = user.is_admin if user else False
|
||||
lists = storage.get_lists(user_id=user_id, is_admin=is_admin)
|
||||
|
||||
connection.send_result(
|
||||
msg["id"],
|
||||
@@ -71,6 +274,7 @@ def websocket_get_lists(
|
||||
vol.Required("type"): WS_TYPE_LISTS_CREATE,
|
||||
vol.Required("name"): str,
|
||||
vol.Optional("icon", default="mdi:cart"): str,
|
||||
vol.Optional("private", default=True): bool,
|
||||
}
|
||||
)
|
||||
@websocket_api.async_response
|
||||
@@ -82,9 +286,14 @@ async def websocket_create_list(
|
||||
"""Handle create list command."""
|
||||
storage = get_storage(hass)
|
||||
|
||||
# Private lists are owned by the creating user; global lists have no owner.
|
||||
is_private = msg.get("private", True)
|
||||
owner_id = connection.user.id if is_private and connection.user else None
|
||||
|
||||
new_list = await storage.create_list(
|
||||
name=msg["name"],
|
||||
icon=msg.get("icon", "mdi:cart")
|
||||
icon=msg.get("icon", "mdi:cart"),
|
||||
owner_id=owner_id,
|
||||
)
|
||||
|
||||
# Fire event
|
||||
@@ -118,6 +327,9 @@ async def websocket_update_list(
|
||||
storage = get_storage(hass)
|
||||
list_id = msg["list_id"]
|
||||
|
||||
if _check_list_access(storage, connection, msg, list_id, require_owner=True) is None:
|
||||
return
|
||||
|
||||
# Build update kwargs
|
||||
update_data = {}
|
||||
if "name" in msg:
|
||||
@@ -161,6 +373,18 @@ async def websocket_delete_list(
|
||||
storage = get_storage(hass)
|
||||
list_id = msg["list_id"]
|
||||
|
||||
lst = storage.get_list(list_id)
|
||||
if lst is None:
|
||||
connection.send_error(msg["id"], "not_found", "List not found")
|
||||
return
|
||||
|
||||
# Only the owner or an admin may delete a private list
|
||||
if lst.owner_id is not None:
|
||||
user = connection.user
|
||||
if not (user and (user.is_admin or user.id == lst.owner_id)):
|
||||
connection.send_error(msg["id"], "forbidden", "Only the list owner can delete this list")
|
||||
return
|
||||
|
||||
success = await storage.delete_list(list_id)
|
||||
|
||||
if not success:
|
||||
@@ -192,6 +416,9 @@ async def websocket_set_active_list(
|
||||
storage = get_storage(hass)
|
||||
list_id = msg["list_id"]
|
||||
|
||||
if _check_list_access(storage, connection, msg, list_id) is None:
|
||||
return
|
||||
|
||||
success = await storage.set_active_list(list_id)
|
||||
|
||||
if not success:
|
||||
@@ -227,6 +454,9 @@ def websocket_get_items(
|
||||
storage = get_storage(hass)
|
||||
list_id = msg["list_id"]
|
||||
|
||||
if _check_list_access(storage, connection, msg, list_id) is None:
|
||||
return
|
||||
|
||||
items = storage.get_items(list_id)
|
||||
|
||||
connection.send_result(
|
||||
@@ -262,6 +492,9 @@ async def websocket_add_item(
|
||||
storage = get_storage(hass)
|
||||
list_id = msg["list_id"]
|
||||
|
||||
if _check_list_access(storage, connection, msg, list_id) is None:
|
||||
return
|
||||
|
||||
# Build item data
|
||||
item_data = {
|
||||
"name": msg["name"],
|
||||
@@ -321,6 +554,13 @@ async def websocket_update_item(
|
||||
storage = get_storage(hass)
|
||||
item_id = msg["item_id"]
|
||||
|
||||
list_id = _find_item_list_id(storage, item_id)
|
||||
if list_id is None:
|
||||
connection.send_error(msg["id"], "not_found", "Item not found")
|
||||
return
|
||||
if _check_list_access(storage, connection, msg, list_id) is None:
|
||||
return
|
||||
|
||||
# Build update data
|
||||
update_data = {}
|
||||
update_fields = ["name", "quantity", "unit", "note", "price", "category_id", "image_url"]
|
||||
@@ -406,6 +646,13 @@ async def websocket_delete_item(
|
||||
storage = get_storage(hass)
|
||||
item_id = msg["item_id"]
|
||||
|
||||
list_id = _find_item_list_id(storage, item_id)
|
||||
if list_id is None:
|
||||
connection.send_error(msg["id"], "not_found", "Item not found")
|
||||
return
|
||||
if _check_list_access(storage, connection, msg, list_id) is None:
|
||||
return
|
||||
|
||||
success = await storage.delete_item(item_id)
|
||||
|
||||
if not success:
|
||||
@@ -547,39 +794,83 @@ def websocket_get_list_total(
|
||||
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): WS_TYPE_PRODUCTS_SEARCH,
|
||||
vol.Required("query"): str,
|
||||
vol.Optional("limit", default=10): int,
|
||||
vol.Optional("exclude_allergens", default=None): vol.Any(None, [str]),
|
||||
vol.Optional("include_tags", default=None): vol.Any(None, [str]),
|
||||
vol.Optional("substitution_group", default=None): vol.Any(None, str),
|
||||
vol.Required("type"): "shopping_list_manager/products/download_image",
|
||||
vol.Required("image_url"): str,
|
||||
vol.Required("product_name"): str,
|
||||
}
|
||||
)
|
||||
@callback
|
||||
def websocket_search_products(
|
||||
@websocket_api.async_response
|
||||
async def websocket_download_product_image(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: Dict[str, Any],
|
||||
) -> None:
|
||||
"""Handle search products command with enhanced filters."""
|
||||
storage = get_storage(hass)
|
||||
"""Download a remote image and save it as WebP to the local images directory."""
|
||||
raw_url: str = msg["image_url"]
|
||||
product_name: str = msg["product_name"]
|
||||
|
||||
safe_stem = re.sub(r"[^a-z0-9_]", "", product_name.lower().replace(" ", "_")) or "product"
|
||||
filename = f"{safe_stem}.webp"
|
||||
images_dir = Path(hass.config.path(IMAGES_LOCAL_DIR))
|
||||
images_dir.mkdir(parents=True, exist_ok=True)
|
||||
dest = images_dir / filename
|
||||
|
||||
try:
|
||||
results = storage.search_products(
|
||||
query=msg["query"],
|
||||
limit=msg.get("limit", 10),
|
||||
exclude_allergens=msg.get("exclude_allergens"),
|
||||
include_tags=msg.get("include_tags"),
|
||||
substitution_group=msg.get("substitution_group"),
|
||||
)
|
||||
session = async_get_clientsession(hass)
|
||||
headers = {"User-Agent": "Mozilla/5.0 (compatible; HomeAssistant/ShoppingListManager)"}
|
||||
async with session.get(raw_url, timeout=ClientTimeout(total=15), headers=headers) as resp:
|
||||
if resp.status != 200:
|
||||
connection.send_error(msg["id"], "download_failed", f"HTTP {resp.status}")
|
||||
return
|
||||
raw = await resp.read()
|
||||
except Exception as exc: # noqa: BLE001
|
||||
connection.send_error(msg["id"], "download_failed", str(exc))
|
||||
return
|
||||
|
||||
try:
|
||||
img = Image.open(io.BytesIO(raw))
|
||||
# Convert to RGB for reliable lossy WebP encoding
|
||||
# (RGBA, palette, grayscale modes can fail or produce oversized files)
|
||||
if img.mode == "RGBA":
|
||||
bg = Image.new("RGB", img.size, (255, 255, 255))
|
||||
bg.paste(img, mask=img.split()[3])
|
||||
img = bg
|
||||
elif img.mode != "RGB":
|
||||
img = img.convert("RGB")
|
||||
img.thumbnail((IMAGE_SIZE, IMAGE_SIZE), Image.LANCZOS)
|
||||
out = io.BytesIO()
|
||||
img.save(out, format="WEBP", quality=IMAGE_QUALITY)
|
||||
dest.write_bytes(out.getvalue())
|
||||
except Exception as exc: # noqa: BLE001
|
||||
connection.send_error(msg["id"], "conversion_failed", str(exc))
|
||||
return
|
||||
|
||||
connection.send_result(
|
||||
msg["id"],
|
||||
{"products": [product.to_dict() for product in results]}
|
||||
{"local_url": f"{LOCAL_IMAGE_URL_PREFIX}{filename}"},
|
||||
)
|
||||
except Exception as err:
|
||||
_LOGGER.error("Error searching products: %s", err)
|
||||
connection.send_error(msg["id"], "search_failed", str(err))
|
||||
|
||||
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): "shopping_list_manager/products/search_by_barcode",
|
||||
vol.Required("barcode"): str,
|
||||
}
|
||||
)
|
||||
@callback
|
||||
def websocket_search_by_barcode(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: Dict[str, Any],
|
||||
) -> None:
|
||||
"""Find a single product by exact barcode match."""
|
||||
storage = get_storage(hass)
|
||||
barcode = msg["barcode"].strip()
|
||||
match = next(
|
||||
(p for p in storage._products.values() if p.barcode and p.barcode == barcode),
|
||||
None,
|
||||
)
|
||||
connection.send_result(msg["id"], {"product": match.to_dict() if match else None})
|
||||
|
||||
|
||||
@websocket_api.websocket_command(
|
||||
@@ -791,3 +1082,323 @@ def websocket_get_categories(
|
||||
"categories": [cat.to_dict() for cat in categories]
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# INTEGRATION SETTINGS HANDLERS
|
||||
# =============================================================================
|
||||
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): "shopping_list_manager/get_integration_settings",
|
||||
}
|
||||
)
|
||||
@callback
|
||||
def websocket_get_integration_settings(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: Dict[str, Any],
|
||||
) -> None:
|
||||
"""Return current country and available country options."""
|
||||
country = hass.data[DOMAIN].get("country", "NZ")
|
||||
connection.send_result(
|
||||
msg["id"],
|
||||
{
|
||||
"country": country,
|
||||
"available_countries": {
|
||||
"NZ": "New Zealand",
|
||||
"AU": "Australia",
|
||||
"US": "United States",
|
||||
"GB": "United Kingdom",
|
||||
"CA": "Canada",
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
_VALID_COUNTRIES = ["NZ", "AU", "US", "GB", "CA"]
|
||||
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): "shopping_list_manager/set_country",
|
||||
vol.Required("country"): vol.In(_VALID_COUNTRIES),
|
||||
}
|
||||
)
|
||||
@websocket_api.async_response
|
||||
async def websocket_set_country(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: Dict[str, Any],
|
||||
) -> None:
|
||||
"""Switch to a different country catalog. Preserves user-added products."""
|
||||
country = msg["country"].upper()
|
||||
storage = get_storage(hass)
|
||||
|
||||
count = await storage.reload_catalog(country)
|
||||
|
||||
# Persist to HA config entry so country survives restart
|
||||
entries = hass.config_entries.async_entries(DOMAIN)
|
||||
if entries:
|
||||
entry = entries[0]
|
||||
hass.config_entries.async_update_entry(entry, options={**entry.options, "country": country})
|
||||
|
||||
hass.data[DOMAIN]["country"] = country
|
||||
|
||||
connection.send_result(
|
||||
msg["id"],
|
||||
{"success": True, "country": country, "products_loaded": count}
|
||||
)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# BACKUP / RESTORE HANDLERS
|
||||
# =============================================================================
|
||||
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): "shopping_list_manager/export_data",
|
||||
}
|
||||
)
|
||||
@websocket_api.async_response
|
||||
async def websocket_export_data(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: Dict[str, Any],
|
||||
) -> None:
|
||||
"""Export all user-created data as a JSON-serialisable dict."""
|
||||
storage = get_storage(hass)
|
||||
data = await storage.export_user_data()
|
||||
connection.send_result(msg["id"], data)
|
||||
|
||||
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): "shopping_list_manager/import_data",
|
||||
vol.Required("data"): dict,
|
||||
}
|
||||
)
|
||||
@websocket_api.async_response
|
||||
async def websocket_import_data(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: Dict[str, Any],
|
||||
) -> None:
|
||||
"""Import user data from a backup payload."""
|
||||
storage = get_storage(hass)
|
||||
counts = await storage.import_user_data(msg["data"])
|
||||
connection.send_result(msg["id"], {"success": True, "imported": counts})
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# LIST MEMBERS HANDLER
|
||||
# =============================================================================
|
||||
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): WS_TYPE_LISTS_UPDATE_MEMBERS,
|
||||
vol.Required("list_id"): str,
|
||||
vol.Required("allowed_users"): [str],
|
||||
}
|
||||
)
|
||||
@websocket_api.async_response
|
||||
async def websocket_update_list_members(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: Dict[str, Any],
|
||||
) -> None:
|
||||
"""Update the allowed_users for a private list."""
|
||||
storage = get_storage(hass)
|
||||
list_id = msg["list_id"]
|
||||
|
||||
lst = storage.get_list(list_id)
|
||||
if lst is None:
|
||||
connection.send_error(msg["id"], "not_found", "List not found")
|
||||
return
|
||||
|
||||
# Only the owner or an admin may manage members
|
||||
user = connection.user
|
||||
if lst.owner_id is not None and not (user and (user.is_admin or user.id == lst.owner_id)):
|
||||
connection.send_error(msg["id"], "forbidden", "Only the list owner can manage members")
|
||||
return
|
||||
|
||||
updated = await storage.update_list_members(list_id, msg["allowed_users"])
|
||||
hass.bus.async_fire(
|
||||
EVENT_LIST_UPDATED,
|
||||
{"list_id": list_id, "action": "members_updated"}
|
||||
)
|
||||
connection.send_result(msg["id"], {"list": updated.to_dict()})
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# HA USERS HANDLER
|
||||
# =============================================================================
|
||||
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): WS_TYPE_USERS_GET_ALL,
|
||||
}
|
||||
)
|
||||
@websocket_api.async_response
|
||||
async def websocket_get_ha_users(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: Dict[str, Any],
|
||||
) -> None:
|
||||
"""Return all active, non-system HA users."""
|
||||
users = await hass.auth.async_get_users()
|
||||
result = [
|
||||
{"id": u.id, "name": u.name}
|
||||
for u in users
|
||||
if not u.system_generated and u.is_active
|
||||
]
|
||||
connection.send_result(msg["id"], {"users": result})
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# LOYALTY CARD HANDLERS
|
||||
# =============================================================================
|
||||
|
||||
@websocket_api.websocket_command({
|
||||
vol.Required("type"): WS_TYPE_LOYALTY_GET_ALL,
|
||||
})
|
||||
@websocket_api.async_response
|
||||
async def websocket_get_loyalty_cards(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: Dict[str, Any],
|
||||
) -> None:
|
||||
"""Return all loyalty cards visible to the current user."""
|
||||
storage = get_storage(hass)
|
||||
user = connection.user
|
||||
user_id = user.id if user else None
|
||||
is_admin = user.is_admin if user else False
|
||||
cards = storage.get_loyalty_cards(user_id=user_id, is_admin=is_admin)
|
||||
connection.send_result(msg["id"], {"cards": [c.to_dict() for c in cards]})
|
||||
|
||||
|
||||
@websocket_api.websocket_command({
|
||||
vol.Required("type"): WS_TYPE_LOYALTY_ADD,
|
||||
vol.Required("name"): str,
|
||||
vol.Required("number"): str,
|
||||
vol.Optional("barcode", default=""): str,
|
||||
vol.Optional("barcode_type", default="barcode"): str,
|
||||
vol.Optional("logo", default=""): str,
|
||||
vol.Optional("notes", default=""): str,
|
||||
vol.Optional("color", default="#9fa8da"): str,
|
||||
vol.Optional("private", default=True): bool,
|
||||
})
|
||||
@websocket_api.async_response
|
||||
async def websocket_add_loyalty_card(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: Dict[str, Any],
|
||||
) -> None:
|
||||
"""Add a new loyalty card."""
|
||||
storage = get_storage(hass)
|
||||
user = connection.user
|
||||
owner_id = user.id if (user and msg.get("private")) else None
|
||||
|
||||
card = await storage.create_loyalty_card(
|
||||
owner_id=owner_id,
|
||||
name=msg["name"],
|
||||
number=msg["number"],
|
||||
barcode=msg.get("barcode", ""),
|
||||
barcode_type=msg.get("barcode_type", "barcode"),
|
||||
logo=msg.get("logo", ""),
|
||||
notes=msg.get("notes", ""),
|
||||
color=msg.get("color", "#9fa8da"),
|
||||
)
|
||||
connection.send_result(msg["id"], {"card": card.to_dict()})
|
||||
|
||||
|
||||
@websocket_api.websocket_command({
|
||||
vol.Required("type"): WS_TYPE_LOYALTY_UPDATE,
|
||||
vol.Required("card_id"): str,
|
||||
vol.Optional("name"): str,
|
||||
vol.Optional("number"): str,
|
||||
vol.Optional("barcode"): str,
|
||||
vol.Optional("barcode_type"): str,
|
||||
vol.Optional("logo"): str,
|
||||
vol.Optional("notes"): str,
|
||||
vol.Optional("color"): str,
|
||||
})
|
||||
@websocket_api.async_response
|
||||
async def websocket_update_loyalty_card(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: Dict[str, Any],
|
||||
) -> None:
|
||||
"""Update an existing loyalty card."""
|
||||
storage = get_storage(hass)
|
||||
card_id = msg["card_id"]
|
||||
|
||||
card = storage.get_loyalty_card(card_id)
|
||||
if card is None:
|
||||
connection.send_error(msg["id"], "not_found", "Loyalty card not found")
|
||||
return
|
||||
|
||||
user = connection.user
|
||||
if card.owner_id is not None and not (user and (user.is_admin or user.id == card.owner_id)):
|
||||
connection.send_error(msg["id"], "forbidden", "Only the card owner can update it")
|
||||
return
|
||||
|
||||
fields = {k: v for k, v in msg.items() if k not in ("type", "id", "card_id")}
|
||||
updated = await storage.update_loyalty_card(card_id, **fields)
|
||||
connection.send_result(msg["id"], {"card": updated.to_dict()})
|
||||
|
||||
|
||||
@websocket_api.websocket_command({
|
||||
vol.Required("type"): WS_TYPE_LOYALTY_DELETE,
|
||||
vol.Required("card_id"): str,
|
||||
})
|
||||
@websocket_api.async_response
|
||||
async def websocket_delete_loyalty_card(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: Dict[str, Any],
|
||||
) -> None:
|
||||
"""Delete a loyalty card."""
|
||||
storage = get_storage(hass)
|
||||
card_id = msg["card_id"]
|
||||
|
||||
card = storage.get_loyalty_card(card_id)
|
||||
if card is None:
|
||||
connection.send_error(msg["id"], "not_found", "Loyalty card not found")
|
||||
return
|
||||
|
||||
user = connection.user
|
||||
if card.owner_id is not None and not (user and (user.is_admin or user.id == card.owner_id)):
|
||||
connection.send_error(msg["id"], "forbidden", "Only the card owner can delete it")
|
||||
return
|
||||
|
||||
await storage.delete_loyalty_card(card_id)
|
||||
connection.send_result(msg["id"], {"success": True})
|
||||
|
||||
|
||||
@websocket_api.websocket_command({
|
||||
vol.Required("type"): WS_TYPE_LOYALTY_UPDATE_MEMBERS,
|
||||
vol.Required("card_id"): str,
|
||||
vol.Required("allowed_users"): [str],
|
||||
})
|
||||
@websocket_api.async_response
|
||||
async def websocket_update_loyalty_card_members(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: Dict[str, Any],
|
||||
) -> None:
|
||||
"""Update the allowed_users for a private loyalty card."""
|
||||
storage = get_storage(hass)
|
||||
card_id = msg["card_id"]
|
||||
|
||||
card = storage.get_loyalty_card(card_id)
|
||||
if card is None:
|
||||
connection.send_error(msg["id"], "not_found", "Loyalty card not found")
|
||||
return
|
||||
|
||||
user = connection.user
|
||||
if card.owner_id is not None and not (user and (user.is_admin or user.id == card.owner_id)):
|
||||
connection.send_error(msg["id"], "forbidden", "Only the card owner can manage members")
|
||||
return
|
||||
|
||||
updated = await storage.update_loyalty_card_members(card_id, msg["allowed_users"])
|
||||
connection.send_result(msg["id"], {"card": updated.to_dict()})
|
||||
|
||||
Reference in New Issue
Block a user