diff --git a/autograbber.py b/autograbber.py new file mode 100755 index 0000000..53b8235 --- /dev/null +++ b/autograbber.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 + +from pathlib import Path + +from services.iview import ABCService + + +# ------------------------- +# Metadata files +# ------------------------- +DOWNLOAD_META = Path("./.download") + +HISTORY_FILE = DOWNLOAD_META / ".history" +SERIES_FILE = DOWNLOAD_META / ".series" + +DOWNLOAD_META.mkdir(exist_ok=True) + +HISTORY_FILE.touch(exist_ok=True) +SERIES_FILE.touch(exist_ok=True) + + +# ------------------------- +# Service registry +# ------------------------- +SERVICES = { + "ABC": ABCService(), +} + + +# ------------------------- +# Core Autograbber +# ------------------------- +class AutoGrabber: + + def __init__(self): + + self.history = self.load_history() + self.series_list = self.load_series() + + # ------------------------- + # History handling + # ------------------------- + def load_history(self): + + history = {} + + with open(HISTORY_FILE, "r") as f: + + for line in f: + + line = line.strip() + + if not line: + continue + + if "|" in line: + + ep_id, filename = line.split("|", 1) + + history[ep_id.strip()] = filename.strip() + + else: + # backwards compatibility + history[line.strip()] = None + + return history + + def save_history(self): + + with open(HISTORY_FILE, "w") as f: + + for ep_id in sorted(self.history.keys()): + + filename = self.history[ep_id] + + if filename: + f.write(f"{ep_id} | {filename}\n") + else: + f.write(f"{ep_id}\n") + + # ------------------------- + # Series loading + # ------------------------- + def load_series(self): + + series = [] + + with open(SERIES_FILE, "r") as f: + + for line in f: + + line = line.strip() + + if not line: + continue + + if "/" not in line: + continue + + service_name, show_title = line.split("/", 1) + + series.append(( + service_name.strip(), + show_title.strip() + )) + + return series + + # ------------------------- + # Process a single show + # ------------------------- + def process_show(self, service_name, show_title): + + service = SERVICES.get(service_name.upper()) + + if not service: + + print(f"āŒ Unknown service: {service_name}") + return + + print("\n==============================") + print(f"šŸ“ŗ Show: {show_title}") + print(f"šŸ“” Service: {service_name}") + print("==============================") + + seasons = service.discover_seasons(show_title) + + if not seasons: + + print("āš ļø No seasons found") + return + + for season_data in seasons: + + season_num = season_data["season"] + data = season_data["data"] + + print(f"\nšŸ“¦ Processing Season {season_num}") + + for entry in data["entries"]: + + episode = service.normalize_episode( + show_title, + entry + ) + + ep_id = episode["episode_id"] + + if not ep_id: + continue + + filename = episode["filename"] + + # already downloaded + if ep_id in self.history: + + print( + f"ā© " + f"{self.history[ep_id]} " + f"(already downloaded)" + ) + + continue + + print(f"āœ… {filename} → queued") + + success = service.download_episode( + episode, + entry + ) + + if success: + + self.history[ep_id] = filename + self.save_history() + + # ------------------------- + # Main execution + # ------------------------- + def run(self): + + if not self.series_list: + + print("āš ļø No shows found in .series") + return + + for service_name, show_title in self.series_list: + + self.process_show( + service_name, + show_title + ) + + print("\nāœ… Autograbber run complete") + + +# ------------------------- +# Entry point +# ------------------------- +if __name__ == "__main__": + + AutoGrabber().run() diff --git a/services/base.py b/services/base.py new file mode 100644 index 0000000..55de9f6 --- /dev/null +++ b/services/base.py @@ -0,0 +1,23 @@ +from abc import ABC, abstractmethod + +class BaseService(ABC): + + @abstractmethod + def name(self): + pass + + @abstractmethod + def slugify(self, text): + pass + + @abstractmethod + def discover_seasons(self, show_title): + pass + + @abstractmethod + def normalize_episode(self, source_title, output_title, entry): + pass + + @abstractmethod + def download_episode(self, episode, entry, download_dir): + pass diff --git a/services/iview.py b/services/iview.py new file mode 100644 index 0000000..62664e7 --- /dev/null +++ b/services/iview.py @@ -0,0 +1,237 @@ +import json +import re +import subprocess +from pathlib import Path + +from services.base import BaseService + + +class iViewService(BaseService): + + # ------------------------- + # Service name + # ------------------------- + def name(self): + return "iView" + + # ------------------------- + # Slug generation + # ------------------------- + def slugify(self, text): + + text = text.lower() + + # replacements + text = text.replace("&", "and") + text = text.replace("'", "-") + text = text.replace("(", "-") + text = text.replace(")", "-") + + # removals + text = text.replace("?", "") + text = text.replace("!", "") + text = text.replace(",", "") + text = text.replace(":", "") + + # whitespace -> hyphen + text = re.sub(r"\s+", "-", text) + + # collapse duplicate hyphens + text = re.sub(r"-{2,}", "-", text) + + # trim + text = text.strip("-") + + return text + + # ------------------------- + # Scene-style naming + # ------------------------- + + def sceneify(self, text): + + replacements = { + "&": "and", + ":": "", + ",": "", + "?": "", + "!": "", + "(": "", + ")": "", + "'": "" + } + + for old, new in replacements.items(): + text = text.replace(old, new) + + # whitespace -> periods + text = re.sub(r"\s+", ".", text) + + # collapse duplicate periods + text = re.sub(r"\.+", ".", text) + + # trim + text = text.strip(".") + + return text + + # ------------------------- + # Episode cleanup + # ------------------------- + + def clean_episode_title(self, title: str) -> str: + """ + Remove redundant 'Series X Ep Y' prefixes from iView titles. + """ + + import re + + # Remove patterns like: + # "Series 1 Ep 2" + # "Season 1 Episode 2" + title = re.sub( + r"(series|season)\s*\d+\s*(ep|episode)\s*\d+", + "", + title, + flags=re.IGNORECASE + ) + + return title.strip() + + # ------------------------- + # yt-dlp JSON helper + # ------------------------- + def run_ytdlp_json(self, url): + + result = subprocess.run( + [ + "yt-dlp", + "-J", + "--no-flat-playlist", + url + ], + capture_output=True, + text=True + ) + + if result.returncode != 0: + return None + + try: + return json.loads(result.stdout) + except: + return None + + # ------------------------- + # Season discovery + # ------------------------- + def discover_seasons(self, show_title): + + slug = self.slugify(show_title) + + seasons = [] + + print(f"\nšŸ”Ž Discovering seasons for: {show_title}") + + for i in range(1, 20): + + url = f"https://iview.abc.net.au/show/{slug}/series/{i}" + + data = self.run_ytdlp_json(url) + + if not data: + break + + if "entries" not in data: + break + + if not data["entries"]: + break + + print(f" āœ” Season {i} found ({len(data['entries'])} episodes)") + + seasons.append({ + "season": i, + "url": url, + "data": data + }) + + return seasons + + # ------------------------- + # Episode normalization + # ------------------------- + def normalize_episode(self, show_title, entry): + + season = entry.get("season_number") or 1 + episode = entry.get("episode_number") or 1 + + episode_id = entry.get("id") + + title = entry.get("title") or "" + + show_clean = self.sceneify(show_title) + + if title: + title = self.clean_episode_title(title) + title_clean = self.sceneify(title) + filename = ( + f"{show_clean}." + f"S{season:02d}E{episode:02d}." + f"{title_clean}" + ) + else: + filename = ( + f"{show_clean}." + f"S{season:02d}E{episode:02d}" + ) + + return { + "show": show_clean, + "season": season, + "episode": episode, + "episode_id": episode_id, + "filename": filename + } + + # ------------------------- + # Download execution + # ------------------------- + def download_episode(self, episode, entry, download_dir): + + show_folder = ( + Path(download_dir) + / episode["show"].replace(".", " ") + ) + + show_folder.mkdir(parents=True, exist_ok=True) + + output_template = str( + show_folder / f"{episode['filename']}.%(ext)s" + ) + + url = ( + entry.get("webpage_url") + or entry.get("url") + ) + + if not url: + print("āŒ No episode URL found") + return False + + print(f"ā¬‡ļø Downloading: {episode['filename']}") + + result = subprocess.run([ + "yt-dlp", + "--no-progress", + "-o", + output_template, + url + ]) + + if result.returncode == 0: + print(f"āœ… Download complete: {episode['filename']}") + return True + + print(f"āŒ Download failed: {episode['filename']}") + return False diff --git a/services/ten.py b/services/ten.py new file mode 100644 index 0000000..e69de29