Source code for zyra.processing.video_processor

# SPDX-License-Identifier: Apache-2.0
import contextlib
import logging
import os
import shlex
import subprocess
from pathlib import Path
from typing import Any, Optional

try:  # Prefer standard library importlib.resources
    from importlib import resources as importlib_resources
except Exception:  # pragma: no cover - fallback for very old Python
    import importlib_resources  # type: ignore

from zyra.processing.base import DataProcessor


[docs] class VideoProcessor(DataProcessor): """Create videos from image sequences via FFmpeg. Processes image frames into a cohesive video file. Optionally overlays a static basemap beneath the frames. Requires system FFmpeg and FFprobe to be installed and accessible on PATH. Parameters ---------- input_directory : str Directory where input images are stored. output_file : str Destination path for the rendered video file. basemap : str, optional Optional background image path to overlay beneath frames. Examples -------- Render a video from PNG frames:: from zyra.processing.video_processor import VideoProcessor vp = VideoProcessor(input_directory="./frames", output_file="./out.mp4") vp.load("./frames") vp.process() vp.save("./out.mp4") """ def __init__( self, input_directory: str, output_file: str, basemap: Optional[str] = None, fps: int = 30, input_glob: Optional[str] = None, ): self.input_directory = input_directory self.output_file = output_file self.basemap = basemap self.fps = int(fps) self.input_glob = input_glob FEATURES = {"load", "process", "save", "validate"} # --- DataProcessor interface --------------------------------------------------------
[docs] def load(self, input_source: Any) -> None: """Set or update the input directory. Parameters ---------- input_source : Any Path to the directory containing input frames. Converted to str. """ self.input_directory = str(input_source)
[docs] def process(self, **kwargs: Any) -> Optional[str]: """Compile image frames into a video. Returns ------- str or None The output file path on success; ``None`` if processing failed. """ fps = int(kwargs.get("fps", self.fps)) input_glob = kwargs.get("input_glob", self.input_glob) success = self.process_video(fps=fps, input_glob=input_glob) return self.output_file if success else None
[docs] def save(self, output_path: Optional[str] = None) -> Optional[str]: """Finalize the configured output path. Parameters ---------- output_path : str, optional If provided, updates the configured output path before returning it. Returns ------- str or None The output path the processor will write or has written to. """ if output_path: self.output_file = output_path return self.output_file
[docs] def validate(self) -> bool: """Check FFmpeg/FFprobe availability. Returns ------- bool True if FFmpeg and FFprobe executables are available. """ return self.check_ffmpeg_installed()
# --- Original implementation --------------------------------------------------------
[docs] def check_ffmpeg_installed(self) -> bool: """Check that FFmpeg and FFprobe are available on PATH. Returns ------- bool True when both tools return version info without error. """ try: result_ffmpeg = subprocess.run( ["ffmpeg", "-version"], capture_output=True, text=True ) if result_ffmpeg.returncode != 0: logging.error("FFmpeg is not installed or not found in system path.") return False result_ffprobe = subprocess.run( ["ffprobe", "-version"], capture_output=True, text=True ) if result_ffprobe.returncode != 0: logging.error("FFprobe is not installed or not found in system path.") return False return True except Exception as e: logging.error(f"An error occurred while checking FFmpeg installation: {e}") return False
[docs] def process_video( self, *, fps: int | None = None, input_glob: Optional[str] = None ) -> bool: """Build the video using FFmpeg from frames in ``input_directory``. Notes ----- - Uses glob pattern matching on the first file's extension to include all frames with that extension in the directory. - When ``basemap`` is provided, overlays frames on top of the basemap. - Logs FFmpeg output lines at debug level. """ if not self.check_ffmpeg_installed(): logging.error("Cannot process video as FFmpeg is not installed.") return False try: input_dir = Path(self.input_directory) logging.debug("Scanning directory for files...") if input_glob: files = sorted(input_dir.glob(str(input_glob))) else: files = sorted( [f for f in input_dir.iterdir() if f.is_file()], key=lambda f: f.name, ) if not files: logging.error("No files found in the video input directory.") return False logging.debug(f"Found {len(files)} files.") if input_glob: input_pattern = f"{self.input_directory}/{input_glob}" file_info = f"glob='{input_glob}'" else: file_extension = files[0].suffix input_pattern = f"{self.input_directory}/*{file_extension}" file_info = f"extension: {file_extension}" logging.debug(f"Processing files with {file_info}") trace = os.environ.get("ZYRA_SHELL_TRACE") if trace: logging.info("+ frames=%s", str(len(files))) logging.info("+ pattern='%s'", input_pattern) output_path = self.output_file ffmpeg_cmd = "ffmpeg" # Resolve optional basemap; support pkg:package/resource form in addition to plain paths. basemap_path: str | None = self.basemap basemap_guard: contextlib.ExitStack | None = None if basemap_path and str(basemap_path).startswith("pkg:"): spec = str(basemap_path)[4:] try: if ":" in spec and "/" not in spec: pkg, res = spec.split(":", 1) else: parts = spec.split("/", 1) pkg = parts[0] res = parts[1] if len(parts) > 1 else "" if res: basemap_guard = contextlib.ExitStack() path = importlib_resources.files(pkg).joinpath(res) p = basemap_guard.enter_context( importlib_resources.as_file(path) ) basemap_path = str(p) logging.debug( "Resolved basemap '%s' to '%s'", self.basemap, basemap_path ) except Exception: # Fall back to original value; ffmpeg will likely fail if protocol-like pass if basemap_path: ffmpeg_cmd += f" -framerate {fps or self.fps} -loop 1 -i {basemap_path}" if trace: try: # Allow override via env to avoid hangs in CI try: timeout_s = float( os.environ.get("ZYRA_FFPROBE_TIMEOUT", "3") ) except (ValueError, TypeError): timeout_s = 3.0 proc = subprocess.run( [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of", "csv=p=0:s=x", basemap_path, ], capture_output=True, text=True, timeout=timeout_s, ) dims = (proc.stdout or "").strip() if dims: logging.info("+ basemap='%s' (%s)", basemap_path, dims) else: logging.info("+ basemap='%s'", basemap_path) except Exception: logging.info("+ basemap='%s'", basemap_path) ffmpeg_cmd += ( f" -framerate {fps or self.fps} -pattern_type glob -i '{input_pattern}'" ) if basemap_path: ffmpeg_cmd += " -filter_complex '[0:v][1:v]overlay=shortest=1'" ffmpeg_cmd += f" -r {fps or self.fps} -vcodec libx264 -pix_fmt yuv420p -y {output_path}" from zyra.utils.cli_helpers import sanitize_for_log if trace: logging.info("+ %s", sanitize_for_log(ffmpeg_cmd)) else: logging.info(f"Starting video processing using:{ffmpeg_cmd}") cmd = shlex.split(ffmpeg_cmd) rc = 0 try: with subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True ) as proc: for line in proc.stdout: msg = line.strip() if trace: logging.info(msg) else: logging.debug(msg) rc = proc.wait() finally: if basemap_guard is not None: with contextlib.suppress(Exception): basemap_guard.close() logging.debug("Video processing complete (rc=%s).", rc) if rc != 0: logging.error("ffmpeg exited with non-zero status: %s", rc) return False logging.info(f"Video created at {self.output_file}") # Consider success if the expected output file exists try: outp = Path(self.output_file) if outp.exists() and outp.stat().st_size > 0: return True except Exception: pass return False except Exception as e: logging.error(f"An error occurred: {e}") return False
[docs] def validate_video_file(self, video_file: str) -> bool: """Validate codec, resolution, and frame rate of an output video. Parameters ---------- video_file : str Path to the video file to validate. Returns ------- bool True if video matches allowed codec/resolution/frame rate. """ if not self.check_ffmpeg_installed(): logging.error("Cannot validate video file as FFmpeg is not installed.") return False valid_codecs = ["h264", "hevc"] valid_resolutions = ["1920x1080", "2048x1024", "4096x2048", "3600x1800"] valid_frame_rates = ["30"] cmd = [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=codec_name,width,height,r_frame_rate", "-of", "default=noprint_wrappers=1:nokey=1", video_file, ] process = subprocess.run(cmd, capture_output=True, text=True) if process.returncode != 0: logging.error(f"FFprobe error: {process.stderr}") return False output = process.stdout.splitlines() if len(output) < 4: logging.error("Could not retrieve all video properties.") return False codec, width, height, frame_rate_str = output resolution = f"{width}x{height}" # Safely parse r_frame_rate which is typically a fraction like "30000/1001" try: if "/" in frame_rate_str: num_s, den_s = frame_rate_str.split("/", 1) # Avoid converting a zero denominator; treat as invalid den_s_stripped = den_s.strip() try: if float(den_s_stripped) == 0.0: logging.error( "Frame rate reports zero denominator: %s", frame_rate_str ) return False except ValueError: logging.error( "Frame rate denominator is not a valid float: %s", den_s ) return False num = float(num_s) den = float(den_s_stripped) frame_rate = float(num) / float(den) else: frame_rate = float(frame_rate_str) except ValueError: logging.error(f"Unable to parse frame rate: {frame_rate_str}") return False # Tolerance-based frame rate validation tolerance = 0.05 valid_frame_rates_float = [float(fps) for fps in valid_frame_rates] if codec not in valid_codecs: logging.error(f"Invalid codec: {codec}") return False if resolution not in valid_resolutions: logging.error(f"Invalid resolution: {resolution}") return False if not any( abs(frame_rate - valid_fps) <= tolerance for valid_fps in valid_frame_rates_float ): logging.error( f"Invalid frame rate: {frame_rate} (expected one of {valid_frame_rates})" ) return False logging.info(f"{video_file} is a valid video file") return True
[docs] def validate_frame_count(self, video_file: str, expected_frame_count: int) -> bool: """Validate the total number of frames in a video. Parameters ---------- video_file : str Path to the video file to inspect. expected_frame_count : int Expected total number of frames. Returns ------- bool True when expected frame count matches the probed value. """ if not self.check_ffmpeg_installed(): logging.error("Cannot validate frame count as FFmpeg is not installed.") return False cmd = [ "ffprobe", "-v", "error", "-select_streams", "v:0", "-count_frames", "-show_entries", "stream=nb_read_frames", "-of", "default=nokey=1:noprint_wrappers=1", video_file, ] process = subprocess.run(cmd, capture_output=True, text=True) if process.returncode != 0: logging.error(f"FFprobe error: {process.stderr}") return False total_frames = process.stdout.strip() if not total_frames.isdigit() or int(total_frames) != expected_frame_count: logging.error( f"Invalid frame count: expected {expected_frame_count}, got {total_frames}" ) return False logging.info( f"{video_file} has the correct number of frames ({expected_frame_count})" ) return True