# SPDX-License-Identifier: Apache-2.0
"""Zyra CLI entrypoint and command wiring.
Organizes commands into groups that mirror the 8-stage pipeline hierarchy:
- import (alias: acquire/ingest): ingress from HTTP/S3/FTP/Vimeo backends
- process (alias: transform): GRIB/NetCDF decoding, extraction, format conversion, and metadata helpers
- simulate: simulation under uncertainty (skeleton)
- decide (alias: optimize): decision/optimization (skeleton)
- visualize (alias: render): static and animated rendering
- narrate: AI-driven storytelling/reporting (skeleton)
- verify: evaluation and metrics (skeleton)
- export (alias: disseminate; legacy: decimate): egress (local, S3, FTP, HTTP POST, Vimeo)
- run: run a config-driven pipeline (YAML/JSON)
Internal helpers support streaming bytes via stdin/stdout, GRIB ``.idx``
subsetting, and S3 URL parsing.
"""
import argparse
import json
import logging
import os
import platform
import re
import shutil
import subprocess
import sys
from importlib import metadata as importlib_metadata
from pathlib import Path
from typing import Tuple
from zyra import __version__ as ZYRA_VERSION
def _print_version_banner(mode: str = "short") -> None:
"""Print version banner.
Modes:
- short: ASCII logo (when found) + version + repo URL
- long: adds runtime diagnostics (tools, libs, env, platform)
- json: machine-readable diagnostics only (no logo)
"""
logo_text: str | None = None
# Candidate locations in order of preference
candidates: list[str] = []
env_logo = os.environ.get("ZYRA_ASCII_LOGO")
if env_logo:
candidates.append(env_logo)
# Project-relative (useful in dev checkouts)
candidates.append("branding/logos/ascii/logo_ascii_tree_tiny.txt")
# Downstream absolute hint (best-effort)
candidates.append("/branding/logos/ascii/logo_ascii_tree_tiny.txt")
# Packaged asset under zyra.assets/ascii if available
try:
try:
from importlib import resources as importlib_resources # type: ignore
except Exception: # pragma: no cover
import importlib_resources # type: ignore
res = (
importlib_resources.files("zyra.assets")
.joinpath("ascii")
.joinpath("logo_ascii_tree_tiny.txt")
)
if getattr(res, "is_file", None) and res.is_file(): # type: ignore[attr-defined]
with importlib_resources.as_file(res) as p:
logo_text = Path(str(p)).read_text(encoding="utf-8", errors="ignore")
else:
# Fallback: older packaging may place the logo at the root of assets
res2 = importlib_resources.files("zyra.assets").joinpath(
"logo_ascii_tree_tiny.txt"
)
if getattr(res2, "is_file", None) and res2.is_file(): # type: ignore[attr-defined]
with importlib_resources.as_file(res2) as p:
logo_text = Path(str(p)).read_text(
encoding="utf-8", errors="ignore"
)
except importlib_metadata.PackageNotFoundError:
# Package metadata not available (editable install or runtime env);
# ignore gracefully.
pass
except Exception as exc: # pragma: no cover - unexpected metadata error
# Avoid hard-failing version banner; log at debug level if possible.
try:
import logging as _log
_log.getLogger(__name__).debug(
"distribution() metadata read failed: %s", exc
)
except Exception:
pass
if logo_text is None:
for c in candidates:
try:
p = Path(c)
if p.exists() and p.is_file():
logo_text = p.read_text(encoding="utf-8", errors="ignore")
break
except Exception:
pass
if mode == "json":
print(json.dumps(_collect_version_info(), indent=2, sort_keys=True))
return
info = _collect_version_info()
lines = []
if logo_text and mode == "short":
lines.append(logo_text.rstrip("\n"))
lines.append(f"Zyra {info.get('version', ZYRA_VERSION)}")
lines.append("https://github.com/NOAA-GSL/zyra")
if mode == "long":
lines.append("")
lines.append(
f"Python: {info['python']['version']} ({info['python']['implementation']})"
)
lines.append(
f"Platform: {info['platform']['system']}/{info['platform']['machine']}"
)
try:
inst = info.get("install", {})
lines.append(
f"Install: {inst.get('module_path','')}\nExec: {inst.get('executable','')}"
)
except Exception:
pass
git = info.get("git") or {}
if git.get("commit") or git.get("date"):
commit = git.get("commit", "unknown")
date = git.get("date", "unknown")
lines.append(f"Git: {commit} ({date})")
tools = info.get("tools", {})
lines.append(
f"FFmpeg: {tools.get('ffmpeg','not found')}; FFprobe: {tools.get('ffprobe','not found')}"
)
lines.append(f"wgrib2: {tools.get('wgrib2','not found')}")
libs = info.get("libs", {})
# Group core libs concisely
core_a = []
for k in ["xarray", "netcdf4", "cfgrib", "eccodes", "pygrib"]:
v = libs.get(k)
core_a.append(f"{k}: {v if v is not None else 'not installed'}")
lines.append("; ".join(core_a))
geo_a = []
for k in ["rasterio", "gdal", "rioxarray", "cartopy", "matplotlib"]:
v = libs.get(k)
geo_a.append(f"{k}: {v if v is not None else 'not installed'}")
lines.append("; ".join(geo_a))
env = info.get("env", {})
lines.append(
f"DATA_DIR: {env.get('DATA_DIR') or ''}; LOG_LEVEL: {env.get('LOG_LEVEL') or ''}"
)
print("\n".join(lines))
def _get_tool_version(cmd: str) -> str | None:
exe = shutil.which(cmd)
if not exe:
return None
try:
proc = subprocess.run(
[exe, "-version"], capture_output=True, text=True, timeout=3
)
except Exception:
return None
out = (proc.stdout or proc.stderr or "").strip().splitlines()
return out[0] if out else None
def _collect_version_info() -> dict:
# Basic metadata
import sys as _sys
info: dict = {
"version": ZYRA_VERSION,
"repo": "https://github.com/NOAA-GSL/zyra",
"python": {
"version": platform.python_version(),
"implementation": platform.python_implementation(),
},
"platform": {
"system": platform.system().lower() or sys.platform,
"machine": platform.machine().lower(),
},
"install": {
"module_path": str(Path(__file__).resolve()),
"executable": _sys.executable,
},
"env": {
"DATA_DIR": os.environ.get("DATA_DIR"),
"LOG_LEVEL": os.environ.get("LOG_LEVEL"),
},
}
# Distribution info (best-effort)
try:
dist = importlib_metadata.distribution("zyra")
info["distribution"] = {
"name": dist.metadata.get("Name", "zyra"),
"version": dist.version,
}
except Exception:
pass
# Git metadata via env (optional)
git_commit = os.environ.get("ZYRA_GIT_COMMIT")
git_date = os.environ.get("ZYRA_BUILD_DATE")
if git_commit or git_date:
info["git"] = {"commit": git_commit, "date": git_date}
# External tool versions
info["tools"] = {
"ffmpeg": _get_tool_version("ffmpeg") or "not found",
"ffprobe": _get_tool_version("ffprobe") or "not found",
"wgrib2": _get_tool_version("wgrib2") or "not found",
}
# Library versions
libs: dict[str, str | None] = {}
def ver(mod: str, attr: str = "__version__") -> str | None:
try:
m = __import__(mod, fromlist=["_"])
v = getattr(m, attr, None)
return str(v) if v is not None else None
except Exception:
return None
libs["xarray"] = ver("xarray")
libs["netcdf4"] = ver("netCDF4") or ver("netcdf4")
libs["cfgrib"] = ver("cfgrib")
libs["eccodes"] = ver("eccodes")
libs["pygrib"] = ver("pygrib")
# Raster/GDAL stack
try:
import rasterio # type: ignore
libs["rasterio"] = getattr(rasterio, "__version__", None)
libs["gdal"] = getattr(rasterio, "__gdal_version__", None)
except Exception:
libs["rasterio"] = None
libs["gdal"] = None
libs["rioxarray"] = ver("rioxarray")
libs["cartopy"] = ver("cartopy")
libs["matplotlib"] = ver("matplotlib")
info["libs"] = libs
# Heuristic extras presence from libs
extras: dict[str, bool] = {
"connectors": any(
(libs.get(k) is not None) for k in ("boto3", "requests", "PyVimeo")
),
"processing": any(
(libs.get(k) is not None)
for k in ("xarray", "netcdf4", "cfgrib", "rasterio", "rioxarray")
),
"visualization": any(
(libs.get(k) is not None) for k in ("cartopy", "matplotlib")
),
"wizard": False,
"api": False,
}
try:
import prompt_toolkit # type: ignore # noqa: F401
extras["wizard"] = True
except Exception:
pass
try:
import fastapi # type: ignore # noqa: F401
import uvicorn # type: ignore # noqa: F401
extras["api"] = True
except Exception:
pass
info["extras"] = extras
return info
def _parse_s3_url(url: str) -> Tuple[str, str]:
m = re.match(r"^s3://([^/]+)/(.+)$", url)
if not m:
raise ValueError("Invalid s3 URL. Expected s3://bucket/key")
return m.group(1), m.group(2)
def _normalize_group_name(name: str) -> str:
"""Normalize top-level group aliases to canonical names.
Keeps canonical groups stable for internal wiring while accepting user-friendly
aliases at the CLI entry: import→acquire, render→visualize,
disseminate/export/decimation→decimate. The legacy name 'decimate' remains
accepted for back-compat.
"""
n = (name or "").strip().lower()
alias_map = {
"import": "acquire",
"ingest": "acquire",
"render": "visualize",
# Egress: keep 'decimate' as internal canonical for back-compat
"export": "decimate",
"disseminate": "decimate",
"decimation": "decimate",
"optimize": "decide",
}
return alias_map.get(n, n)
def _read_bytes(
path_or_url: str, *, idx_pattern: str | None = None, unsigned: bool = False
) -> bytes:
# stdin
if path_or_url == "-":
return sys.stdin.buffer.read()
p = Path(path_or_url)
if p.exists():
return p.read_bytes()
# HTTP(S)
if path_or_url.startswith("http://") or path_or_url.startswith("https://"):
try:
from zyra.connectors.backends import http as http_backend
from zyra.utils.grib import idx_to_byteranges
if idx_pattern:
lines = http_backend.get_idx_lines(path_or_url)
ranges = idx_to_byteranges(lines, idx_pattern)
return http_backend.download_byteranges(path_or_url, ranges.keys())
return http_backend.fetch_bytes(path_or_url)
except Exception as exc: # pragma: no cover - optional dep
raise SystemExit(f"Failed to fetch from URL: {exc}") from exc
# S3
if path_or_url.startswith("s3://"):
try:
from zyra.connectors.backends import s3 as s3_backend
from zyra.utils.grib import idx_to_byteranges
if idx_pattern:
lines = s3_backend.get_idx_lines(path_or_url, unsigned=unsigned)
ranges = idx_to_byteranges(lines, idx_pattern)
return s3_backend.download_byteranges(
path_or_url, None, ranges.keys(), unsigned=unsigned
)
return s3_backend.fetch_bytes(path_or_url, unsigned=unsigned)
except Exception as exc: # pragma: no cover - optional dep
raise SystemExit(f"Failed to fetch from S3: {exc}") from exc
raise SystemExit(f"Input not found or unsupported scheme: {path_or_url}")
[docs]
def cmd_decode_grib2(args: argparse.Namespace) -> int:
from zyra.processing import grib_decode
from zyra.processing.grib_utils import extract_metadata
data = _read_bytes(
args.file_or_url, idx_pattern=args.pattern, unsigned=args.unsigned
)
if getattr(args, "raw", False):
# Emit the (optionally subsetted) raw GRIB2 bytes directly to stdout
sys.stdout.buffer.write(data)
return 0
decoded = grib_decode(data, backend=args.backend)
meta = extract_metadata(decoded)
# Print variables and basic metadata
print(meta)
return 0
def _viz_heatmap_cmd(ns: argparse.Namespace) -> int:
# Local import to avoid importing visualization deps unless used
from zyra.visualization.heatmap_manager import HeatmapManager
mgr = HeatmapManager(basemap=ns.basemap, cmap=ns.cmap)
mgr.configure(extent=ns.extent)
# Build features list with negations
features = None
if getattr(ns, "features", None):
features = [f.strip() for f in (ns.features.split(",")) if f.strip()]
else:
features = None
if features is None:
# use default from styles
from zyra.visualization.styles import MAP_STYLES
features = list(MAP_STYLES.get("features", []) or [])
# Apply negations
if getattr(ns, "no_coastline", False) and "coastline" in features:
features = [f for f in features if f != "coastline"]
if getattr(ns, "no_borders", False) and "borders" in features:
features = [f for f in features if f != "borders"]
if getattr(ns, "no_gridlines", False) and "gridlines" in features:
features = [f for f in features if f != "gridlines"]
mgr.render(
input_path=ns.input,
var=ns.var,
width=ns.width,
height=ns.height,
dpi=ns.dpi,
# CRS handling
crs=getattr(ns, "crs", None),
reproject=getattr(ns, "reproject", False),
colorbar=getattr(ns, "colorbar", False),
label=getattr(ns, "label", None),
units=getattr(ns, "units", None),
features=features,
timestamp=getattr(ns, "timestamp", None),
timestamp_loc=getattr(ns, "timestamp_loc", "lower_right"),
map_type=getattr(ns, "map_type", "image"),
tile_source=getattr(ns, "tile_source", None),
tile_zoom=getattr(ns, "tile_zoom", 3),
)
out = mgr.save(ns.output)
print(out or "")
return 0
def _viz_contour_cmd(ns: argparse.Namespace) -> int:
from zyra.visualization.contour_manager import ContourManager
levels = ns.levels
if not isinstance(levels, int):
try:
# If provided as a simple integer string (e.g., "5"), treat as count
levels = int(str(levels))
except Exception:
try:
# Otherwise, parse comma-separated explicit level values
s = str(levels)
levels = [float(x) for x in s.split(",") if x.strip()]
except Exception:
levels = 10
mgr = ContourManager(basemap=ns.basemap, cmap=ns.cmap, filled=ns.filled)
mgr.configure(extent=ns.extent)
features = None
if getattr(ns, "features", None):
features = [f.strip() for f in (ns.features.split(",")) if f.strip()]
else:
features = None
if features is None:
from zyra.visualization.styles import MAP_STYLES
features = list(MAP_STYLES.get("features", []) or [])
if getattr(ns, "no_coastline", False) and "coastline" in features:
features = [f for f in features if f != "coastline"]
if getattr(ns, "no_borders", False) and "borders" in features:
features = [f for f in features if f != "borders"]
if getattr(ns, "no_gridlines", False) and "gridlines" in features:
features = [f for f in features if f != "gridlines"]
mgr.render(
input_path=ns.input,
var=ns.var,
width=ns.width,
height=ns.height,
dpi=ns.dpi,
levels=levels,
# CRS handling
crs=getattr(ns, "crs", None),
reproject=getattr(ns, "reproject", False),
colorbar=getattr(ns, "colorbar", False),
label=getattr(ns, "label", None),
units=getattr(ns, "units", None),
features=features,
timestamp=getattr(ns, "timestamp", None),
timestamp_loc=getattr(ns, "timestamp_loc", "lower_right"),
map_type=getattr(ns, "map_type", "image"),
tile_source=getattr(ns, "tile_source", None),
tile_zoom=getattr(ns, "tile_zoom", 3),
)
out = mgr.save(ns.output)
print(out or "")
return 0
def _viz_timeseries_cmd(ns: argparse.Namespace) -> int:
from zyra.visualization.timeseries_manager import TimeSeriesManager
mgr = TimeSeriesManager(
title=ns.title, xlabel=ns.xlabel, ylabel=ns.ylabel, style=ns.style
)
mgr.render(
input_path=ns.input,
x=ns.x,
y=ns.y,
var=ns.var,
width=ns.width,
height=ns.height,
dpi=ns.dpi,
)
out = mgr.save(ns.output)
print(out or "")
return 0
def _viz_vector_cmd(ns: argparse.Namespace) -> int:
from zyra.visualization.vector_field_manager import VectorFieldManager
mgr = VectorFieldManager(
basemap=ns.basemap,
color=ns.color,
density=ns.density,
scale=ns.scale,
streamlines=getattr(ns, "streamlines", False),
)
mgr.configure(extent=ns.extent)
features = None
if getattr(ns, "features", None):
features = [f.strip() for f in (ns.features.split(",")) if f.strip()]
else:
features = None
if features is None:
from zyra.visualization.styles import MAP_STYLES
features = list(MAP_STYLES.get("features", []) or [])
if getattr(ns, "no_coastline", False) and "coastline" in features:
features = [f for f in features if f != "coastline"]
if getattr(ns, "no_borders", False) and "borders" in features:
features = [f for f in features if f != "borders"]
if getattr(ns, "no_gridlines", False) and "gridlines" in features:
features = [f for f in features if f != "gridlines"]
mgr.render(
input_path=ns.input,
uvar=ns.uvar,
vvar=ns.vvar,
u=ns.u,
v=ns.v,
width=ns.width,
height=ns.height,
dpi=ns.dpi,
# CRS handling
crs=getattr(ns, "crs", None),
reproject=getattr(ns, "reproject", False),
features=features,
map_type=getattr(ns, "map_type", "image"),
tile_source=getattr(ns, "tile_source", None),
tile_zoom=getattr(ns, "tile_zoom", 3),
)
out = mgr.save(ns.output)
print(out or "")
return 0
def _viz_wind_cmd(ns: argparse.Namespace) -> int:
# Back-compat alias for vector
import sys
print("[deprecated] 'wind' is deprecated; use 'vector' instead", file=sys.stderr)
return _viz_vector_cmd(ns)
[docs]
def main(argv: list[str] | None = None) -> int:
# Pre-scan argv to support --version without requiring a subcommand
args_list = argv if argv is not None else sys.argv[1:]
if any(a in {"--version", "-V"} for a in args_list):
mode = "short"
if "--json" in args_list:
mode = "json"
elif "--long" in args_list:
mode = "long"
_print_version_banner(mode)
return 0
parser = argparse.ArgumentParser(prog="zyra")
try:
from zyra import plugins as _plugins
epilog = _plugins.help_epilog()
if epilog:
parser.epilog = epilog
parser.formatter_class = argparse.RawDescriptionHelpFormatter
except ImportError:
# Non-fatal: plugin epilog is best-effort.
pass
except Exception as exc: # pragma: no cover - defensive
logging.getLogger(__name__).warning(
"plugin help epilog disabled due to error: %s", exc
)
# Global verbosity controls for all commands
vgrp = parser.add_mutually_exclusive_group()
vgrp.add_argument(
"-v",
"--verbose",
action="store_true",
help="Verbose output (sets ZYRA_VERBOSITY=debug)",
)
vgrp.add_argument(
"--quiet", action="store_true", help="Quiet output (sets ZYRA_VERBOSITY=quiet)"
)
sub = parser.add_subparsers(dest="cmd", required=True)
# Pre-scan argv to support lazy registration and avoid importing heavy stacks unnecessarily
first_non_flag_raw = next((a for a in args_list if not a.startswith("-")), None)
first_non_flag = (
_normalize_group_name(first_non_flag_raw) if first_non_flag_raw else None
)
# Always make 'run' available (lightweight)
from zyra.pipeline_runner import register_cli_run as _register_run
_register_run(sub)
# Lazy-register only the requested top-level group when possible
if first_non_flag == "acquire":
from zyra.connectors import ingest as _ingest_mod
p_acq = sub.add_parser(
"acquire", help="Acquire/ingest data from sources (alias: import/ingest)"
)
acq_sub = p_acq.add_subparsers(dest="acquire_cmd", required=True)
_ingest_mod.register_cli(acq_sub)
# Alias top-level: import → acquire
p_import = sub.add_parser("import", help=argparse.SUPPRESS)
import_sub = p_import.add_subparsers(dest="acquire_cmd", required=True)
_ingest_mod.register_cli(import_sub)
elif first_non_flag == "process":
import zyra.transform as _transform_mod
from zyra import processing as _process_mod
p_proc = sub.add_parser(
"process", help="Processing commands (GRIB/NetCDF/GeoTIFF) + transforms"
)
proc_sub = p_proc.add_subparsers(dest="process_cmd", required=True)
# Combine transform commands under process group
_process_mod.register_cli(proc_sub)
_transform_mod.register_cli(proc_sub)
elif first_non_flag == "visualize":
from zyra.visualization import cli_register as _visual_mod
p_viz = sub.add_parser(
"visualize",
help="Visualization commands (static/interactive/animation) (alias: render)",
)
viz_sub = p_viz.add_subparsers(dest="visualize_cmd", required=True)
_visual_mod.register_cli(viz_sub)
# Alias top-level: render → visualize
p_render = sub.add_parser("render", help=argparse.SUPPRESS)
render_sub = p_render.add_subparsers(dest="visualize_cmd", required=True)
_visual_mod.register_cli(render_sub)
elif first_non_flag == "disseminate":
from zyra.connectors import egress as _egress_mod
p_disseminate = sub.add_parser(
"disseminate",
help="Write/egress data to destinations (alias: export; legacy: decimate)",
)
dis_sub = p_disseminate.add_subparsers(dest="disseminate_cmd", required=True)
_egress_mod.register_cli(dis_sub)
# Legacy alias top-level: decimate → disseminate
p_dec_alias = sub.add_parser("decimate", help=argparse.SUPPRESS)
dec_alias_sub = p_dec_alias.add_subparsers(
dest="disseminate_cmd", required=True
)
_egress_mod.register_cli(dec_alias_sub)
elif first_non_flag == "decimate":
# Legacy top-level alias retained for back-compat
from zyra.connectors import egress as _egress_mod
p_dec = sub.add_parser(
"decimate",
help="[deprecated] Write/egress data (use 'disseminate' or 'export')",
)
dec_sub = p_dec.add_subparsers(dest="decimate_cmd", required=True)
_egress_mod.register_cli(dec_sub)
# Alias top-level: disseminate/export → decimate
p_disseminate = sub.add_parser("disseminate", help=argparse.SUPPRESS)
dis_sub = p_disseminate.add_subparsers(dest="decimate_cmd", required=True)
_egress_mod.register_cli(dis_sub)
p_export = sub.add_parser("export", help=argparse.SUPPRESS)
exp_sub = p_export.add_subparsers(dest="decimate_cmd", required=True)
_egress_mod.register_cli(exp_sub)
elif first_non_flag == "swarm":
from zyra.swarm import cli as _swarm_cli
p_swarm = sub.add_parser(
"swarm",
help="Run the multi-stage swarm orchestrator",
)
_swarm_cli.register_cli(p_swarm)
elif first_non_flag == "plan":
from zyra.swarm import planner as _swarm_planner
p_plan = sub.add_parser(
"plan",
help="Generate a swarm manifest from user intent (planner preview)",
)
_swarm_planner.register_cli(p_plan)
elif first_non_flag == "simulate":
import zyra.simulate as _simulate_mod
p_sim = sub.add_parser("simulate", help="Simulate under uncertainty (skeleton)")
sim_sub = p_sim.add_subparsers(dest="simulate_cmd", required=True)
_simulate_mod.register_cli(sim_sub)
elif first_non_flag == "decide":
import zyra.decide as _decide_mod
p_dec = sub.add_parser("decide", help="Decision/optimization (skeleton)")
d_sub = p_dec.add_subparsers(dest="decide_cmd", required=True)
_decide_mod.register_cli(d_sub)
# Alias top-level: optimize → decide
p_opt = sub.add_parser("optimize", help=argparse.SUPPRESS)
opt_sub = p_opt.add_subparsers(dest="decide_cmd", required=True)
_decide_mod.register_cli(opt_sub)
elif first_non_flag == "narrate":
import zyra.narrate as _narrate_mod
p_nar = sub.add_parser("narrate", help="Narrate/report (skeleton)")
n_sub = p_nar.add_subparsers(dest="narrate_cmd", required=True)
_narrate_mod.register_cli(n_sub)
elif first_non_flag == "verify":
import zyra.verify as _verify_mod
p_ver = sub.add_parser(
"verify", help="Evaluation/metrics/validation (skeleton)"
)
v_sub = p_ver.add_subparsers(dest="verify_cmd", required=True)
_verify_mod.register_cli(v_sub)
elif first_non_flag == "transform":
import zyra.transform as _transform_mod
p_tr = sub.add_parser("transform", help="Transform helpers (metadata, etc.)")
tr_sub = p_tr.add_subparsers(dest="transform_cmd", required=True)
_transform_mod.register_cli(tr_sub)
elif first_non_flag == "run":
# Already registered above
pass
elif first_non_flag == "search":
# Single command for dataset discovery
from zyra.connectors import discovery as _discovery_mod
p_search = sub.add_parser(
"search", help="Search datasets (local SOS catalog; JSON/YAML export)"
)
_discovery_mod.register_cli(p_search)
elif first_non_flag == "wizard":
# Lightweight: registers a single command with optional LLM backends
from zyra import wizard as _wizard_mod
p_wiz = sub.add_parser(
"wizard", help="Interactive assistant that suggests/runs CLI commands"
)
_wizard_mod.register_cli(p_wiz)
elif first_non_flag == "generate-manifest":
# Developer utility to generate capabilities manifest
from zyra.connectors import discovery as _discovery_mod
from zyra.wizard.manifest import save_manifest as _save_manifest
p_gen = sub.add_parser(
"generate-manifest", help="Generate capabilities JSON manifest"
)
p_gen.add_argument(
"-o",
"--output",
default=str(Path(__file__).parent / "wizard" / "zyra_capabilities"),
help=(
"Directory for split manifests (default). Provide a .json path to emit a legacy single-file manifest."
),
)
p_gen.add_argument(
"--legacy-output",
default=str(Path(__file__).parent / "wizard" / "zyra_capabilities.json"),
help="Path for legacy zyra_capabilities.json when --legacy-json is enabled",
)
p_gen.add_argument(
"--legacy-json",
dest="legacy_json",
action="store_true",
default=True,
help="Also write legacy zyra_capabilities.json (default)",
)
p_gen.add_argument(
"--no-legacy-json",
dest="legacy_json",
action="store_false",
help="Skip writing the legacy zyra_capabilities.json",
)
def _cmd_gen(ns: argparse.Namespace) -> int:
_save_manifest(
ns.output,
include_legacy=ns.legacy_json,
legacy_path=ns.legacy_output,
)
print(f"Capabilities written to {ns.output}")
if ns.legacy_json:
print(f"Legacy manifest written to {ns.legacy_output}")
return 0
p_gen.set_defaults(func=_cmd_gen)
else:
# Fallback: register the full CLI tree when we cannot infer the target
import zyra.decide as _decide_mod
import zyra.narrate as _narrate_mod
import zyra.simulate as _simulate_mod
import zyra.transform as _transform_mod
from zyra import processing as _process_mod
from zyra import wizard as _wizard_mod
from zyra.connectors import discovery as _discovery_mod
from zyra.connectors import egress as _egress_mod
from zyra.connectors import ingest as _ingest_mod
from zyra.visualization import cli_register as _visual_mod
from zyra.wizard.manifest import save_manifest as _save_manifest
p_acq = sub.add_parser(
"acquire", help="Acquire/ingest data from sources (alias: import/ingest)"
)
acq_sub = p_acq.add_subparsers(dest="acquire_cmd", required=True)
_ingest_mod.register_cli(acq_sub)
# Alias: import → acquire
p_acq_alias = sub.add_parser("import", help=argparse.SUPPRESS)
acq_alias_sub = p_acq_alias.add_subparsers(dest="acquire_cmd", required=True)
_ingest_mod.register_cli(acq_alias_sub)
p_proc = sub.add_parser(
"process", help="Processing commands (GRIB/NetCDF/GeoTIFF) + transforms"
)
proc_sub = p_proc.add_subparsers(dest="process_cmd", required=True)
# Combine transform commands under process group
_process_mod.register_cli(proc_sub)
_transform_mod.register_cli(proc_sub)
p_viz = sub.add_parser(
"visualize",
help="Visualization commands (static/interactive/animation) (alias: render)",
)
viz_sub = p_viz.add_subparsers(dest="visualize_cmd", required=True)
_visual_mod.register_cli(viz_sub)
# Alias: render → visualize
p_viz_alias = sub.add_parser("render", help=argparse.SUPPRESS)
viz_alias_sub = p_viz_alias.add_subparsers(dest="visualize_cmd", required=True)
_visual_mod.register_cli(viz_alias_sub)
p_disseminate = sub.add_parser(
"disseminate",
help="Write/egress data to destinations (alias: export; legacy: decimate)",
)
dis_sub = p_disseminate.add_subparsers(dest="disseminate_cmd", required=True)
_egress_mod.register_cli(dis_sub)
# Aliases: export/decimate → disseminate
p_export = sub.add_parser("export", help=argparse.SUPPRESS)
exp_sub = p_export.add_subparsers(dest="disseminate_cmd", required=True)
_egress_mod.register_cli(exp_sub)
p_dec_alias = sub.add_parser("decimate", help=argparse.SUPPRESS)
dec_alias_sub = p_dec_alias.add_subparsers(
dest="disseminate_cmd", required=True
)
_egress_mod.register_cli(dec_alias_sub)
p_tr = sub.add_parser("transform", help="Transform helpers (metadata, etc.)")
tr_sub = p_tr.add_subparsers(dest="transform_cmd", required=True)
_transform_mod.register_cli(tr_sub)
# New skeleton groups: simulate/decide/narrate
p_sim = sub.add_parser("simulate", help="Simulate under uncertainty (skeleton)")
sim_sub = p_sim.add_subparsers(dest="simulate_cmd", required=True)
_simulate_mod.register_cli(sim_sub)
p_dec = sub.add_parser("decide", help="Decision/optimization (skeleton)")
dec_sub2 = p_dec.add_subparsers(dest="decide_cmd", required=True)
_decide_mod.register_cli(dec_sub2)
p_nar = sub.add_parser("narrate", help="Narrate/report (skeleton)")
nar_sub = p_nar.add_subparsers(dest="narrate_cmd", required=True)
_narrate_mod.register_cli(nar_sub)
# Wizard (single command, no subcommands)
p_wiz = sub.add_parser(
"wizard", help="Interactive assistant that suggests/runs CLI commands"
)
_wizard_mod.register_cli(p_wiz)
# Verify stage
import zyra.verify as _verify_mod
p_ver = sub.add_parser(
"verify", help="Evaluation/metrics/validation (skeleton)"
)
ver_sub = p_ver.add_subparsers(dest="verify_cmd", required=True)
_verify_mod.register_cli(ver_sub)
# Search (single command)
p_search = sub.add_parser(
"search", help="Search datasets (local SOS catalog; JSON/YAML export)"
)
_discovery_mod.register_cli(p_search)
# Generate-manifest
p_gen = sub.add_parser(
"generate-manifest", help="Generate capabilities JSON manifest"
)
p_gen.add_argument(
"-o",
"--output",
default=str(Path(__file__).parent / "wizard" / "zyra_capabilities"),
help=(
"Directory for split manifests (default). Provide a .json path to emit a legacy single-file manifest."
),
)
p_gen.add_argument(
"--legacy-output",
default=str(Path(__file__).parent / "wizard" / "zyra_capabilities.json"),
help="Path for legacy zyra_capabilities.json when --legacy-json is enabled",
)
p_gen.add_argument(
"--legacy-json",
dest="legacy_json",
action="store_true",
default=True,
help="Also write legacy zyra_capabilities.json (default)",
)
p_gen.add_argument(
"--no-legacy-json",
dest="legacy_json",
action="store_false",
help="Skip writing the legacy zyra_capabilities.json",
)
def _cmd_gen(ns: argparse.Namespace) -> int:
_save_manifest(
ns.output,
include_legacy=ns.legacy_json,
legacy_path=ns.legacy_output,
)
print(f"Capabilities written to {ns.output}")
if ns.legacy_json:
print(f"Legacy manifest written to {ns.legacy_output}")
return 0
p_gen.set_defaults(func=_cmd_gen)
# No separate workflow group; use `zyra run` for workflows
args = parser.parse_args(args_list)
# Apply global verbosity to environment so downstream modules pick it up
# Deprecation notice for legacy 'decimate' and 'transform' groups
try:
import warnings
cmd = getattr(args, "cmd", None)
if cmd == "decimate":
warnings.warn(
"'decimate' is deprecated; use 'export' or 'disseminate'",
category=UserWarning,
stacklevel=1,
)
if cmd == "transform":
warnings.warn(
"'transform' is merged into 'process'; use 'process'",
category=UserWarning,
stacklevel=1,
)
except Exception:
pass
if getattr(args, "verbose", False):
os.environ["ZYRA_VERBOSITY"] = "debug"
os.environ["DATAVIZHUB_VERBOSITY"] = "debug"
elif getattr(args, "quiet", False):
os.environ["ZYRA_VERBOSITY"] = "quiet"
os.environ["DATAVIZHUB_VERBOSITY"] = "quiet"
else:
os.environ.setdefault("ZYRA_VERBOSITY", "info")
os.environ.setdefault("DATAVIZHUB_VERBOSITY", "info")
# Configure logging based on env verbosity (idempotent)
try:
from zyra.utils.cli_helpers import configure_logging_from_env as _cfg_log
_cfg_log(default=os.environ.get("ZYRA_VERBOSITY", "info"))
except Exception:
pass
return args.func(args)
if __name__ == "__main__":
raise SystemExit(main())