From 3829039a8af94bfc192e02dd9feb2aff9ce31a4a Mon Sep 17 00:00:00 2001 From: thekiwismarthome <134335563+thekiwismarthome@users.noreply.github.com> Date: Fri, 13 Feb 2026 13:54:38 +1300 Subject: [PATCH] Delete custom_components/shopping_list_manager/websocket_api.py --- .../shopping_list_manager/websocket_api.py | 325 ------------------ 1 file changed, 325 deletions(-) delete mode 100644 custom_components/shopping_list_manager/websocket_api.py diff --git a/custom_components/shopping_list_manager/websocket_api.py b/custom_components/shopping_list_manager/websocket_api.py deleted file mode 100644 index da53906..0000000 --- a/custom_components/shopping_list_manager/websocket_api.py +++ /dev/null @@ -1,325 +0,0 @@ -"""WebSocket API for Shopping List Manager.""" -import logging -import voluptuous as vol - -from homeassistant.components import websocket_api -from homeassistant.core import HomeAssistant, callback - -from .const import DOMAIN -from .models import InvariantError - -_LOGGER = logging.getLogger(__name__) - -@websocket_api.websocket_command({ - vol.Required("type"): "shopping_list_manager/create_list", - vol.Required("list_id"): str, - vol.Required("catalogue"): str, - vol.Optional("visibility", default="shared"): vol.In(["shared", "private"]), -}) -@websocket_api.async_response -async def websocket_create_list( - hass: HomeAssistant, - connection: websocket_api.ActiveConnection, - msg: dict, -) -> None: - manager = hass.data[DOMAIN]["manager"] - - try: - await manager.async_create_list( - list_id=msg["list_id"], - catalogue=msg["catalogue"], - owner=connection.user.id, - visibility=msg.get("visibility", "shared"), - ) - - connection.send_result(msg["id"], {"success": True}) - - except Exception as err: - connection.send_error(msg["id"], "create_list_failed", str(err)) - - -@websocket_api.websocket_command({ - vol.Required("type"): "shopping_list_manager/add_product", - vol.Optional("list_id", default="groceries"): str, - vol.Required("key"): str, - vol.Required("name"): str, - vol.Optional("category", default="other"): str, - vol.Optional("unit", default="pcs"): str, - vol.Optional("image", default=""): str, -}) -@websocket_api.async_response -async def websocket_add_product( - hass: HomeAssistant, - connection: websocket_api.ActiveConnection, - msg: dict, -) -> None: - """ - Add or update a product in the catalog. - - Does NOT modify quantity - use set_qty for that. - - Request: - { - "type": "shopping_list_manager/add_product", - "list_id": "groceries", # optional, defaults to "groceries" - "key": "milk", - "name": "Milk", - "category": "dairy", - "unit": "pcs", - "image": "" - } - - Response: - { - "success": true, - "result": { - "key": "milk", - "name": "Milk", - "category": "dairy", - "unit": "pcs", - "image": "" - } - } - """ - manager = hass.data[DOMAIN]["manager"] - list_id = msg.get("list_id", "groceries") - - try: - product = await manager.async_add_product( - list_id=list_id, - key=msg["key"], - name=msg["name"], - category=msg.get("category", "other"), - unit=msg.get("unit", "pcs"), - image=msg.get("image", "") - ) - - connection.send_result(msg["id"], product.to_dict()) - - except Exception as err: - _LOGGER.error("Error adding product to list '%s': %s", list_id, err) - connection.send_error(msg["id"], "add_product_failed", str(err)) - - -@websocket_api.websocket_command({ - vol.Required("type"): "shopping_list_manager/get_catalogues", -}) -@websocket_api.async_response -async def ws_get_catalogues( - hass: HomeAssistant, - connection: websocket_api.ActiveConnection, - msg: dict, -) -> None: - """Return catalogue metadata (read-only).""" - manager = hass.data[DOMAIN]["manager"] - - try: - catalogues = manager.get_catalogues() - connection.send_result(msg["id"], catalogues) - except Exception as err: - _LOGGER.error("Error getting catalogues: %s", err) - connection.send_error(msg["id"], "get_catalogues_failed", str(err)) - - -@websocket_api.websocket_command({ - vol.Required("type"): "shopping_list_manager/get_lists", -}) -@websocket_api.async_response -async def ws_get_lists( - hass: HomeAssistant, - connection: websocket_api.ActiveConnection, - msg: dict, -) -> None: - """Return list → catalogue mapping (read-only).""" - manager = hass.data[DOMAIN]["manager"] - - try: - # Ensure lists are loaded - await manager._ensure_lists_loaded() - lists = manager._lists - connection.send_result(msg["id"], lists) - except Exception as err: - _LOGGER.error("Error getting lists: %s", err) - connection.send_error(msg["id"], "get_lists_failed", str(err)) - - -@websocket_api.websocket_command({ - vol.Required("type"): "shopping_list_manager/set_qty", - vol.Optional("list_id", default="groceries"): str, - vol.Required("key"): str, - vol.Required("qty"): vol.All(int, vol.Range(min=0)), -}) -@websocket_api.async_response -async def websocket_set_qty( - hass: HomeAssistant, - connection: websocket_api.ActiveConnection, - msg: dict, -) -> None: - """ - Set quantity for a product on the shopping list. - - Product MUST exist in catalog first. - qty = 0 removes from list. - qty > 0 adds/updates on list. - - Request: - { - "type": "shopping_list_manager/set_qty", - "list_id": "groceries", # optional, defaults to "groceries" - "key": "milk", - "qty": 2 - } - - Response: - { - "success": true - } - - Error (if product doesn't exist): - { - "success": false, - "error": { - "code": "invariant_violation", - "message": "Cannot set quantity for unknown product 'milk'..." - } - } - """ - manager = hass.data[DOMAIN]["manager"] - list_id = msg.get("list_id", "groceries") - - try: - await manager.async_set_qty( - list_id=list_id, - key=msg["key"], - qty=msg["qty"] - ) - - connection.send_result(msg["id"], {"success": True}) - - except InvariantError as err: - # This is expected if frontend tries to set qty for non-existent product - _LOGGER.warning("Invariant violation in set_qty (list '%s'): %s", list_id, err) - connection.send_error(msg["id"], "invariant_violation", str(err)) - - except Exception as err: - _LOGGER.error("Error setting quantity in list '%s': %s", list_id, err) - connection.send_error(msg["id"], "set_qty_failed", str(err)) - - -@websocket_api.websocket_command({ - vol.Required("type"): "shopping_list_manager/get_products", - vol.Optional("list_id", default="groceries"): str, -}) -@websocket_api.async_response -async def websocket_get_products( - hass: HomeAssistant, - connection: websocket_api.ActiveConnection, - msg: dict, -) -> None: - """ - Get all products in the catalog. - - Request: - { - "type": "shopping_list_manager/get_products", - "list_id": "groceries" # optional, defaults to "groceries" - } - - Response: - { - "milk": { - "key": "milk", - "name": "Milk", - "category": "dairy", - "unit": "pcs", - "image": "" - }, - ... - } - """ - manager = hass.data[DOMAIN]["manager"] - list_id = msg.get("list_id", "groceries") - - try: - products = await manager.async_get_products(list_id=list_id) - connection.send_result(msg["id"], products) - - except Exception as err: - _LOGGER.error("Error getting products for list '%s': %s", list_id, err) - connection.send_error(msg["id"], "get_products_failed", str(err)) - - -@websocket_api.websocket_command({ - vol.Required("type"): "shopping_list_manager/get_active", - vol.Optional("list_id", default="groceries"): str, -}) -@websocket_api.async_response -async def websocket_get_active( - hass: HomeAssistant, - connection: websocket_api.ActiveConnection, - msg: dict, -) -> None: - """ - Get active shopping list (quantities only). - - Request: - { - "type": "shopping_list_manager/get_active", - "list_id": "groceries" # optional, defaults to "groceries" - } - - Response: - { - "milk": {"qty": 2}, - "bread": {"qty": 1}, - ... - } - """ - manager = hass.data[DOMAIN]["manager"] - list_id = msg.get("list_id", "groceries") - - try: - active = await manager.async_get_active(list_id=list_id) - connection.send_result(msg["id"], active) - - except Exception as err: - _LOGGER.error("Error getting active list for '%s': %s", list_id, err) - connection.send_error(msg["id"], "get_active_failed", str(err)) - - -@websocket_api.websocket_command({ - vol.Required("type"): "shopping_list_manager/delete_product", - vol.Optional("list_id", default="groceries"): str, - vol.Required("key"): str, -}) -@websocket_api.async_response -async def websocket_delete_product( - hass: HomeAssistant, - connection: websocket_api.ActiveConnection, - msg: dict, -) -> None: - """ - Delete a product from catalog (and remove from active list). - - Request: - { - "type": "shopping_list_manager/delete_product", - "list_id": "groceries", # optional, defaults to "groceries" - "key": "milk" - } - - Response: - { - "success": true - } - """ - manager = hass.data[DOMAIN]["manager"] - list_id = msg.get("list_id", "groceries") - - try: - await manager.async_delete_product(list_id=list_id, key=msg["key"]) - connection.send_result(msg["id"], {"success": True}) - - except Exception as err: - _LOGGER.error("Error deleting product from list '%s': %s", list_id, err) - connection.send_error(msg["id"], "delete_product_failed", str(err))