# /// script # requires-python = ">=3.13" # dependencies = [ # "ffmpeg", # "watchdog", # ] # /// import argparse import time import sys import subprocess import os import logging import json from pathlib import Path from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler # Supported video extensions to monitor VIDEO_EXTENSIONS = {'.mkv', '.mp4', '.avi', '.mov', '.wmv', '.flv', '.webm', '.m4v', '.ts'} def get_config_dir(): """Returns the main configuration directory path.""" xdg_config = os.environ.get('XDG_CONFIG_HOME', os.path.expanduser('~/.config')) return Path(xdg_config) / "transcoder" def setup_logging(): """ Sets up logging to both console (INFO) and file (DEBUG/Verbose). Log file is stored in $XDG_CONFIG_HOME/transcoder/transcoder.log """ log_dir = get_config_dir() log_dir.mkdir(parents=True, exist_ok=True) log_file = log_dir / "transcoder.log" # Create logger logger = logging.getLogger() logger.setLevel(logging.DEBUG) # Capture everything # formatter formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') # File Handler (Verbose) file_handler = logging.FileHandler(log_file) file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(formatter) logger.addHandler(file_handler) # Console Handler (Concise) console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.INFO) console_handler.setFormatter(logging.Formatter('[%(levelname)s] %(message)s')) logger.addHandler(console_handler) logging.info(f"Logging started. Verbose logs at: {log_file}") def load_config(): """ Loads configuration from config.json in the config directory. Returns a dictionary of config values. """ config_path = get_config_dir() / "config.json" if not config_path.exists(): return {} try: with open(config_path, 'r') as f: config = json.load(f) logging.debug(f"Loaded config from {config_path}: {config}") return config except Exception as e: logging.error(f"Failed to load config file: {e}") return {} def get_video_codec(input_path): """ Uses ffprobe to determine the codec of the first video stream. Returns the codec name (e.g., 'av1', 'h264', 'hevc') or None if detection fails. """ cmd = [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=codec_name", "-of", "default=noprint_wrappers=1:nokey=1", str(input_path) ] try: # We don't log this subprocess call to avoid spamming debug logs, # unless it fails. result = subprocess.run(cmd, capture_output=True, text=True, check=True) codec = result.stdout.strip() logging.debug(f"Detected codec for {input_path.name}: {codec}") return codec except subprocess.CalledProcessError as e: logging.warning(f"Failed to probe codec for {input_path.name}: {e}") return None except FileNotFoundError: logging.error("ffprobe not found. Please ensure ffmpeg/ffprobe is installed.") return None def get_ffmpeg_command(input_path, output_path): """ Constructs the FFmpeg command based on the Handbrake preset requirements. """ cmd = [ "ffmpeg", "-n", # Never overwrite output files "-i", str(input_path), # Input file "-c:v", "av1_nvenc", # Video Encoder "-pix_fmt", "p010le", # 10-bit color "-preset", "p4", # Medium preset "-rc", "vbr", # Variable Bit Rate control "-cq", "35", # Constant Quality factor # Filter chain: Deinterlace -> Scale down if >1080p -> Cap FPS at 30 "-vf", "yadif,scale='min(1920,iw)':-2,fps=30", "-c:a", "aac", # Audio Encoder "-b:a", "160k", # Audio Bitrate "-ac", "2", # Audio Channels (Stereo) "-color_range", "tv", # Limited color range "-movflags", "+faststart", # Web optimization str(output_path) # Output file ] return cmd def transcode_file(input_file, output_file=None, skip_av1=True, replace_mode=False): input_path = Path(input_file) logging.debug(f"Processing request for: {input_path}") target_path = None use_temp_file = False if replace_mode: logging.warning("Policy: --replace is enabled. --output-dir is ignored. Transcoding in-place.") target_path = input_path.with_suffix('.mp4') # If the target is the same as input (e.g. input is already .mp4), use a temp file if target_path == input_path: use_temp_file = True transcode_output_path = input_path.with_suffix('.tmp.mp4') logging.debug(f"Input and output filenames match. Using temp file: {transcode_output_path}") else: transcode_output_path = target_path else: if not output_file: logging.error("No output file specified and not in replace mode.") return target_path = Path(output_file) transcode_output_path = target_path logging.debug(f"Final transcode target: {transcode_output_path}") # 1. Check existence logic (Skip if target exists, unless we are using a temp file for replacement) # If replace_mode is True and extensions differ, we still want to respect -n (no overwrite) # for the destination file if it already exists. if transcode_output_path.exists(): logging.info(f"SKIP: Output file already exists: {transcode_output_path.name}") return # 2. Check if file is ready (simple size stability check) logging.info(f"WAIT: Ensuring file is ready: {input_path.name}...") try: historical_size = -1 while True: current_size = input_path.stat().st_size logging.debug(f"File stability check - Current: {current_size}, Previous: {historical_size}") if current_size == historical_size and current_size > 0: logging.debug("File size stable. Proceeding.") break historical_size = current_size time.sleep(2) except FileNotFoundError: logging.warning(f"File vanished during checks: {input_path.name}") return # 3. Check Codec (Optional Skip) if skip_av1: codec = get_video_codec(input_path) if codec == 'av1': logging.info(f"SKIP: Input file is already AV1: {input_path.name}") return # Prepare individual FFmpeg log file ffmpeg_logs_dir = get_config_dir() / "ffmpeg_logs" ffmpeg_logs_dir.mkdir(parents=True, exist_ok=True) # We append .log to the full filename to avoid collisions (e.g. video.mp4.log) ffmpeg_log_file = ffmpeg_logs_dir / f"{input_path.name}.log" logging.info(f"START: Transcoding {input_path.name}") if replace_mode: logging.info(f" Outputting to temporary file: {input_path.name}") logging.info(f" FFmpeg details logging to: {ffmpeg_log_file}") cmd = get_ffmpeg_command(input_path, transcode_output_path) logging.debug(f"Executing FFmpeg command: {' '.join(cmd)}") try: with open(ffmpeg_log_file, "w", encoding="utf-8", errors="replace") as f_log: # Write the command itself to the top of the log f_log.write(f"COMMAND: {' '.join(cmd)}\n\n") f_log.flush() # Run FFmpeg, redirecting both stdout and stderr to the individual log file result = subprocess.run(cmd, stdout=f_log, stderr=subprocess.STDOUT, text=True) if result.returncode == 0: logging.info(f"DONE: Successfully created {transcode_output_path.name}") if replace_mode: if use_temp_file: # Rename temp file to overwrite original try: transcode_output_path.replace(input_path) logging.info(f"REPLACE: Overwrote original file {input_path.name} with new version.") except OSError as e: logging.error(f"Failed to replace original file: {e}") else: # Different extensions (e.g. mkv -> mp4). Delete original. try: input_path.unlink() logging.info(f"DELETE: Removed original file {input_path.name}") except OSError as e: logging.error(f"Failed to delete original file: {e}") else: logging.error(f"FFmpeg failed for {input_path.name}. See log at {ffmpeg_log_file}") except Exception as e: logging.exception(f"Unexpected error during transcoding of {input_path.name}") except KeyboardInterrupt: logging.warning("Interrupted by user during transcoding.") sys.exit(0) class NewFileHandler(FileSystemEventHandler): def __init__(self, output_dir=None, skip_av1=True, replace_mode=False): self.output_dir = Path(output_dir) if output_dir else None self.skip_av1 = skip_av1 self.replace_mode = replace_mode def on_created(self, event): if event.is_directory: return logging.debug(f"Watchdog event (created): {event.src_path}") self.process(event.src_path) def on_moved(self, event): if event.is_directory: return logging.debug(f"Watchdog event (moved): {event.dest_path}") self.process(event.dest_path) def process(self, file_path_str): input_path = Path(file_path_str) # Filter for video extensions if input_path.suffix.lower() not in VIDEO_EXTENSIONS: logging.debug(f"Ignored non-video file: {input_path.name}") return output_path = None if not self.replace_mode and self.output_dir: new_filename = input_path.stem + ".mp4" output_path = self.output_dir / new_filename transcode_file(input_path, output_path, skip_av1=self.skip_av1, replace_mode=self.replace_mode) def main(): setup_logging() config = load_config() parser = argparse.ArgumentParser(description="Nvidia AV1 Transcoder & Watcher") parser.add_argument("--input", type=str, help="Single file to transcode (Overrides watch mode)") parser.add_argument("--watch-dir", type=str, help="Directory to monitor for new files") parser.add_argument("--output-dir", type=str, help="Output directory") parser.add_argument("--no-skip-av1", action="store_true", help="Force transcoding even if input is already AV1") parser.add_argument("--replace", action="store_true", help="Replace original files with transcoded versions (Ignores --output-dir)") # Set defaults from config # We accept both hyphenated and underscore keys from JSON for user convenience default_watch = config.get("watch-dir") or config.get("watch_dir") default_output = config.get("output-dir") or config.get("output_dir") default_no_skip = config.get("no-skip-av1") or config.get("no_skip_av1") or False default_replace = config.get("replace") or False parser.set_defaults( watch_dir=default_watch, output_dir=default_output, no_skip_av1=default_no_skip, replace=default_replace ) args = parser.parse_args() skip_av1 = not args.no_skip_av1 # --- Initial Policy Logging --- if args.replace: logging.info("Policy: REPLACE mode enabled. Original files will be overwritten/deleted.") if args.output_dir: logging.warning("Warning: --output-dir is specified but will be IGNORED due to --replace mode.") if args.no_skip_av1: logging.info("Policy: Force transcoding all files (including AV1).") else: logging.info("Policy: Skipping files that are already AV1.") # --- Mode 1: Single File --- if args.input: input_path = Path(args.input) if not input_path.exists(): logging.critical(f"Input file '{args.input}' not found.") sys.exit(1) output_path = None if not args.replace: if args.output_dir: out_dir = Path(args.output_dir) out_dir.mkdir(parents=True, exist_ok=True) output_path = out_dir / (input_path.stem + ".mp4") else: output_path = input_path.parent / (input_path.stem + "_av1.mp4") transcode_file(input_path, output_path, skip_av1=skip_av1, replace_mode=args.replace) return # --- Mode 2: Watch Directory --- if args.watch_dir: # If replace mode is OFF, output-dir is required. if not args.replace and not args.output_dir: logging.critical("Output directory is not specified in CLI (--output-dir) or Config.") logging.critical("Either specify --output-dir OR enable --replace mode.") sys.exit(1) watch_dir = Path(args.watch_dir) if not watch_dir.exists(): logging.critical(f"Watch directory '{watch_dir}' does not exist.") sys.exit(1) logging.info(f"Monitoring {watch_dir}...") output_dir_path = None if not args.replace: output_dir_path = Path(args.output_dir) output_dir_path.mkdir(parents=True, exist_ok=True) logging.info(f"Outputting to {output_dir_path}") else: logging.info("Outputting in-place (replacing originals).") logging.info("Press Ctrl+C to stop.") event_handler = NewFileHandler(output_dir_path, skip_av1=skip_av1, replace_mode=args.replace) observer = Observer() observer.schedule(event_handler, str(watch_dir), recursive=False) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: logging.info("Stopping observer...") observer.stop() observer.join() else: logging.critical("No operation mode selected.") logging.critical("Please provide --input for single file OR configure watch-dir via CLI or Config.") sys.exit(1) if __name__ == "__main__": main()