# SPDX-License-Identifier: Apache-2.0
"""FTP connector backend.
Thin functional wrappers around the FTPManager to support simple byte fetches
and uploads, directory listing with regex/date filtering, sync-to-local flows,
and advanced GRIB workflows (``.idx`` handling, ranged downloads).
The URL parser supports anonymous and credentialed forms, e.g.:
``ftp://host/path``, ``ftp://user@host/path``, ``ftp://user:pass@host/path``.
"""
from __future__ import annotations
import contextlib
import json
import logging
import re
import warnings
from collections.abc import Iterable
from dataclasses import dataclass
from datetime import datetime, timezone
from ftplib import FTP, all_errors
from io import BytesIO
from pathlib import Path
from zyra.utils.date_manager import DateManager
from zyra.utils.grib import compute_chunks, ensure_idx_path, parse_idx_lines
_DELEGATE_NONE = object()
[docs]
@dataclass(frozen=True)
class SyncOptions:
"""Configuration for FTP sync file replacement behavior.
Controls how ``sync_directory`` decides whether to download a remote file
when a local copy already exists. Options are evaluated in precedence order:
1. ``skip_if_local_done`` - Skip if ``.done`` marker file exists
2. Local file missing - Always download when no local copy is present
3. Local file is zero bytes - Always replace empty local files
4. ``overwrite_existing`` - Unconditional replacement
5. ``prefer_remote`` - Always prioritize remote versions
6. ``prefer_remote_if_meta_newer`` - Use frames-meta.json timestamps
7. ``recheck_missing_meta`` - Re-download if metadata entry missing
8. ``min_remote_size`` - Replace if remote exceeds size threshold
9. ``recheck_existing`` - Compare sizes when mtime unavailable
10. Default: Replace if remote mtime (via MDTM) is newer
"""
overwrite_existing: bool = False
"""Replace local files unconditionally regardless of timestamps."""
recheck_existing: bool = False
"""Compare file sizes when timestamps are unavailable."""
min_remote_size: int | str | None = None
"""Replace if remote file exceeds threshold (bytes or percentage like '10%')."""
prefer_remote: bool = False
"""Always prioritize remote versions over local copies."""
prefer_remote_if_meta_newer: bool = False
"""Use frames-meta.json timestamps for comparison instead of MDTM."""
skip_if_local_done: bool = False
"""Skip files that have a companion ``.done`` marker file."""
recheck_missing_meta: bool = False
"""Re-download files that lack a companion entry in frames-meta.json."""
frames_meta_path: str | None = None
"""Path to frames-meta.json for metadata-aware sync operations."""
[docs]
class FTPManager: # pragma: no cover - test patch hook
"""Placeholder for tests to patch.
The backend functions will attempt to delegate to this manager if present.
Tests patch this attribute with a mock class exposing expected methods.
"""
pass
def _maybe_delegate(method: str, *args, **kwargs): # pragma: no cover - test hook
"""If a ``FTPManager`` attribute is present on this module (patched in tests),
instantiate it and call the requested method. Returns ``_NO`` on failure.
"""
try:
mgr = FTPManager() # type: ignore[name-defined, call-arg]
fn = getattr(mgr, method)
return fn(*args, **kwargs)
except Exception:
return _DELEGATE_NONE
[docs]
def parse_ftp_path(
url_or_path: str,
*,
username: str | None = None,
password: str | None = None,
) -> tuple[str, str, str | None, str | None]:
"""Return ``(host, remote_path, username, password)`` parsed from an FTP path."""
s = url_or_path
if s.startswith("ftp://"):
s = s[len("ftp://") :]
user = None
pwd = None
if "@" in s:
auth, s = s.split("@", 1)
if ":" in auth:
user, pwd = auth.split(":", 1)
else:
user = auth
if "/" not in s:
raise ValueError("FTP path must be host/path")
host, path = s.split("/", 1)
if username is not None:
if user is not None and username != user:
warnings.warn(
"Explicit FTP username overrides username embedded in the URL.",
UserWarning,
stacklevel=2,
)
user = username
if password is not None:
if pwd is not None and password != pwd:
warnings.warn(
"Explicit FTP password overrides credentials embedded in the URL.",
UserWarning,
stacklevel=2,
)
pwd = password
return host, path, user, pwd
[docs]
def fetch_bytes(
url_or_path: str, *, username: str | None = None, password: str | None = None
) -> bytes:
"""Fetch a remote file as bytes from an FTP server."""
host, remote_path, user, pwd = parse_ftp_path(
url_or_path, username=username, password=password
)
ftp = FTP(timeout=30)
ftp.connect(host)
ftp.login(user=(user or "anonymous"), passwd=(pwd or "test@test.com"))
ftp.set_pasv(True)
directory = ""
filename = remote_path
if "/" in remote_path:
directory, filename = remote_path.rsplit("/", 1)
if directory:
ftp.cwd(directory)
buf = BytesIO()
ftp.retrbinary(f"RETR {filename}", buf.write)
with contextlib.suppress(Exception):
ftp.quit()
return buf.getvalue()
[docs]
def upload_bytes(
data: bytes,
url_or_path: str,
*,
username: str | None = None,
password: str | None = None,
) -> bool:
"""Upload bytes to a remote FTP path."""
host, remote_path, user, pwd = parse_ftp_path(
url_or_path, username=username, password=password
)
ftp = FTP(timeout=30)
ftp.connect(host)
ftp.login(user=(user or "anonymous"), passwd=(pwd or "test@test.com"))
ftp.set_pasv(True)
directory = ""
filename = remote_path
if "/" in remote_path:
directory, filename = remote_path.rsplit("/", 1)
if directory:
ftp.cwd(directory)
with BytesIO(data) as bio:
ftp.storbinary(f"STOR {filename}", bio)
with contextlib.suppress(Exception):
ftp.quit()
return True
[docs]
def list_files(
url_or_dir: str,
pattern: str | None = None,
*,
since: str | None = None,
until: str | None = None,
date_format: str | None = None,
username: str | None = None,
password: str | None = None,
) -> list[str] | None:
"""List FTP directory contents with optional regex and date filtering."""
host, remote_dir, user, pwd = parse_ftp_path(
url_or_dir, username=username, password=password
)
ftp = FTP(timeout=30)
ftp.connect(host)
ftp.login(user=(user or "anonymous"), passwd=(pwd or "test@test.com"))
ftp.set_pasv(True)
ftp.cwd(remote_dir)
try:
try:
names = ftp.nlst()
except all_errors:
names = []
if pattern:
rx = re.compile(pattern)
names = [n for n in names if rx.search(n)]
if names is None:
return None
if since or until:
dm = DateManager([date_format] if date_format else None)
start = datetime.min if not since else datetime.fromisoformat(since)
end = datetime.max if not until else datetime.fromisoformat(until)
names = [n for n in names if dm.is_date_in_range(n, start, end)]
return names
finally:
with contextlib.suppress(Exception):
ftp.quit()
[docs]
def exists(
url_or_path: str, *, username: str | None = None, password: str | None = None
) -> bool:
"""Return True if the remote path exists on the FTP server."""
host, remote_path, user, pwd = parse_ftp_path(
url_or_path, username=username, password=password
)
ftp = FTP(timeout=30)
ftp.connect(host)
ftp.login(user=(user or "anonymous"), passwd=(pwd or "test@test.com"))
ftp.set_pasv(True)
directory = ""
filename = remote_path
if "/" in remote_path:
directory, filename = remote_path.rsplit("/", 1)
if directory:
ftp.cwd(directory)
try:
files = ftp.nlst()
return filename in files
except all_errors:
return False
[docs]
def delete(
url_or_path: str, *, username: str | None = None, password: str | None = None
) -> bool:
"""Delete a remote FTP path (file)."""
host, remote_path, user, pwd = parse_ftp_path(
url_or_path, username=username, password=password
)
ftp = FTP(timeout=30)
ftp.connect(host)
ftp.login(user=(user or "anonymous"), passwd=(pwd or "test@test.com"))
ftp.set_pasv(True)
directory = ""
filename = remote_path
if "/" in remote_path:
directory, filename = remote_path.rsplit("/", 1)
if directory:
ftp.cwd(directory)
try:
ftp.delete(filename)
return True
except all_errors:
return False
except Exception:
# Test doubles in unit tests may raise a plain Exception rather than
# an ftplib-specific error. Handle these as non-fatal and return False
# to preserve the semantic of "delete failed / missing file" without
# coupling tests to ftplib's exception hierarchy.
return False
[docs]
def stat(url_or_path: str, *, username: str | None = None, password: str | None = None):
"""Return minimal metadata mapping for a remote path (e.g., size)."""
host, remote_path, user, pwd = parse_ftp_path(
url_or_path, username=username, password=password
)
ftp = FTP(timeout=30)
ftp.connect(host)
ftp.login(user=(user or "anonymous"), passwd=(pwd or "test@test.com"))
ftp.set_pasv(True)
directory = ""
filename = remote_path
if "/" in remote_path:
directory, filename = remote_path.rsplit("/", 1)
if directory:
ftp.cwd(directory)
try:
size = ftp.size(filename)
return {"size": int(size) if size is not None else None}
except all_errors:
return None
[docs]
def sync_directory(
url_or_dir: str,
local_dir: str,
*,
pattern: str | None = None,
since: str | None = None,
until: str | None = None,
date_format: str | None = None,
clean_zero_bytes: bool = False,
username: str | None = None,
password: str | None = None,
sync_options: SyncOptions | None = None,
) -> None:
"""Sync files from a remote FTP directory to a local directory.
Applies regex/date filters prior to download; optionally removes local
zero-byte files before syncing and deletes local files that are no
longer present on the server.
Args:
url_or_dir: FTP URL or path to the remote directory.
local_dir: Local directory path to sync files to.
pattern: Optional regex pattern to filter filenames.
since: ISO date string for start of date range filter.
until: ISO date string for end of date range filter.
date_format: Custom date format for parsing dates in filenames.
clean_zero_bytes: Remove zero-byte local files before syncing.
username: FTP username (overrides URL-embedded credentials).
password: FTP password (overrides URL-embedded credentials).
sync_options: Configuration for file replacement behavior. If None,
a default SyncOptions instance is used, which downloads files
that are missing, zero-byte, or have a newer remote modification
time than the local copy.
"""
host, remote_dir, user, pwd = parse_ftp_path(
url_or_dir, username=username, password=password
)
options = sync_options or SyncOptions()
if pattern is None and not (since or until):
# Fast path placeholder reserved for future optimization.
pass
# List, filter, then fetch missing/zero-size files
names = (
list_files(
url_or_dir,
pattern,
since=since,
until=until,
date_format=date_format,
username=username,
password=password,
)
or []
)
if not names and (since or until):
logging.warning(
"FTP sync found no files for date range (since=%s until=%s); retrying without date filter",
since,
until,
)
names = (
list_files(
url_or_dir,
pattern,
since=None,
until=None,
date_format=date_format,
username=username,
password=password,
)
or []
)
if since or until:
dm = DateManager([date_format] if date_format else None)
start = datetime.min if not since else datetime.fromisoformat(since)
end = datetime.max if not until else datetime.fromisoformat(until)
names = [n for n in names if dm.is_date_in_range(n, start, end)]
local_dir_path = Path(local_dir)
local_dir_path.mkdir(parents=True, exist_ok=True)
if clean_zero_bytes:
with contextlib.suppress(Exception):
for fp in local_dir_path.iterdir():
if fp.is_file() and fp.stat().st_size == 0:
fp.unlink()
local_set = {p.name for p in local_dir_path.iterdir() if p.is_file()}
# Remove locals not on server (but preserve .done marker files)
remote_set = set(Path(n).name for n in names)
for fname in list(local_set - remote_set):
if fname.endswith(".done"):
continue # Preserve marker files for skip_if_local_done
with contextlib.suppress(Exception):
(local_dir_path / fname).unlink()
# Load frames metadata if needed for metadata-aware sync
frames_meta = None
if options.prefer_remote_if_meta_newer or options.recheck_missing_meta:
frames_meta = _load_frames_meta(options.frames_meta_path)
# Determine if we need remote metadata for decision-making
needs_remote_size = options.recheck_existing or options.min_remote_size is not None
# MDTM is skipped only when overwrite_existing or prefer_remote is set,
# because those modes return early in should_download() (steps 4-5) before
# the default MDTM comparison (step 10). All other modes -- including
# prefer_remote_if_meta_newer -- may fall through to step 10, so MDTM is
# still fetched as a fallback.
needs_remote_mtime = not (options.overwrite_existing or options.prefer_remote)
# Use a single FTP connection for all metadata queries and downloads
# to avoid connection overhead per file
ftp: FTP | None = None
def ensure_ftp_connection() -> FTP:
"""Ensure we have an active FTP connection, creating one if needed."""
nonlocal ftp
if ftp is None:
ftp = FTP(timeout=30)
ftp.connect(host)
ftp.login(user=(user or "anonymous"), passwd=(pwd or "test@test.com"))
ftp.set_pasv(True)
if remote_dir:
ftp.cwd(remote_dir)
return ftp
def get_size_via_conn(filename: str) -> int | None:
"""Get file size using the shared connection."""
try:
conn = ensure_ftp_connection()
return conn.size(filename)
except all_errors as exc:
logging.debug("FTP SIZE failed for %s: %s", filename, exc)
return None
def get_mtime_via_conn(filename: str) -> datetime | None:
"""Get file mtime using the shared connection via MDTM.
Returns a UTC-aware datetime (MDTM timestamps are UTC per RFC 3659).
"""
try:
conn = ensure_ftp_connection()
resp = conn.sendcmd(f"MDTM {filename}")
if resp.startswith("213 "):
ts_str = resp[4:].strip()
try:
return datetime.strptime(ts_str, "%Y%m%d%H%M%S").replace(
tzinfo=timezone.utc
)
except (ValueError, TypeError) as exc:
logging.debug(
"FTP MDTM parse failed for %s (raw: %s): %s",
filename,
ts_str,
exc,
)
except all_errors as exc:
logging.debug("FTP MDTM failed for %s: %s", filename, exc)
return None
try:
for name in names:
filename = Path(name).name
local_path = local_dir_path / filename
dest = str(local_path)
# Short-circuit: check local-only conditions first to avoid remote queries
# NOTE: skip_if_local_done has highest precedence, matching should_download().
# If a .done marker exists, we always skip, even if the base file is missing.
if options.skip_if_local_done and _has_done_marker(local_path):
do_download, reason = False, ".done marker present"
# File missing or zero-byte -> download
elif not local_path.exists() or local_path.stat().st_size == 0:
do_download, reason = True, "missing or zero-byte"
else:
# Need full decision logic with potentially remote metadata
remote_size: int | None = None
remote_mtime: datetime | None = None
if needs_remote_size:
remote_size = get_size_via_conn(filename)
if needs_remote_mtime:
remote_mtime = get_mtime_via_conn(filename)
do_download, reason = should_download(
filename,
local_path,
remote_size,
remote_mtime,
options,
frames_meta,
)
if do_download:
logging.debug("Downloading %s: %s", filename, reason)
conn = ensure_ftp_connection()
with Path(dest).open("wb") as lf:
conn.retrbinary(f"RETR {filename}", lf.write)
else:
logging.debug("Skipping %s: %s", filename, reason)
finally:
if ftp is not None:
with contextlib.suppress(Exception):
ftp.quit()
[docs]
def get_size(
url_or_path: str, *, username: str | None = None, password: str | None = None
) -> int | None:
"""Return remote file size in bytes via FTP SIZE."""
v = _maybe_delegate("get_size", url_or_path, username=username, password=password)
if v is not _DELEGATE_NONE:
return v # type: ignore[return-value]
host, remote_path, user, pwd = parse_ftp_path(
url_or_path, username=username, password=password
)
ftp = FTP(timeout=30)
ftp.connect(host)
ftp.login(user=(user or "anonymous"), passwd=(pwd or "test@test.com"))
ftp.set_pasv(True)
directory = ""
filename = remote_path
if "/" in remote_path:
directory, filename = remote_path.rsplit("/", 1)
if directory:
ftp.cwd(directory)
try:
sz = ftp.size(filename)
return int(sz) if sz is not None else None
except all_errors:
return None
[docs]
def get_remote_mtime(
url_or_path: str,
*,
username: str | None = None,
password: str | None = None,
) -> datetime | None:
"""Return modification time from FTP MDTM command, or None if unavailable.
The MDTM command returns timestamps in UTC (RFC 3659) in the format
``YYYYMMDDhhmmss``. Returns a UTC-aware datetime.
Not all FTP servers support this command; failures return None gracefully.
"""
v = _maybe_delegate(
"get_remote_mtime", url_or_path, username=username, password=password
)
if v is not _DELEGATE_NONE:
return v # type: ignore[return-value]
host, remote_path, user, pwd = parse_ftp_path(
url_or_path, username=username, password=password
)
ftp = FTP(timeout=30)
try:
ftp.connect(host)
ftp.login(user=(user or "anonymous"), passwd=(pwd or "test@test.com"))
ftp.set_pasv(True)
directory = ""
filename = remote_path
if "/" in remote_path:
directory, filename = remote_path.rsplit("/", 1)
if directory:
ftp.cwd(directory)
# FTP MDTM returns: "213 YYYYMMDDhhmmss" (UTC per RFC 3659)
resp = ftp.sendcmd(f"MDTM {filename}")
if resp.startswith("213 "):
ts_str = resp[4:].strip()
try:
return datetime.strptime(ts_str, "%Y%m%d%H%M%S").replace(
tzinfo=timezone.utc
)
except ValueError:
logging.debug(
"Failed to parse MDTM timestamp %r; returning None",
ts_str,
)
return None
return None
except all_errors:
return None
finally:
with contextlib.suppress(Exception):
ftp.quit()
def _parse_min_size(spec: int | str | None, local_size: int) -> int | None:
"""Parse min_remote_size spec (bytes or percentage) into absolute bytes.
Examples:
- ``1000`` -> 1000 (absolute bytes)
- ``"1000"`` -> 1000 (string form of bytes)
- ``"10%"`` -> local_size * 1.10 (local size plus 10%)
"""
if spec is None:
return None
if isinstance(spec, int):
return spec
spec_str = str(spec).strip()
if spec_str.endswith("%"):
try:
pct = float(spec_str[:-1])
return round(local_size * (1 + pct / 100))
except ValueError:
return None
try:
return int(spec_str)
except ValueError:
return None
def _load_frames_meta(path: str | None) -> dict | None:
"""Load frames-meta.json if provided and exists.
Returns None if the file is missing, unreadable, or not a JSON object.
"""
if not path:
return None
try:
p = Path(path)
if p.exists():
data = json.loads(p.read_text(encoding="utf-8"))
if not isinstance(data, dict):
logging.debug("frames-meta %s is not a JSON object, ignoring", path)
return None
return data
else:
logging.getLogger(__name__).warning(
"Frames meta file %s does not exist; metadata-aware sync disabled",
path,
)
except Exception as exc:
logging.getLogger(__name__).warning(
"Failed to load frames meta from %s: %s", path, exc
)
return None
def _has_done_marker(local_path: Path) -> bool:
"""Check if a .done marker file exists for the given file.
Marker files are named ``<filename>.done``, e.g., ``frame_001.png.done``.
"""
done_path = local_path.parent / (local_path.name + ".done")
return done_path.exists()
def _is_missing_companion_meta(filename: str, frames_meta: dict | None) -> bool:
"""Check if a file is missing companion metadata in frames-meta.json.
Returns False if no metadata source is provided (nothing to check against).
Returns True only if frames_meta exists but the file is not listed in it.
"""
if not frames_meta:
return False # No metadata source, can't be "missing" from it
frames = frames_meta.get("frames", [])
if isinstance(frames, list):
return not any(f.get("filename") == filename for f in frames)
return False
def _get_meta_timestamp(filename: str, frames_meta: dict | None) -> datetime | None:
"""Extract timestamp for a file from frames-meta.json.
Always returns a UTC-aware datetime so comparisons with other
UTC-aware timestamps are safe.
"""
if not frames_meta:
return None
frames = frames_meta.get("frames", [])
if not isinstance(frames, list):
return None
for frame in frames:
if frame.get("filename") == filename:
ts = frame.get("timestamp")
if ts:
try:
dt = datetime.fromisoformat(ts)
# Normalize to UTC-aware
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
else:
dt = dt.astimezone(timezone.utc)
return dt
except ValueError:
pass
break
return None
[docs]
def should_download(
remote_name: str,
local_path: Path,
remote_size: int | None,
remote_mtime: datetime | None,
options: SyncOptions,
frames_meta: dict | None = None,
) -> tuple[bool, str]:
"""Determine if a remote file should be downloaded based on sync options.
Args:
remote_name: The filename on the remote server.
local_path: Path to the local file (may not exist).
remote_size: Remote file size in bytes, or None if unknown.
remote_mtime: Remote modification time from MDTM, or None if unavailable.
options: SyncOptions configuration.
frames_meta: Parsed frames-meta.json content, or None.
Returns:
A tuple of ``(should_download, reason)`` where reason is a short
description suitable for logging.
"""
# 1. Skip if .done marker exists
if options.skip_if_local_done and _has_done_marker(local_path):
return (False, "skip: .done marker exists")
# 2. File doesn't exist locally - always download
if not local_path.exists():
return (True, "new file")
local_size = local_path.stat().st_size
# 3. Zero-byte local file - always replace
if local_size == 0:
return (True, "local file is zero bytes")
# 4. Overwrite existing unconditionally
if options.overwrite_existing:
return (True, "overwrite-existing mode")
# 5. Prefer remote unconditionally
if options.prefer_remote:
return (True, "prefer-remote mode")
# 6. Prefer remote if meta is newer
if options.prefer_remote_if_meta_newer and frames_meta:
meta_ts = _get_meta_timestamp(remote_name, frames_meta)
if meta_ts:
try:
local_mtime = datetime.fromtimestamp(
local_path.stat().st_mtime, tz=timezone.utc
)
if meta_ts > local_mtime:
return (True, "meta timestamp newer than local")
except OSError as exc:
logging.debug(
"Failed to read mtime for %s when comparing to meta timestamp: %s",
local_path,
exc,
)
# 7. Recheck missing meta
if options.recheck_missing_meta and _is_missing_companion_meta(
remote_name, frames_meta
):
return (True, "missing companion metadata")
# 8. Min remote size check
if options.min_remote_size is not None and remote_size is not None:
threshold = _parse_min_size(options.min_remote_size, local_size)
if threshold is not None and remote_size >= threshold:
return (True, f"remote size {remote_size} >= threshold {threshold}")
# 9. Recheck existing (size fallback when mtime unavailable)
if (
options.recheck_existing
and remote_mtime is None
and remote_size is not None
and remote_size != local_size
):
return (True, f"size mismatch: local={local_size}, remote={remote_size}")
# 10. Default: MDTM-based comparison (both sides UTC-aware)
if remote_mtime is not None:
try:
local_mtime = datetime.fromtimestamp(
local_path.stat().st_mtime, tz=timezone.utc
)
if remote_mtime > local_mtime:
return (True, "remote mtime newer")
except OSError as exc:
logging.debug(
"Failed to read local mtime for %s during MDTM comparison: %s",
local_path,
exc,
)
return (False, "up-to-date")
[docs]
def get_idx_lines(
url_or_path: str,
*,
write_to: str | None = None,
timeout: int = 30,
max_retries: int = 3,
username: str | None = None,
password: str | None = None,
) -> list[str] | None:
"""Fetch and parse the GRIB ``.idx`` for a remote path via FTP."""
v = _maybe_delegate(
"get_idx_lines",
url_or_path,
write_to=write_to,
timeout=timeout,
max_retries=max_retries,
username=username,
password=password,
)
if v is not _DELEGATE_NONE:
return v # type: ignore[return-value]
host, remote_path, user, pwd = parse_ftp_path(
url_or_path, username=username, password=password
)
ftp = FTP(timeout=30)
ftp.connect(host)
ftp.login(user=(user or "anonymous"), passwd=(pwd or "test@test.com"))
ftp.set_pasv(True)
idx_path = ensure_idx_path(remote_path)
directory = ""
filename = idx_path
if "/" in idx_path:
directory, filename = idx_path.rsplit("/", 1)
if directory:
ftp.cwd(directory)
buf = BytesIO()
ftp.retrbinary(f"RETR {filename}", buf.write)
with contextlib.suppress(Exception):
ftp.quit()
lines = parse_idx_lines(buf.getvalue())
if write_to:
outp = write_to if write_to.endswith(".idx") else f"{write_to}.idx"
try:
from pathlib import Path as _P
with _P(outp).open("w", encoding="utf8") as f:
f.write("\n".join(lines))
except Exception:
pass
return lines
[docs]
def get_chunks(
url_or_path: str,
chunk_size: int = 500 * 1024 * 1024,
*,
username: str | None = None,
password: str | None = None,
) -> list[str]:
"""Compute contiguous chunk ranges for an FTP file."""
v = _maybe_delegate(
"get_chunks",
url_or_path,
chunk_size,
username=username,
password=password,
)
if v is not _DELEGATE_NONE:
return v # type: ignore[return-value]
size = get_size(url_or_path, username=username, password=password)
if size is None:
return []
return compute_chunks(size, chunk_size)
[docs]
def download_byteranges(
url_or_path: str,
byte_ranges: Iterable[str],
*,
max_workers: int = 10,
timeout: int = 30,
username: str | None = None,
password: str | None = None,
) -> bytes:
"""Download multiple ranges via FTP REST and concatenate in the input order."""
v = _maybe_delegate(
"download_byteranges",
url_or_path,
byte_ranges,
max_workers=max_workers,
timeout=timeout,
username=username,
password=password,
)
if v is not _DELEGATE_NONE:
return v # type: ignore[return-value]
host, remote_path, user, pwd = parse_ftp_path(
url_or_path, username=username, password=password
)
def _worker(_range: str) -> bytes:
start_end = _range.replace("bytes=", "").split("-")
start = int(start_end[0]) if start_end[0] else 0
if start_end[1]:
end = int(start_end[1])
else:
sz = get_size(url_or_path) or 0
end = max(sz - 1, start)
ftp = FTP(timeout=timeout)
ftp.connect(host)
ftp.login(user=(user or "anonymous"), passwd=(pwd or "test@test.com"))
ftp.set_pasv(True)
directory = ""
filename = remote_path
if "/" in remote_path:
directory, filename = remote_path.rsplit("/", 1)
if directory:
ftp.cwd(directory)
remaining = end - start + 1
out = BytesIO()
class _Stop(Exception):
pass
def _cb(chunk: bytes):
nonlocal remaining
if remaining <= 0:
raise _Stop()
take = min(len(chunk), remaining)
if take:
out.write(chunk[:take])
remaining -= take
if remaining <= 0:
raise _Stop()
try:
ftp.retrbinary(f"RETR {filename}", _cb, rest=start)
except _Stop:
with contextlib.suppress(Exception):
ftp.abort()
with contextlib.suppress(Exception):
ftp.quit()
return out.getvalue()
from concurrent.futures import ThreadPoolExecutor
results: list[bytes] = []
with ThreadPoolExecutor(max_workers=max_workers) as ex:
results = list(ex.map(_worker, list(byte_ranges)))
return b"".join(results)