import json import re import subprocess from pathlib import Path from services.base import BaseService class iViewService(BaseService): # ------------------------- # Service name # ------------------------- def name(self): return "iView" # ------------------------- # Slug generation # ------------------------- def slugify(self, text): text = text.lower() # replacements text = text.replace("&", "and") text = text.replace("'", "-") text = text.replace("(", "-") text = text.replace(")", "-") # removals text = text.replace("?", "") text = text.replace("!", "") text = text.replace(",", "") text = text.replace(":", "") # whitespace -> hyphen text = re.sub(r"\s+", "-", text) # collapse duplicate hyphens text = re.sub(r"-{2,}", "-", text) # trim text = text.strip("-") return text # ------------------------- # Scene-style naming # ------------------------- def sceneify(self, text): replacements = { "&": "and", ":": "", ",": "", "?": "", "!": "", "(": "", ")": "", "'": "" } for old, new in replacements.items(): text = text.replace(old, new) # whitespace -> periods text = re.sub(r"\s+", ".", text) # collapse duplicate periods text = re.sub(r"\.+", ".", text) # trim text = text.strip(".") return text # ------------------------- # Episode cleanup # ------------------------- def clean_episode_title(self, title: str) -> str: """ Remove redundant 'Series X Ep Y' prefixes from iView titles. """ import re # Remove patterns like: # "Series 1 Ep 2" # "Season 1 Episode 2" title = re.sub( r"(series|season)\s*\d+\s*(ep|episode)\s*\d+", "", title, flags=re.IGNORECASE ) return title.strip() # ------------------------- # yt-dlp JSON helper # ------------------------- def run_ytdlp_json(self, url): result = subprocess.run( [ "yt-dlp", "-J", "--no-flat-playlist", url ], capture_output=True, text=True ) if result.returncode != 0: return None try: return json.loads(result.stdout) except: return None # ------------------------- # Season discovery # ------------------------- def discover_seasons(self, show_title): slug = self.slugify(show_title) seasons = [] print(f"\nšŸ”Ž Discovering seasons for: {show_title}") for i in range(1, 20): url = f"https://iview.abc.net.au/show/{slug}/series/{i}" data = self.run_ytdlp_json(url) if not data: break if "entries" not in data: break if not data["entries"]: break print(f" āœ” Season {i} found ({len(data['entries'])} episodes)") seasons.append({ "season": i, "url": url, "data": data }) return seasons # ------------------------- # Episode normalization # ------------------------- def normalize_episode(self, show_title, entry): season = entry.get("season_number") or 1 episode = entry.get("episode_number") or 1 episode_id = entry.get("id") title = entry.get("title") or "" show_clean = self.sceneify(show_title) if title: title = self.clean_episode_title(title) title_clean = self.sceneify(title) filename = ( f"{show_clean}." f"S{season:02d}E{episode:02d}." f"{title_clean}" ) else: filename = ( f"{show_clean}." f"S{season:02d}E{episode:02d}" ) return { "show": show_clean, "season": season, "episode": episode, "episode_id": episode_id, "filename": filename } # ------------------------- # Download execution # ------------------------- def download_episode(self, episode, entry, download_dir): show_folder = ( Path(download_dir) / episode["show"].replace(".", " ") ) show_folder.mkdir(parents=True, exist_ok=True) output_template = str( show_folder / f"{episode['filename']}.%(ext)s" ) url = ( entry.get("webpage_url") or entry.get("url") ) if not url: print("āŒ No episode URL found") return False print(f"ā¬‡ļø Downloading: {episode['filename']}") result = subprocess.run([ "yt-dlp", "--no-progress", "-o", output_template, url ]) if result.returncode == 0: print(f"āœ… Download complete: {episode['filename']}") return True print(f"āŒ Download failed: {episode['filename']}") return False