First code

This commit is contained in:
2026-05-14 18:02:27 +09:30
parent 07163cdd03
commit ed4c54d051
4 changed files with 462 additions and 0 deletions
Executable
+202
View File
@@ -0,0 +1,202 @@
#!/usr/bin/env python3
from pathlib import Path
from services.iview import ABCService
# -------------------------
# Metadata files
# -------------------------
DOWNLOAD_META = Path("./.download")
HISTORY_FILE = DOWNLOAD_META / ".history"
SERIES_FILE = DOWNLOAD_META / ".series"
DOWNLOAD_META.mkdir(exist_ok=True)
HISTORY_FILE.touch(exist_ok=True)
SERIES_FILE.touch(exist_ok=True)
# -------------------------
# Service registry
# -------------------------
SERVICES = {
"ABC": ABCService(),
}
# -------------------------
# Core Autograbber
# -------------------------
class AutoGrabber:
def __init__(self):
self.history = self.load_history()
self.series_list = self.load_series()
# -------------------------
# History handling
# -------------------------
def load_history(self):
history = {}
with open(HISTORY_FILE, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
if "|" in line:
ep_id, filename = line.split("|", 1)
history[ep_id.strip()] = filename.strip()
else:
# backwards compatibility
history[line.strip()] = None
return history
def save_history(self):
with open(HISTORY_FILE, "w") as f:
for ep_id in sorted(self.history.keys()):
filename = self.history[ep_id]
if filename:
f.write(f"{ep_id} | {filename}\n")
else:
f.write(f"{ep_id}\n")
# -------------------------
# Series loading
# -------------------------
def load_series(self):
series = []
with open(SERIES_FILE, "r") as f:
for line in f:
line = line.strip()
if not line:
continue
if "/" not in line:
continue
service_name, show_title = line.split("/", 1)
series.append((
service_name.strip(),
show_title.strip()
))
return series
# -------------------------
# Process a single show
# -------------------------
def process_show(self, service_name, show_title):
service = SERVICES.get(service_name.upper())
if not service:
print(f"❌ Unknown service: {service_name}")
return
print("\n==============================")
print(f"📺 Show: {show_title}")
print(f"📡 Service: {service_name}")
print("==============================")
seasons = service.discover_seasons(show_title)
if not seasons:
print("⚠️ No seasons found")
return
for season_data in seasons:
season_num = season_data["season"]
data = season_data["data"]
print(f"\n📦 Processing Season {season_num}")
for entry in data["entries"]:
episode = service.normalize_episode(
show_title,
entry
)
ep_id = episode["episode_id"]
if not ep_id:
continue
filename = episode["filename"]
# already downloaded
if ep_id in self.history:
print(
f""
f"{self.history[ep_id]} "
f"(already downloaded)"
)
continue
print(f"{filename} → queued")
success = service.download_episode(
episode,
entry
)
if success:
self.history[ep_id] = filename
self.save_history()
# -------------------------
# Main execution
# -------------------------
def run(self):
if not self.series_list:
print("⚠️ No shows found in .series")
return
for service_name, show_title in self.series_list:
self.process_show(
service_name,
show_title
)
print("\n✅ Autograbber run complete")
# -------------------------
# Entry point
# -------------------------
if __name__ == "__main__":
AutoGrabber().run()
+23
View File
@@ -0,0 +1,23 @@
from abc import ABC, abstractmethod
class BaseService(ABC):
@abstractmethod
def name(self):
pass
@abstractmethod
def slugify(self, text):
pass
@abstractmethod
def discover_seasons(self, show_title):
pass
@abstractmethod
def normalize_episode(self, source_title, output_title, entry):
pass
@abstractmethod
def download_episode(self, episode, entry, download_dir):
pass
+237
View File
@@ -0,0 +1,237 @@
import json
import re
import subprocess
from pathlib import Path
from services.base import BaseService
class iViewService(BaseService):
# -------------------------
# Service name
# -------------------------
def name(self):
return "iView"
# -------------------------
# Slug generation
# -------------------------
def slugify(self, text):
text = text.lower()
# replacements
text = text.replace("&", "and")
text = text.replace("'", "-")
text = text.replace("(", "-")
text = text.replace(")", "-")
# removals
text = text.replace("?", "")
text = text.replace("!", "")
text = text.replace(",", "")
text = text.replace(":", "")
# whitespace -> hyphen
text = re.sub(r"\s+", "-", text)
# collapse duplicate hyphens
text = re.sub(r"-{2,}", "-", text)
# trim
text = text.strip("-")
return text
# -------------------------
# Scene-style naming
# -------------------------
def sceneify(self, text):
replacements = {
"&": "and",
":": "",
",": "",
"?": "",
"!": "",
"(": "",
")": "",
"'": ""
}
for old, new in replacements.items():
text = text.replace(old, new)
# whitespace -> periods
text = re.sub(r"\s+", ".", text)
# collapse duplicate periods
text = re.sub(r"\.+", ".", text)
# trim
text = text.strip(".")
return text
# -------------------------
# Episode cleanup
# -------------------------
def clean_episode_title(self, title: str) -> str:
"""
Remove redundant 'Series X Ep Y' prefixes from iView titles.
"""
import re
# Remove patterns like:
# "Series 1 Ep 2"
# "Season 1 Episode 2"
title = re.sub(
r"(series|season)\s*\d+\s*(ep|episode)\s*\d+",
"",
title,
flags=re.IGNORECASE
)
return title.strip()
# -------------------------
# yt-dlp JSON helper
# -------------------------
def run_ytdlp_json(self, url):
result = subprocess.run(
[
"yt-dlp",
"-J",
"--no-flat-playlist",
url
],
capture_output=True,
text=True
)
if result.returncode != 0:
return None
try:
return json.loads(result.stdout)
except:
return None
# -------------------------
# Season discovery
# -------------------------
def discover_seasons(self, show_title):
slug = self.slugify(show_title)
seasons = []
print(f"\n🔎 Discovering seasons for: {show_title}")
for i in range(1, 20):
url = f"https://iview.abc.net.au/show/{slug}/series/{i}"
data = self.run_ytdlp_json(url)
if not data:
break
if "entries" not in data:
break
if not data["entries"]:
break
print(f" ✔ Season {i} found ({len(data['entries'])} episodes)")
seasons.append({
"season": i,
"url": url,
"data": data
})
return seasons
# -------------------------
# Episode normalization
# -------------------------
def normalize_episode(self, show_title, entry):
season = entry.get("season_number") or 1
episode = entry.get("episode_number") or 1
episode_id = entry.get("id")
title = entry.get("title") or ""
show_clean = self.sceneify(show_title)
if title:
title = self.clean_episode_title(title)
title_clean = self.sceneify(title)
filename = (
f"{show_clean}."
f"S{season:02d}E{episode:02d}."
f"{title_clean}"
)
else:
filename = (
f"{show_clean}."
f"S{season:02d}E{episode:02d}"
)
return {
"show": show_clean,
"season": season,
"episode": episode,
"episode_id": episode_id,
"filename": filename
}
# -------------------------
# Download execution
# -------------------------
def download_episode(self, episode, entry, download_dir):
show_folder = (
Path(download_dir)
/ episode["show"].replace(".", " ")
)
show_folder.mkdir(parents=True, exist_ok=True)
output_template = str(
show_folder / f"{episode['filename']}.%(ext)s"
)
url = (
entry.get("webpage_url")
or entry.get("url")
)
if not url:
print("❌ No episode URL found")
return False
print(f"⬇️ Downloading: {episode['filename']}")
result = subprocess.run([
"yt-dlp",
"--no-progress",
"-o",
output_template,
url
])
if result.returncode == 0:
print(f"✅ Download complete: {episode['filename']}")
return True
print(f"❌ Download failed: {episode['filename']}")
return False
View File