Source code for zyra.transform

# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

import argparse
import contextlib
import json
import os
import re
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any

from zyra.utils.cli_helpers import configure_logging_from_env
from zyra.utils.date_manager import DateManager
from zyra.utils.io_utils import open_output


def _compute_frames_metadata(
    frames_dir: str,
    *,
    pattern: str | None = None,
    datetime_format: str | None = None,
    period_seconds: int | None = None,
) -> dict[str, Any]:
    """Compute summary metadata for a directory of frame images.

    Scans a directory for image files (optionally filtered by regex), parses
    timestamps embedded in filenames using ``datetime_format`` or a fallback,
    and returns a JSON-serializable mapping with start/end timestamps, the
    number of frames, expected count for a cadence (if provided), and a list
    of missing timestamps on the cadence grid.
    """
    p = Path(frames_dir)
    if not p.exists() or not p.is_dir():
        raise SystemExit(f"Frames directory not found: {frames_dir}")

    # Collect candidate files
    names = [f.name for f in p.iterdir() if f.is_file()]
    if pattern:
        rx = re.compile(pattern)
        names = [n for n in names if rx.search(n)]
    else:
        exts = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".dds"}
        names = [n for n in names if Path(n).suffix.lower() in exts]
    names.sort()

    # Parse timestamps from filenames
    entries: list[tuple[datetime, Path]] = []
    timestamps: list[datetime] = []
    if datetime_format:
        dm = DateManager([datetime_format])
        parsed = dm.parse_timestamps_from_filenames(names, datetime_format)
        for name, dt in zip(names, parsed):
            if dt is not None:
                entries.append((dt, p / name))
                timestamps.append(dt)
    else:
        dm = DateManager()
        for n in names:
            s = dm.extract_date_time(n)
            if s:
                with contextlib.suppress(Exception):
                    dt = datetime.fromisoformat(s)
                    entries.append((dt, p / n))
                    timestamps.append(dt)
    entries.sort(key=lambda item: item[0])
    timestamps.sort()

    start_dt = timestamps[0] if timestamps else None
    end_dt = timestamps[-1] if timestamps else None

    out: dict[str, Any] = {
        "frames_dir": str(p),
        "pattern": pattern,
        "datetime_format": datetime_format,
        "period_seconds": period_seconds,
        "frame_count_actual": len(timestamps),
        "start_datetime": start_dt.isoformat() if start_dt else None,
        "end_datetime": end_dt.isoformat() if end_dt else None,
    }

    if period_seconds and start_dt and end_dt:
        exp = DateManager().calculate_expected_frames(start_dt, end_dt, period_seconds)
        out["frame_count_expected"] = exp
        # Compute missing timestamps grid
        have: set[str] = {t.isoformat() for t in timestamps}
        miss: list[str] = []
        cur = start_dt
        step = timedelta(seconds=int(period_seconds))
        for _ in range(exp):
            s = cur.isoformat()
            if s not in have:
                miss.append(s)
            cur += step
        out["missing_count"] = len(miss)
        out["missing_timestamps"] = miss
    else:
        out["frame_count_expected"] = None
        out["missing_count"] = None
        out["missing_timestamps"] = []

    analysis: dict[str, Any] = {}
    if start_dt and end_dt:
        analysis["span_seconds"] = int((end_dt - start_dt).total_seconds())
    if entries:
        unique_seen: set[str] = set()
        duplicates: list[str] = []
        for dt, _ in entries:
            iso = dt.isoformat()
            if iso in unique_seen:
                duplicates.append(iso)
            else:
                unique_seen.add(iso)
        analysis["frame_count_unique"] = len(unique_seen)
        analysis["duplicate_timestamps"] = duplicates
        sample_indexes = sorted({0, len(entries) // 2, len(entries) - 1})
        samples: list[dict[str, Any]] = []
        for idx in sample_indexes:
            if idx < 0 or idx >= len(entries):
                continue
            dt, file_path = entries[idx]
            sample: dict[str, Any] = {
                "timestamp": dt.isoformat(),
                "path": str(file_path),
            }
            with contextlib.suppress(OSError, ValueError):
                stat = file_path.stat()
                sample["size_bytes"] = stat.st_size
            samples.append(sample)
        analysis["sample_frames"] = samples

        sizes = []
        for _, fp in entries:
            with contextlib.suppress(OSError, ValueError):
                sizes.append(fp.stat().st_size)
        if sizes:
            analysis["file_size_summary"] = {
                "min_bytes": min(sizes),
                "max_bytes": max(sizes),
                "total_bytes": sum(sizes),
            }

    if analysis:
        out["analysis"] = analysis

    return out


def _cmd_metadata(ns: argparse.Namespace) -> int:
    """CLI: compute frames metadata and write JSON to stdout or a file."""
    if getattr(ns, "verbose", False):
        os.environ["ZYRA_VERBOSITY"] = "debug"
    elif getattr(ns, "quiet", False):
        os.environ["ZYRA_VERBOSITY"] = "quiet"
    if getattr(ns, "trace", False):
        os.environ["ZYRA_SHELL_TRACE"] = "1"
    configure_logging_from_env()
    alias = getattr(ns, "_command_alias", "metadata")
    if alias == "metadata":
        import logging

        logging.info(
            "Note: 'transform metadata' is also available as 'transform scan-frames'."
        )
    meta = _compute_frames_metadata(
        ns.frames_dir,
        pattern=ns.pattern,
        datetime_format=ns.datetime_format,
        period_seconds=ns.period_seconds,
    )
    payload = (json.dumps(meta, indent=2) + "\n").encode("utf-8")
    # Write to stdout or file
    # Ensure parent directories exist when writing to a file path
    if ns.output and ns.output != "-":
        try:
            out_path = Path(ns.output)
            if out_path.parent:
                out_path.parent.mkdir(parents=True, exist_ok=True)
        except Exception:
            # Fall through; open_output will surface any remaining errors
            pass
    with open_output(ns.output) as f:
        f.write(payload)
    return 0


[docs] def register_cli(subparsers: Any) -> None: """Register transform subcommands (metadata, enrich-metadata, enrich-datasets, update-dataset-json).""" from zyra.cli_common import add_output_option def _configure_metadata_parser( parser: argparse.ArgumentParser, *, alias_name: str ) -> None: parser.add_argument( "--frames-dir", required=True, dest="frames_dir", help="Directory containing frames", ) parser.add_argument("--pattern", help="Regex filter for frame filenames") parser.add_argument( "--datetime-format", dest="datetime_format", help="Datetime format used in filenames (e.g., %Y%m%d%H%M%S)", ) parser.add_argument( "--period-seconds", type=int, help="Expected cadence to compute missing frames", ) add_output_option(parser) parser.add_argument( "--verbose", action="store_true", help="Verbose logging for this command" ) parser.add_argument( "--quiet", action="store_true", help="Quiet logging for this command" ) parser.add_argument( "--trace", action="store_true", help="Shell-style trace of key steps and external commands", ) parser.set_defaults(func=_cmd_metadata, _command_alias=alias_name) p = subparsers.add_parser( "metadata", help="Compute frames metadata as JSON", description=( "Scan a frames directory to compute start/end timestamps, counts, and missing frames on a cadence." ), ) _configure_metadata_parser(p, alias_name="metadata") p_scan = subparsers.add_parser( "scan-frames", help="Alias of 'metadata' with a descriptive name", description=( "Alias of 'metadata'. Scan a frames directory and report timestamps, counts, and missing frames." ), ) _configure_metadata_parser(p_scan, alias_name="scan-frames") # Enrich metadata with dataset_id, vimeo_uri, and updated_at def _cmd_enrich(ns: argparse.Namespace) -> int: """CLI: enrich a frames metadata JSON with dataset id and Vimeo URI. Accepts a base metadata JSON (e.g., from ``metadata``), merges optional ``dataset_id`` and ``vimeo_uri`` (read from arg or stdin), and stamps ``updated_at``. """ if getattr(ns, "verbose", False): os.environ["ZYRA_VERBOSITY"] = "debug" elif getattr(ns, "quiet", False): os.environ["ZYRA_VERBOSITY"] = "quiet" if getattr(ns, "trace", False): os.environ["ZYRA_SHELL_TRACE"] = "1" configure_logging_from_env() import sys from zyra.utils.json_file_manager import JSONFileManager fm = JSONFileManager() # Load base metadata JSON from file or stdin when requested try: if getattr(ns, "read_frames_meta_stdin", False): raw = sys.stdin.buffer.read() try: js = raw.decode("utf-8") except UnicodeDecodeError as e: raise SystemExit( f"Failed to decode stdin as UTF-8 for frames metadata: {e}" ) from e try: base = json.loads(js) except json.JSONDecodeError as e: raise SystemExit( f"Invalid JSON on stdin for frames metadata: {e}" ) from e else: base = fm.read_json(ns.frames_meta) except Exception as exc: raise SystemExit(f"Failed to read frames metadata: {exc}") from exc if not isinstance(base, dict): base = {} # Attach dataset_id if getattr(ns, "dataset_id", None): base["dataset_id"] = ns.dataset_id # Attach vimeo_uri from arg or stdin vuri = getattr(ns, "vimeo_uri", None) if getattr(ns, "read_vimeo_uri", False): raw = sys.stdin.buffer.read() try: data = raw.decode("utf-8").strip() except UnicodeDecodeError as e: raise SystemExit( f"Failed to decode stdin as UTF-8 for Vimeo URI: {e}" ) from e if data: vuri = data.splitlines()[0].strip() if vuri: base["vimeo_uri"] = vuri # Add updated_at timestamp base["updated_at"] = datetime.now().replace(microsecond=0).isoformat() payload = (json.dumps(base, indent=2) + "\n").encode("utf-8") with open_output(ns.output) as f: f.write(payload) return 0 p2 = subparsers.add_parser( "enrich-metadata", help="Enrich frames metadata with dataset id and Vimeo URI", description=( "Enrich a frames metadata JSON with dataset_id, Vimeo URI, and updated_at; read from file or stdin." ), ) # Source of base frames metadata: file or stdin srcgrp = p2.add_mutually_exclusive_group(required=True) srcgrp.add_argument( "--frames-meta", dest="frames_meta", help="Path to frames metadata JSON", ) srcgrp.add_argument( "--read-frames-meta-stdin", dest="read_frames_meta_stdin", action="store_true", help="Read frames metadata JSON from stdin", ) p2.add_argument( "--dataset-id", dest="dataset_id", help="Dataset identifier to embed" ) grp = p2.add_mutually_exclusive_group() grp.add_argument("--vimeo-uri", help="Vimeo video URI to embed in metadata") grp.add_argument( "--read-vimeo-uri", action="store_true", help="Read Vimeo URI from stdin (first line)", ) add_output_option(p2) p2.add_argument( "--verbose", action="store_true", help="Verbose logging for this command" ) p2.add_argument( "--quiet", action="store_true", help="Quiet logging for this command" ) p2.add_argument( "--trace", action="store_true", help="Shell-style trace of key steps and external commands", ) p2.set_defaults(func=_cmd_enrich) # Enrich a list of dataset items (id,name,description,source,format,uri) def _cmd_enrich_datasets(ns: argparse.Namespace) -> int: """CLI: enrich dataset items provided in a JSON file. Input JSON can be either a list of items or an object with an `items` array. Each item should contain: id, name, description, source, format, uri. """ if getattr(ns, "verbose", False): os.environ["ZYRA_VERBOSITY"] = "debug" elif getattr(ns, "quiet", False): os.environ["ZYRA_VERBOSITY"] = "quiet" if getattr(ns, "trace", False): os.environ["ZYRA_SHELL_TRACE"] = "1" configure_logging_from_env() from zyra.connectors.discovery import DatasetMetadata from zyra.transform.enrich import enrich_items from zyra.utils.json_file_manager import JSONFileManager from zyra.utils.serialize import to_list fm = JSONFileManager() try: data = fm.read_json(ns.items_file) except Exception as exc: raise SystemExit(f"Failed to read items JSON: {exc}") from exc if isinstance(data, dict) and isinstance(data.get("items"), list): items_in_raw = data.get("items") elif isinstance(data, list): items_in_raw = data else: raise SystemExit( "Input JSON must be a list or an object with an 'items' array" ) # Optional profiles for defaults and license policy prof_defaults: dict[str, Any] = {} prof_license_policy: dict[str, Any] = {} if getattr(ns, "profile", None): try: from importlib import resources as importlib_resources pkg = "zyra.assets.profiles" res = f"{ns.profile}.json" path = importlib_resources.files(pkg).joinpath(res) with importlib_resources.as_file(path) as p: import json as _json prof0 = _json.loads(p.read_text(encoding="utf-8")) enr = prof0.get("enrichment") or {} ed = enr.get("defaults") or {} if isinstance(ed, dict): prof_defaults.update(ed) lp = enr.get("license_policy") or {} if isinstance(lp, dict): prof_license_policy.update(lp) except Exception as exc: raise SystemExit( f"Failed to load bundled profile '{ns.profile}': {exc}" ) from exc if getattr(ns, "profile_file", None): try: import json as _json prof1 = _json.loads(Path(ns.profile_file).read_text(encoding="utf-8")) enr = prof1.get("enrichment") or {} ed = enr.get("defaults") or {} if isinstance(ed, dict): prof_defaults.update(ed) lp = enr.get("license_policy") or {} if isinstance(lp, dict): prof_license_policy.update(lp) except Exception as exc: raise SystemExit(f"Failed to load profile file: {exc}") from exc # Normalize to DatasetMetadata items_in: list[DatasetMetadata] = [] for d in items_in_raw: try: items_in.append( DatasetMetadata( id=str(d.get("id")), name=str(d.get("name")), description=d.get("description"), source=str(d.get("source")), format=str(d.get("format")), uri=str(d.get("uri")), ) ) except Exception: continue enriched = enrich_items( items_in, level=str(ns.enrich), timeout=float(getattr(ns, "enrich_timeout", 3.0) or 3.0), workers=int(getattr(ns, "enrich_workers", 4) or 4), cache_ttl=int(getattr(ns, "cache_ttl", 86400) or 86400), offline=bool(getattr(ns, "offline", False) or False), https_only=bool(getattr(ns, "https_only", False) or False), allow_hosts=list(getattr(ns, "allow_host", []) or []), deny_hosts=list(getattr(ns, "deny_host", []) or []), max_probe_bytes=(getattr(ns, "max_probe_bytes", None)), profile_defaults=prof_defaults, profile_license_policy=prof_license_policy, ) payload = (json.dumps(to_list(enriched), indent=2) + "\n").encode("utf-8") with open_output(ns.output) as f: f.write(payload) return 0 p3 = subparsers.add_parser( "enrich-datasets", help=( "Enrich dataset items JSON (id,name,description,source,format,uri) with metadata\n" "Use --profile/--profile-file for defaults and license policy" ), ) p3.add_argument( "--items-file", required=True, dest="items_file", help="Path to items JSON" ) p3.add_argument("--profile", help="Bundled profile name under zyra.assets.profiles") p3.add_argument("--profile-file", help="External profile JSON path") p3.add_argument( "--enrich", required=True, choices=["shallow", "capabilities", "probe"], help="Enrichment level", ) p3.add_argument( "--enrich-timeout", type=float, default=3.0, help="Per-item timeout (s)" ) p3.add_argument( "--enrich-workers", type=int, default=4, help="Concurrency (workers)" ) p3.add_argument("--cache-ttl", type=int, default=86400, help="Cache TTL seconds") p3.add_argument( "--offline", action="store_true", help="Disable network during enrichment" ) p3.add_argument( "--https-only", action="store_true", help="Require HTTPS for remote probing" ) p3.add_argument( "--allow-host", action="append", help="Allow host suffix (repeatable)" ) p3.add_argument( "--deny-host", action="append", help="Deny host suffix (repeatable)" ) p3.add_argument( "--max-probe-bytes", type=int, help="Skip probing when larger than this size" ) add_output_option(p3) p3.add_argument( "--verbose", action="store_true", help="Verbose logging for this command" ) p3.add_argument( "--quiet", action="store_true", help="Quiet logging for this command" ) p3.add_argument( "--trace", action="store_true", help="Shell-style trace of key steps and external commands", ) p3.set_defaults(func=_cmd_enrich_datasets) # Update a dataset.json entry's startTime/endTime (and optionally dataLink) by dataset id def _cmd_update_dataset(ns: argparse.Namespace) -> int: """CLI: update an entry in dataset.json by dataset id. Loads a dataset index JSON from a local path or URL (HTTP or s3), updates the entry matching ``--dataset-id`` with ``startTime`` and ``endTime`` (from metadata or explicit flags), and optionally updates ``dataLink`` from a Vimeo URI. Writes the updated JSON to ``--output``. """ configure_logging_from_env() import sys # Fetch input JSON raw: bytes src = ns.input_url or ns.input_file if not src: raise SystemExit("--input-url or --input-file is required") try: if ns.input_url: url = ns.input_url if url.startswith("s3://"): from zyra.connectors.backends import s3 as s3_backend raw = s3_backend.fetch_bytes(url) else: from zyra.connectors.backends import http as http_backend raw = http_backend.fetch_bytes(url) else: raw = Path(ns.input_file).read_bytes() except Exception as exc: raise SystemExit(f"Failed to read dataset JSON: {exc}") from exc # Load metadata source (either explicit args or meta file/stdin) start = ns.start end = ns.end vimeo_uri = ns.vimeo_uri if ns.meta: try: meta = json.loads(Path(ns.meta).read_text(encoding="utf-8")) start = start or meta.get("start_datetime") end = end or meta.get("end_datetime") vimeo_uri = vimeo_uri or meta.get("vimeo_uri") except Exception: pass if ns.read_meta_stdin: raw_meta = sys.stdin.buffer.read() try: js = raw_meta.decode("utf-8") except UnicodeDecodeError as e: raise SystemExit( f"Failed to decode stdin as UTF-8 for metadata JSON: {e}" ) from e try: meta2 = json.loads(js) except json.JSONDecodeError as e: raise SystemExit(f"Invalid metadata JSON on stdin: {e}") from e start = start or meta2.get("start_datetime") end = end or meta2.get("end_datetime") vimeo_uri = vimeo_uri or meta2.get("vimeo_uri") # Parse dataset JSON try: text = raw.decode("utf-8") except UnicodeDecodeError as e: raise SystemExit(f"Dataset JSON is not valid UTF-8: {e}") from e try: data = json.loads(text) except json.JSONDecodeError as exc: raise SystemExit(f"Invalid dataset JSON: {exc}") from exc # Build dataLink from Vimeo if requested data_link = None if vimeo_uri and ns.set_data_link: vid = vimeo_uri.rsplit("/", 1)[-1] if vid.isdigit(): data_link = f"https://vimeo.com/{vid}" else: # If full URL already if vimeo_uri.startswith("http"): data_link = vimeo_uri # Update entry matching dataset id did = ns.dataset_id updated = False def _update_entry(entry: dict) -> bool: if not isinstance(entry, dict): return False if str(entry.get("id")) != str(did): return False if start is not None: entry["startTime"] = start if end is not None: entry["endTime"] = end if data_link is not None: entry["dataLink"] = data_link return True if isinstance(data, list): for ent in data: if _update_entry(ent): updated = True elif isinstance(data, dict) and isinstance(data.get("datasets"), list): for ent in data["datasets"]: if _update_entry(ent): updated = True else: # Single object case if isinstance(data, dict) and _update_entry(data): updated = True if not updated: raise SystemExit(f"Dataset id not found: {did}") out_bytes = (json.dumps(data, indent=2) + "\n").encode("utf-8") with open_output(ns.output) as f: f.write(out_bytes) return 0 p3 = subparsers.add_parser( "update-dataset-json", help="Update start/end (and dataLink) for a dataset id in dataset.json", description=( "Update a dataset.json entry by id using metadata (start/end and Vimeo URI) from a file, stdin, or args." ), ) srcgrp = p3.add_mutually_exclusive_group(required=True) srcgrp.add_argument("--input-url", help="HTTP(S) or s3:// URL of dataset.json") srcgrp.add_argument("--input-file", help="Local dataset.json path") p3.add_argument("--dataset-id", required=True, help="Dataset id to update") # Metadata sources p3.add_argument( "--meta", help="Path to metadata JSON containing start_datetime/end_datetime/vimeo_uri", ) p3.add_argument( "--read-meta-stdin", action="store_true", help="Read metadata JSON from stdin" ) p3.add_argument("--start", help="Explicit startTime override (ISO)") p3.add_argument("--end", help="Explicit endTime override (ISO)") p3.add_argument("--vimeo-uri", help="Explicit Vimeo URI (e.g., /videos/12345)") p3.add_argument( "--no-set-data-link", dest="set_data_link", action="store_false", help="Do not update dataLink from Vimeo URI", ) p3.set_defaults(set_data_link=True) add_output_option(p3) p3.add_argument( "--verbose", action="store_true", help="Verbose logging for this command" ) p3.add_argument( "--quiet", action="store_true", help="Quiet logging for this command" ) p3.add_argument( "--trace", action="store_true", help="Shell-style trace of key steps and external commands", ) p3.set_defaults(func=_cmd_update_dataset)