feat: Introduce private shopping lists with owner and member management, and add Home Assistant user lookup.

This commit is contained in:
thekiwismarthome
2026-02-26 14:41:43 +13:00
parent 36a8939ebc
commit 86896ba4af
5 changed files with 144 additions and 12 deletions
@@ -223,6 +223,18 @@ async def _async_register_websocket_handlers(
handlers.websocket_import_data,
)
# List members handler
websocket_api.async_register_command(
hass,
handlers.websocket_update_list_members,
)
# HA users handler
websocket_api.async_register_command(
hass,
handlers.websocket_get_ha_users,
)
_LOGGER.debug("WebSocket handlers registered")
@@ -16,6 +16,10 @@ WS_TYPE_LISTS_CREATE = f"{DOMAIN}/lists/create"
WS_TYPE_LISTS_UPDATE = f"{DOMAIN}/lists/update"
WS_TYPE_LISTS_DELETE = f"{DOMAIN}/lists/delete"
WS_TYPE_LISTS_SET_ACTIVE = f"{DOMAIN}/lists/set_active"
WS_TYPE_LISTS_UPDATE_MEMBERS = f"{DOMAIN}/lists/update_members"
# WebSocket Commands - Users
WS_TYPE_USERS_GET_ALL = f"{DOMAIN}/users/get_all"
# WebSocket Commands - Items
WS_TYPE_ITEMS_GET = f"{DOMAIN}/items/get"
@@ -107,6 +107,9 @@ class ShoppingList:
item_order: List[str] = field(default_factory=list)
category_order: List[str] = field(default_factory=list)
active: bool = False
# Ownership: None = visible to all users; set = private to owner + allowed_users
owner_id: Optional[str] = None
allowed_users: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
@@ -159,9 +159,21 @@ class ShoppingListStorage:
data = {list_id: lst.to_dict() for list_id, lst in self._lists.items()}
await self._store_lists.async_save(data)
def get_lists(self) -> List[ShoppingList]:
"""Get all lists."""
return list(self._lists.values())
def get_lists(self, user_id: str = None, is_admin: bool = False) -> List[ShoppingList]:
"""Get lists visible to the specified user.
Global lists (owner_id=None) are visible to everyone.
Private lists are visible to their owner, anyone in allowed_users, and admins.
"""
all_lists = list(self._lists.values())
if is_admin or user_id is None:
return all_lists
return [
lst for lst in all_lists
if lst.owner_id is None
or lst.owner_id == user_id
or user_id in (lst.allowed_users or [])
]
def get_list(self, list_id: str) -> Optional[ShoppingList]:
"""Get a specific list."""
@@ -174,13 +186,14 @@ class ShoppingListStorage:
return lst
return None
async def create_list(self, name: str, icon: str = "mdi:cart") -> ShoppingList:
"""Create a new list."""
async def create_list(self, name: str, icon: str = "mdi:cart", owner_id: str = None) -> ShoppingList:
"""Create a new list. Pass owner_id to make the list private to that user."""
new_list = ShoppingList(
id=generate_id(),
name=name,
icon=icon,
category_order=[cat.id for cat in self._categories]
category_order=[cat.id for cat in self._categories],
owner_id=owner_id,
)
self._lists[new_list.id] = new_list
self._items[new_list.id] = []
@@ -206,6 +219,18 @@ class ShoppingListStorage:
_LOGGER.debug("Updated list: %s", list_id)
return lst
async def update_list_members(self, list_id: str, allowed_users: List[str]) -> Optional[ShoppingList]:
"""Update the allowed_users for a private list."""
if list_id not in self._lists:
return None
lst = self._lists[list_id]
lst.allowed_users = allowed_users
from .models import current_timestamp
lst.updated_at = current_timestamp()
await self._save_lists()
_LOGGER.debug("Updated members for list: %s", list_id)
return lst
async def delete_list(self, list_id: str) -> bool:
"""Delete a list."""
if list_id not in self._lists:
@@ -14,6 +14,8 @@ from ..const import (
WS_TYPE_LISTS_UPDATE,
WS_TYPE_LISTS_DELETE,
WS_TYPE_LISTS_SET_ACTIVE,
WS_TYPE_LISTS_UPDATE_MEMBERS,
WS_TYPE_USERS_GET_ALL,
WS_TYPE_ITEMS_GET,
WS_TYPE_ITEMS_ADD,
WS_TYPE_ITEMS_UPDATE,
@@ -171,8 +173,11 @@ def websocket_get_lists(
) -> None:
"""Handle get all lists command."""
storage = get_storage(hass)
lists = storage.get_lists()
user = connection.user
user_id = user.id if user else None
is_admin = user.is_admin if user else False
lists = storage.get_lists(user_id=user_id, is_admin=is_admin)
connection.send_result(
msg["id"],
{
@@ -186,6 +191,7 @@ def websocket_get_lists(
vol.Required("type"): WS_TYPE_LISTS_CREATE,
vol.Required("name"): str,
vol.Optional("icon", default="mdi:cart"): str,
vol.Optional("private", default=True): bool,
}
)
@websocket_api.async_response
@@ -196,10 +202,15 @@ async def websocket_create_list(
) -> None:
"""Handle create list command."""
storage = get_storage(hass)
# Private lists are owned by the creating user; global lists have no owner.
is_private = msg.get("private", True)
owner_id = connection.user.id if is_private and connection.user else None
new_list = await storage.create_list(
name=msg["name"],
icon=msg.get("icon", "mdi:cart")
icon=msg.get("icon", "mdi:cart"),
owner_id=owner_id,
)
# Fire event
@@ -275,9 +286,21 @@ async def websocket_delete_list(
"""Handle delete list command."""
storage = get_storage(hass)
list_id = msg["list_id"]
lst = storage.get_list(list_id)
if lst is None:
connection.send_error(msg["id"], "not_found", "List not found")
return
# Only the owner or an admin may delete a private list
if lst.owner_id is not None:
user = connection.user
if not (user and (user.is_admin or user.id == lst.owner_id)):
connection.send_error(msg["id"], "forbidden", "Only the list owner can delete this list")
return
success = await storage.delete_list(list_id)
if not success:
connection.send_error(msg["id"], "not_found", "List not found")
return
@@ -972,3 +995,68 @@ async def websocket_import_data(
storage = get_storage(hass)
counts = await storage.import_user_data(msg["data"])
connection.send_result(msg["id"], {"success": True, "imported": counts})
# =============================================================================
# LIST MEMBERS HANDLER
# =============================================================================
@websocket_api.websocket_command(
{
vol.Required("type"): WS_TYPE_LISTS_UPDATE_MEMBERS,
vol.Required("list_id"): str,
vol.Required("allowed_users"): [str],
}
)
@websocket_api.async_response
async def websocket_update_list_members(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Update the allowed_users for a private list."""
storage = get_storage(hass)
list_id = msg["list_id"]
lst = storage.get_list(list_id)
if lst is None:
connection.send_error(msg["id"], "not_found", "List not found")
return
# Only the owner or an admin may manage members
user = connection.user
if lst.owner_id is not None and not (user and (user.is_admin or user.id == lst.owner_id)):
connection.send_error(msg["id"], "forbidden", "Only the list owner can manage members")
return
updated = await storage.update_list_members(list_id, msg["allowed_users"])
hass.bus.async_fire(
EVENT_LIST_UPDATED,
{"list_id": list_id, "action": "members_updated"}
)
connection.send_result(msg["id"], {"list": updated.to_dict()})
# =============================================================================
# HA USERS HANDLER
# =============================================================================
@websocket_api.websocket_command(
{
vol.Required("type"): WS_TYPE_USERS_GET_ALL,
}
)
@websocket_api.async_response
async def websocket_get_ha_users(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Return all active, non-system HA users."""
users = await hass.auth.async_get_users()
result = [
{"id": u.id, "name": u.name}
for u in users
if not u.system_generated and u.is_active
]
connection.send_result(msg["id"], {"users": result})