From 752f9e56222dc412614dd7d62f88b3e6c77d2c00 Mon Sep 17 00:00:00 2001 From: thekiwismarthome Date: Fri, 27 Feb 2026 13:00:08 +1300 Subject: [PATCH] feat: add loyalty card management with dedicated storage, data model, and websocket API endpoints. --- .../shopping_list_manager/__init__.py | 22 +++ .../shopping_list_manager/const.py | 8 + .../shopping_list_manager/models.py | 21 +++ .../shopping_list_manager/storage.py | 94 ++++++++++- .../websocket/handlers.py | 152 ++++++++++++++++++ 5 files changed, 295 insertions(+), 2 deletions(-) diff --git a/custom_components/shopping_list_manager/__init__.py b/custom_components/shopping_list_manager/__init__.py index dc40cca..f1b66a5 100644 --- a/custom_components/shopping_list_manager/__init__.py +++ b/custom_components/shopping_list_manager/__init__.py @@ -235,6 +235,28 @@ async def _async_register_websocket_handlers( handlers.websocket_get_ha_users, ) + # Loyalty card handlers + websocket_api.async_register_command( + hass, + handlers.websocket_get_loyalty_cards, + ) + websocket_api.async_register_command( + hass, + handlers.websocket_add_loyalty_card, + ) + websocket_api.async_register_command( + hass, + handlers.websocket_update_loyalty_card, + ) + websocket_api.async_register_command( + hass, + handlers.websocket_delete_loyalty_card, + ) + websocket_api.async_register_command( + hass, + handlers.websocket_update_loyalty_card_members, + ) + _LOGGER.debug("WebSocket handlers registered") diff --git a/custom_components/shopping_list_manager/const.py b/custom_components/shopping_list_manager/const.py index 24eeeb6..5909e51 100644 --- a/custom_components/shopping_list_manager/const.py +++ b/custom_components/shopping_list_manager/const.py @@ -9,6 +9,7 @@ STORAGE_KEY_LISTS = f"{DOMAIN}.lists" STORAGE_KEY_ITEMS = f"{DOMAIN}.items" STORAGE_KEY_PRODUCTS = f"{DOMAIN}.products" STORAGE_KEY_CATEGORIES = f"{DOMAIN}.categories" +STORAGE_KEY_LOYALTY_CARDS = f"{DOMAIN}.loyalty_cards" # WebSocket Commands - Lists WS_TYPE_LISTS_GET_ALL = f"{DOMAIN}/lists/get_all" @@ -43,6 +44,13 @@ WS_TYPE_PRODUCTS_DELETE = f"{DOMAIN}/products/delete" WS_TYPE_CATEGORIES_GET_ALL = f"{DOMAIN}/categories/get_all" WS_TYPE_CATEGORIES_REORDER = f"{DOMAIN}/categories/reorder" +# WebSocket Commands - Loyalty Cards +WS_TYPE_LOYALTY_GET_ALL = f"{DOMAIN}/loyalty/get_all" +WS_TYPE_LOYALTY_ADD = f"{DOMAIN}/loyalty/add" +WS_TYPE_LOYALTY_UPDATE = f"{DOMAIN}/loyalty/update" +WS_TYPE_LOYALTY_DELETE = f"{DOMAIN}/loyalty/delete" +WS_TYPE_LOYALTY_UPDATE_MEMBERS = f"{DOMAIN}/loyalty/update_members" + # WebSocket Commands - Subscriptions WS_TYPE_SUBSCRIBE = f"{DOMAIN}/subscribe" WS_TYPE_UNSUBSCRIBE = f"{DOMAIN}/unsubscribe" diff --git a/custom_components/shopping_list_manager/models.py b/custom_components/shopping_list_manager/models.py index d83f584..e0f2fc4 100644 --- a/custom_components/shopping_list_manager/models.py +++ b/custom_components/shopping_list_manager/models.py @@ -96,6 +96,27 @@ class Item: self.estimated_total = self.quantity * self.price +@dataclass +class LoyaltyCard: + """Loyalty card model.""" + id: str + name: str + number: str + barcode: str = "" + logo: str = "" + notes: str = "" + color: str = "#9fa8da" + created_at: str = field(default_factory=current_timestamp) + updated_at: str = field(default_factory=current_timestamp) + # Ownership: None = visible to all users; set = private to owner + allowed_users + owner_id: Optional[str] = None + allowed_users: List[str] = field(default_factory=list) + + def to_dict(self) -> Dict[str, Any]: + """Convert to dictionary.""" + return asdict(self) + + @dataclass class ShoppingList: """Shopping list model.""" diff --git a/custom_components/shopping_list_manager/storage.py b/custom_components/shopping_list_manager/storage.py index ea5ddb3..9718693 100644 --- a/custom_components/shopping_list_manager/storage.py +++ b/custom_components/shopping_list_manager/storage.py @@ -14,9 +14,10 @@ from .const import ( STORAGE_KEY_ITEMS, STORAGE_KEY_PRODUCTS, STORAGE_KEY_CATEGORIES, + STORAGE_KEY_LOYALTY_CARDS, ) from .data.catalog_loader import load_product_catalog -from .models import ShoppingList, Item, Product, Category, generate_id +from .models import ShoppingList, Item, Product, Category, LoyaltyCard, generate_id from .data.category_loader import load_categories _LOGGER = logging.getLogger(__name__) @@ -40,11 +41,13 @@ class ShoppingListStorage: self._store_items = Store(hass, STORAGE_VERSION, STORAGE_KEY_ITEMS) self._store_products = Store(hass, STORAGE_VERSION, STORAGE_KEY_PRODUCTS) self._store_categories = Store(hass, STORAGE_VERSION, STORAGE_KEY_CATEGORIES) - + self._store_loyalty_cards = Store(hass, STORAGE_VERSION, STORAGE_KEY_LOYALTY_CARDS) + self._lists: Dict[str, ShoppingList] = {} self._items: Dict[str, List[Item]] = {} self._products: Dict[str, Product] = {} self._categories: List[Category] = [] + self._loyalty_cards: Dict[str, LoyaltyCard] = {} self._search_engine: Optional[ProductSearch] = None async def async_load(self) -> None: @@ -144,6 +147,15 @@ class ShoppingListStorage: await self._save_products() _LOGGER.info("Successfully imported %d products from catalog", len(self._products)) + # Load loyalty cards + loyalty_data = await self._store_loyalty_cards.async_load() + if loyalty_data: + self._loyalty_cards = { + card_id: LoyaltyCard(**card_data) + for card_id, card_data in loyalty_data.items() + } + _LOGGER.debug("Loaded %d loyalty cards", len(self._loyalty_cards)) + # Initialize search engine after products are loaded if self._products: products_dict = {pid: p.to_dict() for pid, p in self._products.items()} @@ -660,3 +672,81 @@ class ShoppingListStorage: def get_categories(self) -> List[Category]: """Get all categories.""" return self._categories + + # Loyalty card methods + async def _save_loyalty_cards(self) -> None: + """Save loyalty cards to storage.""" + data = {card_id: card.to_dict() for card_id, card in self._loyalty_cards.items()} + await self._store_loyalty_cards.async_save(data) + + def get_loyalty_cards(self, user_id: str = None, is_admin: bool = False) -> List[LoyaltyCard]: + """Get loyalty cards visible to the specified user. + + Global cards (owner_id=None) are visible to everyone. + Private cards are visible to their owner, anyone in allowed_users, and admins. + """ + all_cards = list(self._loyalty_cards.values()) + if is_admin or user_id is None: + return all_cards + return [ + card for card in all_cards + if card.owner_id is None + or card.owner_id == user_id + or user_id in (card.allowed_users or []) + ] + + def get_loyalty_card(self, card_id: str) -> Optional[LoyaltyCard]: + """Get a specific loyalty card.""" + return self._loyalty_cards.get(card_id) + + async def create_loyalty_card(self, owner_id: str = None, **kwargs) -> LoyaltyCard: + """Create a new loyalty card.""" + from .models import current_timestamp + new_card = LoyaltyCard( + id=generate_id(), + owner_id=owner_id, + **kwargs + ) + self._loyalty_cards[new_card.id] = new_card + await self._save_loyalty_cards() + _LOGGER.debug("Created loyalty card: %s", new_card.name) + return new_card + + async def update_loyalty_card(self, card_id: str, **kwargs) -> Optional[LoyaltyCard]: + """Update a loyalty card.""" + if card_id not in self._loyalty_cards: + return None + + card = self._loyalty_cards[card_id] + for key, value in kwargs.items(): + if hasattr(card, key): + setattr(card, key, value) + + from .models import current_timestamp + card.updated_at = current_timestamp() + await self._save_loyalty_cards() + _LOGGER.debug("Updated loyalty card: %s", card_id) + return card + + async def delete_loyalty_card(self, card_id: str) -> bool: + """Delete a loyalty card.""" + if card_id not in self._loyalty_cards: + return False + + del self._loyalty_cards[card_id] + await self._save_loyalty_cards() + _LOGGER.debug("Deleted loyalty card: %s", card_id) + return True + + async def update_loyalty_card_members(self, card_id: str, allowed_users: List[str]) -> Optional[LoyaltyCard]: + """Update the allowed_users for a private loyalty card.""" + if card_id not in self._loyalty_cards: + return None + + card = self._loyalty_cards[card_id] + card.allowed_users = allowed_users + from .models import current_timestamp + card.updated_at = current_timestamp() + await self._save_loyalty_cards() + _LOGGER.debug("Updated members for loyalty card: %s", card_id) + return card diff --git a/custom_components/shopping_list_manager/websocket/handlers.py b/custom_components/shopping_list_manager/websocket/handlers.py index a19c58c..2f2d02f 100644 --- a/custom_components/shopping_list_manager/websocket/handlers.py +++ b/custom_components/shopping_list_manager/websocket/handlers.py @@ -30,6 +30,11 @@ from ..const import ( WS_TYPE_PRODUCTS_ADD, WS_TYPE_PRODUCTS_UPDATE, WS_TYPE_CATEGORIES_GET_ALL, + WS_TYPE_LOYALTY_GET_ALL, + WS_TYPE_LOYALTY_ADD, + WS_TYPE_LOYALTY_UPDATE, + WS_TYPE_LOYALTY_DELETE, + WS_TYPE_LOYALTY_UPDATE_MEMBERS, WS_TYPE_SUBSCRIBE, EVENT_ITEM_ADDED, EVENT_ITEM_UPDATED, @@ -1060,3 +1065,150 @@ async def websocket_get_ha_users( if not u.system_generated and u.is_active ] connection.send_result(msg["id"], {"users": result}) + + +# ============================================================================= +# LOYALTY CARD HANDLERS +# ============================================================================= + +@websocket_api.websocket_command({ + vol.Required("type"): WS_TYPE_LOYALTY_GET_ALL, +}) +@websocket_api.async_response +async def websocket_get_loyalty_cards( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Return all loyalty cards visible to the current user.""" + storage = get_storage(hass) + user = connection.user + user_id = user.id if user else None + is_admin = user.is_admin if user else False + cards = storage.get_loyalty_cards(user_id=user_id, is_admin=is_admin) + connection.send_result(msg["id"], {"cards": [c.to_dict() for c in cards]}) + + +@websocket_api.websocket_command({ + vol.Required("type"): WS_TYPE_LOYALTY_ADD, + vol.Required("name"): str, + vol.Required("number"): str, + vol.Optional("barcode", default=""): str, + vol.Optional("logo", default=""): str, + vol.Optional("notes", default=""): str, + vol.Optional("color", default="#9fa8da"): str, + vol.Optional("private", default=False): bool, +}) +@websocket_api.async_response +async def websocket_add_loyalty_card( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Add a new loyalty card.""" + storage = get_storage(hass) + user = connection.user + owner_id = user.id if (user and msg.get("private")) else None + + card = await storage.create_loyalty_card( + owner_id=owner_id, + name=msg["name"], + number=msg["number"], + barcode=msg.get("barcode", ""), + logo=msg.get("logo", ""), + notes=msg.get("notes", ""), + color=msg.get("color", "#9fa8da"), + ) + connection.send_result(msg["id"], {"card": card.to_dict()}) + + +@websocket_api.websocket_command({ + vol.Required("type"): WS_TYPE_LOYALTY_UPDATE, + vol.Required("card_id"): str, + vol.Optional("name"): str, + vol.Optional("number"): str, + vol.Optional("barcode"): str, + vol.Optional("logo"): str, + vol.Optional("notes"): str, + vol.Optional("color"): str, +}) +@websocket_api.async_response +async def websocket_update_loyalty_card( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Update an existing loyalty card.""" + storage = get_storage(hass) + card_id = msg["card_id"] + + card = storage.get_loyalty_card(card_id) + if card is None: + connection.send_error(msg["id"], "not_found", "Loyalty card not found") + return + + user = connection.user + if card.owner_id is not None and not (user and (user.is_admin or user.id == card.owner_id)): + connection.send_error(msg["id"], "forbidden", "Only the card owner can update it") + return + + fields = {k: v for k, v in msg.items() if k not in ("type", "id", "card_id")} + updated = await storage.update_loyalty_card(card_id, **fields) + connection.send_result(msg["id"], {"card": updated.to_dict()}) + + +@websocket_api.websocket_command({ + vol.Required("type"): WS_TYPE_LOYALTY_DELETE, + vol.Required("card_id"): str, +}) +@websocket_api.async_response +async def websocket_delete_loyalty_card( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Delete a loyalty card.""" + storage = get_storage(hass) + card_id = msg["card_id"] + + card = storage.get_loyalty_card(card_id) + if card is None: + connection.send_error(msg["id"], "not_found", "Loyalty card not found") + return + + user = connection.user + if card.owner_id is not None and not (user and (user.is_admin or user.id == card.owner_id)): + connection.send_error(msg["id"], "forbidden", "Only the card owner can delete it") + return + + await storage.delete_loyalty_card(card_id) + connection.send_result(msg["id"], {"success": True}) + + +@websocket_api.websocket_command({ + vol.Required("type"): WS_TYPE_LOYALTY_UPDATE_MEMBERS, + vol.Required("card_id"): str, + vol.Required("allowed_users"): [str], +}) +@websocket_api.async_response +async def websocket_update_loyalty_card_members( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Update the allowed_users for a private loyalty card.""" + storage = get_storage(hass) + card_id = msg["card_id"] + + card = storage.get_loyalty_card(card_id) + if card is None: + connection.send_error(msg["id"], "not_found", "Loyalty card not found") + return + + user = connection.user + if card.owner_id is not None and not (user and (user.is_admin or user.id == card.owner_id)): + connection.send_error(msg["id"], "forbidden", "Only the card owner can manage members") + return + + updated = await storage.update_loyalty_card_members(card_id, msg["allowed_users"]) + connection.send_result(msg["id"], {"card": updated.to_dict()})