Source code for feldera.rest.errors

from requests import Response
import json
from datetime import datetime, timezone
from typing import Optional
from urllib.parse import urlparse


# RFC 9110 ยง5.6.7: HTTP-date "IMF-fixdate" format used by `Retry-After`.
_HTTP_DATE_FMT = "%a, %d %b %Y %H:%M:%S GMT"


def _parse_retry_after(value: Optional[str]) -> Optional[float]:
    """
    Parse an HTTP `Retry-After` header into a delay in seconds.

    Returns `None` if missing or unparseable. Per RFC 9110, the value is
    either a non-negative integer of seconds or an IMF-fixdate HTTP-date.
    """
    if not value:
        return None
    try:
        return max(float(value), 0.0)
    except ValueError:
        pass
    try:
        retry_at = datetime.strptime(value, _HTTP_DATE_FMT).replace(tzinfo=timezone.utc)
    except ValueError:
        return None
    return max((retry_at - datetime.now(timezone.utc)).total_seconds(), 0.0)


[docs] class FelderaError(Exception): """ Generic class for Feldera error handling """
[docs] def __init__(self, message: str) -> None: self.message = message super().__init__(self.message)
def __str__(self) -> str: return f"FelderaError. Error message: {self.message}"
[docs] class FelderaAPIError(FelderaError): """Error sent by Feldera API"""
[docs] def __init__(self, error: str, request: Response) -> None: self.status_code = request.status_code self.error = error self.error_code = None self.message = None self.details = None self.retry_after: Optional[float] = _parse_retry_after( request.headers.get("Retry-After") ) err_msg = "" if request.text: try: json_data = json.loads(request.text) self.error_code = json_data.get("error_code") if self.error_code: err_msg += f"\nError Code: {self.error_code}" self.message = json_data.get("message") if self.message: err_msg += f"\nMessage: {self.message}" self.details = json_data.get("details") if self.details: err_msg += f"\nDetails: {self.details}" except Exception: self.message = request.text err_msg += request.text err_msg += f"\nResponse Status: {request.status_code}" if int(request.status_code) == 401: parsed = urlparse(request.request.url) auth_err = f"\nAuthorization error: Failed to connect to '{parsed.scheme}://{parsed.hostname}': " auth = request.request.headers.get("Authorization") if auth is None: err_msg += f"{auth_err} API key not set" else: err_msg += f"{auth_err} invalid API key" err_msg = err_msg.strip() super().__init__(err_msg)
[docs] class FelderaTimeoutError(FelderaError): """Error when Feldera operation takes longer than expected"""
[docs] def __init__(self, err: str) -> None: super().__init__(f"Timeout connecting to Feldera: {err}")
[docs] class FelderaCommunicationError(FelderaError): """Error when connection to Feldera"""
[docs] def __init__(self, err: str) -> None: super().__init__(f"Cannot connect to Feldera API: {err}")