Source code for zyra.connectors.backends.http

# SPDX-License-Identifier: Apache-2.0
"""HTTP connector backend.

Provides functional helpers to fetch and list resources over HTTP(S), as well
as convenience utilities for GRIB workflows (``.idx`` parsing and byte-range
downloads) and Content-Length probing.

Functions are intentionally small and dependency-light so they can be used by
the CLI, pipelines, or higher-level wrappers without imposing heavy imports.
"""

from __future__ import annotations

import re
from typing import Iterable
from urllib.parse import urljoin

# Make a module-level "requests" attribute patchable in tests even when the
# optional dependency is not installed in the environment.
try:  # pragma: no cover - env dependent
    import requests as requests  # type: ignore
except Exception:  # pragma: no cover

    class _RequestsProxy:
        pass

    requests = _RequestsProxy()  # type: ignore


[docs] def fetch_bytes( url: str, *, timeout: int = 60, headers: dict[str, str] | None = None ) -> bytes: """Return the raw response body for a GET request. Parameters - url: HTTP(S) URL to fetch. - timeout: request timeout in seconds. """ try: import requests # type: ignore except Exception as exc: # pragma: no cover - optional dep raise RuntimeError("HTTP backend requires the 'requests' extra") from exc r = requests.get(url, timeout=timeout, headers=headers) r.raise_for_status() return r.content
[docs] def fetch_text( url: str, *, timeout: int = 60, headers: dict[str, str] | None = None ) -> str: """Return the response body as text for a GET request.""" try: import requests # type: ignore except Exception as exc: # pragma: no cover - optional dep raise RuntimeError("HTTP backend requires the 'requests' extra") from exc r = requests.get(url, timeout=timeout, headers=headers) r.raise_for_status() return r.text
[docs] def fetch_json(url: str, *, timeout: int = 60, headers: dict[str, str] | None = None): """Return the parsed JSON body for a GET request.""" try: import requests # type: ignore except Exception as exc: # pragma: no cover - optional dep raise RuntimeError("HTTP backend requires the 'requests' extra") from exc r = requests.get(url, timeout=timeout, headers=headers) r.raise_for_status() return r.json()
[docs] def post_data( url: str, data: bytes, *, timeout: int = 60, content_type: str | None = None, headers: dict[str, str] | None = None, ) -> int: """POST raw bytes to a URL and return the HTTP status code.""" try: import requests # type: ignore except Exception as exc: # pragma: no cover - optional dep raise RuntimeError("HTTP backend requires the 'requests' extra") from exc final_headers = dict(headers or {}) if content_type and "Content-Type" not in final_headers: final_headers["Content-Type"] = content_type r = requests.post( url, data=data, headers=final_headers or None, timeout=timeout, ) r.raise_for_status() return r.status_code
[docs] def post_bytes( url: str, data: bytes, *, timeout: int = 60, content_type: str | None = None, headers: dict[str, str] | None = None, ) -> int: """Backward-compat wrapper for ``post_data``.""" return post_data( url, data, timeout=timeout, content_type=content_type, headers=headers, )
[docs] def list_files( url: str, pattern: str | None = None, *, timeout: int = 60, headers: dict[str, str] | None = None, ) -> list[str]: """Best-effort directory listing by scraping anchor tags on index pages. Returns absolute URLs; optionally filters them via regex ``pattern``. """ try: import requests # type: ignore except Exception as exc: # pragma: no cover - optional dep raise RuntimeError("HTTP backend requires the 'requests' extra") from exc r = requests.get(url, timeout=timeout, headers=headers) r.raise_for_status() text = r.text hrefs = re.findall(r'href=["\']([^"\']+)["\']', text, re.IGNORECASE) results: list[str] = [] for href in hrefs: if href.startswith("?") or href.startswith("#"): continue results.append(urljoin(url, href)) if pattern: rx = re.compile(pattern) results = [u for u in results if rx.search(u)] return results
[docs] def get_idx_lines( url: str, *, timeout: int = 60, max_retries: int = 3, headers: dict[str, str] | None = None, ) -> list[str]: """Fetch and parse the GRIB ``.idx`` file for a URL.""" try: import requests # type: ignore except Exception as exc: # pragma: no cover - optional dep raise RuntimeError("HTTP backend requires the 'requests' extra") from exc from zyra.utils.grib import ensure_idx_path, parse_idx_lines idx_url = ensure_idx_path(url) # Determine exception type from requests or fall back to Exception for tests ReqExc = getattr(requests, "RequestException", Exception) attempt = 0 last_exc = None while attempt < max_retries: try: r = requests.get(idx_url, timeout=timeout, headers=headers) r.raise_for_status() return parse_idx_lines(r.content) except ReqExc as e: # pragma: no cover - simple retry wrapper last_exc = e attempt += 1 if last_exc: raise last_exc return []
[docs] def download_byteranges( url: str, byte_ranges: Iterable[str], *, max_workers: int = 10, timeout: int = 60, headers: dict[str, str] | None = None, ) -> bytes: """Download multiple byte ranges and concatenate in the input order.""" try: import requests # type: ignore except Exception as exc: # pragma: no cover - optional dep raise RuntimeError("HTTP backend requires the 'requests' extra") from exc base_headers = dict(headers or {}) def _ranged_get(u: str, range_header: str) -> bytes: request_headers = dict(base_headers) request_headers["Range"] = range_header r = requests.get(u, headers=request_headers, timeout=timeout) r.raise_for_status() return r.content from zyra.utils.grib import parallel_download_byteranges return parallel_download_byteranges( _ranged_get, url, byte_ranges, max_workers=max_workers )
[docs] def get_size( url: str, *, timeout: int = 60, headers: dict[str, str] | None = None ) -> int | None: """Return Content-Length for a URL via HTTP HEAD when provided.""" try: import requests # type: ignore except Exception as exc: # pragma: no cover - optional dep raise RuntimeError("HTTP backend requires the 'requests' extra") from exc try: r = requests.head(url, timeout=timeout, headers=headers) r.raise_for_status() val = r.headers.get("Content-Length") return int(val) if val is not None else None except (requests.RequestException, ValueError, TypeError): return None