from typing import Mapping, Any, Optional, List
from feldera._helpers import (
expect_bool,
expect_int,
expect_mapping,
expect_str,
parse_datetime,
)
from feldera.enums import PipelineStatus, TransactionStatus
from datetime import datetime
import uuid
[docs]
class PipelineStatistics:
"""
Represents statistics reported by a pipeline's "/stats" endpoint.
"""
[docs]
def __init__(self):
"""
Initializes as an empty set of statistics.
"""
self.global_metrics: GlobalPipelineMetrics = GlobalPipelineMetrics()
self.suspend_error: Optional[Any] = None
self.inputs: Mapping[List[InputEndpointStatus]] = {}
self.outputs: Mapping[List[OutputEndpointStatus]] = {}
[docs]
@classmethod
def from_dict(cls, d: Mapping[str, Any]):
pipeline = cls()
pipeline.global_metrics = GlobalPipelineMetrics.from_dict(d["global_metrics"])
pipeline.inputs = [
InputEndpointStatus.from_dict(input) for input in d["inputs"]
]
pipeline.outputs = [
OutputEndpointStatus.from_dict(output) for output in d["outputs"]
]
return pipeline
[docs]
class GlobalPipelineMetrics:
"""Represents the "global_metrics" object within the pipeline's
"/stats" endpoint reply.
"""
[docs]
def __init__(self):
"""
Initializes as an empty set of metrics.
"""
self.state: Optional[PipelineStatus] = None
self.bootstrap_in_progress: Optional[bool] = None
self.transaction_status: Optional[TransactionStatus] = None
self.transaction_id: Optional[int] = None
self.transaction_msecs: Optional[int] = None
self.transaction_records: Optional[int] = None
self.commit_progress: Optional[CommitProgressSummary] = None
self.transaction_initiators: Optional[TransactionInitiators] = None
self.rss_bytes: Optional[int] = None
self.cpu_msecs: Optional[int] = None
self.uptime_msecs: Optional[float] = None
self.start_time: Optional[datetime] = None
self.incarnation_uuid: Optional[uuid.UUID] = None
self.initial_start_time: Optional[datetime] = None
self.storage_bytes: Optional[int] = None
self.storage_mb_secs: Optional[int] = None
self.runtime_elapsed_msecs: Optional[int] = None
self.buffered_input_records: Optional[int] = None
self.total_input_records: Optional[int] = None
self.buffered_input_bytes: Optional[int] = None
self.total_input_bytes: Optional[int] = None
self.total_processed_records: Optional[int] = None
self.total_processed_bytes: Optional[int] = None
self.total_completed_records: Optional[int] = None
self.output_stall_msecs: Optional[int] = None
self.total_initiated_steps: Optional[int] = None
self.total_completed_steps: Optional[int] = None
self.pipeline_complete: Optional[bool] = None
[docs]
@classmethod
def from_dict(cls, d: Mapping[str, Any]):
metrics = cls()
metrics.__dict__.update(d)
metrics.state = PipelineStatus.from_str(d["state"])
metrics.incarnation_uuid = uuid.UUID(d["incarnation_uuid"])
metrics.start_time = datetime.fromtimestamp(d["start_time"])
metrics.initial_start_time = datetime.fromtimestamp(d["start_time"])
metrics.transaction_status = TransactionStatus.from_str(d["transaction_status"])
return metrics
[docs]
class ConnectorError:
"""Represents a connector error item reported by connector status endpoints."""
[docs]
def __init__(self):
self.timestamp: datetime = datetime.fromtimestamp(0)
self.index: int = 0
self.tag: Optional[str] = None
self.message: str = ""
[docs]
@classmethod
def from_dict(cls, d: Mapping[str, Any]):
error = cls()
error.timestamp = parse_datetime(expect_str(d, "timestamp"), "timestamp")
error.index = expect_int(d, "index")
tag = d.get("tag")
if tag is not None and not isinstance(tag, str):
raise ValueError("invalid optional field 'tag': expected string or null")
error.tag = tag
error.message = expect_str(d, "message")
return error
[docs]
class ConnectorHealth:
"""Health status reported for a connector."""
HEALTHY = "Healthy"
UNHEALTHY = "Unhealthy"
[docs]
def __init__(self):
self.status: str = ConnectorHealth.HEALTHY
self.description: Optional[str] = None
[docs]
@classmethod
def from_dict(cls, d: Mapping[str, Any]):
health = cls()
status = d.get("status", ConnectorHealth.HEALTHY)
if not isinstance(status, str):
raise ValueError("invalid field 'health.status': expected string")
if status not in (ConnectorHealth.HEALTHY, ConnectorHealth.UNHEALTHY):
raise ValueError(
"invalid field 'health.status': expected 'Healthy' or 'Unhealthy'"
)
health.status = status
description = d.get("description")
if description is not None and not isinstance(description, str):
raise ValueError(
"invalid optional field 'health.description': expected string or null"
)
health.description = description
return health
[docs]
class CompletedWatermark:
"""Latest completed watermark reported by input connector status."""
[docs]
def __init__(self):
self.metadata: Any = None
self.ingested_at: datetime = datetime.fromtimestamp(0)
self.processed_at: datetime = datetime.fromtimestamp(0)
self.completed_at: datetime = datetime.fromtimestamp(0)
[docs]
@classmethod
def from_dict(cls, d: Mapping[str, Any]):
watermark = cls()
if "metadata" not in d:
raise ValueError("missing required field 'metadata'")
watermark.metadata = d.get("metadata")
watermark.ingested_at = parse_datetime(
expect_str(d, "ingested_at"), "ingested_at"
)
watermark.processed_at = parse_datetime(
expect_str(d, "processed_at"), "processed_at"
)
watermark.completed_at = parse_datetime(
expect_str(d, "completed_at"), "completed_at"
)
return watermark
[docs]
class OutputEndpointStatus:
"""Represents one member of the "outputs" array within the
pipeline's "/stats" endpoint reply.
"""
[docs]
def __init__(self):
"""Initializes an empty status."""
self.endpoint_name: Optional[str] = None
self.config: Optional[Mapping] = None
self.metrics: Optional[OutputEndpointMetrics] = None
self.fatal_error: Optional[str] = None
self.encode_errors: Optional[list[ConnectorError]] = None
self.transport_errors: Optional[list[ConnectorError]] = None
self.health: Optional[ConnectorHealth] = None
[docs]
@classmethod
def from_dict(cls, d: Mapping[str, Any]):
status = cls()
status.endpoint_name = expect_str(d, "endpoint_name")
status.config = expect_mapping(d, "config")
status.metrics = OutputEndpointMetrics.from_dict(expect_mapping(d, "metrics"))
fatal_error = d.get("fatal_error")
if fatal_error is not None and not isinstance(fatal_error, str):
raise ValueError(
"invalid optional field 'fatal_error': expected string or null"
)
status.fatal_error = fatal_error
encode_errors = d.get("encode_errors")
status.encode_errors = (
[ConnectorError.from_dict(error) for error in encode_errors]
if isinstance(encode_errors, list)
else None
)
transport_errors = d.get("transport_errors")
status.transport_errors = (
[ConnectorError.from_dict(error) for error in transport_errors]
if isinstance(transport_errors, list)
else None
)
health = d.get("health")
status.health = (
ConnectorHealth.from_dict(health) if isinstance(health, Mapping) else None
)
return status
[docs]
class OutputEndpointMetrics:
"""Represents the "metrics" member within an output endpoint status
in the pipeline's "/stats" endpoint reply.
"""
[docs]
def __init__(self):
self.transmitted_records: Optional[int] = None
self.transmitted_bytes: Optional[int] = None
self.queued_records: Optional[int] = None
self.queued_batches: Optional[int] = None
self.buffered_records: Optional[int] = None
self.buffered_batches: Optional[int] = None
self.num_encode_errors: Optional[int] = None
self.num_transport_errors: Optional[int] = None
self.total_processed_input_records: Optional[int] = None
self.total_processed_steps: Optional[int] = None
self.memory: Optional[int] = None
[docs]
@classmethod
def from_dict(cls, d: Mapping[str, Any]):
metrics = cls()
metrics.__dict__.update(d)
return metrics
[docs]
class CommitProgressSummary:
"""Progress of a transaction commit."""
[docs]
def __init__(self):
"""Initializes an empty status."""
self.completed: Optional[int] = None
self.in_progress: Optional[int] = None
self.remaining: Optional[int] = None
self.in_progress_processed_records: Optional[int] = None
self.in_progress_total_records: Optional[int] = None
[docs]
@classmethod
def from_dict(cls, d: Mapping[str, Any]):
status = cls()
status.__dict__.update(d)
return status
[docs]
class TransactionInitiators:
"""Initiators for an ongoing transaction."""
[docs]
def __init__(self):
"""Initializes an empty status."""
self.transaction_id: Optional[int] = None
self.initiated_by_api: Optional[str] = None
self.initiated_by_connectors: Optional[
Mapping[str, ConnectorTransactionPhase]
] = None
[docs]
@classmethod
def from_dict(cls, d: Mapping[str, Any]):
status = cls()
status.__dict__.update(d)
status.initiated_by_connectors = ConnectorTransactionPhase.from_dict(
d["initiated_by_connectors"]
)
return status
[docs]
class ConnectorTransactionPhase:
"""Connector transaction phase with optional label"""
[docs]
def __init__(self):
"""Initializes an empty status."""
self.phase: Optional[str] = None
self.label: Optional[str] = None
[docs]
@classmethod
def from_dict(cls, d: Mapping[str, Any]):
status = cls()
status.__dict__.update(d)
return status