From 31a50a09a8d9b80f5380a6da6c1fb4f503da7c32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Kucharczyk?= Date: Thu, 4 Dec 2025 15:26:28 +0100 Subject: [PATCH] transcode.py: make it possible to skip AV1 --- transcode.py | 53 +++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 5 deletions(-) diff --git a/transcode.py b/transcode.py index da33dc3..5b455a5 100644 --- a/transcode.py +++ b/transcode.py @@ -53,6 +53,33 @@ def setup_logging(): logging.info(f"Logging started. Verbose logs at: {log_file}") +def get_video_codec(input_path): + """ + Uses ffprobe to determine the codec of the first video stream. + Returns the codec name (e.g., 'av1', 'h264', 'hevc') or None if detection fails. + """ + cmd = [ + "ffprobe", + "-v", "error", + "-select_streams", "v:0", + "-show_entries", "stream=codec_name", + "-of", "default=noprint_wrappers=1:nokey=1", + str(input_path) + ] + try: + # We don't log this subprocess call to avoid spamming debug logs, + # unless it fails. + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + codec = result.stdout.strip() + logging.debug(f"Detected codec for {input_path.name}: {codec}") + return codec + except subprocess.CalledProcessError as e: + logging.warning(f"Failed to probe codec for {input_path.name}: {e}") + return None + except FileNotFoundError: + logging.error("ffprobe not found. Please ensure ffmpeg/ffprobe is installed.") + return None + def get_ffmpeg_command(input_path, output_path): """ Constructs the FFmpeg command based on the Handbrake preset requirements. @@ -77,7 +104,7 @@ def get_ffmpeg_command(input_path, output_path): ] return cmd -def transcode_file(input_file, output_file): +def transcode_file(input_file, output_file, skip_av1=True): input_path = Path(input_file) output_path = Path(output_file) @@ -106,6 +133,13 @@ def transcode_file(input_file, output_file): logging.warning(f"File vanished during checks: {input_path.name}") return + # 3. Check Codec (Optional Skip) + if skip_av1: + codec = get_video_codec(input_path) + if codec == 'av1': + logging.info(f"SKIP: Input file is already AV1: {input_path.name}") + return + # Prepare individual FFmpeg log file ffmpeg_logs_dir = get_config_dir() / "ffmpeg_logs" ffmpeg_logs_dir.mkdir(parents=True, exist_ok=True) @@ -140,8 +174,9 @@ def transcode_file(input_file, output_file): sys.exit(0) class NewFileHandler(FileSystemEventHandler): - def __init__(self, output_dir): + def __init__(self, output_dir, skip_av1=True): self.output_dir = Path(output_dir) + self.skip_av1 = skip_av1 def on_created(self, event): if event.is_directory: @@ -166,7 +201,7 @@ class NewFileHandler(FileSystemEventHandler): new_filename = input_path.stem + ".mp4" output_path = self.output_dir / new_filename - transcode_file(input_path, output_path) + transcode_file(input_path, output_path, skip_av1=self.skip_av1) def main(): setup_logging() @@ -178,9 +213,13 @@ def main(): group.add_argument("--watch-dir", type=str, help="Directory to monitor for new files") parser.add_argument("--output-dir", type=str, help="Output directory (Required for --watch-dir)") + parser.add_argument("--no-skip-av1", action="store_true", help="Force transcoding even if input is already AV1") args = parser.parse_args() + # Determine skip policy + skip_av1 = not args.no_skip_av1 + # --- Mode 1: Single File --- if args.input: input_path = Path(args.input) @@ -195,7 +234,7 @@ def main(): else: output_path = input_path.parent / (input_path.stem + "_av1.mp4") - transcode_file(input_path, output_path) + transcode_file(input_path, output_path, skip_av1=skip_av1) # --- Mode 2: Watch Directory --- elif args.watch_dir: @@ -214,9 +253,13 @@ def main(): logging.info(f"Monitoring {watch_dir}...") logging.info(f"Outputting to {output_dir}") + if skip_av1: + logging.info("Policy: Skipping files that are already AV1.") + else: + logging.info("Policy: Force transcoding all files (including AV1).") logging.info("Press Ctrl+C to stop.") - event_handler = NewFileHandler(output_dir) + event_handler = NewFileHandler(output_dir, skip_av1=skip_av1) observer = Observer() observer.schedule(event_handler, str(watch_dir), recursive=False) observer.start()