Files
Media_Scripts/checktranscode.py

362 lines
11 KiB
Python

import os
import subprocess
import argparse
import csv
import json
import re
import tempfile
import shutil
from tqdm import tqdm
from pathlib import Path
# Define column indices for easy reference
COL_ORIGINAL = 0
COL_TRANSCODED = 1
COL_ORIG_SIZE = 2
COL_TRANS_SIZE = 3
COL_SIZE_SAVED = 4
COL_SIZE_RATIO = 5
COL_SSIM = 6
COL_VMAF = 7
COL_AUDIO = 8
COL_SUBS = 9
COL_STREAM_SUMMARY = 10
COL_ACCEPTED = 11
# Full header with all columns
FULL_HEADER = [
"original_file",
"transcoded_file",
"original_size (MB)",
"transcoded_size (MB)",
"size_saved (MB)",
"size_ratio",
"ssim_score",
"vmaf_score",
"audio_match",
"subs_match",
"stream_summary",
"accepted",
]
def find_video_pairs(root_dir):
"""Find original videos and their AV1 transcodes in subfolders"""
video_exts = (".mkv", ".mp4", ".avi")
pairs = []
for foldername, _, filenames in tqdm(os.walk(root_dir)):
# Find all potential original videos
originals = [
f
for f in filenames
if f.lower().endswith(video_exts) and "_av1_tdar" not in f
]
for orig in originals:
base, ext = os.path.splitext(orig)
trans_name = f"{base}_av1_tdar.mkv"
trans_path = os.path.join(foldername, trans_name)
if os.path.exists(trans_path):
orig_path = os.path.join(foldername, orig)
pairs.append((orig_path, trans_path))
return pairs
def get_video_duration(path):
"""Get video duration in seconds using FFprobe"""
try:
result = subprocess.run(
[
"ffprobe",
"-v", "error",
"-select_streams", "v:0",
"-show_entries", "format=duration",
"-of", "default=noprint_wrappers=1:nokey=1",
path,
],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
creationflags=subprocess.CREATE_NO_WINDOW,
)
return float(result.stdout.strip())
except Exception:
return None
def get_ssim(original, transcoded, max_duration_minutes=15):
"""Calculate SSIM score using FFmpeg with GPU acceleration for decoding"""
duration = get_video_duration(original)
if duration is None:
return "ERROR: Could not determine video duration"
max_duration_sec = min(duration, max_duration_minutes * 60)
cmd = [
"ffmpeg",
"-hwaccel", "cuda", # Use CUDA for decoding
"-i", original,
"-hwaccel", "cuda",
"-i", transcoded,
'-t', str(max_duration_sec),
"-filter_complex", "ssim",
"-f", "null",
"-",
]
try:
result = subprocess.run(
cmd,
stderr=subprocess.PIPE,
stdout=subprocess.DEVNULL,
text=True,
creationflags=subprocess.CREATE_NO_WINDOW,
)
# Extract SSIM value from FFmpeg output
match = re.search(r"All:(\d+\.\d+)", result.stderr)
if match:
return float(match.group(1))
return "ERROR: SSIM not found"
except Exception as e:
return f"ERROR: {str(e)}"
def get_vmaf(original, transcoded):
"""Calculate VMAF score using FFmpeg"""
temp_dir = tempfile.mkdtemp()
log_path = os.path.join(temp_dir, "vmaf.json")
log_path_ffmpeg = log_path.replace("\\", "/")
try:
filter_complex = f"[0:v][1:v]libvmaf=log_fmt=json:log_path='{log_path_ffmpeg}'"
cmd = [
"ffmpeg",
"-hide_banner",
"-i", original,
"-i", transcoded,
"-filter_complex", filter_complex,
"-f", "null",
"-",
]
proc = subprocess.run(
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
text=True,
creationflags=subprocess.CREATE_NO_WINDOW,
)
if proc.returncode != 0:
return f"ERROR: ffmpeg failed:\n{proc.stderr.strip()}"
if os.path.exists(log_path):
with open(log_path, "r") as f:
data = json.load(f)
return round(data["pooled_metrics"]["vmaf"]["mean"], 2)
return "ERROR: VMAF log not found"
except Exception as e:
return f"ERROR: {str(e)}"
finally:
shutil.rmtree(temp_dir, ignore_errors=True)
def get_stream_info(file_path):
"""Get audio and subtitle stream information using FFprobe"""
cmd = [
"ffprobe",
"-v", "error",
"-show_entries", "stream=index,codec_type,codec_name,tags:stream_tags=language",
"-of", "json",
file_path,
]
try:
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
creationflags=subprocess.CREATE_NO_WINDOW,
)
return json.loads(result.stdout)
except Exception as e:
return f"ERROR: {str(e)}"
def compare_streams(orig_info, trans_info):
"""Compare audio and subtitle streams between original and transcoded files"""
def extract_streams(streams, stream_type):
return [
{
"type": stream_type,
"language": stream.get("tags", {}).get("language", "und"),
"codec": stream.get("codec_name", "unknown"),
}
for stream in streams
if stream.get("codec_type") == stream_type
]
# Handle probe errors
if isinstance(orig_info, str) or isinstance(trans_info, str):
return "E", "E", "Error during probing"
orig_streams = orig_info.get("streams", [])
trans_streams = trans_info.get("streams", [])
# Extract audio and subtitle streams
orig_audio = extract_streams(orig_streams, "audio")
trans_audio = extract_streams(trans_streams, "audio")
orig_subs = extract_streams(orig_streams, "subtitle")
trans_subs = extract_streams(trans_streams, "subtitle")
# Compare counts and languages
audio_match = (
"O"
if (
len(orig_audio) == len(trans_audio)
and sorted(a["language"] for a in orig_audio)
== sorted(a["language"] for a in trans_audio)
)
else "X"
)
subs_match = (
"O"
if (
len(orig_subs) == len(trans_subs)
and sorted(s["language"] for s in orig_subs)
== sorted(s["language"] for s in trans_subs)
)
else "X"
)
# Create summary report
summary = (
f"Audio: {len(orig_audio)}=>{len(trans_audio)} "
f"Subs: {len(orig_subs)}=>{len(trans_subs)}"
)
return audio_match, subs_match, summary
def load_existing_results(csv_path):
"""Load existing results from CSV"""
results = {}
if os.path.exists(csv_path):
with open(csv_path, "r", encoding="utf-8") as f:
reader = csv.reader(f)
header = next(reader, None)
for row in reader:
if len(row) > 1:
key = (row[COL_ORIGINAL], row[COL_TRANSCODED])
results[key] = row
return results
def create_new_row(orig, trans):
"""Create a new row with all computed values"""
# Get file sizes in MB
try:
orig_size = os.path.getsize(orig) / (1024 * 1024)
trans_size = os.path.getsize(trans) / (1024 * 1024)
size_diff = orig_size - trans_size
size_ratio = trans_size / orig_size
except OSError as e:
orig_size = trans_size = size_diff = size_ratio = f"ERROR: {str(e)}"
# Calculate SSIM
ssim_score = get_ssim(orig, trans)
# Compare audio and subtitle streams
orig_info = get_stream_info(orig)
trans_info = get_stream_info(trans)
audio_match, subs_match, stream_summary = compare_streams(
orig_info, trans_info)
return [
orig,
trans,
f"{orig_size:.2f}" if isinstance(orig_size, float) else orig_size,
f"{trans_size:.2f}" if isinstance(trans_size, float) else trans_size,
f"{size_diff:.2f}" if isinstance(size_diff, float) else size_diff,
f"{size_ratio:.3f}" if isinstance(size_ratio, float) else size_ratio,
ssim_score,
"", # VMAF score initially empty
audio_match,
subs_match,
stream_summary,
"", # Accepted initially empty
]
def main():
parser = argparse.ArgumentParser(
description="Compare AV1 transcodes with originals"
)
parser.add_argument(
"directory", help="Root directory containing video files")
parser.add_argument(
"--output", default="av1_quality_report.csv", help="Output CSV filename"
)
args = parser.parse_args()
existing_results = load_existing_results(args.output)
all_pairs = find_video_pairs(args.directory)
# Ensure CSV file exists with header
if not os.path.exists(args.output):
with open(args.output, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(FULL_HEADER)
# === SSIM Phase ===
for orig, trans in (pbar := tqdm(all_pairs, desc="Calculating SSIM")):
key = (orig, trans)
pbar.set_postfix_str(Path(orig).stem)
if key not in existing_results:
new_row = create_new_row(orig, trans)
existing_results[key] = new_row
# Write immediately
with open(args.output, "a", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(new_row)
f.flush()
# === VMAF Phase ===
review_queue = []
# for key, row in existing_results.items():
# if len(row) <= COL_ACCEPTED:
# continue
# if row[COL_ACCEPTED].lower() == 'r' and (not row[COL_VMAF] or "ERROR" in row[COL_VMAF]):
# review_queue.append(key)
for key in (pbar := tqdm(review_queue, desc="Processing VMAF")):
orig, trans = key
pbar.set_postfix_str(Path(orig).stem)
vmaf_score = get_vmaf(orig, trans)
existing_results[key][COL_VMAF] = vmaf_score
# Overwrite CSV row-by-row after each VMAF update
with open(args.output, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(FULL_HEADER)
for row in existing_results.values():
if len(row) < len(FULL_HEADER):
row.extend([""] * (len(FULL_HEADER) - len(row)))
writer.writerow(row)
f.flush()
print(f"\nProcessing complete! Results saved to {args.output}")
print("Review the 'accepted' column and mark:")
print(" 'o' - Good quality")
print(" 'x' - Poor quality")
print(" 'r' - Needs VMAF review")
if __name__ == "__main__":
main()