# SPDX-License-Identifier: Apache-2.0
"""Optional OO connector clients.
These thin wrappers provide a small amount of state (e.g., host or bucket)
and delegate to the functional backends for each operation. They are useful
when you want to reuse configuration across several calls in a script while
keeping the core API functional and composable.
"""
from __future__ import annotations
from typing import Iterable
from zyra.connectors.backends import ftp as ftp_backend
from zyra.connectors.backends import s3 as s3_backend
from .base import Connector
[docs]
class FTPConnector(Connector):
"""Thin OO wrapper around the FTP backend for convenience.
Stores host/credentials and exposes methods that accept path-only inputs.
All methods delegate to functional backends.
"""
CAPABILITIES = {"fetch", "upload", "list"}
def __init__(
self,
host: str,
port: int = 21,
username: str = "anonymous",
password: str = "test@test.com",
timeout: int = 30,
) -> None:
self.host = host
self.port = port
self.username = username
self.password = password
self.timeout = timeout
def _url(self, path: str) -> str:
s = path.lstrip("/")
return f"ftp://{self.host}/{s}"
# ---- Simple operations -----------------------------------------------------------
[docs]
def fetch_bytes(self, path: str) -> bytes:
"""Fetch a remote file from this FTP host as bytes."""
return ftp_backend.fetch_bytes(
self._url(path), username=self.username, password=self.password
)
[docs]
def upload_bytes(self, data: bytes, path: str) -> bool:
"""Upload bytes to a remote path on this FTP host."""
return ftp_backend.upload_bytes(
data,
self._url(path),
username=self.username,
password=self.password,
)
[docs]
def list_files(
self,
remote_dir: str,
pattern: str | None = None,
*,
since: str | None = None,
until: str | None = None,
date_format: str | None = None,
) -> list[str] | None:
"""List files under a remote directory with optional filters."""
return ftp_backend.list_files(
self._url(remote_dir),
pattern=pattern,
since=since,
until=until,
date_format=date_format,
username=self.username,
password=self.password,
)
[docs]
def exists(self, path: str) -> bool:
"""Return True if the path exists on this FTP host."""
return ftp_backend.exists(
self._url(path), username=self.username, password=self.password
)
[docs]
def delete(self, path: str) -> bool:
"""Delete a remote path on this FTP host."""
return ftp_backend.delete(
self._url(path), username=self.username, password=self.password
)
[docs]
def stat(self, path: str):
"""Return minimal metadata mapping for a remote path."""
return ftp_backend.stat(
self._url(path), username=self.username, password=self.password
)
[docs]
def sync_directory(
self,
remote_dir: str,
local_dir: str,
*,
pattern: str | None = None,
since: str | None = None,
until: str | None = None,
date_format: str | None = None,
) -> None:
"""Mirror a remote directory on this FTP host to a local directory."""
return ftp_backend.sync_directory(
self._url(remote_dir),
local_dir,
pattern=pattern,
since=since,
until=until,
date_format=date_format,
username=self.username,
password=self.password,
)
[docs]
class S3Connector(Connector):
"""Thin OO wrapper around the S3 backend for convenience.
Stores bucket configuration and exposes familiar object operations.
"""
CAPABILITIES = {"fetch", "upload", "list"}
def __init__(self, bucket: str, *, unsigned: bool = False) -> None:
self.bucket = bucket
self.unsigned = bool(unsigned)
def _url(self, key: str) -> str:
s = key.lstrip("/")
return f"s3://{self.bucket}/{s}"
[docs]
def fetch_bytes(self, key: str) -> bytes:
"""Fetch object bytes from the configured bucket."""
return s3_backend.fetch_bytes(self._url(key), unsigned=self.unsigned)
[docs]
def upload_bytes(self, data: bytes, key: str) -> bool:
"""Upload bytes as an object to the configured bucket."""
return s3_backend.upload_bytes(data, self._url(key))
[docs]
def list_files(
self,
prefix: str | None = None,
*,
pattern: str | None = None,
since: str | None = None,
until: str | None = None,
date_format: str | None = None,
) -> list[str] | None:
"""List keys under an optional prefix with optional filters."""
url = self._url(prefix) if prefix else f"s3://{self.bucket}/"
return s3_backend.list_files(
url,
pattern=pattern,
since=since,
until=until,
date_format=date_format,
)
[docs]
def exists(self, key: str) -> bool:
"""Return True if the object exists in the configured bucket."""
return s3_backend.exists(self._url(key))
[docs]
def delete(self, key: str) -> bool:
"""Delete an object from the configured bucket."""
return s3_backend.delete(self._url(key))
[docs]
def stat(self, key: str):
"""Return basic metadata mapping for an object in the bucket."""
return s3_backend.stat(self._url(key))
# GRIB helpers
[docs]
def get_idx_lines(self, key: str) -> list[str]:
"""Fetch and parse the GRIB ``.idx`` for an object in the bucket."""
return s3_backend.get_idx_lines(self._url(key), unsigned=self.unsigned)
[docs]
def download_byteranges(
self, key: str, byte_ranges: Iterable[str], *, max_workers: int = 10
) -> bytes:
"""Download multiple byte ranges and concatenate them in order."""
return s3_backend.download_byteranges(
self._url(key),
None,
byte_ranges,
unsigned=self.unsigned,
max_workers=max_workers,
)