# /// script # requires-python = ">=3.13" # dependencies = [ # "ffmpeg", # "watchdog", # ] # /// import argparse import csv import datetime import hashlib import json import logging import os import re import select import signal import subprocess import sys import time from pathlib import Path from watchdog.events import FileSystemEventHandler from watchdog.observers import Observer # Supported video extensions to monitor VIDEO_EXTENSIONS = { ".mkv", ".mp4", ".avi", ".mov", ".wmv", ".flv", ".webm", ".m4v", ".ts", } # Global cache dictionary # Structure: { "filesize-md5hash": "codec_name" } TRANSCODE_CACHE = {} def get_config_dir(): """Returns the main configuration directory path.""" xdg_config = os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")) return Path(xdg_config) / "transcoder" def get_cache_file(): """Returns the path to the cache file.""" return get_config_dir() / "cache.json" def load_cache(): """Loads the codec cache from disk into the global variable.""" global TRANSCODE_CACHE cache_path = get_cache_file() if cache_path.exists(): try: with open(cache_path, "r") as f: TRANSCODE_CACHE = json.load(f) logging.debug(f"Loaded {len(TRANSCODE_CACHE)} entries from cache.") except Exception as e: logging.error(f"Failed to load cache: {e}") TRANSCODE_CACHE = {} else: TRANSCODE_CACHE = {} def save_cache(): """Saves the global cache to disk.""" cache_path = get_cache_file() try: with open(cache_path, "w") as f: json.dump(TRANSCODE_CACHE, f, indent=2) except Exception as e: logging.warning(f"Failed to save cache: {e}") def get_file_signature(file_path): """ Generates a robust signature for the file based on size and header hash. This allows detection even if filename or modification date changes. """ try: stat = file_path.stat() file_size = stat.st_size # Read the first 32KB of the file to generate a partial hash # This is extremely fast but unique enough for video files with open(file_path, "rb") as f: header = f.read(32 * 1024) header_hash = hashlib.md5(header).hexdigest() return f"{file_size}-{header_hash}" except Exception as e: logging.debug(f"Could not generate signature for {file_path}: {e}") return None def update_cache_entry(file_path, codec): """Updates the cache for a specific file using its signature.""" signature = get_file_signature(file_path) if signature: TRANSCODE_CACHE[signature] = codec save_cache() def setup_logging(): """ Sets up logging to both console (INFO) and file (DEBUG/Verbose). Log file is stored in $XDG_CONFIG_HOME/transcoder/transcoder.log """ log_dir = get_config_dir() log_dir.mkdir(parents=True, exist_ok=True) log_file = log_dir / "transcoder.log" # Create logger logger = logging.getLogger() logger.setLevel(logging.DEBUG) # Capture everything # formatter formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") # File Handler (Verbose) file_handler = logging.FileHandler(log_file) file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(formatter) logger.addHandler(file_handler) # Console Handler (Concise) console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(logging.INFO) console_handler.setFormatter(logging.Formatter("[%(levelname)s] %(message)s")) logger.addHandler(console_handler) logging.info(f"Logging started. Verbose logs at: {log_file}") def load_config(): """ Loads configuration from config.json in the config directory. Returns a dictionary of config values. """ config_path = get_config_dir() / "config.json" if not config_path.exists(): return {} try: with open(config_path, "r") as f: config = json.load(f) logging.debug(f"Loaded config from {config_path}: {config}") return config except Exception as e: logging.error(f"Failed to load config file: {e}") return {} def _run_ffprobe(cmd): """ Helper function to run ffprobe commands with consistent error handling. """ cmd_base = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", "-select_streams", "v:0", ] try: result = subprocess.run( cmd_base + cmd, capture_output=True, text=True, check=True ) if result.stdout.strip(): return json.loads(result.stdout.strip()) else: return json.loads("{}") except subprocess.CalledProcessError as e: logging.warning(f"ffprobe command failed: {e}") return None except FileNotFoundError: logging.error("ffprobe not found. Please ensure ffmpeg/ffprobe is installed.") return None def get_video_codec(file_path): """ Uses ffprobe to determine the codec of the first video stream. Checks memory cache first using robust file signatures. Returns the codec name (e.g., 'av1', 'h264', 'hevc') or None if detection fails. """ try: # 1. Check Cache using Signature signature = get_file_signature(file_path) if signature and signature in TRANSCODE_CACHE: codec = TRANSCODE_CACHE[signature] logging.debug( f"Cache hit for {file_path} (sig: {signature[:10]}...): {codec}" ) return codec # 2. Run FFprobe (Cache Miss) ffprobe_data = _run_ffprobe([str(file_path)]) if ffprobe_data: codec = ffprobe_data["streams"][0]["codec_name"] logging.debug(f"Detected codec for {file_path}: {codec}") # 3. Update Cache if signature: TRANSCODE_CACHE[signature] = codec save_cache() return codec return None except Exception as e: logging.warning(f"Failed to probe codec for {file_path}: {e}") return None def get_video_duration(file_path): """ Get video duration using ffprobe. Returns duration in seconds as a float, or 0.0 if error occurs. """ try: ffprobe_data = _run_ffprobe([str(file_path)]) if ffprobe_data: duration = float(ffprobe_data["streams"][0]["duration"]) # format to minutes with "m" suffix duration_formatted = f"{duration / 60:.0f}m" logging.debug(f"Detected duration for {file_path}: {duration_formatted}") return duration_formatted return "0m" except (ValueError, Exception) as e: logging.warning(f"Could not get duration for {file_path}: {e}") return "0m" def leftalign(str): return 5 * " " + str def get_ffmpeg_command(input_path, output_path): """ Constructs the FFmpeg command based on the Handbrake preset requirements. """ cmd = [ "ffmpeg", "-n", # Never overwrite output files "-i", str(input_path), # Input file "-c:v", "av1_nvenc", # Video Encoder "-pix_fmt", "p010le", # 10-bit color "-preset", "p4", # Medium preset "-rc", "vbr", # Variable Bit Rate control "-cq", "35", # Constant Quality factor # Filter chain: Deinterlace -> Scale down if >1080p -> Cap FPS at 30 "-vf", "yadif,scale='min(1920,iw)':-2,fps=30", "-c:a", "aac", # Audio Encoder "-b:a", "160k", # Audio Bitrate "-ac", "2", # Audio Channels (Stereo) "-color_range", "tv", # Limited color range "-movflags", "+faststart", # Web optimization str(output_path), # Output file ] return cmd def transcode_file(input_file, output_file=None, skip_av1=True, replace_mode=False): input_path = Path(input_file) logging.debug(f"Processing request for: {input_path}") target_path = None use_temp_file = False if replace_mode: target_path = input_path.with_suffix(".mp4") # If the target is the same as input (e.g. input is already .mp4), use a temp file if target_path == input_path: use_temp_file = True transcode_output_path = input_path.with_suffix(".tmp.mp4") logging.debug( f"Input and output filenames match. Using temp file: {transcode_output_path}" ) else: transcode_output_path = target_path else: if not output_file: logging.error("No output file specified and not in replace mode.") return target_path = Path(output_file) transcode_output_path = target_path logging.debug(f"Final transcode target: {transcode_output_path}") # 1. Check existence logic if transcode_output_path.exists(): logging.info( leftalign(f"SKIP: Output file already exists: {transcode_output_path}") ) return # 2. Check if file is ready (simple size stability check) if not input_path.exists(): logging.warning(f"File vanished during checks: {input_path}") return logging.info(leftalign("WAIT: Ensuring file is ready...")) try: historical_size = -1 while True: current_size = input_path.stat().st_size logging.debug( f"File stability check - Current: {current_size}, Previous: {historical_size}" ) if current_size == historical_size and current_size > 0: logging.debug("File size stable. Proceeding.") break historical_size = current_size time.sleep(2) except FileNotFoundError: logging.warning(f"File vanished during checks: {input_path}") return # 3. Check Codec (Optional Skip) if skip_av1: codec = get_video_codec(input_path) if codec == "av1": logging.info(leftalign(f"SKIP: Input file is already AV1: {input_path}")) return else: logging.info(leftalign(f"Codec is {codec}")) # Prepare individual FFmpeg log file ffmpeg_logs_dir = get_config_dir() / "ffmpeg_logs" ffmpeg_logs_dir.mkdir(parents=True, exist_ok=True) logfile_timestamp = datetime.datetime.now().strftime("%Y-%m-%d-%H:%M:%S") # We also append .log to the full filename to avoid collisions (e.g. video.mp4.log) ffmpeg_log_file = ffmpeg_logs_dir / f"{logfile_timestamp}_{input_path.name}.log" logging.info(leftalign("START: Transcoding...")) if replace_mode: logging.info( leftalign(f"Outputting to temporary file: {transcode_output_path}") ) logging.info(leftalign(f"FFmpeg details logging to: {ffmpeg_log_file}")) cmd = get_ffmpeg_command(input_path, transcode_output_path) logging.debug(f"Executing FFmpeg command: {' '.join(cmd)}") graceful_exit = False try: with open(ffmpeg_log_file, "w", encoding="utf-8", errors="replace") as f_log: # Write the command itself to the top of the log f_log.write(f"COMMAND: {' '.join(cmd)}\n\n") f_log.flush() # Run FFmpeg and process output in real-time with timestamps # NOTE: start_new_session=True ensures Ctrl+C doesn't kill ffmpeg immediately # allowing us to handle the first press gracefully in Python. process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, start_new_session=True, ) regex_elapsed = re.compile(r"elapsed=([0-9:.]+)") transcoding_duration = None # Helper to check if process is alive def is_alive(p): return p.poll() is None # Non-blocking read loop using select try: while is_alive(process): # select.select() waits up to 0.5s for data to be ready. # This prevents the script from blocking indefinitely on .readline() # if the FFmpeg process hangs (e.g., D-state on network drive). reads = [process.stdout.fileno()] ret = select.select(reads, [], [], 0.5) if reads[0] in ret[0]: output = process.stdout.readline() if output: if regex_result := re.findall(regex_elapsed, output): transcoding_duration = regex_result[0] timestamp = datetime.datetime.now().strftime( "%Y-%m-%d %H:%M:%S" ) f_log.write(f"[{timestamp}] {output.rstrip()}\n") f_log.flush() elif is_alive(process): # Output is empty but process is alive? # Could be buffering or momentary pause. time.sleep(0.1) continue except KeyboardInterrupt: if not graceful_exit: logging.info( " > [GRACEFUL EXIT] Signal received. Processing will finish, then script will exit." ) logging.info(" > Press Ctrl+C again to FORCE QUIT immediately.") graceful_exit = True # Resume waiting for the process to finish naturally. while is_alive(process): try: # We still need to drain the output pipe to prevent FFmpeg from blocking reads = [process.stdout.fileno()] ret = select.select(reads, [], [], 0.5) if reads[0] in ret[0]: output = process.stdout.readline() if output: timestamp = datetime.datetime.now().strftime( "%Y-%m-%d %H:%M:%S" ) f_log.write(f"[{timestamp}] {output.rstrip()}\n") except KeyboardInterrupt: # User pressed Ctrl+C a second time inside the graceful wait loop logging.warning( " >> [FORCE QUIT] Signal received again. Killing process..." ) process.terminate() try: process.wait(timeout=5) except subprocess.TimeoutExpired: process.kill() raise # Re-raise to trigger outer cleanup else: # This block handles the case where the interrupt happens exactly # as we re-enter the loop or in a race condition logging.warning(" >> [FORCE QUIT] Killing process...") process.terminate() try: process.wait(timeout=5) except subprocess.TimeoutExpired: process.kill() raise # Wait for process to complete and get return code result = process.wait() if result == 0: logging.info( leftalign(f"DONE: Successfully transcoded to {transcode_output_path}") ) transcoding_duration = transcode_output_path logging.debug(f"Transcoding duration was {transcoding_duration}.") original_codec = get_video_codec(input_path) # Update cache for the NEW file (which is now AV1) # This ensures if we restart, we know this new file is already done if replace_mode and use_temp_file: # We replaced the input file with the temp file. # Update signature for the new file content update_cache_entry(input_path, "av1") transcoded_codec = get_video_codec(transcode_output_path) video_duration = get_video_duration(input_path) # Safe size check try: original_size = f"{input_path.stat().st_size / (1024 * 1024):.0f}MB" except FileNotFoundError: original_size = "0MB" transcoded_size = ( f"{transcode_output_path.stat().st_size / (1024 * 1024):.0f}MB" ) logging.debug(f"Original file size: {original_size}") logging.debug(f"Transcoded file size: {transcoded_size}") # Write to stats CSV file stats_file = get_config_dir() / "stats.csv" file_exists = stats_file.exists() with open(stats_file, "a", newline="", encoding="utf-8") as csvfile: writer = csv.writer(csvfile) # Write header if file doesn't exist if not file_exists: writer.writerow( [ "filename", "original_size", "transcoded_size", "original_codec", "transcoded_codec", "video_duration", "transcoding_duration", ] ) writer.writerow( [ input_path.name, original_size, transcoded_size, original_codec, transcoded_codec, video_duration, transcoding_duration, ] ) if replace_mode: if use_temp_file: # Rename temp file to overwrite original try: transcode_output_path.replace(input_path) logging.info(leftalign("REPLACE: Overwrote original file.")) # IMPORTANT: Update cache again because file content at input_path changed update_cache_entry(input_path, "av1") except OSError as e: logging.error( leftalign(f"Failed to replace original file: {e}") ) else: # Different extensions (e.g. mkv -> mp4). Delete original. try: input_path.unlink() logging.info(leftalign("DELETE: Removed original file.")) except OSError as e: logging.error(leftalign(f"Failed to delete original file: {e}")) else: logging.error( f"FFmpeg failed for {input_path} with exit code {result}. See log at {ffmpeg_log_file}" ) if replace_mode and use_temp_file and transcode_output_path.exists(): try: transcode_output_path.unlink() except Exception: pass # If a graceful exit was requested, we exit now that the file is fully processed if graceful_exit: logging.info("Exiting script gracefully as requested.") sys.exit(0) except Exception as e: logging.exception(f"Unexpected error during transcoding of {input_path}: {e}") if replace_mode and use_temp_file and transcode_output_path.exists(): try: transcode_output_path.unlink() except Exception: pass except KeyboardInterrupt: # This catches the re-raised exception from the inner loop (2nd Ctrl+C / Force Quit) logging.info("Transcoding aborted by user.") # Ensure cleanup happened (redundant check but safe) if replace_mode and use_temp_file and transcode_output_path.exists(): try: transcode_output_path.unlink() except Exception: pass sys.exit(1) class NewFileHandler(FileSystemEventHandler): def __init__(self, output_dir=None, skip_av1=True, replace_mode=False): self.output_dir = Path(output_dir) if output_dir else None self.skip_av1 = skip_av1 self.replace_mode = replace_mode def on_created(self, event): if event.is_directory: return logging.debug(f"Watchdog event (created): {event.src_path}") self.process(event.src_path) def on_moved(self, event): if event.is_directory: return logging.debug(f"Watchdog event (moved): {event.dest_path}") self.process(event.dest_path) def process(self, file_path_str): input_path = Path(file_path_str) # Filter for video extensions if input_path.suffix.lower() not in VIDEO_EXTENSIONS: logging.debug(f"Ignored non-video file: {input_path}") return output_path = None if not self.replace_mode and self.output_dir: new_filename = input_path.stem + ".mp4" output_path = self.output_dir / new_filename transcode_file( input_path, output_path, skip_av1=self.skip_av1, replace_mode=self.replace_mode, ) def main(): setup_logging() load_cache() # Load the cache at startup config = load_config() parser = argparse.ArgumentParser(description="Nvidia AV1 Transcoder & Watcher") parser.add_argument("--input", type=str, help="File or directory to transcode") parser.add_argument( "--watch", action="store_true", help="Enable filesystem watching mode" ) parser.add_argument("--output-dir", type=str, help="Output directory") parser.add_argument( "--no-skip-av1", action="store_true", help="Force transcoding even if input is already AV1", ) parser.add_argument( "--replace", action="store_true", help="Replace original files with transcoded versions (Ignores --output-dir)", ) parser.add_argument( "--recursive", action="store_true", help="Recursively scan directories for input files", ) # Set defaults from config default_watch = config.get("watch") or False default_output = config.get("output-dir") or config.get("output_dir") default_no_skip = config.get("no-skip-av1") or config.get("no_skip_av1") or False default_replace = config.get("replace") or False default_recursive = config.get("recursive") or False parser.set_defaults( watch=default_watch, output_dir=default_output, no_skip_av1=default_no_skip, replace=default_replace, recursive=default_recursive, ) args = parser.parse_args() skip_av1 = not args.no_skip_av1 # --- Initial Policy Logging --- if args.replace: logging.info("POLICY: --replace: Original files will be overwritten/deleted.") if args.output_dir: logging.warning( "--output-dir is specified but will be IGNORED due to --replace mode." ) if args.no_skip_av1: logging.info("POLICY: --no-skip-av1: Do not skip AV1 files.") else: logging.info("POLICY: Skip AV1 files. Change with --no-skip-av1.") # --- Mode 1: Single File --- if args.input: input_path = Path(args.input) if not input_path.exists(): logging.critical(f"Input file '{args.input}' not found.") sys.exit(1) # If input is a directory, process accordingly if input_path.is_dir(): if args.recursive: # Process directory recursively process_recursive_directory(input_path, args, skip_av1) return else: # Process directory non-recursively (all files in this directory only) logging.info(f"Processing directory non-recursively: {input_path}") for video_file in input_path.iterdir(): if ( video_file.is_file() and video_file.suffix.lower() in VIDEO_EXTENSIONS ): logging.info(f"FILE: {video_file}") output_path = None if not args.replace: if args.output_dir: out_dir = Path(args.output_dir) out_dir.mkdir(parents=True, exist_ok=True) output_path = out_dir / (video_file.stem + ".mp4") else: output_path = video_file.parent / ( video_file.stem + "_av1.mp4" ) transcode_file( video_file, output_path, skip_av1=skip_av1, replace_mode=args.replace, ) return output_path = None if not args.replace: if args.output_dir: out_dir = Path(args.output_dir) out_dir.mkdir(parents=True, exist_ok=True) output_path = out_dir / (input_path.stem + ".mp4") else: output_path = input_path.parent / (input_path.stem + "_av1.mp4") transcode_file( input_path, output_path, skip_av1=skip_av1, replace_mode=args.replace ) return # --- Mode 2: Watch Directory --- if args.watch: # If replace mode is OFF, output-dir is required. if not args.replace and not args.output_dir: logging.critical( "Output directory is not specified in CLI (--output-dir) or Config." ) logging.critical("Either specify --output-dir OR enable --replace mode.") sys.exit(1) # Use --input as the watch directory if provided, otherwise default to current directory if args.input: watch_dir = Path(args.input) else: watch_dir = Path(".") logging.info("No watch directory specified, monitoring current directory.") if not watch_dir.exists(): logging.critical(f"Watch directory '{watch_dir}' does not exist.") sys.exit(1) logging.info(f"Monitoring {watch_dir}...") output_dir_path = None if not args.replace: output_dir_path = Path(args.output_dir) output_dir_path.mkdir(parents=True, exist_ok=True) logging.info(f"Outputting to {output_dir_path}") else: logging.info("Outputting in-place (replacing originals).") logging.info("Press Ctrl+C to stop.") event_handler = NewFileHandler( output_dir_path, skip_av1=skip_av1, replace_mode=args.replace ) observer = Observer() # Use recursive monitoring if --recursive is specified recursive_watch = args.recursive observer.schedule(event_handler, str(watch_dir), recursive=recursive_watch) observer.start() try: while True: time.sleep(1) except KeyboardInterrupt: logging.info("Stopping observer...") observer.stop() observer.join() else: logging.critical("No operation mode selected.") logging.critical( "Please provide --input for single file OR enable --watch mode." ) sys.exit(1) def process_recursive_directory(input_path, args, skip_av1): """Process all video files in a directory recursively.""" logging.info(f"Scanning directory recursively for video files: {input_path}") for video_file in input_path.rglob("*"): if video_file.is_file() and video_file.suffix.lower() in VIDEO_EXTENSIONS: logging.info(f"FILE: {video_file}") output_path = None if not args.replace: if args.output_dir: out_dir = Path(args.output_dir) out_dir.mkdir(parents=True, exist_ok=True) output_path = out_dir / (video_file.stem + ".mp4") else: output_path = video_file.parent / (video_file.stem + "_av1.mp4") transcode_file( video_file, output_path, skip_av1=skip_av1, replace_mode=args.replace, ) if __name__ == "__main__": main()