Compare commits

...

4 Commits

5 changed files with 167 additions and 6 deletions
@@ -171,6 +171,10 @@ async def _async_register_websocket_handlers(
) )
# Products handlers # Products handlers
websocket_api.async_register_command(
hass,
handlers.websocket_download_product_image,
)
websocket_api.async_register_command( websocket_api.async_register_command(
hass, hass,
handlers.websocket_search_by_barcode, handlers.websocket_search_by_barcode,
@@ -76,7 +76,10 @@ IMAGE_FORMAT = "webp"
IMAGE_SIZE = 200 # 200x200px IMAGE_SIZE = 200 # 200x200px
IMAGE_QUALITY = 85 IMAGE_QUALITY = 85
IMAGE_MAX_SIZE_KB = 15 IMAGE_MAX_SIZE_KB = 15
IMAGES_LOCAL_DIR = "www/shopping_list_manager/images" IMAGES_LOCAL_DIR = "www/images/shopping_list_manager"
LEGACY_IMAGES_LOCAL_DIR = "www/shopping_list_manager/images"
LOCAL_IMAGE_URL_PREFIX = "/local/images/shopping_list_manager/"
LEGACY_IMAGE_URL_PREFIX = "/local/shopping_list_manager/images/"
# Placeholder image (inline SVG) # Placeholder image (inline SVG)
PLACEHOLDER_IMAGE = "data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='200' height='200'%3E%3Crect width='200' height='200' fill='%23f0f0f0'/%3E%3Ctext x='50%25' y='50%25' dominant-baseline='middle' text-anchor='middle' font-family='Arial' font-size='16' fill='%23999'%3ENo Image%3C/text%3E%3C/svg%3E" PLACEHOLDER_IMAGE = "data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='200' height='200'%3E%3Crect width='200' height='200' fill='%23f0f0f0'/%3E%3Ctext x='50%25' y='50%25' dominant-baseline='middle' text-anchor='middle' font-family='Arial' font-size='16' fill='%23999'%3ENo Image%3C/text%3E%3C/svg%3E"
@@ -2,7 +2,9 @@
import json import json
import logging import logging
import os import os
import shutil
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Any from typing import Dict, List, Optional, Any
from .utils.search import ProductSearch from .utils.search import ProductSearch
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant
@@ -15,6 +17,10 @@ from .const import (
STORAGE_KEY_PRODUCTS, STORAGE_KEY_PRODUCTS,
STORAGE_KEY_CATEGORIES, STORAGE_KEY_CATEGORIES,
STORAGE_KEY_LOYALTY_CARDS, STORAGE_KEY_LOYALTY_CARDS,
IMAGES_LOCAL_DIR,
LEGACY_IMAGES_LOCAL_DIR,
LOCAL_IMAGE_URL_PREFIX,
LEGACY_IMAGE_URL_PREFIX,
) )
from .data.catalog_loader import load_product_catalog from .data.catalog_loader import load_product_catalog
from .models import ShoppingList, Item, Product, Category, LoyaltyCard, generate_id from .models import ShoppingList, Item, Product, Category, LoyaltyCard, generate_id
@@ -49,6 +55,8 @@ class ShoppingListStorage:
self._categories: List[Category] = [] self._categories: List[Category] = []
self._loyalty_cards: Dict[str, LoyaltyCard] = {} self._loyalty_cards: Dict[str, LoyaltyCard] = {}
self._search_engine: Optional[ProductSearch] = None self._search_engine: Optional[ProductSearch] = None
self._images_dir = Path(hass.config.path(IMAGES_LOCAL_DIR))
self._legacy_images_dir = Path(hass.config.path(LEGACY_IMAGES_LOCAL_DIR))
async def async_load(self) -> None: async def async_load(self) -> None:
"""Load data from storage.""" """Load data from storage."""
@@ -89,6 +97,10 @@ class ShoppingListStorage:
for product_id, product_data in products_data.items() for product_id, product_data in products_data.items()
} }
_LOGGER.debug("Loaded %d products", len(self._products)) _LOGGER.debug("Loaded %d products", len(self._products))
# Ensure image directory exists and migrate legacy image paths/URLs.
self._images_dir.mkdir(parents=True, exist_ok=True)
await self._migrate_legacy_images_and_urls()
# Load categories # Load categories
categories_data = await self._store_categories.async_load() categories_data = await self._store_categories.async_load()
@@ -419,6 +431,56 @@ class ShoppingListStorage:
"""Save products to storage.""" """Save products to storage."""
data = {product_id: product.to_dict() for product_id, product in self._products.items()} data = {product_id: product.to_dict() for product_id, product in self._products.items()}
await self._store_products.async_save(data) await self._store_products.async_save(data)
async def _migrate_legacy_images_and_urls(self) -> None:
"""Move old image files and rewrite stored URLs to the new path."""
moved_files = 0
updated_product_urls = 0
updated_item_urls = 0
if self._legacy_images_dir.exists() and self._legacy_images_dir != self._images_dir:
for src in self._legacy_images_dir.glob("*"):
if not src.is_file():
continue
dest = self._images_dir / src.name
if dest.exists():
continue
try:
shutil.move(str(src), str(dest))
moved_files += 1
except Exception as err:
_LOGGER.debug("Could not move legacy image %s: %s", src, err)
for product in self._products.values():
image_url = product.image_url or ""
if image_url.startswith(LEGACY_IMAGE_URL_PREFIX):
product.image_url = image_url.replace(
LEGACY_IMAGE_URL_PREFIX, LOCAL_IMAGE_URL_PREFIX, 1
)
updated_product_urls += 1
for items in self._items.values():
for item in items:
image_url = item.image_url or ""
if image_url.startswith(LEGACY_IMAGE_URL_PREFIX):
item.image_url = image_url.replace(
LEGACY_IMAGE_URL_PREFIX, LOCAL_IMAGE_URL_PREFIX, 1
)
updated_item_urls += 1
if updated_product_urls:
await self._save_products()
if updated_item_urls:
await self._save_items()
if moved_files or updated_product_urls or updated_item_urls:
_LOGGER.info(
"Migrated shopping list images to %s (moved_files=%d, updated_product_urls=%d, updated_item_urls=%d)",
IMAGES_LOCAL_DIR,
moved_files,
updated_product_urls,
updated_item_urls,
)
def get_products(self) -> List[Product]: def get_products(self) -> List[Product]:
"""Get all products.""" """Get all products."""
@@ -1,9 +1,15 @@
"""Image handling utilities for Shopping List Manager.""" """Image handling utilities for Shopping List Manager."""
import logging import logging
import os import shutil
from pathlib import Path from pathlib import Path
from typing import Optional from typing import Optional
from ..const import (
IMAGES_LOCAL_DIR,
LEGACY_IMAGES_LOCAL_DIR,
LOCAL_IMAGE_URL_PREFIX,
)
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@@ -18,11 +24,28 @@ class ImageHandler:
config_path: Path to HA config directory config_path: Path to HA config directory
""" """
self.hass = hass self.hass = hass
# Images stored in /config/www/shopping_list_manager/images/ # Images stored in /config/www/images/shopping_list_manager/
self._local_images_dir = Path(config_path) / "www" / "shopping_list_manager" / "images" self._local_images_dir = Path(hass.config.path(IMAGES_LOCAL_DIR))
self._legacy_images_dir = Path(hass.config.path(LEGACY_IMAGES_LOCAL_DIR))
self._local_images_dir.mkdir(parents=True, exist_ok=True) self._local_images_dir.mkdir(parents=True, exist_ok=True)
self._migrate_legacy_files()
_LOGGER.info("Image directory: %s", self._local_images_dir) _LOGGER.info("Image directory: %s", self._local_images_dir)
def _migrate_legacy_files(self) -> None:
"""Move legacy image files to the new standardized directory."""
if not self._legacy_images_dir.exists() or self._legacy_images_dir == self._local_images_dir:
return
for src in self._legacy_images_dir.glob("*"):
if not src.is_file():
continue
dest = self._local_images_dir / src.name
if dest.exists():
continue
try:
shutil.move(str(src), str(dest))
except Exception as err:
_LOGGER.debug("Could not move legacy image %s: %s", src, err)
def get_image_url(self, product_name: str, external_url: Optional[str] = None) -> str: def get_image_url(self, product_name: str, external_url: Optional[str] = None) -> str:
"""Get image URL for a product. """Get image URL for a product.
@@ -73,11 +96,11 @@ class ImageHandler:
# Check exact match # Check exact match
image_file = self._local_images_dir / f"{normalized_name}{ext}" image_file = self._local_images_dir / f"{normalized_name}{ext}"
if image_file.exists(): if image_file.exists():
return f"/local/shopping_list_manager/images/{normalized_name}{ext}" return f"{LOCAL_IMAGE_URL_PREFIX}{normalized_name}{ext}"
# Check for files starting with the product name # Check for files starting with the product name
for file in self._local_images_dir.glob(f"{normalized_name}*{ext}"): for file in self._local_images_dir.glob(f"{normalized_name}*{ext}"):
return f"/local/shopping_list_manager/images/{file.name}" return f"{LOCAL_IMAGE_URL_PREFIX}{file.name}"
return None return None
@@ -1,14 +1,24 @@
"""WebSocket API handlers for Shopping List Manager.""" """WebSocket API handlers for Shopping List Manager."""
import io
import logging import logging
import re
from pathlib import Path
from typing import Any, Dict from typing import Any, Dict
import voluptuous as vol import voluptuous as vol
from aiohttp import ClientTimeout
from PIL import Image
from homeassistant.components import websocket_api from homeassistant.components import websocket_api
from homeassistant.core import HomeAssistant, callback from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from ..const import DOMAIN from ..const import DOMAIN
from ..const import ( from ..const import (
IMAGE_SIZE,
IMAGE_QUALITY,
IMAGES_LOCAL_DIR,
LOCAL_IMAGE_URL_PREFIX,
WS_TYPE_LISTS_GET_ALL, WS_TYPE_LISTS_GET_ALL,
WS_TYPE_LISTS_CREATE, WS_TYPE_LISTS_CREATE,
WS_TYPE_LISTS_UPDATE, WS_TYPE_LISTS_UPDATE,
@@ -782,6 +792,65 @@ def websocket_get_list_total(
# PRODUCT HANDLERS # PRODUCT HANDLERS
# ============================================================================= # =============================================================================
@websocket_api.websocket_command(
{
vol.Required("type"): "shopping_list_manager/products/download_image",
vol.Required("image_url"): str,
vol.Required("product_name"): str,
}
)
@websocket_api.async_response
async def websocket_download_product_image(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: Dict[str, Any],
) -> None:
"""Download a remote image and save it as WebP to the local images directory."""
raw_url: str = msg["image_url"]
product_name: str = msg["product_name"]
safe_stem = re.sub(r"[^a-z0-9_]", "", product_name.lower().replace(" ", "_")) or "product"
filename = f"{safe_stem}.webp"
images_dir = Path(hass.config.path(IMAGES_LOCAL_DIR))
images_dir.mkdir(parents=True, exist_ok=True)
dest = images_dir / filename
try:
session = async_get_clientsession(hass)
headers = {"User-Agent": "Mozilla/5.0 (compatible; HomeAssistant/ShoppingListManager)"}
async with session.get(raw_url, timeout=ClientTimeout(total=15), headers=headers) as resp:
if resp.status != 200:
connection.send_error(msg["id"], "download_failed", f"HTTP {resp.status}")
return
raw = await resp.read()
except Exception as exc: # noqa: BLE001
connection.send_error(msg["id"], "download_failed", str(exc))
return
try:
img = Image.open(io.BytesIO(raw))
# Convert to RGB for reliable lossy WebP encoding
# (RGBA, palette, grayscale modes can fail or produce oversized files)
if img.mode == "RGBA":
bg = Image.new("RGB", img.size, (255, 255, 255))
bg.paste(img, mask=img.split()[3])
img = bg
elif img.mode != "RGB":
img = img.convert("RGB")
img.thumbnail((IMAGE_SIZE, IMAGE_SIZE), Image.LANCZOS)
out = io.BytesIO()
img.save(out, format="WEBP", quality=IMAGE_QUALITY)
dest.write_bytes(out.getvalue())
except Exception as exc: # noqa: BLE001
connection.send_error(msg["id"], "conversion_failed", str(exc))
return
connection.send_result(
msg["id"],
{"local_url": f"{LOCAL_IMAGE_URL_PREFIX}{filename}"},
)
@websocket_api.websocket_command( @websocket_api.websocket_command(
{ {
vol.Required("type"): "shopping_list_manager/products/search_by_barcode", vol.Required("type"): "shopping_list_manager/products/search_by_barcode",