feat: add loyalty card management with dedicated storage, data model, and websocket API endpoints.

This commit is contained in:
thekiwismarthome
2026-02-27 13:00:08 +13:00
parent 86896ba4af
commit 752f9e5622
5 changed files with 295 additions and 2 deletions
@@ -235,6 +235,28 @@ async def _async_register_websocket_handlers(
handlers.websocket_get_ha_users,
)
# Loyalty card handlers
websocket_api.async_register_command(
hass,
handlers.websocket_get_loyalty_cards,
)
websocket_api.async_register_command(
hass,
handlers.websocket_add_loyalty_card,
)
websocket_api.async_register_command(
hass,
handlers.websocket_update_loyalty_card,
)
websocket_api.async_register_command(
hass,
handlers.websocket_delete_loyalty_card,
)
websocket_api.async_register_command(
hass,
handlers.websocket_update_loyalty_card_members,
)
_LOGGER.debug("WebSocket handlers registered")
@@ -9,6 +9,7 @@ STORAGE_KEY_LISTS = f"{DOMAIN}.lists"
STORAGE_KEY_ITEMS = f"{DOMAIN}.items"
STORAGE_KEY_PRODUCTS = f"{DOMAIN}.products"
STORAGE_KEY_CATEGORIES = f"{DOMAIN}.categories"
STORAGE_KEY_LOYALTY_CARDS = f"{DOMAIN}.loyalty_cards"
# WebSocket Commands - Lists
WS_TYPE_LISTS_GET_ALL = f"{DOMAIN}/lists/get_all"
@@ -43,6 +44,13 @@ WS_TYPE_PRODUCTS_DELETE = f"{DOMAIN}/products/delete"
WS_TYPE_CATEGORIES_GET_ALL = f"{DOMAIN}/categories/get_all"
WS_TYPE_CATEGORIES_REORDER = f"{DOMAIN}/categories/reorder"
# WebSocket Commands - Loyalty Cards
WS_TYPE_LOYALTY_GET_ALL = f"{DOMAIN}/loyalty/get_all"
WS_TYPE_LOYALTY_ADD = f"{DOMAIN}/loyalty/add"
WS_TYPE_LOYALTY_UPDATE = f"{DOMAIN}/loyalty/update"
WS_TYPE_LOYALTY_DELETE = f"{DOMAIN}/loyalty/delete"
WS_TYPE_LOYALTY_UPDATE_MEMBERS = f"{DOMAIN}/loyalty/update_members"
# WebSocket Commands - Subscriptions
WS_TYPE_SUBSCRIBE = f"{DOMAIN}/subscribe"
WS_TYPE_UNSUBSCRIBE = f"{DOMAIN}/unsubscribe"
@@ -96,6 +96,27 @@ class Item:
self.estimated_total = self.quantity * self.price
@dataclass
class LoyaltyCard:
"""Loyalty card model."""
id: str
name: str
number: str
barcode: str = ""
logo: str = ""
notes: str = ""
color: str = "#9fa8da"
created_at: str = field(default_factory=current_timestamp)
updated_at: str = field(default_factory=current_timestamp)
# Ownership: None = visible to all users; set = private to owner + allowed_users
owner_id: Optional[str] = None
allowed_users: List[str] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return asdict(self)
@dataclass
class ShoppingList:
"""Shopping list model."""
@@ -14,9 +14,10 @@ from .const import (
STORAGE_KEY_ITEMS,
STORAGE_KEY_PRODUCTS,
STORAGE_KEY_CATEGORIES,
STORAGE_KEY_LOYALTY_CARDS,
)
from .data.catalog_loader import load_product_catalog
from .models import ShoppingList, Item, Product, Category, generate_id
from .models import ShoppingList, Item, Product, Category, LoyaltyCard, generate_id
from .data.category_loader import load_categories
_LOGGER = logging.getLogger(__name__)
@@ -40,11 +41,13 @@ class ShoppingListStorage:
self._store_items = Store(hass, STORAGE_VERSION, STORAGE_KEY_ITEMS)
self._store_products = Store(hass, STORAGE_VERSION, STORAGE_KEY_PRODUCTS)
self._store_categories = Store(hass, STORAGE_VERSION, STORAGE_KEY_CATEGORIES)
self._store_loyalty_cards = Store(hass, STORAGE_VERSION, STORAGE_KEY_LOYALTY_CARDS)
self._lists: Dict[str, ShoppingList] = {}
self._items: Dict[str, List[Item]] = {}
self._products: Dict[str, Product] = {}
self._categories: List[Category] = []
self._loyalty_cards: Dict[str, LoyaltyCard] = {}
self._search_engine: Optional[ProductSearch] = None
async def async_load(self) -> None:
@@ -144,6 +147,15 @@ class ShoppingListStorage:
await self._save_products()
_LOGGER.info("Successfully imported %d products from catalog", len(self._products))
# Load loyalty cards
loyalty_data = await self._store_loyalty_cards.async_load()
if loyalty_data:
self._loyalty_cards = {
card_id: LoyaltyCard(**card_data)
for card_id, card_data in loyalty_data.items()
}
_LOGGER.debug("Loaded %d loyalty cards", len(self._loyalty_cards))
# Initialize search engine after products are loaded
if self._products:
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
@@ -660,3 +672,81 @@ class ShoppingListStorage:
def get_categories(self) -> List[Category]:
"""Get all categories."""
return self._categories
# Loyalty card methods
async def _save_loyalty_cards(self) -> None:
"""Save loyalty cards to storage."""
data = {card_id: card.to_dict() for card_id, card in self._loyalty_cards.items()}
await self._store_loyalty_cards.async_save(data)
def get_loyalty_cards(self, user_id: str = None, is_admin: bool = False) -> List[LoyaltyCard]:
"""Get loyalty cards visible to the specified user.
Global cards (owner_id=None) are visible to everyone.
Private cards are visible to their owner, anyone in allowed_users, and admins.
"""
all_cards = list(self._loyalty_cards.values())
if is_admin or user_id is None:
return all_cards
return [
card for card in all_cards
if card.owner_id is None
or card.owner_id == user_id
or user_id in (card.allowed_users or [])
]
def get_loyalty_card(self, card_id: str) -> Optional[LoyaltyCard]:
"""Get a specific loyalty card."""
return self._loyalty_cards.get(card_id)
async def create_loyalty_card(self, owner_id: str = None, **kwargs) -> LoyaltyCard:
"""Create a new loyalty card."""
from .models import current_timestamp
new_card = LoyaltyCard(
id=generate_id(),
owner_id=owner_id,
**kwargs
)
self._loyalty_cards[new_card.id] = new_card
await self._save_loyalty_cards()
_LOGGER.debug("Created loyalty card: %s", new_card.name)
return new_card
async def update_loyalty_card(self, card_id: str, **kwargs) -> Optional[LoyaltyCard]:
"""Update a loyalty card."""
if card_id not in self._loyalty_cards:
return None
card = self._loyalty_cards[card_id]
for key, value in kwargs.items():
if hasattr(card, key):
setattr(card, key, value)
from .models import current_timestamp
card.updated_at = current_timestamp()
await self._save_loyalty_cards()
_LOGGER.debug("Updated loyalty card: %s", card_id)
return card
async def delete_loyalty_card(self, card_id: str) -> bool:
"""Delete a loyalty card."""
if card_id not in self._loyalty_cards:
return False
del self._loyalty_cards[card_id]
await self._save_loyalty_cards()
_LOGGER.debug("Deleted loyalty card: %s", card_id)
return True
async def update_loyalty_card_members(self, card_id: str, allowed_users: List[str]) -> Optional[LoyaltyCard]:
"""Update the allowed_users for a private loyalty card."""
if card_id not in self._loyalty_cards:
return None
card = self._loyalty_cards[card_id]
card.allowed_users = allowed_users
from .models import current_timestamp
card.updated_at = current_timestamp()
await self._save_loyalty_cards()
_LOGGER.debug("Updated members for loyalty card: %s", card_id)
return card
@@ -30,6 +30,11 @@ from ..const import (
WS_TYPE_PRODUCTS_ADD,
WS_TYPE_PRODUCTS_UPDATE,
WS_TYPE_CATEGORIES_GET_ALL,
WS_TYPE_LOYALTY_GET_ALL,
WS_TYPE_LOYALTY_ADD,
WS_TYPE_LOYALTY_UPDATE,
WS_TYPE_LOYALTY_DELETE,
WS_TYPE_LOYALTY_UPDATE_MEMBERS,
WS_TYPE_SUBSCRIBE,
EVENT_ITEM_ADDED,
EVENT_ITEM_UPDATED,
@@ -1060,3 +1065,150 @@ async def websocket_get_ha_users(
if not u.system_generated and u.is_active
]
connection.send_result(msg["id"], {"users": result})
# =============================================================================
# LOYALTY CARD HANDLERS
# =============================================================================
@websocket_api.websocket_command({
vol.Required("type"): WS_TYPE_LOYALTY_GET_ALL,
})
@websocket_api.async_response
async def websocket_get_loyalty_cards(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Return all loyalty cards visible to the current user."""
storage = get_storage(hass)
user = connection.user
user_id = user.id if user else None
is_admin = user.is_admin if user else False
cards = storage.get_loyalty_cards(user_id=user_id, is_admin=is_admin)
connection.send_result(msg["id"], {"cards": [c.to_dict() for c in cards]})
@websocket_api.websocket_command({
vol.Required("type"): WS_TYPE_LOYALTY_ADD,
vol.Required("name"): str,
vol.Required("number"): str,
vol.Optional("barcode", default=""): str,
vol.Optional("logo", default=""): str,
vol.Optional("notes", default=""): str,
vol.Optional("color", default="#9fa8da"): str,
vol.Optional("private", default=False): bool,
})
@websocket_api.async_response
async def websocket_add_loyalty_card(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Add a new loyalty card."""
storage = get_storage(hass)
user = connection.user
owner_id = user.id if (user and msg.get("private")) else None
card = await storage.create_loyalty_card(
owner_id=owner_id,
name=msg["name"],
number=msg["number"],
barcode=msg.get("barcode", ""),
logo=msg.get("logo", ""),
notes=msg.get("notes", ""),
color=msg.get("color", "#9fa8da"),
)
connection.send_result(msg["id"], {"card": card.to_dict()})
@websocket_api.websocket_command({
vol.Required("type"): WS_TYPE_LOYALTY_UPDATE,
vol.Required("card_id"): str,
vol.Optional("name"): str,
vol.Optional("number"): str,
vol.Optional("barcode"): str,
vol.Optional("logo"): str,
vol.Optional("notes"): str,
vol.Optional("color"): str,
})
@websocket_api.async_response
async def websocket_update_loyalty_card(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Update an existing loyalty card."""
storage = get_storage(hass)
card_id = msg["card_id"]
card = storage.get_loyalty_card(card_id)
if card is None:
connection.send_error(msg["id"], "not_found", "Loyalty card not found")
return
user = connection.user
if card.owner_id is not None and not (user and (user.is_admin or user.id == card.owner_id)):
connection.send_error(msg["id"], "forbidden", "Only the card owner can update it")
return
fields = {k: v for k, v in msg.items() if k not in ("type", "id", "card_id")}
updated = await storage.update_loyalty_card(card_id, **fields)
connection.send_result(msg["id"], {"card": updated.to_dict()})
@websocket_api.websocket_command({
vol.Required("type"): WS_TYPE_LOYALTY_DELETE,
vol.Required("card_id"): str,
})
@websocket_api.async_response
async def websocket_delete_loyalty_card(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Delete a loyalty card."""
storage = get_storage(hass)
card_id = msg["card_id"]
card = storage.get_loyalty_card(card_id)
if card is None:
connection.send_error(msg["id"], "not_found", "Loyalty card not found")
return
user = connection.user
if card.owner_id is not None and not (user and (user.is_admin or user.id == card.owner_id)):
connection.send_error(msg["id"], "forbidden", "Only the card owner can delete it")
return
await storage.delete_loyalty_card(card_id)
connection.send_result(msg["id"], {"success": True})
@websocket_api.websocket_command({
vol.Required("type"): WS_TYPE_LOYALTY_UPDATE_MEMBERS,
vol.Required("card_id"): str,
vol.Required("allowed_users"): [str],
})
@websocket_api.async_response
async def websocket_update_loyalty_card_members(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Update the allowed_users for a private loyalty card."""
storage = get_storage(hass)
card_id = msg["card_id"]
card = storage.get_loyalty_card(card_id)
if card is None:
connection.send_error(msg["id"], "not_found", "Loyalty card not found")
return
user = connection.user
if card.owner_id is not None and not (user and (user.is_admin or user.id == card.owner_id)):
connection.send_error(msg["id"], "forbidden", "Only the card owner can manage members")
return
updated = await storage.update_loyalty_card_members(card_id, msg["allowed_users"])
connection.send_result(msg["id"], {"card": updated.to_dict()})