Files
scripts/transcode.py
T

835 lines
28 KiB
Python

# /// script
# requires-python = ">=3.13"
# dependencies = [
# "ffmpeg",
# "watchdog",
# ]
# ///
import argparse
import csv
import datetime
import hashlib
import json
import logging
import os
import re
import select
import signal
import subprocess
import sys
import time
from pathlib import Path
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
# Supported video extensions to monitor
VIDEO_EXTENSIONS = {
".mkv",
".mp4",
".avi",
".mov",
".wmv",
".flv",
".webm",
".m4v",
".ts",
}
# Global cache dictionary
# Structure: { "filesize-md5hash": "codec_name" }
TRANSCODE_CACHE = {}
STOP_REQUESTED = False
CURRENT_PROCESS = None
def signal_handler(sig, frame):
"""
Handles Ctrl+C (SIGINT) without raising exceptions.
1st Press: Sets STOP_REQUESTED flag (Graceful Exit).
2nd Press: Force kills the current subprocess and exits.
"""
global STOP_REQUESTED, CURRENT_PROCESS
if not STOP_REQUESTED:
logging.info(
"\n[INFO] > [GRACEFUL EXIT] Signal received. Finishing current file, then exiting script."
)
logging.info("[INFO] > Press Ctrl+C again to FORCE QUIT immediately.")
STOP_REQUESTED = True
else:
logging.warning(
"\n[WARN] >> [FORCE QUIT] Signal received again. Terminating process..."
)
if CURRENT_PROCESS:
try:
CURRENT_PROCESS.terminate()
# Give it a tiny moment to die before we hard exit,
# but don't block the signal handler too long
time.sleep(0.5)
if CURRENT_PROCESS.poll() is None:
CURRENT_PROCESS.kill()
except Exception as e:
logging.error(f"Error killing process: {e}")
sys.exit(1)
def get_config_dir():
"""Returns the main configuration directory path."""
xdg_config = os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config"))
return Path(xdg_config) / "transcoder"
def get_cache_file():
"""Returns the path to the cache file."""
return get_config_dir() / "cache.json"
def load_cache():
"""Loads the codec cache from disk into the global variable."""
global TRANSCODE_CACHE
cache_path = get_cache_file()
if cache_path.exists():
try:
with open(cache_path, "r") as f:
TRANSCODE_CACHE = json.load(f)
logging.debug(f"Loaded {len(TRANSCODE_CACHE)} entries from cache.")
except Exception as e:
logging.error(f"Failed to load cache: {e}")
TRANSCODE_CACHE = {}
else:
TRANSCODE_CACHE = {}
def save_cache():
"""Saves the global cache to disk."""
cache_path = get_cache_file()
try:
with open(cache_path, "w") as f:
json.dump(TRANSCODE_CACHE, f, indent=2)
except Exception as e:
logging.warning(f"Failed to save cache: {e}")
def get_file_signature(file_path):
"""
Generates a robust signature for the file based on size and header hash.
This allows detection even if filename or modification date changes.
"""
try:
stat = file_path.stat()
file_size = stat.st_size
# Read the first 32KB of the file to generate a partial hash
# This is extremely fast but unique enough for video files
with open(file_path, "rb") as f:
header = f.read(32 * 1024)
header_hash = hashlib.md5(header).hexdigest()
return f"{file_size}-{header_hash}"
except Exception as e:
logging.debug(f"Could not generate signature for {file_path}: {e}")
return None
def update_cache_entry(file_path, codec):
"""Updates the cache for a specific file using its signature."""
signature = get_file_signature(file_path)
if signature:
TRANSCODE_CACHE[signature] = codec
save_cache()
def setup_logging():
"""
Sets up logging to both console (INFO) and file (DEBUG/Verbose).
Log file is stored in $XDG_CONFIG_HOME/transcoder/transcoder.log
"""
log_dir = get_config_dir()
log_dir.mkdir(parents=True, exist_ok=True)
log_file = log_dir / "transcoder.log"
# Create logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG) # Capture everything
# formatter
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
# File Handler (Verbose)
file_handler = logging.FileHandler(log_file)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
# Console Handler (Concise)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter("[%(levelname)s] %(message)s"))
logger.addHandler(console_handler)
logging.info(f"Logging started. Verbose logs at: {log_file}")
def load_config():
"""
Loads configuration from config.json in the config directory.
Returns a dictionary of config values.
"""
config_path = get_config_dir() / "config.json"
if not config_path.exists():
return {}
try:
with open(config_path, "r") as f:
config = json.load(f)
logging.debug(f"Loaded config from {config_path}: {config}")
return config
except Exception as e:
logging.error(f"Failed to load config file: {e}")
return {}
def _run_ffprobe(cmd):
"""
Helper function to run ffprobe commands with consistent error handling.
"""
cmd_base = [
"ffprobe",
"-v",
"quiet",
"-print_format",
"json",
"-show_streams",
"-select_streams",
"v:0",
]
try:
result = subprocess.run(
cmd_base + cmd, capture_output=True, text=True, check=True
)
if result.stdout.strip():
return json.loads(result.stdout.strip())
else:
return json.loads("{}")
except subprocess.CalledProcessError as e:
logging.warning(f"ffprobe command failed: {e}")
return None
except FileNotFoundError:
logging.error("ffprobe not found. Please ensure ffmpeg/ffprobe is installed.")
return None
def get_video_codec(file_path):
"""
Uses ffprobe to determine the codec of the first video stream.
Checks memory cache first using robust file signatures.
Returns the codec name (e.g., 'av1', 'h264', 'hevc') or None if detection fails.
"""
try:
# 1. Check Cache using Signature
signature = get_file_signature(file_path)
if signature and signature in TRANSCODE_CACHE:
codec = TRANSCODE_CACHE[signature]
logging.debug(
f"Cache hit for {file_path} (sig: {signature[:10]}...): {codec}"
)
return codec
# 2. Run FFprobe (Cache Miss)
ffprobe_data = _run_ffprobe([str(file_path)])
if ffprobe_data:
codec = ffprobe_data["streams"][0]["codec_name"]
logging.debug(f"Detected codec for {file_path}: {codec}")
# 3. Update Cache
if signature:
TRANSCODE_CACHE[signature] = codec
save_cache()
return codec
return None
except Exception as e:
logging.warning(f"Failed to probe codec for {file_path}: {e}")
return None
def get_video_duration(file_path):
"""
Get video duration using ffprobe.
Returns duration in seconds as a float, or 0.0 if error occurs.
"""
try:
ffprobe_data = _run_ffprobe([str(file_path)])
if ffprobe_data:
duration = float(ffprobe_data["streams"][0]["duration"])
# format to minutes with "m" suffix
duration_formatted = f"{duration / 60:.0f}m"
logging.debug(f"Detected duration for {file_path}: {duration_formatted}")
return duration_formatted
return "0m"
except (ValueError, Exception) as e:
logging.warning(f"Could not get duration for {file_path}: {e}")
return "0m"
def leftalign(str):
return 5 * " " + str
def get_ffmpeg_command(input_path, output_path):
"""
Constructs the FFmpeg command based on the Handbrake preset requirements.
"""
cmd = [
"ffmpeg",
"-n", # Never overwrite output files
"-i",
str(input_path), # Input file
"-c:v",
"av1_nvenc", # Video Encoder
"-pix_fmt",
"p010le", # 10-bit color
"-preset",
"p4", # Medium preset
"-rc",
"vbr", # Variable Bit Rate control
"-cq",
"35", # Constant Quality factor
# Filter chain: Deinterlace -> Scale down if >1080p -> Cap FPS at 30
"-vf",
"yadif,scale='min(1920,iw)':-2,fps=30",
"-c:a",
"aac", # Audio Encoder
"-b:a",
"160k", # Audio Bitrate
"-ac",
"2", # Audio Channels (Stereo)
"-color_range",
"tv", # Limited color range
"-movflags",
"+faststart", # Web optimization
str(output_path), # Output file
]
return cmd
def transcode_file(input_file, output_file=None, skip_av1=True, replace_mode=False):
global STOP_REQUESTED, CURRENT_PROCESS
# If a stop was requested during the directory scan before we even entered here, abort.
if STOP_REQUESTED:
return
input_path = Path(input_file)
logging.debug(f"Processing request for: {input_path}")
target_path = None
use_temp_file = False
if replace_mode:
target_path = input_path.with_suffix(".mp4")
# If the target is the same as input (e.g. input is already .mp4), use a temp file
if target_path == input_path:
use_temp_file = True
transcode_output_path = input_path.with_suffix(".tmp.mp4")
logging.debug(
f"Input and output filenames match. Using temp file: {transcode_output_path}"
)
else:
transcode_output_path = target_path
else:
if not output_file:
logging.error("No output file specified and not in replace mode.")
return
target_path = Path(output_file)
transcode_output_path = target_path
logging.debug(f"Final transcode target: {transcode_output_path}")
# 1. Check existence logic
if transcode_output_path.exists():
logging.info(
leftalign(f"SKIP: Output file already exists: {transcode_output_path}")
)
return
# 2. Check if file is ready (simple size stability check)
if not input_path.exists():
logging.warning(f"File vanished during checks: {input_path}")
return
logging.info(leftalign("WAIT: Ensuring file is ready..."))
try:
historical_size = -1
while True:
# Check for stop request during wait
if STOP_REQUESTED:
logging.info(leftalign("Aborting file check due to stop request."))
return
current_size = input_path.stat().st_size
logging.debug(
f"File stability check - Current: {current_size}, Previous: {historical_size}"
)
if current_size == historical_size and current_size > 0:
logging.debug("File size stable. Proceeding.")
break
historical_size = current_size
time.sleep(2)
except FileNotFoundError:
logging.warning(f"File vanished during checks: {input_path}")
return
# 3. Check Codec (Optional Skip)
if skip_av1:
codec = get_video_codec(input_path)
if codec == "av1":
logging.info(leftalign(f"SKIP: Input file is already AV1: {input_path}"))
return
else:
logging.info(leftalign(f"Codec is {codec}"))
# Prepare individual FFmpeg log file
ffmpeg_logs_dir = get_config_dir() / "ffmpeg_logs"
ffmpeg_logs_dir.mkdir(parents=True, exist_ok=True)
logfile_timestamp = datetime.datetime.now().strftime("%Y-%m-%d-%H:%M:%S")
# We also append .log to the full filename to avoid collisions (e.g. video.mp4.log)
ffmpeg_log_file = ffmpeg_logs_dir / f"{logfile_timestamp}_{input_path.name}.log"
logging.info(leftalign("START: Transcoding..."))
if replace_mode:
logging.info(
leftalign(f"Outputting to temporary file: {transcode_output_path}")
)
logging.info(leftalign(f"FFmpeg details logging to: {ffmpeg_log_file}"))
cmd = get_ffmpeg_command(input_path, transcode_output_path)
logging.debug(f"Executing FFmpeg command: {' '.join(cmd)}")
try:
with open(ffmpeg_log_file, "w", encoding="utf-8", errors="replace") as f_log:
# Write the command itself to the top of the log
f_log.write(f"COMMAND: {' '.join(cmd)}\n\n")
f_log.flush()
# Run FFmpeg and process output in real-time with timestamps
# NOTE: start_new_session=True ensures Ctrl+C doesn't kill ffmpeg immediately
# allowing us to handle the first press gracefully in Python.
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
start_new_session=True,
)
# Set global process so signal handler can kill it if needed
CURRENT_PROCESS = process
regex_elapsed = re.compile(r"elapsed=([0-9:.]+)")
transcoding_duration = None
# Helper to check if process is alive
def is_alive(p):
return p.poll() is None
# Main read loop
while is_alive(process):
# NOTE: If signal handler is called, it does NOT interrupt select in Python 3.5+
# unless the handler raises an exception. We don't raise exception on 1st press.
# We check STOP_REQUESTED only to decide logic, but we KEEP WAITING for this file.
reads = [process.stdout.fileno()]
# Wait up to 0.5s for output
ret = select.select(reads, [], [], 0.5)
if reads[0] in ret[0]:
output = process.stdout.readline()
if output:
if regex_result := re.findall(regex_elapsed, output):
transcoding_duration = regex_result[0]
timestamp = datetime.datetime.now().strftime(
"%Y-%m-%d %H:%M:%S"
)
f_log.write(f"[{timestamp}] {output.rstrip()}\n")
f_log.flush()
elif is_alive(process):
time.sleep(0.1)
# We do NOT break here if STOP_REQUESTED is True.
# We expressly want to finish this file.
CURRENT_PROCESS = None
result = process.wait()
# Process Finished
if result == 0:
logging.info(
leftalign(f"DONE: Successfully transcoded to {transcode_output_path}")
)
logging.debug(f"Transcoding duration was {transcoding_duration}.")
original_codec = get_video_codec(input_path)
# Update cache for the NEW file (which is now AV1)
# This ensures if we restart, we know this new file is already done
if replace_mode and use_temp_file:
# We replaced the input file with the temp file.
# Update signature for the new file content
update_cache_entry(input_path, "av1")
transcoded_codec = get_video_codec(transcode_output_path)
video_duration = get_video_duration(input_path)
# Safe size check
try:
original_size = f"{input_path.stat().st_size / (1024 * 1024):.0f}MB"
except FileNotFoundError:
original_size = "0MB"
transcoded_size = (
f"{transcode_output_path.stat().st_size / (1024 * 1024):.0f}MB"
)
logging.debug(f"Original file size: {original_size}")
logging.debug(f"Transcoded file size: {transcoded_size}")
# Write to stats CSV file
stats_file = get_config_dir() / "stats.csv"
file_exists = stats_file.exists()
with open(stats_file, "a", newline="", encoding="utf-8") as csvfile:
writer = csv.writer(csvfile)
# Write header if file doesn't exist
if not file_exists:
writer.writerow(
[
"filename",
"original_size",
"transcoded_size",
"original_codec",
"transcoded_codec",
"video_duration",
"transcoding_duration",
]
)
writer.writerow(
[
input_path.name,
original_size,
transcoded_size,
original_codec,
transcoded_codec,
video_duration,
transcoding_duration,
]
)
if replace_mode:
if use_temp_file:
# Rename temp file to overwrite original
try:
transcode_output_path.replace(input_path)
logging.info(leftalign("REPLACE: Overwrote original file."))
# IMPORTANT: Update cache again because file content at input_path changed
update_cache_entry(input_path, "av1")
except OSError as e:
logging.error(
leftalign(f"Failed to replace original file: {e}")
)
else:
# Different extensions (e.g. mkv -> mp4). Delete original.
try:
input_path.unlink()
logging.info(leftalign("DELETE: Removed original file."))
except OSError as e:
logging.error(leftalign(f"Failed to delete original file: {e}"))
else:
# If stopped via signal (SIGTERM/KILL), exit code will be non-zero
if STOP_REQUESTED:
logging.info(leftalign("Process stopped by user (Incomplete)."))
else:
logging.error(
f"FFmpeg failed for {input_path} with exit code {result}. See log at {ffmpeg_log_file}"
)
if replace_mode and use_temp_file and transcode_output_path.exists():
try:
transcode_output_path.unlink()
except Exception:
pass
except Exception as e:
logging.exception(f"Unexpected error during transcoding of {input_path}: {e}")
if replace_mode and use_temp_file and transcode_output_path.exists():
try:
transcode_output_path.unlink()
except Exception:
pass
class NewFileHandler(FileSystemEventHandler):
def __init__(self, output_dir=None, skip_av1=True, replace_mode=False):
self.output_dir = Path(output_dir) if output_dir else None
self.skip_av1 = skip_av1
self.replace_mode = replace_mode
def on_created(self, event):
if event.is_directory:
return
logging.debug(f"Watchdog event (created): {event.src_path}")
self.process(event.src_path)
def on_moved(self, event):
if event.is_directory:
return
logging.debug(f"Watchdog event (moved): {event.dest_path}")
self.process(event.dest_path)
def process(self, file_path_str):
if STOP_REQUESTED:
return
input_path = Path(file_path_str)
# Filter for video extensions
if input_path.suffix.lower() not in VIDEO_EXTENSIONS:
logging.debug(f"Ignored non-video file: {input_path}")
return
output_path = None
if not self.replace_mode and self.output_dir:
new_filename = input_path.stem + ".mp4"
output_path = self.output_dir / new_filename
transcode_file(
input_path,
output_path,
skip_av1=self.skip_av1,
replace_mode=self.replace_mode,
)
def process_recursive_directory(input_path, args, skip_av1):
"""Process all video files in a directory recursively."""
logging.info(f"Scanning directory recursively for video files: {input_path}")
# We collect files first to avoid modifying directory while iterating if possible,
# though rglob is a generator. We check STOP_REQUESTED inside loop.
for video_file in input_path.rglob("*"):
if STOP_REQUESTED:
logging.info("Stopping recursive scan due to signal.")
break
if video_file.is_file() and video_file.suffix.lower() in VIDEO_EXTENSIONS:
logging.info(f"FILE: {video_file}")
output_path = None
if not args.replace:
if args.output_dir:
out_dir = Path(args.output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
output_path = out_dir / (video_file.stem + ".mp4")
else:
output_path = video_file.parent / (video_file.stem + "_av1.mp4")
transcode_file(
video_file,
output_path,
skip_av1=skip_av1,
replace_mode=args.replace,
)
def main():
# Register Signal Handler
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
setup_logging()
load_cache()
config = load_config()
parser = argparse.ArgumentParser(description="Nvidia AV1 Transcoder & Watcher")
parser.add_argument("--input", type=str, help="File or directory to transcode")
parser.add_argument(
"--watch", action="store_true", help="Enable filesystem watching mode"
)
parser.add_argument("--output-dir", type=str, help="Output directory")
parser.add_argument(
"--no-skip-av1",
action="store_true",
help="Force transcoding even if input is already AV1",
)
parser.add_argument(
"--replace",
action="store_true",
help="Replace original files with transcoded versions (Ignores --output-dir)",
)
parser.add_argument(
"--recursive",
action="store_true",
help="Recursively scan directories for input files",
)
# Set defaults from config
default_watch = config.get("watch") or False
default_output = config.get("output-dir") or config.get("output_dir")
default_no_skip = config.get("no-skip-av1") or config.get("no_skip_av1") or False
default_replace = config.get("replace") or False
default_recursive = config.get("recursive") or False
parser.set_defaults(
watch=default_watch,
output_dir=default_output,
no_skip_av1=default_no_skip,
replace=default_replace,
recursive=default_recursive,
)
args = parser.parse_args()
skip_av1 = not args.no_skip_av1
# --- Initial Policy Logging ---
if args.replace:
logging.info("POLICY: --replace: Original files will be overwritten/deleted.")
if args.output_dir:
logging.warning(
"--output-dir is specified but will be IGNORED due to --replace mode."
)
if args.no_skip_av1:
logging.info("POLICY: --no-skip-av1: Do not skip AV1 files.")
else:
logging.info("POLICY: Skip AV1 files. Change with --no-skip-av1.")
# --- Mode 1: Single File / Recursive ---
if args.input:
input_path = Path(args.input)
if not input_path.exists():
logging.critical(f"Input file '{args.input}' not found.")
sys.exit(1)
# If input is a directory, process accordingly
if input_path.is_dir():
if args.recursive:
# Process directory recursively
process_recursive_directory(input_path, args, skip_av1)
else:
# Process directory non-recursively (all files in this directory only)
logging.info(f"Processing directory non-recursively: {input_path}")
for video_file in input_path.iterdir():
if STOP_REQUESTED:
break
if (
video_file.is_file()
and video_file.suffix.lower() in VIDEO_EXTENSIONS
):
# ... (Simplified logic for non-recursive dir) ...
logging.info(f"FILE: {video_file}")
output_path = None
if not args.replace:
if args.output_dir:
out_dir = Path(args.output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
output_path = out_dir / (video_file.stem + ".mp4")
else:
output_path = video_file.parent / (
video_file.stem + "_av1.mp4"
)
transcode_file(
video_file,
output_path,
skip_av1=skip_av1,
replace_mode=args.replace,
)
else:
# Single File
output_path = None
if not args.replace:
if args.output_dir:
out_dir = Path(args.output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
output_path = out_dir / (input_path.stem + ".mp4")
else:
output_path = input_path.parent / (input_path.stem + "_av1.mp4")
transcode_file(
input_path, output_path, skip_av1=skip_av1, replace_mode=args.replace
)
if STOP_REQUESTED:
logging.info("Exiting script gracefully as requested.")
sys.exit(0)
# --- Mode 2: Watch Directory ---
if args.watch:
# If replace mode is OFF, output-dir is required.
if not args.replace and not args.output_dir:
logging.critical("Either specify --output-dir OR enable --replace mode.")
sys.exit(1)
# Use --input as the watch directory if provided, otherwise default to current directory
if args.input:
watch_dir = Path(args.input)
else:
watch_dir = Path(".")
logging.info("No watch directory specified, monitoring current directory.")
if not watch_dir.exists():
logging.critical(f"Watch directory '{watch_dir}' does not exist.")
sys.exit(1)
logging.info(f"Monitoring {watch_dir}...")
logging.info("Press Ctrl+C to stop.")
output_dir_path = (
Path(args.output_dir) if (args.output_dir and not args.replace) else None
)
if output_dir_path:
output_dir_path.mkdir(parents=True, exist_ok=True)
logging.info(f"Outputting to {output_dir_path}")
else:
logging.info("Outputting in-place (replacing originals).")
event_handler = NewFileHandler(
output_dir_path, skip_av1=skip_av1, replace_mode=args.replace
)
observer = Observer()
observer.schedule(event_handler, str(watch_dir), recursive=args.recursive)
observer.start()
try:
while not STOP_REQUESTED:
time.sleep(1)
except KeyboardInterrupt:
# Fallback if signal handler doesn't catch it for some reason
pass
logging.info("Stopping observer...")
observer.stop()
observer.join()
logging.info("Exiting script gracefully.")
else:
logging.critical("No operation mode selected.")
logging.critical(
"Please provide --input for single file OR enable --watch mode."
)
sys.exit(1)
if __name__ == "__main__":
main()