Add AnimeThemes Downloader

This commit is contained in:
2026-02-02 16:21:39 +07:00
parent 2f175cab74
commit 3c45932902

585
thememusic.py Normal file
View File

@@ -0,0 +1,585 @@
#!/usr/bin/env python3
"""
AnimeThemes Downloader with Rate Limiting and Progress Bars
Downloads OP1, OP2, ED1, ED2 videos + creates theme.mp3 from OP1.
Rate limited to 20 requests per minute.
Default behavior: Downloads only OP1, extracts 30s as theme.mp3, deletes video.
Use --download-video to keep videos and download all themes (OP1/2, ED1/2).
"""
import os
import re
import requests
import subprocess
import sys
import time
from pathlib import Path
from typing import Optional, List
from collections import deque
from tqdm import tqdm
# Configuration
API_BASE_URL = "https://api.animethemes.moe"
VIDEO_BASE_URL = "https://animethemes.moe/video"
THEME_FILENAME = "theme.mp3"
# Rate limiting config (20 requests per minute = 1 request every 3 seconds)
MAX_REQUESTS_PER_MINUTE = 20
RATE_LIMIT_WINDOW = 60 # seconds
# Audio specifications
TARGET_DURATION = 30 # seconds
TARGET_SAMPLE_RATE = 44100 # Hz (or higher)
TARGET_BITRATE = "192k" # kbps (range 128k-256k)
# Regex patterns for folder names
ANIDB_PATTERN = re.compile(
r'[{\[]\s*anidb(\d*)\s*-\s*(\d+)\s*[}\]]',
re.IGNORECASE
)
# Video file extensions
VIDEO_EXTENSIONS = {'.mkv', '.mp4', '.avi', '.mov', '.webm'}
# Patterns to exclude from episode count (existing files, extras)
EXCLUDED_PATTERNS = [
r'(?i)NC', # NCOP, NCED
r'(?i)OP\s*\d+', # OP1, OP2 (downloaded files)
r'(?i)ED\s*\d+', # ED1, ED2 (downloaded files)
r'(?i)OPENING\s+SEQUENCE', # Opening Sequence X
r'(?i)ENDING\s+SEQUENCE', # Ending Sequence X
r'(?i)PV\s*\d*', # Trailers
r'(?i)CM\s*\d*', # Commercials
r'(?i)TRAILER',
r'(?i)MENU',
r'(?i)EXTRA',
r'(?i)PREVIEW',
]
class RateLimiter:
"""Sliding window rate limiter to enforce requests per minute limit."""
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window = window_seconds
self.requests = deque()
self.min_interval = window_seconds / max_requests
self.last_request_time = 0
def wait_if_needed(self, desc=""):
"""Block until it's safe to make another request."""
now = time.time()
while self.requests and self.requests[0] < now - self.window:
self.requests.popleft()
if len(self.requests) >= self.max_requests:
wait_time = self.requests[0] - (now - self.window) + 0.1
if wait_time > 0:
tqdm.write(f" [Rate Limit] {desc}Waiting {wait_time:.1f}s...")
time.sleep(wait_time)
return self.wait_if_needed(desc)
time_since_last = now - self.last_request_time
if time_since_last < self.min_interval:
time.sleep(self.min_interval - time_since_last)
self.last_request_time = time.time()
self.requests.append(time.time())
class AnimeThemesDownloader:
def __init__(self, root_dir: str, video_mode: bool = False, dry_run: bool = False):
self.root_dir = Path(root_dir).resolve()
self.video_mode = video_mode # Renamed from download_video to avoid conflict
self.dry_run = dry_run
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'AnimeThemes-Downloader/1.0 (Personal Use)'
})
self.api_limiter = RateLimiter(MAX_REQUESTS_PER_MINUTE, RATE_LIMIT_WINDOW)
self.stats = {
'processed': 0,
'success_mp3': 0,
'success_video': 0,
'skipped_no_id': 0,
'skipped_exists_video': 0,
'skipped_exists_mp3': 0,
'skipped_movie': 0,
'not_found': 0,
'failed': 0,
'cleaned_up': 0
}
def extract_anidb_id(self, folder_name: str) -> Optional[int]:
"""Extract AniDB ID from folder name."""
matches = ANIDB_PATTERN.findall(folder_name)
if not matches:
return None
_, anidb_id = matches[0]
return int(anidb_id)
def is_excluded_file(self, filename: str) -> bool:
"""Check if file should be excluded from episode count."""
for pattern in EXCLUDED_PATTERNS:
if re.search(pattern, filename):
return True
return False
def count_video_files(self, folder_path: Path) -> tuple[int, int]:
"""Count video files. Returns (total, episodes_only)."""
total_count = 0
episode_count = 0
try:
for item in folder_path.iterdir():
if item.is_file() and item.suffix.lower() in VIDEO_EXTENSIONS:
total_count += 1
if not self.is_excluded_file(item.name):
episode_count += 1
except (PermissionError, OSError):
pass
return total_count, episode_count
def sanitize_filename(self, name: str) -> str:
"""Remove invalid characters for filenames."""
invalid_chars = '<>:"/\\|?*'
for char in invalid_chars:
name = name.replace(char, '')
return name.strip()
def construct_video_filename(self, anime_name: str, theme_type: str, sequence: int, original_basename: str) -> str:
"""Construct filename: 'Series Name - OP1 - Opening Sequence 1.webm'."""
safe_name = self.sanitize_filename(anime_name)
type_str = f"{theme_type}{sequence}"
full_type = "Opening" if theme_type == "OP" else "Ending"
desc_str = f"{full_type} Sequence {sequence}"
ext = Path(original_basename).suffix or '.webm'
return f"{safe_name} - {type_str} - {desc_str}{ext}"
def get_themes(self, anime_data: dict) -> List[dict]:
"""Get OP1, OP2, ED1, ED2 themes from anime data."""
themes = anime_data.get('animethemes', [])
filtered = []
for theme in themes:
theme_type = theme.get('type', '').upper()
sequence = theme.get('sequence')
if sequence is None:
sequence = 1
# Only OP and ED, only 1 and 2
if theme_type in ['OP', 'ED'] and sequence in [1, 2]:
filtered.append(theme)
# Sort: OP1, OP2, ED1, ED2
def sort_key(theme):
t_type = theme.get('type', '').upper()
seq = theme.get('sequence')
if seq is None:
seq = 1
return (0 if t_type == 'OP' else 1, seq)
filtered.sort(key=sort_key)
return filtered
def get_op1_theme(self, anime_data: dict) -> Optional[dict]:
"""Get only OP1 theme from anime data."""
themes = anime_data.get('animethemes', [])
for theme in themes:
theme_type = theme.get('type', '').upper()
sequence = theme.get('sequence')
if sequence is None:
sequence = 1
if theme_type == 'OP' and sequence == 1:
return theme
return None
def get_video_from_theme(self, theme: dict) -> Optional[dict]:
"""Get best quality video from theme."""
entries = theme.get('animethemeentries', [])
if not entries:
return None
videos = entries[0].get('videos', [])
if not videos:
return None
videos.sort(key=lambda x: (x.get('resolution') or 0), reverse=True)
return videos[0]
def download_video(self, video: dict, output_path: Path) -> bool:
"""Download video file with progress bar."""
basename = video.get('basename') or video.get('filename')
if not basename:
return False
video_url = f"{VIDEO_BASE_URL}/{basename}"
if self.dry_run:
tqdm.write(f" [DRY RUN] Would download: {output_path.name}")
return True
try:
self.api_limiter.wait_if_needed(desc="Video Download ")
response = self.session.get(video_url, stream=True, timeout=120)
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
with open(output_path, 'wb') as f, tqdm(
desc=f" Downloading",
total=total_size,
unit='B',
unit_scale=True,
unit_divisor=1024,
leave=False,
ncols=70
) as pbar:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
pbar.update(len(chunk))
return True
except Exception as e:
tqdm.write(f" Download failed: {e}")
if output_path.exists():
output_path.unlink()
return False
def convert_to_mp3(self, input_path: Path, output_path: Path) -> bool:
"""Convert video to MP3 (30s, 44.1kHz, 192kbps)."""
if self.dry_run:
tqdm.write(f" [DRY RUN] Would create: {output_path.name}")
return True
cmd = [
'ffmpeg',
'-y',
'-i', str(input_path),
'-t', str(TARGET_DURATION),
'-ar', str(TARGET_SAMPLE_RATE),
'-b:a', TARGET_BITRATE,
'-vn',
'-acodec', 'libmp3lame',
'-q:a', '0',
str(output_path)
]
try:
subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True
)
return True
except subprocess.CalledProcessError as e:
tqdm.write(f" FFmpeg error: {e.stderr[:200]}")
return False
except FileNotFoundError:
tqdm.write(" Error: ffmpeg not found!")
sys.exit(1)
def process_theme_video_mode(self, theme: dict, anime_name: str, folder_path: Path, is_op1: bool = False):
"""Process a single theme in video mode (download video + optionally convert to MP3 for OP1)."""
theme_type = theme.get('type', '').upper()
sequence = theme.get('sequence')
if sequence is None:
sequence = 1
video = self.get_video_from_theme(theme)
if not video:
tqdm.write(f" ✗ No video available for {theme_type}{sequence}")
return False
filename = self.construct_video_filename(anime_name, theme_type, sequence, video.get('basename', ''))
video_path = folder_path / filename
theme_mp3_path = folder_path / THEME_FILENAME
video_exists = video_path.exists()
if video_exists:
tqdm.write(f"{filename} already exists")
self.stats['skipped_exists_video'] += 1
else:
tqdm.write(f" ↓ Downloading {filename}...")
if self.download_video(video, video_path):
tqdm.write(f" ✓ Saved {filename}")
self.stats['success_video'] += 1
else:
self.stats['failed'] += 1
return False
# If this is OP1, handle theme.mp3 creation
if is_op1:
if theme_mp3_path.exists():
tqdm.write(f" • theme.mp3 already exists")
self.stats['skipped_exists_mp3'] += 1
else:
tqdm.write(f" ♫ Converting to theme.mp3...")
if self.convert_to_mp3(video_path, theme_mp3_path):
tqdm.write(f" ✓ Created theme.mp3")
self.stats['success_mp3'] += 1
else:
tqdm.write(f" ✗ Failed to create theme.mp3")
self.stats['failed'] += 1
return True
def process_mp3_mode(self, anime_name: str, folder_path: Path, theme: dict):
"""Process OP1 in MP3-only mode: download, convert, cleanup."""
video = self.get_video_from_theme(theme)
if not video:
tqdm.write(f" ✗ No video available for OP1")
return False
filename = self.construct_video_filename(anime_name, "OP", 1, video.get('basename', ''))
video_path = folder_path / filename
theme_mp3_path = folder_path / THEME_FILENAME
# Download video
tqdm.write(f" ↓ Downloading OP1 temporarily...")
if not self.download_video(video, video_path):
self.stats['failed'] += 1
return False
tqdm.write(f" ✓ Downloaded temporary video")
# Convert to MP3
tqdm.write(f" ♫ Converting to theme.mp3...")
if not self.convert_to_mp3(video_path, theme_mp3_path):
tqdm.write(f" ✗ Failed to create theme.mp3")
self.stats['failed'] += 1
# Don't delete video if conversion failed, user might want to retry manually
return False
tqdm.write(f" ✓ Created theme.mp3")
self.stats['success_mp3'] += 1
# Delete temporary video
if not self.dry_run:
try:
video_path.unlink()
tqdm.write(f" 🗑️ Cleaned up temporary video")
self.stats['cleaned_up'] += 1
except OSError as e:
tqdm.write(f" ⚠️ Failed to delete temporary video: {e}")
else:
tqdm.write(f" [DRY RUN] Would delete: {filename}")
return True
def get_anime_by_anidb(self, anidb_id: int) -> Optional[dict]:
"""Query AnimeThemes API by AniDB ID."""
self.api_limiter.wait_if_needed(desc="API Query ")
params = {
'filter[has]': 'resources',
'filter[site]': 'AniDB',
'filter[external_id]': anidb_id,
'include': 'animethemes.animethemeentries.videos',
'fields[anime]': 'id,name,slug',
'fields[animetheme]': 'id,type,sequence,slug',
'fields[video]': 'id,basename,filename,resolution'
}
try:
response = self.session.get(
f"{API_BASE_URL}/anime",
params=params,
timeout=30
)
response.raise_for_status()
data = response.json()
anime_list = data.get('anime', [])
return anime_list[0] if anime_list else None
except Exception as e:
tqdm.write(f" API Error: {e}")
return None
def process_folder(self, folder_path: Path, pbar: tqdm):
"""Process a single anime folder."""
folder_name = folder_path.name
pbar.set_description(f"Processing: {folder_name[:40]:<40}")
tqdm.write(f"\n{'' * 60}")
tqdm.write(f"Processing: {folder_name}")
# Check AniDB ID
anidb_id = self.extract_anidb_id(folder_name)
if not anidb_id:
tqdm.write(f" ✗ Skipped: No AniDB ID found")
self.stats['skipped_no_id'] += 1
return
tqdm.write(f" AniDB ID: {anidb_id}")
# Check video count (excluding existing OP/ED)
total_videos, episode_videos = self.count_video_files(folder_path)
if episode_videos <= 1:
tqdm.write(f" ✗ Skipped: Only {episode_videos} episode file(s) (movie?)")
self.stats['skipped_movie'] += 1
return
tqdm.write(f" Episodes: {episode_videos}")
# In MP3-only mode, check if theme.mp3 already exists and skip
theme_mp3_path = folder_path / THEME_FILENAME
if not self.video_mode and theme_mp3_path.exists():
tqdm.write(f" • Skipped: theme.mp3 already exists")
self.stats['skipped_exists_mp3'] += 1
return
# Query API
anime_data = self.get_anime_by_anidb(anidb_id)
if not anime_data:
tqdm.write(f" ✗ Not found on AnimeThemes")
self.stats['not_found'] += 1
return
anime_name = anime_data.get('name', folder_name)
tqdm.write(f" Anime: {anime_name}")
if self.video_mode:
# Video mode: download all themes (OP1/2, ED1/2)
themes = self.get_themes(anime_data)
if not themes:
tqdm.write(f" ✗ No OP1/OP2/ED1/ED2 found")
self.stats['not_found'] += 1
return
theme_names = []
for t in themes:
t_type = t.get('type', '').upper()
seq = t.get('sequence')
if seq is None:
seq = 1
theme_names.append(f"{t_type}{seq}")
tqdm.write(f" Found: {', '.join(theme_names)}")
# Separate OP1 from others
op1 = None
others = []
for theme in themes:
t_type = theme.get('type', '').upper()
seq = theme.get('sequence')
if seq is None:
seq = 1
if t_type == 'OP' and seq == 1:
op1 = theme
else:
others.append(theme)
# Process OP1 first (download + convert to MP3)
if op1:
tqdm.write(f"\n [OP1 - Video + MP3]")
self.process_theme_video_mode(op1, anime_name, folder_path, is_op1=True)
# Process others (OP2, ED1, ED2)
for theme in others:
t_type = theme.get('type', '').upper()
seq = theme.get('sequence')
if seq is None:
seq = 1
tqdm.write(f"\n [{t_type}{seq} - Video]")
self.process_theme_video_mode(theme, anime_name, folder_path, is_op1=False)
else:
# MP3-only mode: download only OP1, extract MP3, delete video
op1 = self.get_op1_theme(anime_data)
if not op1:
tqdm.write(f" ✗ No OP1 found")
self.stats['not_found'] += 1
return
tqdm.write(f" [MP3 Mode: OP1 only]")
self.process_mp3_mode(anime_name, folder_path, op1)
def scan_folders(self):
"""Scan all folders in root directory."""
if not self.root_dir.exists():
print(f"Error: Root directory '{self.root_dir}' not found")
return
folders = [item for item in self.root_dir.iterdir() if item.is_dir()]
folders.sort()
mode_str = "Video+MP3" if self.video_mode else "MP3-only"
print(f"Scanning: {self.root_dir}")
print(f"Found: {len(folders)} folders")
print(f"Mode: {mode_str}")
print(f"Rate Limit: {MAX_REQUESTS_PER_MINUTE} req/min")
if self.video_mode:
print(f"Format: 'Series - OP1 - Opening Sequence 1.webm'")
print("=" * 60)
with tqdm(
total=len(folders),
desc="Overall Progress",
unit="folder",
ncols=80,
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}<{remaining}]"
) as pbar:
for folder in folders:
self.process_folder(folder, pbar)
self.stats['processed'] += 1
pbar.update(1)
# Final stats
print("\n" + "=" * 60)
print("FINAL STATISTICS")
print("=" * 60)
print(f"Folders processed: {self.stats['processed']}")
if self.video_mode:
print(f"Videos downloaded: {self.stats['success_video']}")
print(f"Videos skipped: {self.stats['skipped_exists_video']}")
else:
print(f"Videos cleaned up: {self.stats['cleaned_up']}")
print(f"theme.mp3 created: {self.stats['success_mp3']}")
print(f"theme.mp3 skipped: {self.stats['skipped_exists_mp3']}")
print(f"Skipped (no ID): {self.stats['skipped_no_id']}")
print(f"Skipped (movie): {self.stats['skipped_movie']}")
print(f"Not found on API: {self.stats['not_found']}")
print(f"Failed: {self.stats['failed']}")
print("=" * 60)
def main():
import argparse
parser = argparse.ArgumentParser(
description='Download OP/ED themes from AnimeThemes.moe. Default: MP3-only mode (downloads OP1, extracts 30s as theme.mp3, deletes video).',
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('directory', help='Root directory containing anime folders')
parser.add_argument('--download-video', action='store_true',
help='Download and keep OP1/OP2/ED1/ED2 videos (plus theme.mp3). Default is MP3-only mode.')
parser.add_argument('--dry-run', action='store_true', help='Scan only, do not download')
args = parser.parse_args()
downloader = AnimeThemesDownloader(
args.directory,
video_mode=args.download_video, # Changed parameter name to match
dry_run=args.dry_run
)
downloader.scan_folders()
if __name__ == '__main__':
main()