From 3303776cd08440f7a42d21bd0200b26794d8fd63 Mon Sep 17 00:00:00 2001 From: thekiwismarthome <134335563+thekiwismarthome@users.noreply.github.com> Date: Fri, 13 Feb 2026 10:48:10 +1300 Subject: [PATCH] Update handlers.py --- .../websocket/handlers.py | 718 ++++++++++++++++++ 1 file changed, 718 insertions(+) diff --git a/custom_components/shopping_list_manager/websocket/handlers.py b/custom_components/shopping_list_manager/websocket/handlers.py index 8b13789..8adaa6f 100644 --- a/custom_components/shopping_list_manager/websocket/handlers.py +++ b/custom_components/shopping_list_manager/websocket/handlers.py @@ -1 +1,719 @@ +"""WebSocket API handlers for Shopping List Manager.""" +import logging +from typing import Any, Dict +import voluptuous as vol + +from homeassistant.components import websocket_api +from homeassistant.core import HomeAssistant, callback + +from ..const import ( + WS_TYPE_LISTS_GET_ALL, + WS_TYPE_LISTS_CREATE, + WS_TYPE_LISTS_UPDATE, + WS_TYPE_LISTS_DELETE, + WS_TYPE_LISTS_SET_ACTIVE, + WS_TYPE_ITEMS_GET, + WS_TYPE_ITEMS_ADD, + WS_TYPE_ITEMS_UPDATE, + WS_TYPE_ITEMS_CHECK, + WS_TYPE_ITEMS_DELETE, + WS_TYPE_ITEMS_REORDER, + WS_TYPE_ITEMS_BULK_CHECK, + WS_TYPE_ITEMS_CLEAR_CHECKED, + WS_TYPE_ITEMS_GET_TOTAL, + WS_TYPE_PRODUCTS_SEARCH, + WS_TYPE_PRODUCTS_SUGGESTIONS, + WS_TYPE_PRODUCTS_ADD, + WS_TYPE_PRODUCTS_UPDATE, + WS_TYPE_CATEGORIES_GET_ALL, + EVENT_ITEM_ADDED, + EVENT_ITEM_UPDATED, + EVENT_ITEM_CHECKED, + EVENT_ITEM_DELETED, + EVENT_LIST_UPDATED, + EVENT_LIST_DELETED, +) +from .. import get_storage + +_LOGGER = logging.getLogger(__name__) + + +# ============================================================================= +# LIST HANDLERS +# ============================================================================= + +@websocket_api.websocket_command( + { + vol.Required("type"): WS_TYPE_LISTS_GET_ALL, + } +) +@callback +def websocket_get_lists( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Handle get all lists command.""" + storage = get_storage(hass) + lists = storage.get_lists() + + connection.send_result( + msg["id"], + { + "lists": [lst.to_dict() for lst in lists] + } + ) + + +@websocket_api.websocket_command( + { + vol.Required("type"): WS_TYPE_LISTS_CREATE, + vol.Required("name"): str, + vol.Optional("icon", default="mdi:cart"): str, + } +) +@websocket_api.async_response +async def websocket_create_list( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Handle create list command.""" + storage = get_storage(hass) + + new_list = await storage.create_list( + name=msg["name"], + icon=msg.get("icon", "mdi:cart") + ) + + # Fire event + hass.bus.async_fire( + EVENT_LIST_UPDATED, + {"list_id": new_list.id, "action": "created"} + ) + + connection.send_result( + msg["id"], + {"list": new_list.to_dict()} + ) + + +@websocket_api.websocket_command( + { + vol.Required("type"): WS_TYPE_LISTS_UPDATE, + vol.Required("list_id"): str, + vol.Optional("name"): str, + vol.Optional("icon"): str, + vol.Optional("category_order"): [str], + } +) +@websocket_api.async_response +async def websocket_update_list( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Handle update list command.""" + storage = get_storage(hass) + list_id = msg["list_id"] + + # Build update kwargs + update_data = {} + if "name" in msg: + update_data["name"] = msg["name"] + if "icon" in msg: + update_data["icon"] = msg["icon"] + if "category_order" in msg: + update_data["category_order"] = msg["category_order"] + + updated_list = await storage.update_list(list_id, **update_data) + + if updated_list is None: + connection.send_error(msg["id"], "not_found", "List not found") + return + + # Fire event + hass.bus.async_fire( + EVENT_LIST_UPDATED, + {"list_id": list_id, "action": "updated"} + ) + + connection.send_result( + msg["id"], + {"list": updated_list.to_dict()} + ) + + +@websocket_api.websocket_command( + { + vol.Required("type"): WS_TYPE_LISTS_DELETE, + vol.Required("list_id"): str, + } +) +@websocket_api.async_response +async def websocket_delete_list( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Handle delete list command.""" + storage = get_storage(hass) + list_id = msg["list_id"] + + success = await storage.delete_list(list_id) + + if not success: + connection.send_error(msg["id"], "not_found", "List not found") + return + + # Fire event + hass.bus.async_fire( + EVENT_LIST_DELETED, + {"list_id": list_id} + ) + + connection.send_result(msg["id"], {"success": True}) + + +@websocket_api.websocket_command( + { + vol.Required("type"): WS_TYPE_LISTS_SET_ACTIVE, + vol.Required("list_id"): str, + } +) +@websocket_api.async_response +async def websocket_set_active_list( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Handle set active list command.""" + storage = get_storage(hass) + list_id = msg["list_id"] + + success = await storage.set_active_list(list_id) + + if not success: + connection.send_error(msg["id"], "not_found", "List not found") + return + + # Fire event + hass.bus.async_fire( + EVENT_LIST_UPDATED, + {"list_id": list_id, "action": "set_active"} + ) + + connection.send_result(msg["id"], {"success": True}) + + +# ============================================================================= +# ITEM HANDLERS +# ============================================================================= + +@websocket_api.websocket_command( + { + vol.Required("type"): WS_TYPE_ITEMS_GET, + vol.Required("list_id"): str, + } +) +@callback +def websocket_get_items( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Handle get items command.""" + storage = get_storage(hass) + list_id = msg["list_id"] + + items = storage.get_items(list_id) + + connection.send_result( + msg["id"], + { + "items": [item.to_dict() for item in items] + } + ) + + +@websocket_api.websocket_command( + { + vol.Required("type"): WS_TYPE_ITEMS_ADD, + vol.Required("list_id"): str, + vol.Required("name"): str, + vol.Required("category_id"): str, + vol.Optional("product_id"): str, + vol.Optional("quantity", default=1): vol.Coerce(float), + vol.Optional("unit", default="units"): str, + vol.Optional("note"): str, + vol.Optional("price"): vol.Coerce(float), + vol.Optional("image_url"): str, + vol.Optional("barcode"): str, + } +) +@websocket_api.async_response +async def websocket_add_item( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Handle add item command.""" + storage = get_storage(hass) + list_id = msg["list_id"] + + # Build item data + item_data = { + "name": msg["name"], + "category_id": msg["category_id"], + "quantity": msg.get("quantity", 1), + "unit": msg.get("unit", "units"), + } + + # Optional fields + optional_fields = ["product_id", "note", "price", "image_url", "barcode"] + for field in optional_fields: + if field in msg: + item_data[field] = msg[field] + + new_item = await storage.add_item(list_id, **item_data) + + if new_item is None: + connection.send_error(msg["id"], "not_found", "List not found") + return + + # Fire event + hass.bus.async_fire( + EVENT_ITEM_ADDED, + { + "list_id": list_id, + "item_id": new_item.id, + "item": new_item.to_dict() + } + ) + + connection.send_result( + msg["id"], + {"item": new_item.to_dict()} + ) + + +@websocket_api.websocket_command( + { + vol.Required("type"): WS_TYPE_ITEMS_UPDATE, + vol.Required("item_id"): str, + vol.Optional("name"): str, + vol.Optional("quantity"): vol.Coerce(float), + vol.Optional("unit"): str, + vol.Optional("note"): str, + vol.Optional("price"): vol.Coerce(float), + vol.Optional("category_id"): str, + vol.Optional("image_url"): str, + } +) +@websocket_api.async_response +async def websocket_update_item( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Handle update item command.""" + storage = get_storage(hass) + item_id = msg["item_id"] + + # Build update data + update_data = {} + update_fields = ["name", "quantity", "unit", "note", "price", "category_id", "image_url"] + for field in update_fields: + if field in msg: + update_data[field] = msg[field] + + updated_item = await storage.update_item(item_id, **update_data) + + if updated_item is None: + connection.send_error(msg["id"], "not_found", "Item not found") + return + + # Fire event + hass.bus.async_fire( + EVENT_ITEM_UPDATED, + { + "list_id": updated_item.list_id, + "item_id": item_id, + "item": updated_item.to_dict() + } + ) + + connection.send_result( + msg["id"], + {"item": updated_item.to_dict()} + ) + + +@websocket_api.websocket_command( + { + vol.Required("type"): WS_TYPE_ITEMS_CHECK, + vol.Required("item_id"): str, + vol.Required("checked"): bool, + } +) +@websocket_api.async_response +async def websocket_check_item( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Handle check/uncheck item command.""" + storage = get_storage(hass) + item_id = msg["item_id"] + checked = msg["checked"] + + updated_item = await storage.check_item(item_id, checked) + + if updated_item is None: + connection.send_error(msg["id"], "not_found", "Item not found") + return + + # Fire event + hass.bus.async_fire( + EVENT_ITEM_CHECKED, + { + "list_id": updated_item.list_id, + "item_id": item_id, + "checked": checked + } + ) + + connection.send_result( + msg["id"], + {"item": updated_item.to_dict()} + ) + + +@websocket_api.websocket_command( + { + vol.Required("type"): WS_TYPE_ITEMS_DELETE, + vol.Required("item_id"): str, + } +) +@websocket_api.async_response +async def websocket_delete_item( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Handle delete item command.""" + storage = get_storage(hass) + item_id = msg["item_id"] + + success = await storage.delete_item(item_id) + + if not success: + connection.send_error(msg["id"], "not_found", "Item not found") + return + + # Fire event + hass.bus.async_fire( + EVENT_ITEM_DELETED, + {"item_id": item_id} + ) + + connection.send_result(msg["id"], {"success": True}) + + +@websocket_api.websocket_command( + { + vol.Required("type"): WS_TYPE_ITEMS_REORDER, + vol.Required("list_id"): str, + vol.Required("item_order"): [str], + } +) +@websocket_api.async_response +async def websocket_reorder_items( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Handle reorder items command.""" + storage = get_storage(hass) + list_id = msg["list_id"] + item_order = msg["item_order"] + + updated_list = await storage.update_list(list_id, item_order=item_order) + + if updated_list is None: + connection.send_error(msg["id"], "not_found", "List not found") + return + + # Fire event + hass.bus.async_fire( + EVENT_LIST_UPDATED, + {"list_id": list_id, "action": "reordered"} + ) + + connection.send_result(msg["id"], {"success": True}) + + +@websocket_api.websocket_command( + { + vol.Required("type"): WS_TYPE_ITEMS_BULK_CHECK, + vol.Required("item_ids"): [str], + vol.Required("checked"): bool, + } +) +@websocket_api.async_response +async def websocket_bulk_check_items( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Handle bulk check/uncheck items command.""" + storage = get_storage(hass) + item_ids = msg["item_ids"] + checked = msg["checked"] + + count = await storage.bulk_check_items(item_ids, checked) + + # Fire event + hass.bus.async_fire( + EVENT_ITEM_CHECKED, + { + "item_ids": item_ids, + "checked": checked, + "count": count + } + ) + + connection.send_result( + msg["id"], + {"success": True, "count": count} + ) + + +@websocket_api.websocket_command( + { + vol.Required("type"): WS_TYPE_ITEMS_CLEAR_CHECKED, + vol.Required("list_id"): str, + } +) +@websocket_api.async_response +async def websocket_clear_checked_items( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Handle clear checked items command.""" + storage = get_storage(hass) + list_id = msg["list_id"] + + count = await storage.clear_checked_items(list_id) + + # Fire event + hass.bus.async_fire( + EVENT_ITEM_DELETED, + {"list_id": list_id, "count": count, "action": "cleared_checked"} + ) + + connection.send_result( + msg["id"], + {"success": True, "count": count} + ) + + +@websocket_api.websocket_command( + { + vol.Required("type"): WS_TYPE_ITEMS_GET_TOTAL, + vol.Required("list_id"): str, + } +) +@callback +def websocket_get_list_total( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Handle get list total command.""" + storage = get_storage(hass) + list_id = msg["list_id"] + + total_data = storage.get_list_total(list_id) + + connection.send_result(msg["id"], total_data) + + +# ============================================================================= +# PRODUCT HANDLERS +# ============================================================================= + +@websocket_api.websocket_command( + { + vol.Required("type"): WS_TYPE_PRODUCTS_SEARCH, + vol.Required("query"): str, + vol.Optional("limit", default=10): int, + } +) +@callback +def websocket_search_products( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Handle search products command.""" + storage = get_storage(hass) + query = msg["query"] + limit = msg.get("limit", 10) + + results = storage.search_products(query, limit) + + connection.send_result( + msg["id"], + { + "products": [product.to_dict() for product in results] + } + ) + + +@websocket_api.websocket_command( + { + vol.Required("type"): WS_TYPE_PRODUCTS_SUGGESTIONS, + vol.Optional("limit", default=20): int, + } +) +@callback +def websocket_get_product_suggestions( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Handle get product suggestions command.""" + storage = get_storage(hass) + limit = msg.get("limit", 20) + + suggestions = storage.get_product_suggestions(limit) + + connection.send_result( + msg["id"], + { + "products": [product.to_dict() for product in suggestions] + } + ) + + +@websocket_api.websocket_command( + { + vol.Required("type"): WS_TYPE_PRODUCTS_ADD, + vol.Required("name"): str, + vol.Required("category_id"): str, + vol.Optional("aliases"): [str], + vol.Optional("default_unit", default="units"): str, + vol.Optional("default_quantity", default=1): vol.Coerce(float), + vol.Optional("price"): vol.Coerce(float), + vol.Optional("barcode"): str, + vol.Optional("image_url"): str, + } +) +@websocket_api.async_response +async def websocket_add_product( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Handle add product command.""" + storage = get_storage(hass) + + # Build product data + product_data = { + "name": msg["name"], + "category_id": msg["category_id"], + "default_unit": msg.get("default_unit", "units"), + "default_quantity": msg.get("default_quantity", 1), + "custom": True, + "source": "user" + } + + # Optional fields + optional_fields = ["aliases", "price", "barcode", "image_url"] + for field in optional_fields: + if field in msg: + product_data[field] = msg[field] + + new_product = await storage.add_product(**product_data) + + connection.send_result( + msg["id"], + {"product": new_product.to_dict()} + ) + + +@websocket_api.websocket_command( + { + vol.Required("type"): WS_TYPE_PRODUCTS_UPDATE, + vol.Required("product_id"): str, + vol.Optional("name"): str, + vol.Optional("category_id"): str, + vol.Optional("price"): vol.Coerce(float), + vol.Optional("default_unit"): str, + vol.Optional("default_quantity"): vol.Coerce(float), + vol.Optional("aliases"): [str], + vol.Optional("image_url"): str, + } +) +@websocket_api.async_response +async def websocket_update_product( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Handle update product command.""" + storage = get_storage(hass) + product_id = msg["product_id"] + + # Build update data + update_data = {} + update_fields = ["name", "category_id", "price", "default_unit", "default_quantity", "aliases", "image_url"] + for field in update_fields: + if field in msg: + update_data[field] = msg[field] + + # Add price_updated timestamp if price changed + if "price" in update_data: + from ..models import current_timestamp + update_data["price_updated"] = current_timestamp() + + updated_product = await storage.update_product(product_id, **update_data) + + if updated_product is None: + connection.send_error(msg["id"], "not_found", "Product not found") + return + + connection.send_result( + msg["id"], + {"product": updated_product.to_dict()} + ) + + +# ============================================================================= +# CATEGORY HANDLERS +# ============================================================================= + +@websocket_api.websocket_command( + { + vol.Required("type"): WS_TYPE_CATEGORIES_GET_ALL, + } +) +@callback +def websocket_get_categories( + hass: HomeAssistant, + connection: websocket_api.ActiveConnection, + msg: Dict[str, Any], +) -> None: + """Handle get all categories command.""" + storage = get_storage(hass) + categories = storage.get_categories() + + connection.send_result( + msg["id"], + { + "categories": [cat.to_dict() for cat in categories] + } + )