Source code for zyra.connectors.backends.ftp

# SPDX-License-Identifier: Apache-2.0
"""FTP connector backend.

Thin functional wrappers around the FTPManager to support simple byte fetches
and uploads, directory listing with regex/date filtering, sync-to-local flows,
and advanced GRIB workflows (``.idx`` handling, ranged downloads).

The URL parser supports anonymous and credentialed forms, e.g.:
``ftp://host/path``, ``ftp://user@host/path``, ``ftp://user:pass@host/path``.
"""

from __future__ import annotations

import contextlib
import logging
import re
import warnings
from datetime import datetime
from ftplib import FTP, all_errors
from io import BytesIO
from typing import Iterable

from zyra.utils.date_manager import DateManager
from zyra.utils.grib import compute_chunks, ensure_idx_path, parse_idx_lines

_DELEGATE_NONE = object()


[docs] class FTPManager: # pragma: no cover - test patch hook """Placeholder for tests to patch. The backend functions will attempt to delegate to this manager if present. Tests patch this attribute with a mock class exposing expected methods. """ pass
def _maybe_delegate(method: str, *args, **kwargs): # pragma: no cover - test hook """If a ``FTPManager`` attribute is present on this module (patched in tests), instantiate it and call the requested method. Returns ``_NO`` on failure. """ try: mgr = FTPManager() # type: ignore[name-defined, call-arg] fn = getattr(mgr, method) return fn(*args, **kwargs) except Exception: return _DELEGATE_NONE
[docs] def parse_ftp_path( url_or_path: str, *, username: str | None = None, password: str | None = None, ) -> tuple[str, str, str | None, str | None]: """Return ``(host, remote_path, username, password)`` parsed from an FTP path.""" s = url_or_path if s.startswith("ftp://"): s = s[len("ftp://") :] user = None pwd = None if "@" in s: auth, s = s.split("@", 1) if ":" in auth: user, pwd = auth.split(":", 1) else: user = auth if "/" not in s: raise ValueError("FTP path must be host/path") host, path = s.split("/", 1) if username is not None: if user is not None and username != user: warnings.warn( "Explicit FTP username overrides username embedded in the URL.", UserWarning, stacklevel=2, ) user = username if password is not None: if pwd is not None and password != pwd: warnings.warn( "Explicit FTP password overrides credentials embedded in the URL.", UserWarning, stacklevel=2, ) pwd = password return host, path, user, pwd
[docs] def fetch_bytes( url_or_path: str, *, username: str | None = None, password: str | None = None ) -> bytes: """Fetch a remote file as bytes from an FTP server.""" host, remote_path, user, pwd = parse_ftp_path( url_or_path, username=username, password=password ) ftp = FTP(timeout=30) ftp.connect(host) ftp.login(user=(user or "anonymous"), passwd=(pwd or "test@test.com")) ftp.set_pasv(True) directory = "" filename = remote_path if "/" in remote_path: directory, filename = remote_path.rsplit("/", 1) if directory: ftp.cwd(directory) buf = BytesIO() ftp.retrbinary(f"RETR {filename}", buf.write) with contextlib.suppress(Exception): ftp.quit() return buf.getvalue()
[docs] def upload_bytes( data: bytes, url_or_path: str, *, username: str | None = None, password: str | None = None, ) -> bool: """Upload bytes to a remote FTP path.""" host, remote_path, user, pwd = parse_ftp_path( url_or_path, username=username, password=password ) ftp = FTP(timeout=30) ftp.connect(host) ftp.login(user=(user or "anonymous"), passwd=(pwd or "test@test.com")) ftp.set_pasv(True) directory = "" filename = remote_path if "/" in remote_path: directory, filename = remote_path.rsplit("/", 1) if directory: ftp.cwd(directory) with BytesIO(data) as bio: ftp.storbinary(f"STOR {filename}", bio) with contextlib.suppress(Exception): ftp.quit() return True
[docs] def list_files( url_or_dir: str, pattern: str | None = None, *, since: str | None = None, until: str | None = None, date_format: str | None = None, username: str | None = None, password: str | None = None, ) -> list[str] | None: """List FTP directory contents with optional regex and date filtering.""" host, remote_dir, user, pwd = parse_ftp_path( url_or_dir, username=username, password=password ) ftp = FTP(timeout=30) ftp.connect(host) ftp.login(user=(user or "anonymous"), passwd=(pwd or "test@test.com")) ftp.set_pasv(True) ftp.cwd(remote_dir) try: names = ftp.nlst() except all_errors: names = [] if pattern: rx = re.compile(pattern) names = [n for n in names if rx.search(n)] if names is None: return None if since or until: dm = DateManager([date_format] if date_format else None) start = datetime.min if not since else datetime.fromisoformat(since) end = datetime.max if not until else datetime.fromisoformat(until) names = [n for n in names if dm.is_date_in_range(n, start, end)] return names
[docs] def exists( url_or_path: str, *, username: str | None = None, password: str | None = None ) -> bool: """Return True if the remote path exists on the FTP server.""" host, remote_path, user, pwd = parse_ftp_path( url_or_path, username=username, password=password ) ftp = FTP(timeout=30) ftp.connect(host) ftp.login(user=(user or "anonymous"), passwd=(pwd or "test@test.com")) ftp.set_pasv(True) directory = "" filename = remote_path if "/" in remote_path: directory, filename = remote_path.rsplit("/", 1) if directory: ftp.cwd(directory) try: files = ftp.nlst() return filename in files except all_errors: return False
[docs] def delete( url_or_path: str, *, username: str | None = None, password: str | None = None ) -> bool: """Delete a remote FTP path (file).""" host, remote_path, user, pwd = parse_ftp_path( url_or_path, username=username, password=password ) ftp = FTP(timeout=30) ftp.connect(host) ftp.login(user=(user or "anonymous"), passwd=(pwd or "test@test.com")) ftp.set_pasv(True) directory = "" filename = remote_path if "/" in remote_path: directory, filename = remote_path.rsplit("/", 1) if directory: ftp.cwd(directory) try: ftp.delete(filename) return True except all_errors: return False except Exception: # Test doubles in unit tests may raise a plain Exception rather than # an ftplib-specific error. Handle these as non-fatal and return False # to preserve the semantic of "delete failed / missing file" without # coupling tests to ftplib's exception hierarchy. return False
[docs] def stat(url_or_path: str, *, username: str | None = None, password: str | None = None): """Return minimal metadata mapping for a remote path (e.g., size).""" host, remote_path, user, pwd = parse_ftp_path( url_or_path, username=username, password=password ) ftp = FTP(timeout=30) ftp.connect(host) ftp.login(user=(user or "anonymous"), passwd=(pwd or "test@test.com")) ftp.set_pasv(True) directory = "" filename = remote_path if "/" in remote_path: directory, filename = remote_path.rsplit("/", 1) if directory: ftp.cwd(directory) try: size = ftp.size(filename) return {"size": int(size) if size is not None else None} except all_errors: return None
[docs] def sync_directory( url_or_dir: str, local_dir: str, *, pattern: str | None = None, since: str | None = None, until: str | None = None, date_format: str | None = None, clean_zero_bytes: bool = False, username: str | None = None, password: str | None = None, ) -> None: """Sync files from a remote FTP directory to a local directory. Applies regex/date filters prior to download; optionally removes local zero-byte files before syncing and deletes local files that are no longer present on the server. """ host, remote_dir, user, pwd = parse_ftp_path( url_or_dir, username=username, password=password ) if pattern is None and not (since or until): # Fast path placeholder reserved for future optimization. pass # List, filter, then fetch missing/zero-size files names = ( list_files( url_or_dir, pattern, since=since, until=until, date_format=date_format, username=username, password=password, ) or [] ) if not names and (since or until): logging.warning( "FTP sync found no files for date range (since=%s until=%s); retrying without date filter", since, until, ) names = ( list_files( url_or_dir, pattern, since=None, until=None, date_format=date_format, username=username, password=password, ) or [] ) if since or until: dm = DateManager([date_format] if date_format else None) start = datetime.min if not since else datetime.fromisoformat(since) end = datetime.max if not until else datetime.fromisoformat(until) names = [n for n in names if dm.is_date_in_range(n, start, end)] from pathlib import Path Path(local_dir).mkdir(parents=True, exist_ok=True) if clean_zero_bytes: with contextlib.suppress(Exception): for fp in Path(local_dir).iterdir(): if fp.is_file() and fp.stat().st_size == 0: fp.unlink() local_set = {p.name for p in Path(local_dir).iterdir() if p.is_file()} # Remove locals not on server remote_set = set(Path(n).name for n in names) for fname in list(local_set - remote_set): with contextlib.suppress(Exception): (Path(local_dir) / fname).unlink() for name in names: dest = str(Path(local_dir) / Path(name).name) if (not Path(dest).exists()) or Path(dest).stat().st_size == 0: ftp = FTP(timeout=30) ftp.connect(host) ftp.login(user=(user or "anonymous"), passwd=(pwd or "test@test.com")) ftp.set_pasv(True) directory = remote_dir filename = Path(name).name if directory: ftp.cwd(directory) from pathlib import Path as _P with _P(dest).open("wb") as lf: ftp.retrbinary(f"RETR {filename}", lf.write) with contextlib.suppress(Exception): ftp.quit()
[docs] def get_size( url_or_path: str, *, username: str | None = None, password: str | None = None ) -> int | None: """Return remote file size in bytes via FTP SIZE.""" v = _maybe_delegate("get_size", url_or_path, username=username, password=password) if v is not _DELEGATE_NONE: return v # type: ignore[return-value] host, remote_path, user, pwd = parse_ftp_path( url_or_path, username=username, password=password ) ftp = FTP(timeout=30) ftp.connect(host) ftp.login(user=(user or "anonymous"), passwd=(pwd or "test@test.com")) ftp.set_pasv(True) directory = "" filename = remote_path if "/" in remote_path: directory, filename = remote_path.rsplit("/", 1) if directory: ftp.cwd(directory) try: sz = ftp.size(filename) return int(sz) if sz is not None else None except all_errors: return None
[docs] def get_idx_lines( url_or_path: str, *, write_to: str | None = None, timeout: int = 30, max_retries: int = 3, username: str | None = None, password: str | None = None, ) -> list[str] | None: """Fetch and parse the GRIB ``.idx`` for a remote path via FTP.""" v = _maybe_delegate( "get_idx_lines", url_or_path, write_to=write_to, timeout=timeout, max_retries=max_retries, username=username, password=password, ) if v is not _DELEGATE_NONE: return v # type: ignore[return-value] host, remote_path, user, pwd = parse_ftp_path( url_or_path, username=username, password=password ) ftp = FTP(timeout=30) ftp.connect(host) ftp.login(user=(user or "anonymous"), passwd=(pwd or "test@test.com")) ftp.set_pasv(True) idx_path = ensure_idx_path(remote_path) directory = "" filename = idx_path if "/" in idx_path: directory, filename = idx_path.rsplit("/", 1) if directory: ftp.cwd(directory) buf = BytesIO() ftp.retrbinary(f"RETR {filename}", buf.write) with contextlib.suppress(Exception): ftp.quit() lines = parse_idx_lines(buf.getvalue()) if write_to: outp = write_to if write_to.endswith(".idx") else f"{write_to}.idx" try: from pathlib import Path as _P with _P(outp).open("w", encoding="utf8") as f: f.write("\n".join(lines)) except Exception: pass return lines
[docs] def get_chunks( url_or_path: str, chunk_size: int = 500 * 1024 * 1024, *, username: str | None = None, password: str | None = None, ) -> list[str]: """Compute contiguous chunk ranges for an FTP file.""" v = _maybe_delegate( "get_chunks", url_or_path, chunk_size, username=username, password=password, ) if v is not _DELEGATE_NONE: return v # type: ignore[return-value] size = get_size(url_or_path, username=username, password=password) if size is None: return [] return compute_chunks(size, chunk_size)
[docs] def download_byteranges( url_or_path: str, byte_ranges: Iterable[str], *, max_workers: int = 10, timeout: int = 30, username: str | None = None, password: str | None = None, ) -> bytes: """Download multiple ranges via FTP REST and concatenate in the input order.""" v = _maybe_delegate( "download_byteranges", url_or_path, byte_ranges, max_workers=max_workers, timeout=timeout, username=username, password=password, ) if v is not _DELEGATE_NONE: return v # type: ignore[return-value] host, remote_path, user, pwd = parse_ftp_path( url_or_path, username=username, password=password ) def _worker(_range: str) -> bytes: start_end = _range.replace("bytes=", "").split("-") start = int(start_end[0]) if start_end[0] else 0 if start_end[1]: end = int(start_end[1]) else: sz = get_size(url_or_path) or 0 end = max(sz - 1, start) ftp = FTP(timeout=timeout) ftp.connect(host) ftp.login(user=(user or "anonymous"), passwd=(pwd or "test@test.com")) ftp.set_pasv(True) directory = "" filename = remote_path if "/" in remote_path: directory, filename = remote_path.rsplit("/", 1) if directory: ftp.cwd(directory) remaining = end - start + 1 out = BytesIO() class _Stop(Exception): pass def _cb(chunk: bytes): nonlocal remaining if remaining <= 0: raise _Stop() take = min(len(chunk), remaining) if take: out.write(chunk[:take]) remaining -= take if remaining <= 0: raise _Stop() try: ftp.retrbinary(f"RETR {filename}", _cb, rest=start) except _Stop: with contextlib.suppress(Exception): ftp.abort() with contextlib.suppress(Exception): ftp.quit() return out.getvalue() from concurrent.futures import ThreadPoolExecutor results: list[bytes] = [] with ThreadPoolExecutor(max_workers=max_workers) as ex: results = list(ex.map(_worker, list(byte_ranges))) return b"".join(results)