mirror of
https://github.com/thekiwismarthome/shopping-list-manager.git
synced 2026-06-30 21:46:30 +00:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| cf3ab90d74 | |||
| 4b7043d075 | |||
| e7306275e4 | |||
| 9430811cda | |||
| 2b11632253 |
@@ -200,6 +200,10 @@ async def _async_register_websocket_handlers(
|
||||
hass,
|
||||
handlers.websocket_update_product,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_delete_product,
|
||||
)
|
||||
websocket_api.async_register_command(
|
||||
hass,
|
||||
handlers.websocket_get_product_substitutes,
|
||||
|
||||
@@ -76,7 +76,10 @@ IMAGE_FORMAT = "webp"
|
||||
IMAGE_SIZE = 200 # 200x200px
|
||||
IMAGE_QUALITY = 85
|
||||
IMAGE_MAX_SIZE_KB = 15
|
||||
IMAGES_LOCAL_DIR = "www/shopping_list_manager/images"
|
||||
IMAGES_LOCAL_DIR = "www/images/shopping_list_manager"
|
||||
LEGACY_IMAGES_LOCAL_DIR = "www/shopping_list_manager/images"
|
||||
LOCAL_IMAGE_URL_PREFIX = "/local/images/shopping_list_manager/"
|
||||
LEGACY_IMAGE_URL_PREFIX = "/local/shopping_list_manager/images/"
|
||||
|
||||
# Placeholder image (inline SVG)
|
||||
PLACEHOLDER_IMAGE = "data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='200' height='200'%3E%3Crect width='200' height='200' fill='%23f0f0f0'/%3E%3Ctext x='50%25' y='50%25' dominant-baseline='middle' text-anchor='middle' font-family='Arial' font-size='16' fill='%23999'%3ENo Image%3C/text%3E%3C/svg%3E"
|
||||
|
||||
@@ -2,7 +2,9 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Any
|
||||
from .utils.search import ProductSearch
|
||||
from homeassistant.core import HomeAssistant
|
||||
@@ -15,6 +17,10 @@ from .const import (
|
||||
STORAGE_KEY_PRODUCTS,
|
||||
STORAGE_KEY_CATEGORIES,
|
||||
STORAGE_KEY_LOYALTY_CARDS,
|
||||
IMAGES_LOCAL_DIR,
|
||||
LEGACY_IMAGES_LOCAL_DIR,
|
||||
LOCAL_IMAGE_URL_PREFIX,
|
||||
LEGACY_IMAGE_URL_PREFIX,
|
||||
)
|
||||
from .data.catalog_loader import load_product_catalog
|
||||
from .models import ShoppingList, Item, Product, Category, LoyaltyCard, generate_id
|
||||
@@ -49,6 +55,8 @@ class ShoppingListStorage:
|
||||
self._categories: List[Category] = []
|
||||
self._loyalty_cards: Dict[str, LoyaltyCard] = {}
|
||||
self._search_engine: Optional[ProductSearch] = None
|
||||
self._images_dir = Path(hass.config.path(IMAGES_LOCAL_DIR))
|
||||
self._legacy_images_dir = Path(hass.config.path(LEGACY_IMAGES_LOCAL_DIR))
|
||||
|
||||
async def async_load(self) -> None:
|
||||
"""Load data from storage."""
|
||||
@@ -90,6 +98,10 @@ class ShoppingListStorage:
|
||||
}
|
||||
_LOGGER.debug("Loaded %d products", len(self._products))
|
||||
|
||||
# Ensure image directory exists and migrate legacy image paths/URLs.
|
||||
self._images_dir.mkdir(parents=True, exist_ok=True)
|
||||
await self._migrate_legacy_images_and_urls()
|
||||
|
||||
# Load categories
|
||||
categories_data = await self._store_categories.async_load()
|
||||
if categories_data:
|
||||
@@ -420,6 +432,56 @@ class ShoppingListStorage:
|
||||
data = {product_id: product.to_dict() for product_id, product in self._products.items()}
|
||||
await self._store_products.async_save(data)
|
||||
|
||||
async def _migrate_legacy_images_and_urls(self) -> None:
|
||||
"""Move old image files and rewrite stored URLs to the new path."""
|
||||
moved_files = 0
|
||||
updated_product_urls = 0
|
||||
updated_item_urls = 0
|
||||
|
||||
if self._legacy_images_dir.exists() and self._legacy_images_dir != self._images_dir:
|
||||
for src in self._legacy_images_dir.glob("*"):
|
||||
if not src.is_file():
|
||||
continue
|
||||
dest = self._images_dir / src.name
|
||||
if dest.exists():
|
||||
continue
|
||||
try:
|
||||
shutil.move(str(src), str(dest))
|
||||
moved_files += 1
|
||||
except Exception as err:
|
||||
_LOGGER.debug("Could not move legacy image %s: %s", src, err)
|
||||
|
||||
for product in self._products.values():
|
||||
image_url = product.image_url or ""
|
||||
if image_url.startswith(LEGACY_IMAGE_URL_PREFIX):
|
||||
product.image_url = image_url.replace(
|
||||
LEGACY_IMAGE_URL_PREFIX, LOCAL_IMAGE_URL_PREFIX, 1
|
||||
)
|
||||
updated_product_urls += 1
|
||||
|
||||
for items in self._items.values():
|
||||
for item in items:
|
||||
image_url = item.image_url or ""
|
||||
if image_url.startswith(LEGACY_IMAGE_URL_PREFIX):
|
||||
item.image_url = image_url.replace(
|
||||
LEGACY_IMAGE_URL_PREFIX, LOCAL_IMAGE_URL_PREFIX, 1
|
||||
)
|
||||
updated_item_urls += 1
|
||||
|
||||
if updated_product_urls:
|
||||
await self._save_products()
|
||||
if updated_item_urls:
|
||||
await self._save_items()
|
||||
|
||||
if moved_files or updated_product_urls or updated_item_urls:
|
||||
_LOGGER.info(
|
||||
"Migrated shopping list images to %s (moved_files=%d, updated_product_urls=%d, updated_item_urls=%d)",
|
||||
IMAGES_LOCAL_DIR,
|
||||
moved_files,
|
||||
updated_product_urls,
|
||||
updated_item_urls,
|
||||
)
|
||||
|
||||
def get_products(self) -> List[Product]:
|
||||
"""Get all products."""
|
||||
return list(self._products.values())
|
||||
@@ -556,6 +618,18 @@ class ShoppingListStorage:
|
||||
_LOGGER.info("Reloaded catalog for %s: %d products imported", country_code, count)
|
||||
return count
|
||||
|
||||
async def delete_product(self, product_id: str) -> bool:
|
||||
"""Delete a product from the catalog."""
|
||||
if product_id not in self._products:
|
||||
return False
|
||||
del self._products[product_id]
|
||||
await self._save_products()
|
||||
# Rebuild search engine so the product is no longer searchable
|
||||
products_dict = {pid: p.to_dict() for pid, p in self._products.items()}
|
||||
self._search_engine = ProductSearch(products_dict)
|
||||
_LOGGER.debug("Deleted product: %s", product_id)
|
||||
return True
|
||||
|
||||
async def update_product(self, product_id: str, **kwargs) -> Optional[Product]:
|
||||
"""Update a product."""
|
||||
if product_id not in self._products:
|
||||
|
||||
@@ -1,9 +1,15 @@
|
||||
"""Image handling utilities for Shopping List Manager."""
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from ..const import (
|
||||
IMAGES_LOCAL_DIR,
|
||||
LEGACY_IMAGES_LOCAL_DIR,
|
||||
LOCAL_IMAGE_URL_PREFIX,
|
||||
)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -18,12 +24,29 @@ class ImageHandler:
|
||||
config_path: Path to HA config directory
|
||||
"""
|
||||
self.hass = hass
|
||||
# Images stored in /config/www/shopping_list_manager/images/
|
||||
self._local_images_dir = Path(config_path) / "www" / "shopping_list_manager" / "images"
|
||||
# Images stored in /config/www/images/shopping_list_manager/
|
||||
self._local_images_dir = Path(hass.config.path(IMAGES_LOCAL_DIR))
|
||||
self._legacy_images_dir = Path(hass.config.path(LEGACY_IMAGES_LOCAL_DIR))
|
||||
self._local_images_dir.mkdir(parents=True, exist_ok=True)
|
||||
self._migrate_legacy_files()
|
||||
|
||||
_LOGGER.info("Image directory: %s", self._local_images_dir)
|
||||
|
||||
def _migrate_legacy_files(self) -> None:
|
||||
"""Move legacy image files to the new standardized directory."""
|
||||
if not self._legacy_images_dir.exists() or self._legacy_images_dir == self._local_images_dir:
|
||||
return
|
||||
for src in self._legacy_images_dir.glob("*"):
|
||||
if not src.is_file():
|
||||
continue
|
||||
dest = self._local_images_dir / src.name
|
||||
if dest.exists():
|
||||
continue
|
||||
try:
|
||||
shutil.move(str(src), str(dest))
|
||||
except Exception as err:
|
||||
_LOGGER.debug("Could not move legacy image %s: %s", src, err)
|
||||
|
||||
def get_image_url(self, product_name: str, external_url: Optional[str] = None) -> str:
|
||||
"""Get image URL for a product.
|
||||
|
||||
@@ -73,11 +96,11 @@ class ImageHandler:
|
||||
# Check exact match
|
||||
image_file = self._local_images_dir / f"{normalized_name}{ext}"
|
||||
if image_file.exists():
|
||||
return f"/local/shopping_list_manager/images/{normalized_name}{ext}"
|
||||
return f"{LOCAL_IMAGE_URL_PREFIX}{normalized_name}{ext}"
|
||||
|
||||
# Check for files starting with the product name
|
||||
for file in self._local_images_dir.glob(f"{normalized_name}*{ext}"):
|
||||
return f"/local/shopping_list_manager/images/{file.name}"
|
||||
return f"{LOCAL_IMAGE_URL_PREFIX}{file.name}"
|
||||
|
||||
return None
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ from ..const import (
|
||||
IMAGE_SIZE,
|
||||
IMAGE_QUALITY,
|
||||
IMAGES_LOCAL_DIR,
|
||||
LOCAL_IMAGE_URL_PREFIX,
|
||||
WS_TYPE_LISTS_GET_ALL,
|
||||
WS_TYPE_LISTS_CREATE,
|
||||
WS_TYPE_LISTS_UPDATE,
|
||||
@@ -38,6 +39,7 @@ from ..const import (
|
||||
WS_TYPE_PRODUCTS_SUGGESTIONS,
|
||||
WS_TYPE_PRODUCTS_ADD,
|
||||
WS_TYPE_PRODUCTS_UPDATE,
|
||||
WS_TYPE_PRODUCTS_DELETE,
|
||||
WS_TYPE_CATEGORIES_GET_ALL,
|
||||
WS_TYPE_LOYALTY_GET_ALL,
|
||||
WS_TYPE_LOYALTY_ADD,
|
||||
@@ -471,7 +473,7 @@ def websocket_get_items(
|
||||
vol.Required("type"): WS_TYPE_ITEMS_ADD,
|
||||
vol.Required("list_id"): str,
|
||||
vol.Required("name"): str,
|
||||
vol.Required("category_id"): str,
|
||||
vol.Optional("category_id", default="other"): str,
|
||||
vol.Optional("product_id"): str,
|
||||
vol.Optional("quantity", default=1): vol.Coerce(float),
|
||||
vol.Optional("unit", default="units"): str,
|
||||
@@ -816,7 +818,8 @@ async def websocket_download_product_image(
|
||||
|
||||
try:
|
||||
session = async_get_clientsession(hass)
|
||||
async with session.get(raw_url, timeout=ClientTimeout(total=10)) as resp:
|
||||
headers = {"User-Agent": "Mozilla/5.0 (compatible; HomeAssistant/ShoppingListManager)"}
|
||||
async with session.get(raw_url, timeout=ClientTimeout(total=15), headers=headers) as resp:
|
||||
if resp.status != 200:
|
||||
connection.send_error(msg["id"], "download_failed", f"HTTP {resp.status}")
|
||||
return
|
||||
@@ -827,8 +830,14 @@ async def websocket_download_product_image(
|
||||
|
||||
try:
|
||||
img = Image.open(io.BytesIO(raw))
|
||||
if img.mode not in ("RGB", "RGBA"):
|
||||
img = img.convert("RGBA")
|
||||
# Convert to RGB for reliable lossy WebP encoding
|
||||
# (RGBA, palette, grayscale modes can fail or produce oversized files)
|
||||
if img.mode == "RGBA":
|
||||
bg = Image.new("RGB", img.size, (255, 255, 255))
|
||||
bg.paste(img, mask=img.split()[3])
|
||||
img = bg
|
||||
elif img.mode != "RGB":
|
||||
img = img.convert("RGB")
|
||||
img.thumbnail((IMAGE_SIZE, IMAGE_SIZE), Image.LANCZOS)
|
||||
out = io.BytesIO()
|
||||
img.save(out, format="WEBP", quality=IMAGE_QUALITY)
|
||||
@@ -839,7 +848,7 @@ async def websocket_download_product_image(
|
||||
|
||||
connection.send_result(
|
||||
msg["id"],
|
||||
{"local_url": f"/local/shopping_list_manager/images/{filename}"},
|
||||
{"local_url": f"{LOCAL_IMAGE_URL_PREFIX}{filename}"},
|
||||
)
|
||||
|
||||
|
||||
@@ -1049,6 +1058,27 @@ async def websocket_update_product(
|
||||
)
|
||||
|
||||
|
||||
@websocket_api.websocket_command(
|
||||
{
|
||||
vol.Required("type"): WS_TYPE_PRODUCTS_DELETE,
|
||||
vol.Required("product_id"): str,
|
||||
}
|
||||
)
|
||||
@websocket_api.async_response
|
||||
async def websocket_delete_product(
|
||||
hass: HomeAssistant,
|
||||
connection: websocket_api.ActiveConnection,
|
||||
msg: Dict[str, Any],
|
||||
) -> None:
|
||||
"""Handle delete product command."""
|
||||
storage = get_storage(hass)
|
||||
deleted = await storage.delete_product(msg["product_id"])
|
||||
if not deleted:
|
||||
connection.send_error(msg["id"], "not_found", "Product not found")
|
||||
return
|
||||
connection.send_result(msg["id"], {"deleted": True})
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# CATEGORY HANDLERS
|
||||
# =============================================================================
|
||||
|
||||
Reference in New Issue
Block a user