238 lines
5.4 KiB
Python
238 lines
5.4 KiB
Python
import json
|
|
import re
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
from services.base import BaseService
|
|
|
|
|
|
class iViewService(BaseService):
|
|
|
|
# -------------------------
|
|
# Service name
|
|
# -------------------------
|
|
def name(self):
|
|
return "iView"
|
|
|
|
# -------------------------
|
|
# Slug generation
|
|
# -------------------------
|
|
def slugify(self, text):
|
|
|
|
text = text.lower()
|
|
|
|
# replacements
|
|
text = text.replace("&", "and")
|
|
text = text.replace("'", "-")
|
|
text = text.replace("(", "-")
|
|
text = text.replace(")", "-")
|
|
|
|
# removals
|
|
text = text.replace("?", "")
|
|
text = text.replace("!", "")
|
|
text = text.replace(",", "")
|
|
text = text.replace(":", "")
|
|
|
|
# whitespace -> hyphen
|
|
text = re.sub(r"\s+", "-", text)
|
|
|
|
# collapse duplicate hyphens
|
|
text = re.sub(r"-{2,}", "-", text)
|
|
|
|
# trim
|
|
text = text.strip("-")
|
|
|
|
return text
|
|
|
|
# -------------------------
|
|
# Scene-style naming
|
|
# -------------------------
|
|
|
|
def sceneify(self, text):
|
|
|
|
replacements = {
|
|
"&": "and",
|
|
":": "",
|
|
",": "",
|
|
"?": "",
|
|
"!": "",
|
|
"(": "",
|
|
")": "",
|
|
"'": ""
|
|
}
|
|
|
|
for old, new in replacements.items():
|
|
text = text.replace(old, new)
|
|
|
|
# whitespace -> periods
|
|
text = re.sub(r"\s+", ".", text)
|
|
|
|
# collapse duplicate periods
|
|
text = re.sub(r"\.+", ".", text)
|
|
|
|
# trim
|
|
text = text.strip(".")
|
|
|
|
return text
|
|
|
|
# -------------------------
|
|
# Episode cleanup
|
|
# -------------------------
|
|
|
|
def clean_episode_title(self, title: str) -> str:
|
|
"""
|
|
Remove redundant 'Series X Ep Y' prefixes from iView titles.
|
|
"""
|
|
|
|
import re
|
|
|
|
# Remove patterns like:
|
|
# "Series 1 Ep 2"
|
|
# "Season 1 Episode 2"
|
|
title = re.sub(
|
|
r"(series|season)\s*\d+\s*(ep|episode)\s*\d+",
|
|
"",
|
|
title,
|
|
flags=re.IGNORECASE
|
|
)
|
|
|
|
return title.strip()
|
|
|
|
# -------------------------
|
|
# yt-dlp JSON helper
|
|
# -------------------------
|
|
def run_ytdlp_json(self, url):
|
|
|
|
result = subprocess.run(
|
|
[
|
|
"yt-dlp",
|
|
"-J",
|
|
"--no-flat-playlist",
|
|
url
|
|
],
|
|
capture_output=True,
|
|
text=True
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return None
|
|
|
|
try:
|
|
return json.loads(result.stdout)
|
|
except:
|
|
return None
|
|
|
|
# -------------------------
|
|
# Season discovery
|
|
# -------------------------
|
|
def discover_seasons(self, show_title):
|
|
|
|
slug = self.slugify(show_title)
|
|
|
|
seasons = []
|
|
|
|
print(f"\n🔎 Discovering seasons for: {show_title}")
|
|
|
|
for i in range(1, 20):
|
|
|
|
url = f"https://iview.abc.net.au/show/{slug}/series/{i}"
|
|
|
|
data = self.run_ytdlp_json(url)
|
|
|
|
if not data:
|
|
break
|
|
|
|
if "entries" not in data:
|
|
break
|
|
|
|
if not data["entries"]:
|
|
break
|
|
|
|
print(f" ✔ Season {i} found ({len(data['entries'])} episodes)")
|
|
|
|
seasons.append({
|
|
"season": i,
|
|
"url": url,
|
|
"data": data
|
|
})
|
|
|
|
return seasons
|
|
|
|
# -------------------------
|
|
# Episode normalization
|
|
# -------------------------
|
|
def normalize_episode(self, show_title, entry):
|
|
|
|
season = entry.get("season_number") or 1
|
|
episode = entry.get("episode_number") or 1
|
|
|
|
episode_id = entry.get("id")
|
|
|
|
title = entry.get("title") or ""
|
|
|
|
show_clean = self.sceneify(show_title)
|
|
|
|
if title:
|
|
title = self.clean_episode_title(title)
|
|
title_clean = self.sceneify(title)
|
|
filename = (
|
|
f"{show_clean}."
|
|
f"S{season:02d}E{episode:02d}."
|
|
f"{title_clean}"
|
|
)
|
|
else:
|
|
filename = (
|
|
f"{show_clean}."
|
|
f"S{season:02d}E{episode:02d}"
|
|
)
|
|
|
|
return {
|
|
"show": show_clean,
|
|
"season": season,
|
|
"episode": episode,
|
|
"episode_id": episode_id,
|
|
"filename": filename
|
|
}
|
|
|
|
# -------------------------
|
|
# Download execution
|
|
# -------------------------
|
|
def download_episode(self, episode, entry, download_dir):
|
|
|
|
show_folder = (
|
|
Path(download_dir)
|
|
/ episode["show"].replace(".", " ")
|
|
)
|
|
|
|
show_folder.mkdir(parents=True, exist_ok=True)
|
|
|
|
output_template = str(
|
|
show_folder / f"{episode['filename']}.%(ext)s"
|
|
)
|
|
|
|
url = (
|
|
entry.get("webpage_url")
|
|
or entry.get("url")
|
|
)
|
|
|
|
if not url:
|
|
print("❌ No episode URL found")
|
|
return False
|
|
|
|
print(f"⬇️ Downloading: {episode['filename']}")
|
|
|
|
result = subprocess.run([
|
|
"yt-dlp",
|
|
"--no-progress",
|
|
"-o",
|
|
output_template,
|
|
url
|
|
])
|
|
|
|
if result.returncode == 0:
|
|
print(f"✅ Download complete: {episode['filename']}")
|
|
return True
|
|
|
|
print(f"❌ Download failed: {episode['filename']}")
|
|
return False
|