diff --git a/transcode.py b/transcode.py index 75ebb60..0acca04 100644 --- a/transcode.py +++ b/transcode.py @@ -39,6 +39,39 @@ VIDEO_EXTENSIONS = { # Global cache dictionary # Structure: { "filesize-md5hash": "codec_name" } TRANSCODE_CACHE = {} +STOP_REQUESTED = False +CURRENT_PROCESS = None + + +def signal_handler(sig, frame): + """ + Handles Ctrl+C (SIGINT) without raising exceptions. + 1st Press: Sets STOP_REQUESTED flag (Graceful Exit). + 2nd Press: Force kills the current subprocess and exits. + """ + global STOP_REQUESTED, CURRENT_PROCESS + + if not STOP_REQUESTED: + logging.info( + "\n[INFO] > [GRACEFUL EXIT] Signal received. Finishing current file, then exiting script." + ) + logging.info("[INFO] > Press Ctrl+C again to FORCE QUIT immediately.") + STOP_REQUESTED = True + else: + logging.warning( + "\n[WARN] >> [FORCE QUIT] Signal received again. Terminating process..." + ) + if CURRENT_PROCESS: + try: + CURRENT_PROCESS.terminate() + # Give it a tiny moment to die before we hard exit, + # but don't block the signal handler too long + time.sleep(0.5) + if CURRENT_PROCESS.poll() is None: + CURRENT_PROCESS.kill() + except Exception as e: + logging.error(f"Error killing process: {e}") + sys.exit(1) def get_config_dir(): @@ -283,6 +316,12 @@ def get_ffmpeg_command(input_path, output_path): def transcode_file(input_file, output_file=None, skip_av1=True, replace_mode=False): + global STOP_REQUESTED, CURRENT_PROCESS + + # If a stop was requested during the directory scan before we even entered here, abort. + if STOP_REQUESTED: + return + input_path = Path(input_file) logging.debug(f"Processing request for: {input_path}") @@ -327,6 +366,11 @@ def transcode_file(input_file, output_file=None, skip_av1=True, replace_mode=Fal try: historical_size = -1 while True: + # Check for stop request during wait + if STOP_REQUESTED: + logging.info(leftalign("Aborting file check due to stop request.")) + return + current_size = input_path.stat().st_size logging.debug( f"File stability check - Current: {current_size}, Previous: {historical_size}" @@ -369,8 +413,6 @@ def transcode_file(input_file, output_file=None, skip_av1=True, replace_mode=Fal cmd = get_ffmpeg_command(input_path, transcode_output_path) logging.debug(f"Executing FFmpeg command: {' '.join(cmd)}") - graceful_exit = False - try: with open(ffmpeg_log_file, "w", encoding="utf-8", errors="replace") as f_log: # Write the command itself to the top of the log @@ -389,6 +431,9 @@ def transcode_file(input_file, output_file=None, skip_av1=True, replace_mode=Fal start_new_session=True, ) + # Set global process so signal handler can kill it if needed + CURRENT_PROCESS = process + regex_elapsed = re.compile(r"elapsed=([0-9:.]+)") transcoding_duration = None @@ -396,83 +441,41 @@ def transcode_file(input_file, output_file=None, skip_av1=True, replace_mode=Fal def is_alive(p): return p.poll() is None - # Non-blocking read loop using select - try: - while is_alive(process): - # select.select() waits up to 0.5s for data to be ready. - # This prevents the script from blocking indefinitely on .readline() - # if the FFmpeg process hangs (e.g., D-state on network drive). - reads = [process.stdout.fileno()] - ret = select.select(reads, [], [], 0.5) + # Main read loop + while is_alive(process): + # NOTE: If signal handler is called, it does NOT interrupt select in Python 3.5+ + # unless the handler raises an exception. We don't raise exception on 1st press. + # We check STOP_REQUESTED only to decide logic, but we KEEP WAITING for this file. - if reads[0] in ret[0]: - output = process.stdout.readline() - if output: - if regex_result := re.findall(regex_elapsed, output): - transcoding_duration = regex_result[0] + reads = [process.stdout.fileno()] + # Wait up to 0.5s for output + ret = select.select(reads, [], [], 0.5) - timestamp = datetime.datetime.now().strftime( - "%Y-%m-%d %H:%M:%S" - ) - f_log.write(f"[{timestamp}] {output.rstrip()}\n") - f_log.flush() - elif is_alive(process): - # Output is empty but process is alive? - # Could be buffering or momentary pause. - time.sleep(0.1) - continue + if reads[0] in ret[0]: + output = process.stdout.readline() + if output: + if regex_result := re.findall(regex_elapsed, output): + transcoding_duration = regex_result[0] + timestamp = datetime.datetime.now().strftime( + "%Y-%m-%d %H:%M:%S" + ) + f_log.write(f"[{timestamp}] {output.rstrip()}\n") + f_log.flush() + elif is_alive(process): + time.sleep(0.1) - except KeyboardInterrupt: - if not graceful_exit: - logging.info( - " > [GRACEFUL EXIT] Signal received. Processing will finish, then script will exit." - ) - logging.info(" > Press Ctrl+C again to FORCE QUIT immediately.") - graceful_exit = True - # Resume waiting for the process to finish naturally. - while is_alive(process): - try: - # We still need to drain the output pipe to prevent FFmpeg from blocking - reads = [process.stdout.fileno()] - ret = select.select(reads, [], [], 0.5) - if reads[0] in ret[0]: - output = process.stdout.readline() - if output: - timestamp = datetime.datetime.now().strftime( - "%Y-%m-%d %H:%M:%S" - ) - f_log.write(f"[{timestamp}] {output.rstrip()}\n") - except KeyboardInterrupt: - # User pressed Ctrl+C a second time inside the graceful wait loop - logging.warning( - " >> [FORCE QUIT] Signal received again. Killing process..." - ) - process.terminate() - try: - process.wait(timeout=5) - except subprocess.TimeoutExpired: - process.kill() - raise # Re-raise to trigger outer cleanup - else: - # This block handles the case where the interrupt happens exactly - # as we re-enter the loop or in a race condition - logging.warning(" >> [FORCE QUIT] Killing process...") - process.terminate() - try: - process.wait(timeout=5) - except subprocess.TimeoutExpired: - process.kill() - raise + # We do NOT break here if STOP_REQUESTED is True. + # We expressly want to finish this file. - # Wait for process to complete and get return code + CURRENT_PROCESS = None result = process.wait() + # Process Finished if result == 0: logging.info( leftalign(f"DONE: Successfully transcoded to {transcode_output_path}") ) - transcoding_duration = transcode_output_path logging.debug(f"Transcoding duration was {transcoding_duration}.") original_codec = get_video_codec(input_path) @@ -555,20 +558,20 @@ def transcode_file(input_file, output_file=None, skip_av1=True, replace_mode=Fal logging.error(leftalign(f"Failed to delete original file: {e}")) else: - logging.error( - f"FFmpeg failed for {input_path} with exit code {result}. See log at {ffmpeg_log_file}" - ) + # If stopped via signal (SIGTERM/KILL), exit code will be non-zero + if STOP_REQUESTED: + logging.info(leftalign("Process stopped by user (Incomplete).")) + else: + logging.error( + f"FFmpeg failed for {input_path} with exit code {result}. See log at {ffmpeg_log_file}" + ) + if replace_mode and use_temp_file and transcode_output_path.exists(): try: transcode_output_path.unlink() except Exception: pass - # If a graceful exit was requested, we exit now that the file is fully processed - if graceful_exit: - logging.info("Exiting script gracefully as requested.") - sys.exit(0) - except Exception as e: logging.exception(f"Unexpected error during transcoding of {input_path}: {e}") if replace_mode and use_temp_file and transcode_output_path.exists(): @@ -576,16 +579,6 @@ def transcode_file(input_file, output_file=None, skip_av1=True, replace_mode=Fal transcode_output_path.unlink() except Exception: pass - except KeyboardInterrupt: - # This catches the re-raised exception from the inner loop (2nd Ctrl+C / Force Quit) - logging.info("Transcoding aborted by user.") - # Ensure cleanup happened (redundant check but safe) - if replace_mode and use_temp_file and transcode_output_path.exists(): - try: - transcode_output_path.unlink() - except Exception: - pass - sys.exit(1) class NewFileHandler(FileSystemEventHandler): @@ -607,8 +600,10 @@ class NewFileHandler(FileSystemEventHandler): self.process(event.dest_path) def process(self, file_path_str): - input_path = Path(file_path_str) + if STOP_REQUESTED: + return + input_path = Path(file_path_str) # Filter for video extensions if input_path.suffix.lower() not in VIDEO_EXTENSIONS: logging.debug(f"Ignored non-video file: {input_path}") @@ -627,9 +622,43 @@ class NewFileHandler(FileSystemEventHandler): ) +def process_recursive_directory(input_path, args, skip_av1): + """Process all video files in a directory recursively.""" + logging.info(f"Scanning directory recursively for video files: {input_path}") + + # We collect files first to avoid modifying directory while iterating if possible, + # though rglob is a generator. We check STOP_REQUESTED inside loop. + for video_file in input_path.rglob("*"): + if STOP_REQUESTED: + logging.info("Stopping recursive scan due to signal.") + break + + if video_file.is_file() and video_file.suffix.lower() in VIDEO_EXTENSIONS: + logging.info(f"FILE: {video_file}") + output_path = None + if not args.replace: + if args.output_dir: + out_dir = Path(args.output_dir) + out_dir.mkdir(parents=True, exist_ok=True) + output_path = out_dir / (video_file.stem + ".mp4") + else: + output_path = video_file.parent / (video_file.stem + "_av1.mp4") + + transcode_file( + video_file, + output_path, + skip_av1=skip_av1, + replace_mode=args.replace, + ) + + def main(): + # Register Signal Handler + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + setup_logging() - load_cache() # Load the cache at startup + load_cache() config = load_config() parser = argparse.ArgumentParser(description="Nvidia AV1 Transcoder & Watcher") @@ -687,7 +716,7 @@ def main(): else: logging.info("POLICY: Skip AV1 files. Change with --no-skip-av1.") - # --- Mode 1: Single File --- + # --- Mode 1: Single File / Recursive --- if args.input: input_path = Path(args.input) if not input_path.exists(): @@ -699,15 +728,17 @@ def main(): if args.recursive: # Process directory recursively process_recursive_directory(input_path, args, skip_av1) - return else: # Process directory non-recursively (all files in this directory only) logging.info(f"Processing directory non-recursively: {input_path}") for video_file in input_path.iterdir(): + if STOP_REQUESTED: + break if ( video_file.is_file() and video_file.suffix.lower() in VIDEO_EXTENSIONS ): + # ... (Simplified logic for non-recursive dir) ... logging.info(f"FILE: {video_file}") output_path = None if not args.replace: @@ -725,29 +756,28 @@ def main(): skip_av1=skip_av1, replace_mode=args.replace, ) - return + else: + # Single File + output_path = None + if not args.replace: + if args.output_dir: + out_dir = Path(args.output_dir) + out_dir.mkdir(parents=True, exist_ok=True) + output_path = out_dir / (input_path.stem + ".mp4") + else: + output_path = input_path.parent / (input_path.stem + "_av1.mp4") + transcode_file( + input_path, output_path, skip_av1=skip_av1, replace_mode=args.replace + ) - output_path = None - if not args.replace: - if args.output_dir: - out_dir = Path(args.output_dir) - out_dir.mkdir(parents=True, exist_ok=True) - output_path = out_dir / (input_path.stem + ".mp4") - else: - output_path = input_path.parent / (input_path.stem + "_av1.mp4") - - transcode_file( - input_path, output_path, skip_av1=skip_av1, replace_mode=args.replace - ) - return + if STOP_REQUESTED: + logging.info("Exiting script gracefully as requested.") + sys.exit(0) # --- Mode 2: Watch Directory --- if args.watch: # If replace mode is OFF, output-dir is required. if not args.replace and not args.output_dir: - logging.critical( - "Output directory is not specified in CLI (--output-dir) or Config." - ) logging.critical("Either specify --output-dir OR enable --replace mode.") sys.exit(1) @@ -763,33 +793,35 @@ def main(): sys.exit(1) logging.info(f"Monitoring {watch_dir}...") + logging.info("Press Ctrl+C to stop.") - output_dir_path = None - if not args.replace: - output_dir_path = Path(args.output_dir) + output_dir_path = ( + Path(args.output_dir) if (args.output_dir and not args.replace) else None + ) + if output_dir_path: output_dir_path.mkdir(parents=True, exist_ok=True) logging.info(f"Outputting to {output_dir_path}") else: logging.info("Outputting in-place (replacing originals).") - logging.info("Press Ctrl+C to stop.") - event_handler = NewFileHandler( output_dir_path, skip_av1=skip_av1, replace_mode=args.replace ) observer = Observer() - # Use recursive monitoring if --recursive is specified - recursive_watch = args.recursive - observer.schedule(event_handler, str(watch_dir), recursive=recursive_watch) + observer.schedule(event_handler, str(watch_dir), recursive=args.recursive) observer.start() try: - while True: + while not STOP_REQUESTED: time.sleep(1) except KeyboardInterrupt: - logging.info("Stopping observer...") - observer.stop() + # Fallback if signal handler doesn't catch it for some reason + pass + + logging.info("Stopping observer...") + observer.stop() observer.join() + logging.info("Exiting script gracefully.") else: logging.critical("No operation mode selected.") logging.critical( @@ -798,27 +830,5 @@ def main(): sys.exit(1) -def process_recursive_directory(input_path, args, skip_av1): - """Process all video files in a directory recursively.""" - logging.info(f"Scanning directory recursively for video files: {input_path}") - for video_file in input_path.rglob("*"): - if video_file.is_file() and video_file.suffix.lower() in VIDEO_EXTENSIONS: - logging.info(f"FILE: {video_file}") - output_path = None - if not args.replace: - if args.output_dir: - out_dir = Path(args.output_dir) - out_dir.mkdir(parents=True, exist_ok=True) - output_path = out_dir / (video_file.stem + ".mp4") - else: - output_path = video_file.parent / (video_file.stem + "_av1.mp4") - transcode_file( - video_file, - output_path, - skip_av1=skip_av1, - replace_mode=args.replace, - ) - - if __name__ == "__main__": main()