From ac103339b21c9b34c88647f399b845fd6f063a0b Mon Sep 17 00:00:00 2001 From: Dion Date: Thu, 14 May 2026 21:24:46 +0930 Subject: [PATCH] Fix broken --download switch --- autograbber.py | 262 +++++++++++-------------------------------------- 1 file changed, 58 insertions(+), 204 deletions(-) diff --git a/autograbber.py b/autograbber.py index e9f9401..201f537 100755 --- a/autograbber.py +++ b/autograbber.py @@ -1,23 +1,10 @@ - #!/usr/bin/env python3 import argparse import sys - from pathlib import Path - from services.iview import iViewService - -# ------------------------- -# Metadata paths -# ------------------------- -DOWNLOAD_META = Path("./.config") - -HISTORY_FILE = DOWNLOAD_META / "history" -SERIES_FILE = DOWNLOAD_META / "series" - - # ------------------------- # Service registry # ------------------------- @@ -25,284 +12,151 @@ SERVICES = { "IVIEW": iViewService(), } - -# ------------------------- -# Core Autograbber -# ------------------------- class AutoGrabber: - - def __init__( - self, - download_dir, - dry_run=False, - mark_existing=False - ): - + def __init__(self, download_dir, config_dir, dry_run=False, mark_existing=False): + # The main media output directory (Required) self.download_dir = Path(download_dir) + + # The config directory (Defaults to PWD/.tvgrabber or CLI override) + self.config_dir = Path(config_dir) + + self.history_file = self.config_dir / "history" + self.series_file = self.config_dir / "series" + self.dry_run = dry_run self.mark_existing = mark_existing + # Initialize self.history = self.load_history() self.series_list = self.load_series() - # ------------------------- - # History handling - # ------------------------- def load_history(self): - history = {} + # Ensure .tvgrabber exists before we try to handle the history file + if not self.config_dir.exists(): + self.config_dir.mkdir(parents=True, exist_ok=True) - if not HISTORY_FILE.exists(): - HISTORY_FILE.touch() - - with open(HISTORY_FILE, "r") as f: + if not self.history_file.exists(): + self.history_file.touch() + with open(self.history_file, "r") as f: for line in f: - line = line.strip() - - if not line: + if not line or "|" not in line: + if line: history[line] = None continue - - if "|" in line: - - ep_id, filename = line.split("|", 1) - - history[ep_id.strip()] = filename.strip() - - else: - history[line.strip()] = None - + ep_id, filename = line.split("|", 1) + history[ep_id.strip()] = filename.strip() return history def save_history(self): - - with open(HISTORY_FILE, "w") as f: - + with open(self.history_file, "w") as f: for ep_id in sorted(self.history.keys()): - filename = self.history[ep_id] + f.write(f"{ep_id} | {filename}\n" if filename else f"{ep_id}\n") - if filename: - f.write(f"{ep_id} | {filename}\n") - else: - f.write(f"{ep_id}\n") - - # ------------------------- - # Series loading - # ------------------------- def load_series(self): - - if not SERIES_FILE.exists(): - - print( - "โŒ Missing required file: " - f"{SERIES_FILE}" - ) - + if not self.series_file.exists(): + print(f"โŒ Error: Series file not found at {self.series_file}") + print(f"Please ensure your series list is in {self.config_dir}/series") sys.exit(1) series = [] - - with open(SERIES_FILE, "r") as f: - + with open(self.series_file, "r") as f: for line in f: - line = line.strip() - - if not line: - continue - - if "/" not in line: - continue + if not line or "/" not in line: continue service_name, remainder = line.split("/", 1) - if "|" in remainder: source_title, output_title = remainder.split("|", 1) else: - source_title = remainder - output_title = remainder - - series.append(( - service_name.strip(), - source_title.strip(), - output_title.strip() - )) + source_title = output_title = remainder + series.append((service_name.strip(), source_title.strip(), output_title.strip())) return series - # ------------------------- - # Process a single show - # ------------------------- - def process_show( - self, - service_name, - source_title, - output_title - ): - + def process_show(self, service_name, source_title, output_title): service = SERVICES.get(service_name.upper()) - if not service: - print(f"โŒ Unknown service: {service_name}") return - print("\n==============================") - print(f"๐Ÿ“บ Show: {output_title}") - print(f"๐Ÿ“ก Service: {service_name}") - print("==============================") - + print(f"\n๐Ÿ“บ Show: {output_title} ({service_name})") seasons = service.discover_seasons(source_title) if not seasons: - print("โš ๏ธ No seasons found") return for season_data in seasons: - season_num = season_data["season"] - data = season_data["data"] - - print(f"\n๐Ÿ“ฆ Processing Season {season_num}") - - for entry in data["entries"]: - - episode = service.normalize_episode( - output_title, - entry - ) + print(f"๐Ÿ“ฆ Season {season_num}") + for entry in season_data["data"]["entries"]: + episode = service.normalize_episode(output_title, entry) ep_id = episode["episode_id"] - - if not ep_id: - continue - filename = episode["filename"] - # already downloaded if ep_id in self.history: - - print( - f"โฉ " - f"{self.history[ep_id]} " - f"(already downloaded)" - ) - + print(f" โฉ {filename} (Skipped)") continue - # dry-run mode if self.dry_run: - - print( - f"๐Ÿงช " - f"{filename} " - f"(would download)" - ) - + print(f" ๐Ÿงช {filename} (Dry Run)") continue - # mark-existing mode if self.mark_existing: - - print( - f"๐Ÿ“ " - f"{filename} " - f"(added to history)" - ) - + print(f" ๐Ÿ“ {filename} (Marked)") self.history[ep_id] = filename self.save_history() - continue - - # normal download mode - print(f"โœ… {filename} โ†’ downloading") - - success = service.download_episode( - episode, - entry, - self.download_dir - ) - - if success: - + print(f" โœ… {filename} (Downloading...)") + if service.download_episode(episode, entry, self.download_dir): self.history[ep_id] = filename self.save_history() - # ------------------------- - # Main execution - # ------------------------- def run(self): - - if not self.series_list: - - print("โš ๏ธ No shows found in .series") - return - - for ( - service_name, - source_title, - output_title - ) in self.series_list: - - self.process_show( - service_name, - source_title, - output_title - ) - - print("\nโœ… Autograbber run complete") - + for service, source, output in self.series_list: + self.process_show(service, source, output) + print("\nโœ… Done.") # ------------------------- # CLI # ------------------------- def parse_args(): + parser = argparse.ArgumentParser(description="Australian FTA TV AutoGrabber") - parser = argparse.ArgumentParser() - + # Required positional argument for the download directory parser.add_argument( - "--dry-run", - action="store_true", - help="Do not download anything" + "download_dir", + help="REQUIRED: Path to the directory where media will be saved" ) + # Optional config directory override parser.add_argument( - "-d", "--downloads", - dest="downloads", - default="./downloads", - help="Directory where videos will be saved." + "-c", "--config", + default="~/.tvgrabber", + help="Path to the config folder containing 'series' and 'history' (default: ~/.tvgrabber)" ) - parser.add_argument( - "--mark-existing", - action="store_true", - help="Add all available episodes to history without downloading" - ) + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--mark-existing", action="store_true") return parser.parse_args() - -# ------------------------- -# Entry point -# ------------------------- if __name__ == "__main__": - args = parse_args() - # Convert to Path object and ensure it exists - download_path = Path(args.downloads).resolve() - - if not args.dry_run and not download_path.exists(): - print(f"๐Ÿ“‚ Creating download directory: {download_path}") - download_path.mkdir(parents=True, exist_ok=True) + # Ensure the download directory exists + out_path = Path(args.download_dir).resolve() + if not out_path.exists() and not args.dry_run: + out_path.mkdir(parents=True, exist_ok=True) AutoGrabber( - download_dir=download_path, + download_dir=out_path, + config_dir=Path(args.config).resolve(), dry_run=args.dry_run, mark_existing=args.mark_existing ).run()