Source code for zyra.utils.credential_manager

# SPDX-License-Identifier: Apache-2.0
"""Credential storage and retrieval utilities.

This module provides `CredentialManager`, a small helper class to securely
load, access, and manage credentials from a `.env`-style file without leaking
them into the global process environment.

Examples
--------
Load and access values::

    from zyra.utils.credential_manager import CredentialManager

    cm = CredentialManager("./.env")
    cm.read_credentials(expected_keys=["API_KEY"])
    token = cm.get_credential("API_KEY")

Use as a context manager::

    with CredentialManager("./.env") as cm:
        cm.read_credentials()
        do_work(cm.get_credential("ACCESS_TOKEN"))
"""

from __future__ import annotations

import logging
from pathlib import Path
from typing import Iterable

# Defer optional dependency import to method calls to avoid crashing
# modules that import CredentialManager when python-dotenv is unavailable.


[docs] class CredentialManager: """Manage app credentials from a dotenv file. Parameters ---------- filename : str, optional Path to a dotenv file containing key=value pairs. namespace : str, optional Optional prefix to apply to all keys when stored/retrieved. Examples -------- Namespaced keys:: cm = CredentialManager(".env", namespace="MYAPP_") cm.read_credentials(expected_keys=["API_KEY"]) # expects MYAPP_API_KEY """ def __init__(self, filename: str | None = None, namespace: str | None = None): self.filename = filename self.namespace = namespace or "" self.credentials: dict[str, str] = {} def __enter__(self): """Load credentials when entering a context manager block. Returns ------- CredentialManager The manager instance. """ if self.filename: self.read_credentials() return self def __exit__(self, exc_type, exc_val, exc_tb): """Clear credentials on exit and propagate exceptions if any. Returns ------- bool ``False`` to propagate exceptions. """ self.clear_credentials() if exc_type is not None: logging.error(f"Exception occurred: {exc_val}") return False def _namespaced_key(self, key: str) -> str: """Return namespaced key when a namespace is configured.""" return f"{self.namespace}{key}" if self.namespace else key
[docs] def read_credentials(self, expected_keys: Iterable[str] | None = None) -> None: """Read credentials from the dotenv file into memory. Parameters ---------- expected_keys : Iterable[str], optional Keys that must be present; raises if any are missing. Raises ------ FileNotFoundError If the dotenv path cannot be resolved. KeyError If any expected keys are missing after reading. """ try: from dotenv import dotenv_values, find_dotenv # type: ignore except ( ImportError, ModuleNotFoundError, ) as exc: # pragma: no cover - optional dependency path raise ImportError( "python-dotenv is required to read credentials; install the 'dev' extra or add python-dotenv" ) from exc dotenv_path = Path(self.filename) if self.filename else Path(find_dotenv()) if dotenv_path and dotenv_path.exists(): env_vars = dotenv_values(dotenv_path) for key, value in env_vars.items(): namespaced_key = self._namespaced_key(key) self.credentials[namespaced_key] = value # type: ignore[assignment] logging.debug(f"Added credential key: {namespaced_key}") missing = set(expected_keys or []) - set(self.credentials.keys()) if missing: raise KeyError(f"Missing expected keys: {', '.join(missing)}") else: raise FileNotFoundError(f"The file {self.filename} was not found.")
@property def tracked_keys(self) -> set[str]: """Return the set of keys currently tracked in memory.""" return set(self.credentials.keys())
[docs] def list_credentials(self, expected_keys: Iterable[str] | None = None) -> list[str]: """List tracked credential keys, checking for expected ones when provided. Parameters ---------- expected_keys : Iterable[str], optional Keys to verify are present. Returns ------- list of str Keys currently stored in memory. Raises ------ KeyError If any expected keys are missing. """ if expected_keys is not None: missing_keys = set(expected_keys) - self.tracked_keys if missing_keys: raise KeyError(f"Missing expected keys: {', '.join(missing_keys)}") return list(self.credentials.keys())
[docs] def get_credential(self, key: str) -> str: """Retrieve a credential value by key (with namespace applied). Parameters ---------- key : str Base key name (namespace is applied automatically). Returns ------- str Stored value for the namespaced key. Raises ------ KeyError If the key is not present in memory. """ namespaced_key = self._namespaced_key(key) if namespaced_key not in self.credentials: raise KeyError( f"Credential key '{namespaced_key}' not found in credentials." ) return self.credentials[namespaced_key]
[docs] def add_credential(self, key: str, value: str) -> None: """Add or update a credential value in memory. Parameters ---------- key : str Base key name (namespace is applied automatically). value : str Credential value to store. """ namespaced_key = self._namespaced_key(key) self.credentials[namespaced_key] = value logging.debug(f"Added/Updated credential key: {namespaced_key}")
[docs] def delete_credential(self, key: str) -> None: """Delete a credential by key if present. Parameters ---------- key : str Base key name (namespace is applied automatically). """ namespaced_key = self._namespaced_key(key) if namespaced_key in self.credentials: del self.credentials[namespaced_key]
[docs] def clear_credentials(self) -> None: """Remove all tracked credentials from memory.""" self.credentials.clear()