# /// script # requires-python = ">=3.13" # dependencies = [ # "ffmpeg", # "watchdog", # ] # /// import argparse import time import sys import subprocess import os from pathlib import Path from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler # Supported video extensions to monitor VIDEO_EXTENSIONS = {'.mkv', '.mp4', '.avi', '.mov', '.wmv', '.flv', '.webm', '.m4v', '.ts'} def get_ffmpeg_command(input_path, output_path): """ Constructs the FFmpeg command based on the Handbrake preset requirements. """ return [ "ffmpeg", "-n", # Never overwrite output files (extra safety) "-i", str(input_path), # Input file "-c:v", "av1_nvenc", # Video Encoder "-pix_fmt", "p010le", # 10-bit color "-preset", "p4", # Medium preset "-rc", "vbr", # Variable Bit Rate control "-cq", "35", # Constant Quality factor # Filter chain: Deinterlace -> Scale down if >1080p -> Cap FPS at 30 "-vf", "yadif,scale='min(1920,iw)':-2,fps=30", "-c:a", "aac", # Audio Encoder "-b:a", "160k", # Audio Bitrate "-ac", "2", # Audio Channels (Stereo) "-color_range", "tv", # Limited color range "-movflags", "+faststart", # Web optimization str(output_path) # Output file ] def transcode_file(input_file, output_file): input_path = Path(input_file) output_path = Path(output_file) # 1. Check existence logic if output_path.exists(): print(f"[SKIP] Output file already exists: {output_path.name}") return # 2. Check if file is ready (simple size stability check) # This prevents transcoding files that are still being copied into the folder print(f"[WAIT] Ensuring file is ready: {input_path.name}...") try: historical_size = -1 while True: current_size = input_path.stat().st_size if current_size == historical_size and current_size > 0: break historical_size = current_size time.sleep(2) # Wait 2 seconds between checks except FileNotFoundError: return # File was deleted before we could process it print(f"[START] Transcoding: {input_path.name} -> {output_path.name}") cmd = get_ffmpeg_command(input_path, output_path) try: # Run FFmpeg subprocess.run(cmd, check=True) print(f"[DONE] Successfully created: {output_path.name}") except subprocess.CalledProcessError as e: print(f"[ERROR] FFmpeg failed for {input_path.name}") except KeyboardInterrupt: print("\n[STOP] Interrupted by user.") sys.exit(0) class NewFileHandler(FileSystemEventHandler): def __init__(self, output_dir): self.output_dir = Path(output_dir) def on_created(self, event): if event.is_directory: return self.process(event.src_path) def on_moved(self, event): if event.is_directory: return self.process(event.dest_path) def process(self, file_path_str): input_path = Path(file_path_str) # Filter for video extensions if input_path.suffix.lower() not in VIDEO_EXTENSIONS: return # Determine output path # We rename the file to avoid conflicts if input/output dirs are same (though not recommended) new_filename = input_path.stem + ".mp4" output_path = self.output_dir / new_filename transcode_file(input_path, output_path) def main(): parser = argparse.ArgumentParser(description="Nvidia AV1 Transcoder & Watcher") # Mutually exclusive group for modes group = parser.add_mutually_exclusive_group(required=True) group.add_argument("--input", type=str, help="Single file to transcode") group.add_argument("--watch-dir", type=str, help="Directory to monitor for new files") parser.add_argument("--output-dir", type=str, help="Output directory (Required for --watch-dir)") args = parser.parse_args() # --- Mode 1: Single File --- if args.input: input_path = Path(args.input) if not input_path.exists(): print(f"Error: Input file '{args.input}' not found.") sys.exit(1) # If output dir provided, use it. Otherwise use source folder + _av1 suffix if args.output_dir: out_dir = Path(args.output_dir) out_dir.mkdir(parents=True, exist_ok=True) output_path = out_dir / (input_path.stem + ".mp4") else: output_path = input_path.parent / (input_path.stem + "_av1.mp4") transcode_file(input_path, output_path) # --- Mode 2: Watch Directory --- elif args.watch_dir: if not args.output_dir: print("Error: --output-dir is required when using --watch-dir") sys.exit(1) watch_dir = Path(args.watch_dir) output_dir = Path(args.output_dir) if not watch_dir.exists(): print(f"Error: Watch directory '{watch_dir}' does not exist.") sys.exit(1) # Create output dir if it doesn't exist output_dir.mkdir(parents=True, exist_ok=True) print(f"Monitoring {watch_dir}...") print(f"Outputting to {output_dir}") print("Press Ctrl+C to stop.") event_handler = NewFileHandler(output_dir) observer = Observer() observer.schedule(event_handler, str(watch_dir), recursive=False) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: observer.stop() observer.join() if __name__ == "__main__": main()