Source code for zyra.connectors.discovery

# SPDX-License-Identifier: Apache-2.0
"""Dataset discovery backends and CLI for `zyra search`.

Implements a lightweight local backend that searches the packaged
SOS dataset catalog at `zyra.assets.metadata/sos_dataset_metadata.json`.

Also includes a federated API search subcommand, ``zyra search api``, that can
query one or more Zyra-like APIs exposing ``/search`` (or equivalent via
OpenAPI). Results are normalized and can be emitted as JSON/CSV or displayed
as a compact table.

Usage examples:
  - zyra search "tsunami"
  - zyra search "GFS" --json
  - Use the selected URI with the matching connector, e.g.:
      zyra acquire ftp "$(zyra search 'earthquake' --select 1)" -o out.bin

API search examples:
  - zyra search api --url https://zyra.noaa.gov/api --query "temperature"
  - zyra search api --url https://zyra.a.edu/api --url https://zyra.b.edu/api \
        --query "precipitation" --table
"""

from __future__ import annotations

import argparse
import json
import re
from dataclasses import dataclass
from typing import Any, Iterable

try:  # optional observability helpers for redaction
    from zyra.api.utils.obs import _redact
except Exception:  # pragma: no cover - fallback when API extras missing

    def _redact(value: Any) -> Any:  # type: ignore[override]
        return value


from zyra.connectors.credentials import (
    CredentialResolutionError,
    apply_auth_header,
    apply_http_credentials,
    resolve_credentials,
)

try:  # Prefer standard library importlib.resources
    from importlib import resources as importlib_resources
except Exception:  # pragma: no cover - fallback for very old Python
    import importlib_resources  # type: ignore


[docs] @dataclass class DatasetMetadata: id: str name: str description: str | None source: str format: str uri: str
[docs] class DiscoveryBackend: """Interface for dataset discovery backends."""
[docs] def search(self, query: str, *, limit: int = 10) -> list[DatasetMetadata]: raise NotImplementedError
[docs] class LocalCatalogBackend(DiscoveryBackend): """Local backend backed by the packaged SOS catalog JSON. File: `zyra.assets.metadata/sos_dataset_metadata.json` Schema (subset of fields used here): - url (str): public catalog URL - title (str): dataset title - description (str): dataset description - keywords (list[str]): search tags - ftp_download (str|None): FTP base path to assets (preferred URI) """ _cache: list[dict[str, Any]] | None = None def __init__( self, catalog_path: str | None = None, *, weights: dict[str, int] | None = None ) -> None: self._catalog_path = catalog_path self._weights = weights or {} def _load(self) -> list[dict[str, Any]]: if self._cache is not None: return self._cache data: list[dict[str, Any]] if self._catalog_path: from pathlib import Path # Support packaged resource references: pkg:package/resource or pkg:package:resource cp = str(self._catalog_path) if cp.startswith("pkg:"): ref = cp[4:] if ":" in ref and "/" not in ref: pkg, res = ref.split(":", 1) else: parts = ref.split("/", 1) pkg = parts[0] res = parts[1] if len(parts) > 1 else "sos_dataset_metadata.json" path = importlib_resources.files(pkg).joinpath(res) with importlib_resources.as_file(path) as p: data = json.loads(p.read_text(encoding="utf-8")) else: # Optional allowlist: if env vars are set, require catalog under one of them try: # First, perform a syntactic containment check without resolving symlinks. # This guards obvious traversal attempts before any filesystem resolution. try: import os as _os def _syntactic_allowed(p0: str, envs0: list[str]) -> bool: tgt = _os.path.abspath(_os.path.normpath(str(p0))) # noqa: PTH100 have_env = False for _env in envs0: base = _os.getenv(_env) if not base: continue have_env = True base_abs = _os.path.abspath(_os.path.normpath(base)) # noqa: PTH100 try: if _os.path.commonpath([tgt, base_abs]) == base_abs: return True except Exception: continue return not have_env allowed_envs = ["ZYRA_CATALOG_DIR", "DATA_DIR"] if not _syntactic_allowed(cp, allowed_envs): raise ValueError( "catalog_file not allowed; must be under ZYRA_CATALOG_DIR or DATA_DIR" ) except Exception: pass def _is_under_allowed(p: str, envs: list[str]) -> bool: """Robust path containment: resolve symlinks and compare real paths. Uses Path.resolve() on both target and base to prevent traversal via symlinks and to normalize case/segments where applicable. """ try: import os as _os tgt_res = ( Path(str(p)).resolve() ) # lgtm [py/uncontrolled-data-in-path-expression] allowed_set = False for env in envs: base = _os.getenv(env) if not base: continue allowed_set = True try: base_res = Path(base).resolve() # Python 3.9+: is_relative_to ensures proper path semantics if hasattr(tgt_res, "is_relative_to"): if tgt_res.is_relative_to(base_res): return True else: # pragma: no cover - old Python fallback if str(tgt_res).startswith(str(base_res)): return True except Exception: continue # If no allowlist env is set, allow by default; otherwise deny return not allowed_set except Exception: return False allowed_envs = ["ZYRA_CATALOG_DIR", "DATA_DIR"] if not _is_under_allowed(cp, allowed_envs): raise ValueError( "catalog_file not allowed; must be under ZYRA_CATALOG_DIR or DATA_DIR" ) except Exception: # Fall through; Path read may still raise appropriately pass # Optional allowlist already enforced above when envs are set. # Explicitly deny suspicious traversal patterns prior to resolving. _raw = str(cp) _scan = _raw.replace("\\", "/") _parts = [p for p in _scan.split("/") if p] if any(p == ".." for p in _parts): raise ValueError( "catalog_file contains parent traversal segment ('..')" ) # Resolve to a normalized absolute path (handles symlinks) before reading. resolved = Path(cp).resolve() # Re-validate containment post-resolve for defense-in-depth try: allowed_envs = ["ZYRA_CATALOG_DIR", "DATA_DIR"] # Reuse helper from above scope if not _is_under_allowed(str(resolved), allowed_envs): raise ValueError( "catalog_file not allowed after resolve; must be under ZYRA_CATALOG_DIR or DATA_DIR" ) except Exception: # If the helper isn't available in scope due to refactor, fall back to direct read error pass if not resolved.is_file(): raise FileNotFoundError(str(resolved)) data = json.loads( resolved.read_text(encoding="utf-8") ) # lgtm [py/path-injection] [py/uncontrolled-data-in-path-expression] else: pkg = "zyra.assets.metadata" path = importlib_resources.files(pkg).joinpath("sos_dataset_metadata.json") with importlib_resources.as_file(path) as p: data = json.loads(p.read_text(encoding="utf-8")) # Store as-is; we'll normalize per-result on demand self._cache = data return data def _match_score(self, item: dict[str, Any], rx: re.Pattern[str]) -> int: title = str(item.get("title") or "") desc = str(item.get("description") or "") keywords = item.get("keywords") or [] score = 0 if rx.search(title): score += int(self._weights.get("title", 3)) if rx.search(desc): score += int(self._weights.get("description", 2)) for kw in keywords: if isinstance(kw, str) and rx.search(kw): score += int(self._weights.get("keywords", 1)) return score @staticmethod def _slug_from_url(url: str) -> str: # e.g., https://sos.noaa.gov/catalog/datasets/tsunami-history/ -> tsunami-history m = re.search(r"/datasets/([^/]+)/?", url) if m: return m.group(1) # Fallback: derive a compact slug and cap length with a hash suffix for stability try: import hashlib from urllib.parse import urlparse parsed = urlparse(url) path_parts = [p for p in (parsed.path or "").split("/") if p] # Prefer last path segment; otherwise use host or whole URL as candidate candidate = path_parts[-1] if path_parts else (parsed.netloc or url) slug = re.sub(r"\W+", "-", candidate).strip("-") or "item" MAX = 64 if len(slug) > MAX: h = hashlib.sha1(url.encode("utf-8")).hexdigest()[:8] slug = slug[: MAX - 9].rstrip("-") + "-" + h return slug except Exception: return re.sub(r"\W+", "-", url).strip("-")[:64] def _normalize(self, item: dict[str, Any]) -> DatasetMetadata: url = str(item.get("url") or "") title = str(item.get("title") or "") desc = str(item.get("description") or "") or None ftp = item.get("ftp_download") uri = str(ftp or url) fmt = "FTP" if ftp else "HTML" return DatasetMetadata( id=self._slug_from_url(url) if url else title.lower().replace(" ", "-"), name=title or (url or uri), description=desc, source="sos-catalog", format=fmt, uri=uri, )
[docs] def search(self, query: str, *, limit: int = 10) -> list[DatasetMetadata]: data = self._load() # Token-aware matching: break the query into words and score per-token tokens = [t for t in re.split(r"\W+", query) if t] token_patterns = [ re.compile(re.escape(t), re.IGNORECASE) for t in tokens if len(t) >= 3 ] scored: list[tuple[int, dict[str, Any]]] = [] if token_patterns: for it in data: s = 0 for pat in token_patterns: s += self._match_score(it, pat) if s > 0: scored.append((s, it)) else: # Fallback to phrase search when no meaningful tokens extracted rx = re.compile(re.escape(query), re.IGNORECASE) for it in data: s = self._match_score(it, rx) if s > 0: scored.append((s, it)) # Sort by score desc, then title asc for stability scored.sort(key=lambda t: (-t[0], str(t[1].get("title") or ""))) results = [self._normalize(it) for _, it in scored[: max(0, limit) or None]] return results
def _print_table(items: Iterable[DatasetMetadata]) -> None: rows = [("ID", "Name", "Format", "URI")] for d in items: rows.append((d.id, d.name, d.format, d.uri)) # Compute simple column widths with caps caps = (32, 40, 10, 60) widths = [ min(max(len(str(r[i])) for r in rows), caps[i]) for i in range(len(rows[0])) ] def _fit(s: str, w: int) -> str: if len(s) <= w: return s.ljust(w) return s[: max(0, w - 1)] + "\u2026" for i, r in enumerate(rows): line = " ".join(_fit(str(r[j]), widths[j]) for j in range(len(r))) print(line) if i == 0: print(" ".join("-" * w for w in widths))
[docs] def register_cli(p: argparse.ArgumentParser) -> None: # Optional subcommands under `zyra search`. # Keep base (no subcommand) behavior intact for local/OGC discovery. sub = p.add_subparsers(dest="search_cmd", required=False) # ----------------------------- # api: federated API search # ----------------------------- p_api = sub.add_parser( "api", help=("Query remote Zyra-like APIs via /search; supports federated URLs"), ) p_api.add_argument( "--url", action="append", required=True, help=("Base API URL (repeatable). Examples: https://zyra.noaa.gov/api"), ) p_api.add_argument( "--query", dest="api_query", required=True, help="Search query to pass to remote APIs", ) p_api.add_argument( "-l", "--limit", type=int, default=10, help="Maximum results per source (default: 10)", ) p_api.add_argument( "--json", action="store_true", help="Output in JSON (default)", ) p_api.add_argument( "--csv", action="store_true", help="Output in CSV (columns: source,dataset,description,link)", ) p_api.add_argument( "--table", action="store_true", help="Pretty table output to the terminal", ) p_api.add_argument( "-v", "--verbose", action="store_true", help="Print debug info about requests and response shapes", ) # Request shaping p_api.add_argument( "--param", action="append", help="Extra query param k=v (repeatable)", ) p_api.add_argument( "--header", action="append", help="HTTP header `key=value` or `Key: Value` (repeatable)", ) p_api.add_argument( "--timeout", type=float, default=30.0, help="Request timeout seconds (default: 30)", ) p_api.add_argument( "--retries", type=int, default=0, help="Retry attempts for transient failures (default: 0)", ) p_api.add_argument( "--concurrency", type=int, default=4, help="Parallel requests across URLs (default: 4)", ) # Endpoint control p_api.add_argument( "--no-openapi", action="store_true", help="Skip OpenAPI discovery and use /search", ) p_api.add_argument( "--endpoint", help="Override endpoint path (e.g., /search/items)", ) p_api.add_argument( "--qp", help="Override query parameter name (default: q)", ) p_api.add_argument( "--result-key", help="Read array from this key when response is an object (e.g., items)", ) # POST support p_api.add_argument( "--post", action="store_true", help="Use POST /search with JSON body instead of GET", ) p_api.add_argument( "--json-param", action="append", help="JSON body field k=v (repeatable)", ) p_api.add_argument( "--json-body", help="JSON body as raw string or @/path/to/file.json", ) p_api.add_argument( "--credential", action="append", dest="credential", help=( "Credential slot (repeatable), e.g., token=$API_TOKEN or header.Authorization=Bearer abc" ), ) p_api.add_argument( "--credential-file", help="Optional dotenv file for @NAME credential lookups", ) p_api.add_argument( "--auth", help=( "Convenience auth helper (e.g., bearer:$TOKEN, basic:user:pass, header:Name:Value)" ), ) p_api.add_argument( "--url-credential", action="append", nargs=2, metavar=("URL", "ENTRY"), help=( "Per-URL credential entry (repeatable), e.g., --url-credential https://api.example token=$TOKEN" ), ) p_api.add_argument( "--url-auth", action="append", nargs=2, metavar=("URL", "AUTH"), help=( "Per-URL auth helper (same syntax as --auth), e.g., --url-auth https://api.example bearer:$TOKEN" ), ) # Output shaping p_api.add_argument( "--fields", help=( "Comma-separated output fields for CSV/table (default: source,dataset,link)" ), ) p_api.add_argument( "--limit-total", type=int, help="Limit total aggregated results across URLs", ) p_api.add_argument( "--dedupe", choices=["dataset", "link"], help="Drop duplicates by key (dataset or link)", ) p_api.add_argument( "--sort", help="Sort by comma-separated keys (e.g., source,dataset)", ) # OpenAPI diagnostics p_api.add_argument( "--print-openapi", action="store_true", help="Print discovered endpoint and query params for each URL", ) p_api.add_argument( "--suggest-flags", action="store_true", help="Suggest simple OpenAPI query params that can be passed via --param", ) def _parse_header_args(values: Iterable[str] | None) -> dict[str, str]: parsed: dict[str, str] = {} if not values: return parsed for raw in values: if raw is None: continue s = str(raw) if ":" in s: key, val = s.split(":", 1) elif "=" in s: key, val = s.split("=", 1) else: continue key = key.strip() val = val.strip() if key: parsed[key] = val return parsed def _cmd_api(ns: argparse.Namespace) -> int: try: from .api_search import federated_api_search, print_api_table urls: list[str] = [u.strip() for u in (ns.url or []) if u and u.strip()] if not urls: print( "error: provide at least one --url", file=__import__("sys").stderr ) return 2 eff_query = ( getattr(ns, "api_query", None) or getattr(ns, "query", None) or getattr(ns, "q", None) ) if not eff_query: print( "error: missing --query; pass it after the subcommand (e.g., zyra search api --query 'text')", file=__import__("sys").stderr, ) return 2 # OpenAPI diagnostics if ns.print_openapi or ns.suggest_flags: from .api_search import print_openapi_info for u in urls: print_openapi_info( u, suggest=bool(ns.suggest_flags), verbose=bool(ns.verbose) ) return 0 header_entries = _parse_header_args(getattr(ns, "header", None)) credential_entries = list(getattr(ns, "credential", []) or []) if credential_entries: try: resolved = resolve_credentials( credential_entries, credential_file=getattr(ns, "credential_file", None), ) except CredentialResolutionError as exc: print(f"error: {exc}", file=__import__("sys").stderr) return 2 apply_http_credentials(header_entries, resolved.values) apply_auth_header(header_entries, getattr(ns, "auth", None)) scoped_credential_entries: dict[str, list[str]] = {} for scope, entry in getattr(ns, "url_credential", []) or []: if scope and entry: scoped_credential_entries.setdefault(str(scope), []).append( str(entry) ) headers_by_url: dict[str, dict[str, str]] = {} def _ensure_scoped_headers(scope: str) -> dict[str, str]: scoped = headers_by_url.get(scope) if scoped is None: scoped = header_entries.copy() if header_entries else {} headers_by_url[scope] = scoped return scoped for scope, entries in scoped_credential_entries.items(): try: resolved = resolve_credentials( entries, credential_file=getattr(ns, "credential_file", None), ) except CredentialResolutionError as exc: print(f"error: {exc}", file=__import__("sys").stderr) return 2 scoped_headers = _ensure_scoped_headers(scope) scoped_headers.pop("Authorization", None) apply_http_credentials(scoped_headers, resolved.values) for scope, auth_value in getattr(ns, "url_auth", []) or []: if not scope or not auth_value: continue scoped_headers = _ensure_scoped_headers(str(scope)) scoped_headers.pop("Authorization", None) apply_auth_header(scoped_headers, str(auth_value)) rows = federated_api_search( urls, eff_query, limit=int(ns.limit or 10), verbose=bool(ns.verbose), params=list(ns.param or []) if ns.param else None, headers=header_entries if header_entries else None, headers_by_url=headers_by_url if headers_by_url else None, timeout=float(ns.timeout or 30.0), retries=int(ns.retries or 0), concurrency=int(ns.concurrency or 4), no_openapi=bool(ns.no_openapi), endpoint=ns.endpoint, qp_name=ns.qp, result_key=ns.result_key, use_post=bool(ns.post), json_params=list(ns.json_param or []) if ns.json_param else None, json_body=ns.json_body, ) # Output shaping if ns.dedupe: key = ns.dedupe seen: set[str] = set() deduped = [] for r in rows: v = str(r.get(key) or "") if v and v not in seen: seen.add(v) deduped.append(r) rows = deduped if ns.sort: keys = [k.strip() for k in ns.sort.split(",") if k.strip()] rows = sorted( rows, key=lambda r: tuple(str(r.get(k) or "") for k in keys) ) if ns.limit_total: rows = rows[: max(0, int(ns.limit_total)) or None] # Default to JSON unless CSV/table explicitly requested fields = ( [s.strip() for s in ns.fields.split(",") if s.strip()] if ns.fields else None ) if ns.csv: import csv import sys cols = fields or ["source", "dataset", "description", "link"] w = csv.DictWriter(sys.stdout, fieldnames=cols) w.writeheader() for r in rows: w.writerow({k: r.get(k) for k in cols}) return 0 if ns.table: print_api_table(rows, fields=fields) return 0 # JSON (default) print(__import__("json").dumps(rows, indent=2)) return 0 except Exception as e: # pragma: no cover - defensive print(f"API search failed: {e}", file=__import__("sys").stderr) return 2 p_api.set_defaults(func=_cmd_api) # ----------------------------- # Base `zyra search` arguments # ----------------------------- p.add_argument( "query", nargs="?", help="Search query (matches title/keywords/description)", ) p.add_argument( "--query", "-q", dest="q", help="Search query (alternative to positional)", ) p.add_argument( "-l", "--limit", type=int, default=10, help="Maximum number of results (default: 10)", ) p.add_argument( "--catalog-file", help=("Path to a local catalog JSON file (overrides packaged SOS catalog)"), ) p.add_argument( "--include-local", action="store_true", help=("When remote sources are provided, also include local catalog results"), ) p.add_argument( "--profile", help=("Name of a bundled profile under zyra.assets.profiles (e.g., sos)"), ) p.add_argument( "--profile-file", help=( "Path to a JSON profile describing sources (local/ogc) and optional scoring weights" ), ) p.add_argument( "--semantic-analyze", action="store_true", help=( "Perform general search and send results to LLM for analysis/ranking (prints summary and picks)" ), ) p.add_argument( "--analysis-limit", type=int, default=20, help="Max number of results to include in LLM analysis (default: 20)", ) p.add_argument( "--semantic", help=( "Natural-language semantic search; plans sources via LLM and executes backends" ), ) p.add_argument( "--show-plan", action="store_true", help="Print the generated semantic search plan (JSON)", ) # Optional remote discovery: OGC WMS capabilities p.add_argument( "--ogc-wms", help=( "WMS GetCapabilities URL to search (remote). If provided, results" " include remote matches; use --remote-only to skip local catalog." ), ) p.add_argument( "--remote-only", action="store_true", help="When used with --ogc-wms, only show remote results", ) p.add_argument( "--ogc-records", help=( "OGC API - Records items URL to search (remote). Often ends with /collections/{id}/items" ), ) p.add_argument( "--json", action="store_true", help="Output results in JSON format", ) p.add_argument( "--yaml", action="store_true", help="Output results in YAML format", ) p.add_argument( "--select", type=int, help=("Select a single result by 1-based index and print its URI only"), ) # Enrichment flags (Phase 2) p.add_argument( "--enrich", choices=["shallow", "capabilities", "probe"], help=( "Optional metadata enrichment level: shallow|capabilities|probe (bounded, cached)" ), ) p.add_argument( "--enrich-timeout", type=float, default=3.0, help="Per-item soft timeout seconds for enrichment (default: 3.0)", ) p.add_argument( "--enrich-workers", type=int, default=4, help="Enrichment concurrency (workers). Default: 4", ) p.add_argument( "--cache-ttl", type=int, default=86400, help="Enrichment cache TTL seconds. Default: 86400 (1 day)", ) p.add_argument( "--enrich-async", action="store_true", help=( "Request async enrichment (API-driven). CLI-only runs print a warning and run sync." ), ) p.add_argument( "--offline", action="store_true", help=("Disable network fetches during enrichment; local files only"), ) p.add_argument( "--https-only", action="store_true", help=("Require HTTPS for any remote probing"), ) p.add_argument( "--allow-host", action="append", dest="allow_hosts", help=("Explicitly allow a host suffix for remote probing (can repeat)"), ) p.add_argument( "--deny-host", action="append", dest="deny_hosts", help=("Explicitly deny a host suffix for remote probing (can repeat)"), ) p.add_argument( "--max-probe-bytes", type=int, help=("Maximum content length to probe (bytes); larger assets are skipped"), ) def _cmd(ns: argparse.Namespace) -> int: items: list[DatasetMetadata] = [] # Semantic search (LLM-planned) if getattr(ns, "semantic", None): try: items = _semantic_search(ns) except Exception as e: print(f"Semantic search failed: {e}", file=__import__("sys").stderr) return 2 return _emit_results(ns, items) # Semantic analysis: general search then LLM analysis over results if getattr(ns, "semantic_analyze", None): try: return _semantic_analyze(ns) except Exception as e: print(f"Semantic analysis failed: {e}", file=__import__("sys").stderr) return 2 # Non-semantic path requires a query effective_query = getattr(ns, "query", None) or getattr(ns, "q", None) if not effective_query: print( "error: the following arguments are required: query", file=__import__("sys").stderr, ) return 2 # Optional profile(s) prof_sources: dict[str, Any] = {} prof_weights: dict[str, int] = {} prof_defaults: dict[str, Any] = {} prof_license_policy: dict[str, Any] = {} defaults_sources: list[str] = [] # Bundled profile by name if getattr(ns, "profile", None): try: pkg = "zyra.assets.profiles" res = f"{ns.profile}.json" path = importlib_resources.files(pkg).joinpath(res) with importlib_resources.as_file(path) as p: prof0 = json.loads(p.read_text(encoding="utf-8")) prof_sources.update(dict(prof0.get("sources") or {})) prof_weights.update( {k: int(v) for k, v in (prof0.get("weights") or {}).items()} ) enr = prof0.get("enrichment") or {} ed = enr.get("defaults") or {} if isinstance(ed, dict): prof_defaults.update(ed) lp = enr.get("license_policy") or {} if isinstance(lp, dict): prof_license_policy.update(lp) except Exception as e: print( f"Failed to load bundled profile '{ns.profile}': {e}", file=__import__("sys").stderr, ) return 2 # External profile file (overrides bundled) if getattr(ns, "profile_file", None): try: from pathlib import Path prof = json.loads(Path(ns.profile_file).read_text(encoding="utf-8")) prof_sources.update(dict(prof.get("sources") or {})) prof_weights.update( {k: int(v) for k, v in (prof.get("weights") or {}).items()} ) enr = prof.get("enrichment") or {} ed = enr.get("defaults") or {} if isinstance(ed, dict): prof_defaults.update(ed) lp = enr.get("license_policy") or {} if isinstance(lp, dict): prof_license_policy.update(lp) except Exception as e: print(f"Failed to load profile: {e}", file=__import__("sys").stderr) return 2 # Decide local inclusion consistently via helper from .utils import compute_inclusion include_local, any_remote, cat_path = compute_inclusion( getattr(ns, "ogc_wms", None), getattr(ns, "ogc_records", None), prof_sources, remote_only=bool(getattr(ns, "remote_only", False)), include_local_flag=bool(getattr(ns, "include_local", False)), catalog_file_flag=getattr(ns, "catalog_file", None), ) if include_local: items.extend( LocalCatalogBackend(cat_path, weights=prof_weights).search( effective_query, limit=ns.limit ) ) # If no profile provided, auto-scope SOS defaults to local items if not getattr(ns, "profile", None) and not getattr( ns, "profile_file", None ): try: pkg = "zyra.assets.profiles" res = "sos.json" path = importlib_resources.files(pkg).joinpath(res) with importlib_resources.as_file(path) as p: import json as _json prof0 = _json.loads(p.read_text(encoding="utf-8")) enr = prof0.get("enrichment") or {} ed = enr.get("defaults") or {} if isinstance(ed, dict): prof_defaults.update(ed) lp = enr.get("license_policy") or {} if isinstance(lp, dict): prof_license_policy.update(lp) defaults_sources = ["sos-catalog"] except Exception: pass # Optional remote OGC WMS # Remote WMS: combine CLI flag and profile list wms_urls = [] if getattr(ns, "ogc_wms", None): wms_urls.append(ns.ogc_wms) prof_wms = prof_sources.get("ogc_wms") or [] if isinstance(prof_wms, list): wms_urls.extend([u for u in prof_wms if isinstance(u, str)]) for wurl in wms_urls: try: from .ogc import OGCWMSBackend rem = OGCWMSBackend(wurl, weights=prof_weights).search( effective_query, limit=ns.limit ) items.extend(rem) except Exception as e: print( f"Remote OGC WMS search failed: {e}", file=__import__("sys").stderr, ) # Remote OGC API - Records: combine CLI flag and profile list rec_urls = [] if getattr(ns, "ogc_records", None): rec_urls.append(ns.ogc_records) prof_rec = prof_sources.get("ogc_records") or [] if isinstance(prof_rec, list): rec_urls.extend([u for u in prof_rec if isinstance(u, str)]) for rurl in rec_urls: try: from .ogc_records import OGCRecordsBackend rec = OGCRecordsBackend(rurl, weights=prof_weights).search( effective_query, limit=ns.limit ) items.extend(rec) except Exception as e: print( f"Remote OGC Records search failed: {e}", file=__import__("sys").stderr, ) # Optional enrichment if getattr(ns, "enrich", None): try: from zyra.transform.enrich import enrich_items items = enrich_items( items, level=str(ns.enrich), timeout=float(getattr(ns, "enrich_timeout", 3.0) or 3.0), workers=int(getattr(ns, "enrich_workers", 4) or 4), cache_ttl=int(getattr(ns, "cache_ttl", 86400) or 86400), offline=bool(getattr(ns, "offline", False) or False), https_only=bool(getattr(ns, "https_only", False) or False), allow_hosts=list(getattr(ns, "allow_hosts", []) or []), deny_hosts=list(getattr(ns, "deny_hosts", []) or []), max_probe_bytes=(getattr(ns, "max_probe_bytes", None)), profile_defaults=prof_defaults, profile_license_policy=prof_license_policy, defaults_sources=defaults_sources, ) except Exception as e: print(f"Enrichment failed: {e}", file=__import__("sys").stderr) if getattr(ns, "enrich_async", False): print( "note: --enrich-async is only available via the API; returned sync results", file=__import__("sys").stderr, ) return _emit_results(ns, items) p.set_defaults(func=_cmd)
def _emit_results(ns: argparse.Namespace, items: list[DatasetMetadata]) -> int: # Respect overall limit items = items[: max(0, ns.limit) or None] if ns.select is not None: idx = ns.select if idx < 1 or idx > len(items): print( f"--select index out of range (1..{len(items)})", file=__import__("sys").stderr, ) return 2 print(items[idx - 1].uri) return 0 if ns.json and ns.yaml: print("Choose one of --json or --yaml", file=__import__("sys").stderr) return 2 if ns.json: from zyra.utils.serialize import to_list out = to_list(items) print(json.dumps(out, indent=2)) return 0 if ns.yaml: try: import yaml # type: ignore from zyra.utils.serialize import to_list out = to_list(items) print(yaml.safe_dump(out, sort_keys=False)) return 0 except Exception: print( "PyYAML is not installed. Use --json or install PyYAML.", file=__import__("sys").stderr, ) return 2 _print_table(items) return 0 def _semantic_search(ns: argparse.Namespace) -> list[DatasetMetadata]: # Plan with the same LLM provider/model as Wizard from zyra.wizard import _select_provider # type: ignore[attr-defined] from zyra.wizard.prompts import load_semantic_search_prompt client = _select_provider(None, None) sys_prompt = load_semantic_search_prompt() user = ( "Given a user's dataset request, produce a minimal JSON search plan.\n" f"User request: {ns.semantic}\n" "If unsure about endpoints, prefer profile 'sos'. Keep keys minimal." ) plan_raw = client.generate(sys_prompt, user) try: plan = json.loads(plan_raw.strip()) except Exception: plan = {"query": ns.semantic, "profile": "sos"} # Apply CLI overrides if getattr(ns, "limit", None): plan["limit"] = ns.limit # Heuristic: switch from 'sos' when SST/NASA or pygeoapi terms detected q = str(plan.get("query") or ns.semantic) wms_urls = plan.get("ogc_wms") or [] rec_urls = plan.get("ogc_records") or [] profile = plan.get("profile") if (not profile or profile == "sos") and not wms_urls and not rec_urls: ql = q.lower() if "sea surface temperature" in ql or "sst" in ql or "nasa" in ql: plan["profile"] = "gibs" elif "lake" in ql or "pygeoapi" in ql: plan["profile"] = "pygeoapi" # Execute using the same backends as normal search from zyra.connectors.discovery.ogc import OGCWMSBackend from zyra.connectors.discovery.ogc_records import OGCRecordsBackend q = str(plan.get("query") or ns.semantic) limit = int(plan.get("limit", ns.limit or 10)) include_local = bool(plan.get("include_local", False)) remote_only = bool(plan.get("remote_only", False)) profile = plan.get("profile") catalog_file = plan.get("catalog_file") wms_urls = plan.get("ogc_wms") or [] rec_urls = plan.get("ogc_records") or [] prof_sources: dict[str, Any] = {} prof_weights: dict[str, int] = {} if isinstance(profile, str) and profile: from contextlib import suppress from importlib import resources as ir with suppress(Exception): base = ir.files("zyra.assets.profiles").joinpath(profile + ".json") with ir.as_file(base) as p: pr = json.loads(p.read_text(encoding="utf-8")) prof_sources = dict(pr.get("sources") or {}) prof_weights = {k: int(v) for k, v in (pr.get("weights") or {}).items()} results: list[DatasetMetadata] = [] from .utils import compute_inclusion as _compute_inclusion include_local_eff, _any_remote, cat = _compute_inclusion( wms_urls, rec_urls, prof_sources, remote_only=bool(remote_only), include_local_flag=bool(include_local), catalog_file_flag=catalog_file, ) if include_local_eff: results.extend( LocalCatalogBackend(cat, weights=prof_weights).search(q, limit=limit) ) # Remote WMS prof_wms = prof_sources.get("ogc_wms") or [] if isinstance(prof_wms, list): wms_urls = list(wms_urls) + [u for u in prof_wms if isinstance(u, str)] from contextlib import suppress for u in wms_urls: with suppress(Exception): results.extend( OGCWMSBackend(u, weights=prof_weights).search(q, limit=limit) ) # Remote Records prof_rec = prof_sources.get("ogc_records") or [] if isinstance(prof_rec, list): rec_urls = list(rec_urls) + [u for u in prof_rec if isinstance(u, str)] for u in rec_urls: with suppress(Exception): results.extend( OGCRecordsBackend(u, weights=prof_weights).search(q, limit=limit) ) # Optional show-plan if getattr(ns, "show_plan", False): try: effective = { "query": q, "limit": limit, "profile": profile, "catalog_file": catalog_file, "include_local": include_local, "remote_only": remote_only, "ogc_wms": wms_urls or None, "ogc_records": rec_urls or None, } safe_plan = _redact(plan) safe_effective = _redact({k: v for k, v in effective.items() if v}) print(json.dumps(safe_plan, indent=2)) print(json.dumps(safe_effective, indent=2)) except Exception: pass return results def _semantic_analyze(ns: argparse.Namespace) -> int: # 1) Perform a broad search using provided flags (reuse normal path) # We'll emulate non-semantic search execution path to collect items from types import SimpleNamespace temp_ns = SimpleNamespace(**vars(ns)) temp_ns.semantic = None # Build up items using the same code paths items: list[DatasetMetadata] = [] # Local eff_query = getattr(ns, "query", None) or getattr(ns, "q", None) or "" if not getattr(ns, "remote_only", False): items.extend( LocalCatalogBackend(getattr(ns, "catalog_file", None)).search( eff_query, limit=ns.limit ) ) # WMS if getattr(ns, "ogc_wms", None): from .ogc import OGCWMSBackend items.extend(OGCWMSBackend(ns.ogc_wms).search(eff_query, limit=ns.limit)) # Records if getattr(ns, "ogc_records", None): from .ogc_records import OGCRecordsBackend items.extend( OGCRecordsBackend(ns.ogc_records).search(eff_query, limit=ns.limit) ) # 2) Analyze via LLM import json as _json from zyra.utils.serialize import compact_dataset from zyra.wizard import _select_provider # type: ignore[attr-defined] from zyra.wizard.prompts import load_semantic_analysis_prompt ctx_items = [ compact_dataset(i, max_desc_len=240) for i in items[: max(1, getattr(ns, "analysis_limit", 20))] ] client = _select_provider(None, None) sys_prompt = load_semantic_analysis_prompt() user = _json.dumps({"query": eff_query, "results": ctx_items}) raw = client.generate(sys_prompt, user) try: analysis = _json.loads(raw.strip()) except Exception: analysis = {"summary": raw.strip(), "picks": []} # Emit analysis; respect --json out = { "query": eff_query, "items": ctx_items, "analysis": analysis, } safe_out = _redact(out) if getattr(ns, "json", False): print(_json.dumps(safe_out, indent=2)) else: summary = safe_out.get("analysis", {}).get("summary", "") print(summary) picks = safe_out.get("analysis", {}).get("picks", []) or [] if picks: print("") print("Top picks:") for p in picks: print(f"- {p.get('id')}: {p.get('reason')}") return 0