Enhance processing logic: add duplicate episode handling, update notifications, and improve file processing flow
All checks were successful
Docker Build / docker (push) Successful in 16s

This commit is contained in:
2026-02-03 00:32:36 +07:00
parent 1eca920269
commit ffcf3927f3
7 changed files with 176 additions and 54 deletions

2
.gitignore vendored
View File

@@ -65,3 +65,5 @@ tmp/
.DS_Store
Thumbs.db
*.tmp
config.toml.example
.roo/memory

View File

@@ -14,7 +14,7 @@ default_flags = ["--execute"]
enabled = true
bot_token = "YOUR_BOT_TOKEN"
chat_id = "YOUR_CHAT_ID"
notify_on = ["no_match", "copy_error", "rename_error", "unhandled_exception"]
notify_on = ["no_match", "copy_error", "rename_error", "unhandled_exception", "duplicate_skipped"]
[database]
path = "/data/anime_manager.db"

View File

@@ -19,7 +19,7 @@ DEFAULT_CONFIG = {
'enabled': False,
'bot_token': 'YOUR_BOT_TOKEN_HERE',
'chat_id': 'YOUR_CHAT_ID_HERE',
'notify_on': ['no_match', 'copy_error', 'rename_error', 'unhandled_exception']
'notify_on': ['no_match', 'copy_error', 'rename_error', 'unhandled_exception', 'duplicate_skipped']
},
'database': {
'path': '/data/anime_manager.db'

View File

@@ -83,6 +83,17 @@ class Database:
)
''')
# Track rule+episode so we don't re-copy same episode (e.g. when duplicates exist or files re-downloaded)
conn.execute('''
CREATE TABLE IF NOT EXISTS processed_rule_episodes (
rule_id INTEGER NOT NULL,
episode TEXT NOT NULL,
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (rule_id, episode),
FOREIGN KEY (rule_id) REFERENCES rules(id)
)
''')
conn.commit()
def add_rule(self, anime_folder, match_regex, rename_regex, **kwargs):
@@ -178,4 +189,28 @@ class Database:
paths
)
conn.commit()
return cursor.rowcount
return cursor.rowcount
def is_rule_episode_processed(self, rule_id: int, episode: str) -> bool:
"""Check if this rule+episode has already been copied (to avoid re-processing duplicates)."""
with sqlite3.connect(self.db_path) as conn:
result = conn.execute(
"SELECT 1 FROM processed_rule_episodes WHERE rule_id = ? AND episode = ?",
(rule_id, str(episode))
).fetchone()
return result is not None
def add_processed_rule_episode(self, rule_id: int, episode: str):
"""Record that we've copied this episode for this rule."""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
"INSERT OR REPLACE INTO processed_rule_episodes (rule_id, episode, processed_at) VALUES (?, ?, CURRENT_TIMESTAMP)",
(rule_id, str(episode))
)
conn.commit()
def clear_processed_rule_episodes(self):
"""Clear all processed_rule_episodes (e.g. for flush_and_reprocess)."""
with sqlite3.connect(self.db_path) as conn:
conn.execute("DELETE FROM processed_rule_episodes")
conn.commit()

View File

@@ -28,14 +28,19 @@ class TelegramNotifier:
'no_match': '',
'copy_error': '📁',
'rename_error': '📝',
'unhandled_exception': '💥'
'unhandled_exception': '💥',
'duplicate_skipped': '📋'
}
emoji = emoji_map.get(event_type, '')
text = f"{emoji} *{event_type.replace('_', ' ').title()}*\n\n{message}"
if details:
details_text = '\n'.join([f"• *{k}*: `{v}`" for k, v in details.items()])
def fmt(v):
if isinstance(v, list):
return ', '.join(str(x) for x in v) if len(v) <= 3 else f"{v[0]}, ... (+{len(v)-1} more)"
return str(v)
details_text = '\n'.join([f"• *{k}*: `{fmt(v)}`" for k, v in details.items()])
text += f"\n\n*Details:*\n{details_text}"
try:

View File

@@ -4,7 +4,8 @@ import hashlib
import shutil
import subprocess
from pathlib import Path
from typing import Optional, List, Dict
from typing import Optional, List, Dict, Tuple
from collections import defaultdict
class FileProcessor:
def __init__(self, config, db, notifier):
@@ -51,36 +52,116 @@ class FileProcessor:
if re.search(rule['match_regex'], filename, re.IGNORECASE):
return rule
return None
def process_file(self, filepath: Path):
"""Main processing pipeline"""
filename = filepath.name
# Wait for file to be fully written
if not self.is_file_stable(filepath):
self.db.log_activity('WARNING', f'File not stable, skipping: {filename}')
return
# Check if already processed
file_hash = self.calculate_hash(filepath)
if self.db.is_file_processed(file_hash):
print(f"Skipping already processed: {filename}")
return
self.db.add_processed_file(file_hash, filename, str(filepath), 'processing')
self.db.log_activity('INFO', f'Started processing: {filename}')
def extract_episode(self, filename: str, rule: Dict) -> Optional[str]:
"""Extract episode number from filename using rule's match_regex and episode_group (1-indexed)."""
try:
# Match rule
rule = self.match_rule(filename)
match = re.search(rule['match_regex'], filename, re.IGNORECASE)
if not match:
return None
group_idx = rule.get('episode_group', 2) - 1 # 1-indexed -> 0-indexed
if group_idx < 0 or group_idx >= len(match.groups()):
return None
return match.group(group_idx + 1) # group(0) is full match
except (IndexError, re.error):
return None
def process_drop_folder(self, drop_path: Path) -> int:
"""
Batch process all videos in drop folder. Groups by (rule_id, episode), picks largest
per group, processes that one. Skips/records duplicates. Returns count of files processed.
"""
videos = self.find_video_files(drop_path)
if not videos:
return 0
# Build (filepath, rule, episode, size) for each matched file, excluding already-processed
# Also handle no-match files
candidates: List[Tuple[Path, Dict, str, int]] = []
for filepath in videos:
if not self.is_file_stable(filepath):
self.db.log_activity('WARNING', f'File not stable, skipping: {filepath.name}')
continue
file_hash = self.calculate_hash(filepath)
if self.db.is_file_processed(file_hash):
continue
rule = self.match_rule(filepath.name)
if not rule:
msg = f"No matching rule for: {filename}"
self.db.log_activity('ERROR', msg, {'filename': filename})
self.db.update_file_status(file_hash, 'no_match', error_msg=msg)
self.notifier.notify('no_match', msg, {'file': filename})
return
# Prepare paths
msg = f"No matching rule for: {filepath.name}"
self.db.add_processed_file(file_hash, filepath.name, str(filepath), 'no_match', error_msg=msg)
self.db.log_activity('ERROR', msg, {'filename': filepath.name})
self.notifier.notify('no_match', msg, {'file': filepath.name})
continue
episode = self.extract_episode(filepath.name, rule)
if episode is None:
self.db.log_activity('WARNING', f'Could not extract episode from: {filepath.name}')
continue
try:
size = filepath.stat().st_size
except OSError:
continue
candidates.append((filepath, rule, episode, size))
# Group by (rule_id, episode)
groups: Dict[Tuple[int, str], List[Tuple[Path, Dict, str, int]]] = defaultdict(list)
for item in candidates:
key = (item[1]['id'], item[2])
groups[key].append(item)
processed_count = 0
for (rule_id, episode), items in groups.items():
rule = items[0][1] # same rule for all in group
if self.db.is_rule_episode_processed(rule_id, episode):
# Already copied this episode before (e.g. from prior run); skip all
for filepath, _, _, _ in items:
file_hash = self.calculate_hash(filepath)
self.db.add_processed_file(file_hash, filepath.name, str(filepath), 'duplicate_skipped', rule_id=rule_id)
self.db.log_activity('INFO', f'Skipped (episode already copied): {filepath.name}')
self.notifier.notify('duplicate_skipped', f'Episode {episode} already copied for rule #{rule_id}; skipped {len(items)} file(s)', {
'rule_id': rule_id,
'episode': episode,
'skipped_count': len(items),
'files': [p.name for p, _, _, _ in items]
})
continue
# Pick largest file
items_sorted = sorted(items, key=lambda x: x[3], reverse=True)
to_process = items_sorted[0]
skipped = items_sorted[1:]
if skipped:
# Multiple files for same episode - notify and mark skipped as processed
for filepath, _, _, _ in skipped:
file_hash = self.calculate_hash(filepath)
self.db.add_processed_file(file_hash, filepath.name, str(filepath), 'duplicate_skipped', rule_id=rule_id)
self.db.log_activity('INFO', f'Skipped duplicate episode (kept larger): {filepath.name}')
self.notifier.notify('duplicate_skipped', f'Multiple files for episode {episode}; kept largest ({to_process[0].name}), skipped {len(skipped)} other(s)', {
'rule_id': rule_id,
'episode': episode,
'kept': to_process[0].name,
'kept_size_mb': round(to_process[3] / (1024 * 1024), 1),
'skipped_count': len(skipped),
'skipped': [p.name for p, _, _, _ in skipped]
})
# Process the chosen file
self._process_single_file(to_process[0], rule)
self.db.add_processed_rule_episode(rule_id, episode)
processed_count += 1
return processed_count
def _process_single_file(self, filepath: Path, rule: Dict):
"""Process a single file (already matched to rule). Used by process_drop_folder."""
filename = filepath.name
file_hash = self.calculate_hash(filepath)
self.db.add_processed_file(file_hash, filename, str(filepath), 'processing', rule_id=rule['id'])
self.db.log_activity('INFO', f'Started processing: {filename}')
try:
dest_folder = Path(rule['anime_folder'])
if not dest_folder.exists():
msg = f"Destination folder does not exist: {dest_folder}"
@@ -88,9 +169,9 @@ class FileProcessor:
self.db.update_file_status(file_hash, 'copy_error', error_msg=msg)
self.notifier.notify('copy_error', msg, {'folder': str(dest_folder)})
return
dest_path = dest_folder / filename
# Handle filename collision
counter = 1
original_dest = dest_path
@@ -99,20 +180,17 @@ class FileProcessor:
suffix = original_dest.suffix
dest_path = original_dest.with_name(f"{stem}_{counter}{suffix}")
counter += 1
# Copy file
self.db.log_activity('INFO', f'Copying to: {dest_path}')
shutil.copy2(filepath, dest_path)
self.db.update_file_status(file_hash, 'copied', str(dest_path))
# Run renamer
self._run_renamer(dest_folder, rule, file_hash)
# Success - optionally remove from drop folder (configurable)
# os.remove(filepath) # Uncomment to enable deletion after processing
self.db.log_activity('SUCCESS', f'Completed processing: {filename}')
except Exception as e:
msg = f"Unhandled exception processing {filename}: {str(e)}"
self.db.log_activity('ERROR', msg, {'exception': str(e)})
@@ -121,6 +199,14 @@ class FileProcessor:
'file': filename,
'error': str(e)
})
def process_file(self, filepath: Path):
"""
Process file(s). Delegates to process_drop_folder to handle duplicate episodes
(picks largest per rule+episode, skips rest, notifies).
"""
drop_path = Path(self.config['general']['drop_folder'])
self.process_drop_folder(drop_path)
def _run_renamer(self, folder: Path, rule: Dict, file_hash: str):
"""Execute the rename.py script"""

View File

@@ -127,13 +127,9 @@ def logs():
@app.route('/api/process_now', methods=['POST'])
def process_now():
"""Manual trigger to scan drop folder"""
"""Manual trigger to scan drop folder (batch processes, picks largest per rule+episode)"""
path = Path(config['general']['drop_folder'])
videos = processor.find_video_files(path)
count = 0
for video in videos:
processor.process_file(video)
count += 1
count = processor.process_drop_folder(path)
return jsonify({'processed': count})
@@ -155,12 +151,10 @@ def flush_and_reprocess():
pass
removed = db.delete_processed_files_by_source_paths(list(paths_to_delete))
db.log_activity('INFO', f'Flush: removed {removed} drop-folder records, reprocessing {len(videos)} files')
db.clear_processed_rule_episodes()
db.log_activity('INFO', f'Flush: removed {removed} drop-folder records, cleared rule-episode cache, reprocessing')
processed = 0
for video in videos:
processor.process_file(video)
processed += 1
processed = processor.process_drop_folder(drop_path)
return jsonify({'removed': removed, 'processed': processed})