362 lines
11 KiB
Python
362 lines
11 KiB
Python
import os
|
|
import subprocess
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import re
|
|
import tempfile
|
|
import shutil
|
|
from tqdm import tqdm
|
|
from pathlib import Path
|
|
|
|
# Define column indices for easy reference
|
|
COL_ORIGINAL = 0
|
|
COL_TRANSCODED = 1
|
|
COL_ORIG_SIZE = 2
|
|
COL_TRANS_SIZE = 3
|
|
COL_SIZE_SAVED = 4
|
|
COL_SIZE_RATIO = 5
|
|
COL_SSIM = 6
|
|
COL_VMAF = 7
|
|
COL_AUDIO = 8
|
|
COL_SUBS = 9
|
|
COL_STREAM_SUMMARY = 10
|
|
COL_ACCEPTED = 11
|
|
|
|
# Full header with all columns
|
|
FULL_HEADER = [
|
|
"original_file",
|
|
"transcoded_file",
|
|
"original_size (MB)",
|
|
"transcoded_size (MB)",
|
|
"size_saved (MB)",
|
|
"size_ratio",
|
|
"ssim_score",
|
|
"vmaf_score",
|
|
"audio_match",
|
|
"subs_match",
|
|
"stream_summary",
|
|
"accepted",
|
|
]
|
|
|
|
|
|
def find_video_pairs(root_dir):
|
|
"""Find original videos and their AV1 transcodes in subfolders"""
|
|
video_exts = (".mkv", ".mp4", ".avi")
|
|
pairs = []
|
|
|
|
for foldername, _, filenames in tqdm(os.walk(root_dir)):
|
|
# Find all potential original videos
|
|
originals = [
|
|
f
|
|
for f in filenames
|
|
if f.lower().endswith(video_exts) and "_av1_tdar" not in f
|
|
]
|
|
|
|
for orig in originals:
|
|
base, ext = os.path.splitext(orig)
|
|
trans_name = f"{base}_av1_tdar.mkv"
|
|
trans_path = os.path.join(foldername, trans_name)
|
|
|
|
if os.path.exists(trans_path):
|
|
orig_path = os.path.join(foldername, orig)
|
|
pairs.append((orig_path, trans_path))
|
|
|
|
return pairs
|
|
|
|
|
|
def get_video_duration(path):
|
|
"""Get video duration in seconds using FFprobe"""
|
|
try:
|
|
result = subprocess.run(
|
|
[
|
|
"ffprobe",
|
|
"-v", "error",
|
|
"-select_streams", "v:0",
|
|
"-show_entries", "format=duration",
|
|
"-of", "default=noprint_wrappers=1:nokey=1",
|
|
path,
|
|
],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.DEVNULL,
|
|
text=True,
|
|
creationflags=subprocess.CREATE_NO_WINDOW,
|
|
)
|
|
return float(result.stdout.strip())
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def get_ssim(original, transcoded, max_duration_minutes=15):
|
|
"""Calculate SSIM score using FFmpeg with GPU acceleration for decoding"""
|
|
duration = get_video_duration(original)
|
|
if duration is None:
|
|
return "ERROR: Could not determine video duration"
|
|
max_duration_sec = min(duration, max_duration_minutes * 60)
|
|
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-hwaccel", "cuda", # Use CUDA for decoding
|
|
"-i", original,
|
|
"-hwaccel", "cuda",
|
|
"-i", transcoded,
|
|
'-t', str(max_duration_sec),
|
|
"-filter_complex", "ssim",
|
|
"-f", "null",
|
|
"-",
|
|
]
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
stderr=subprocess.PIPE,
|
|
stdout=subprocess.DEVNULL,
|
|
text=True,
|
|
creationflags=subprocess.CREATE_NO_WINDOW,
|
|
)
|
|
|
|
# Extract SSIM value from FFmpeg output
|
|
match = re.search(r"All:(\d+\.\d+)", result.stderr)
|
|
if match:
|
|
return float(match.group(1))
|
|
return "ERROR: SSIM not found"
|
|
except Exception as e:
|
|
return f"ERROR: {str(e)}"
|
|
|
|
|
|
def get_vmaf(original, transcoded):
|
|
"""Calculate VMAF score using FFmpeg"""
|
|
temp_dir = tempfile.mkdtemp()
|
|
log_path = os.path.join(temp_dir, "vmaf.json")
|
|
log_path_ffmpeg = log_path.replace("\\", "/")
|
|
try:
|
|
filter_complex = f"[0:v][1:v]libvmaf=log_fmt=json:log_path='{log_path_ffmpeg}'"
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-hide_banner",
|
|
"-i", original,
|
|
"-i", transcoded,
|
|
"-filter_complex", filter_complex,
|
|
"-f", "null",
|
|
"-",
|
|
]
|
|
|
|
proc = subprocess.run(
|
|
cmd,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
creationflags=subprocess.CREATE_NO_WINDOW,
|
|
)
|
|
if proc.returncode != 0:
|
|
return f"ERROR: ffmpeg failed:\n{proc.stderr.strip()}"
|
|
if os.path.exists(log_path):
|
|
with open(log_path, "r") as f:
|
|
data = json.load(f)
|
|
return round(data["pooled_metrics"]["vmaf"]["mean"], 2)
|
|
return "ERROR: VMAF log not found"
|
|
except Exception as e:
|
|
return f"ERROR: {str(e)}"
|
|
finally:
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
|
|
|
|
def get_stream_info(file_path):
|
|
"""Get audio and subtitle stream information using FFprobe"""
|
|
cmd = [
|
|
"ffprobe",
|
|
"-v", "error",
|
|
"-show_entries", "stream=index,codec_type,codec_name,tags:stream_tags=language",
|
|
"-of", "json",
|
|
file_path,
|
|
]
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
creationflags=subprocess.CREATE_NO_WINDOW,
|
|
)
|
|
return json.loads(result.stdout)
|
|
except Exception as e:
|
|
return f"ERROR: {str(e)}"
|
|
|
|
|
|
def compare_streams(orig_info, trans_info):
|
|
"""Compare audio and subtitle streams between original and transcoded files"""
|
|
|
|
def extract_streams(streams, stream_type):
|
|
return [
|
|
{
|
|
"type": stream_type,
|
|
"language": stream.get("tags", {}).get("language", "und"),
|
|
"codec": stream.get("codec_name", "unknown"),
|
|
}
|
|
for stream in streams
|
|
if stream.get("codec_type") == stream_type
|
|
]
|
|
|
|
# Handle probe errors
|
|
if isinstance(orig_info, str) or isinstance(trans_info, str):
|
|
return "E", "E", "Error during probing"
|
|
|
|
orig_streams = orig_info.get("streams", [])
|
|
trans_streams = trans_info.get("streams", [])
|
|
|
|
# Extract audio and subtitle streams
|
|
orig_audio = extract_streams(orig_streams, "audio")
|
|
trans_audio = extract_streams(trans_streams, "audio")
|
|
orig_subs = extract_streams(orig_streams, "subtitle")
|
|
trans_subs = extract_streams(trans_streams, "subtitle")
|
|
|
|
# Compare counts and languages
|
|
audio_match = (
|
|
"O"
|
|
if (
|
|
len(orig_audio) == len(trans_audio)
|
|
and sorted(a["language"] for a in orig_audio)
|
|
== sorted(a["language"] for a in trans_audio)
|
|
)
|
|
else "X"
|
|
)
|
|
|
|
subs_match = (
|
|
"O"
|
|
if (
|
|
len(orig_subs) == len(trans_subs)
|
|
and sorted(s["language"] for s in orig_subs)
|
|
== sorted(s["language"] for s in trans_subs)
|
|
)
|
|
else "X"
|
|
)
|
|
|
|
# Create summary report
|
|
summary = (
|
|
f"Audio: {len(orig_audio)}=>{len(trans_audio)} "
|
|
f"Subs: {len(orig_subs)}=>{len(trans_subs)}"
|
|
)
|
|
|
|
return audio_match, subs_match, summary
|
|
|
|
|
|
def load_existing_results(csv_path):
|
|
"""Load existing results from CSV"""
|
|
results = {}
|
|
if os.path.exists(csv_path):
|
|
with open(csv_path, "r", encoding="utf-8") as f:
|
|
reader = csv.reader(f)
|
|
header = next(reader, None)
|
|
for row in reader:
|
|
if len(row) > 1:
|
|
key = (row[COL_ORIGINAL], row[COL_TRANSCODED])
|
|
results[key] = row
|
|
return results
|
|
|
|
|
|
def create_new_row(orig, trans):
|
|
"""Create a new row with all computed values"""
|
|
# Get file sizes in MB
|
|
try:
|
|
orig_size = os.path.getsize(orig) / (1024 * 1024)
|
|
trans_size = os.path.getsize(trans) / (1024 * 1024)
|
|
size_diff = orig_size - trans_size
|
|
size_ratio = trans_size / orig_size
|
|
except OSError as e:
|
|
orig_size = trans_size = size_diff = size_ratio = f"ERROR: {str(e)}"
|
|
|
|
# Calculate SSIM
|
|
ssim_score = get_ssim(orig, trans)
|
|
|
|
# Compare audio and subtitle streams
|
|
orig_info = get_stream_info(orig)
|
|
trans_info = get_stream_info(trans)
|
|
audio_match, subs_match, stream_summary = compare_streams(
|
|
orig_info, trans_info)
|
|
|
|
return [
|
|
orig,
|
|
trans,
|
|
f"{orig_size:.2f}" if isinstance(orig_size, float) else orig_size,
|
|
f"{trans_size:.2f}" if isinstance(trans_size, float) else trans_size,
|
|
f"{size_diff:.2f}" if isinstance(size_diff, float) else size_diff,
|
|
f"{size_ratio:.3f}" if isinstance(size_ratio, float) else size_ratio,
|
|
ssim_score,
|
|
"", # VMAF score initially empty
|
|
audio_match,
|
|
subs_match,
|
|
stream_summary,
|
|
"", # Accepted initially empty
|
|
]
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Compare AV1 transcodes with originals"
|
|
)
|
|
parser.add_argument(
|
|
"directory", help="Root directory containing video files")
|
|
parser.add_argument(
|
|
"--output", default="av1_quality_report.csv", help="Output CSV filename"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
existing_results = load_existing_results(args.output)
|
|
all_pairs = find_video_pairs(args.directory)
|
|
|
|
# Ensure CSV file exists with header
|
|
if not os.path.exists(args.output):
|
|
with open(args.output, "w", newline="", encoding="utf-8") as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(FULL_HEADER)
|
|
|
|
# === SSIM Phase ===
|
|
for orig, trans in (pbar := tqdm(all_pairs, desc="Calculating SSIM")):
|
|
key = (orig, trans)
|
|
pbar.set_postfix_str(Path(orig).stem)
|
|
if key not in existing_results:
|
|
new_row = create_new_row(orig, trans)
|
|
existing_results[key] = new_row
|
|
|
|
# Write immediately
|
|
with open(args.output, "a", newline="", encoding="utf-8") as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(new_row)
|
|
f.flush()
|
|
|
|
# === VMAF Phase ===
|
|
review_queue = []
|
|
# for key, row in existing_results.items():
|
|
# if len(row) <= COL_ACCEPTED:
|
|
# continue
|
|
# if row[COL_ACCEPTED].lower() == 'r' and (not row[COL_VMAF] or "ERROR" in row[COL_VMAF]):
|
|
# review_queue.append(key)
|
|
|
|
for key in (pbar := tqdm(review_queue, desc="Processing VMAF")):
|
|
orig, trans = key
|
|
pbar.set_postfix_str(Path(orig).stem)
|
|
|
|
vmaf_score = get_vmaf(orig, trans)
|
|
existing_results[key][COL_VMAF] = vmaf_score
|
|
|
|
# Overwrite CSV row-by-row after each VMAF update
|
|
with open(args.output, "w", newline="", encoding="utf-8") as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(FULL_HEADER)
|
|
for row in existing_results.values():
|
|
if len(row) < len(FULL_HEADER):
|
|
row.extend([""] * (len(FULL_HEADER) - len(row)))
|
|
writer.writerow(row)
|
|
f.flush()
|
|
|
|
print(f"\nProcessing complete! Results saved to {args.output}")
|
|
print("Review the 'accepted' column and mark:")
|
|
print(" 'o' - Good quality")
|
|
print(" 'x' - Poor quality")
|
|
print(" 'r' - Needs VMAF review")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|