feldera.rest package

Submodules

feldera.rest.config module

class feldera.rest.config.Config(url: str | None = None, api_key: str | None = None, version: str | None = None, timeout: float | None = None, connection_timeout: float | None = None, requests_verify: bool | str | None = None, health_recovery_timeout: int | None = None)[source]

Bases: object

FelderaClient configuration, which includes authentication information and the address of the Feldera API the client will interact with.

__init__(url: str | None = None, api_key: str | None = None, version: str | None = None, timeout: float | None = None, connection_timeout: float | None = None, requests_verify: bool | str | None = None, health_recovery_timeout: int | None = None) None[source]

See documentation of the FelderaClient constructor for the other arguments.

Parameters:
  • version – (Optional) Version of the API to use. Default: v0.

  • health_recovery_timeout – (Optional) Maximum time in seconds to wait for cluster health recovery after a 502 error. Default: 300 (5 minutes).

feldera.rest.errors module

exception feldera.rest.errors.FelderaAPIError(error: str, request: Response)[source]

Bases: FelderaError

Error sent by Feldera API

__init__(error: str, request: Response) None[source]
exception feldera.rest.errors.FelderaCommunicationError(err: str)[source]

Bases: FelderaError

Error when connection to Feldera

__init__(err: str) None[source]
exception feldera.rest.errors.FelderaError(message: str)[source]

Bases: Exception

Generic class for Feldera error handling

__init__(message: str) None[source]
exception feldera.rest.errors.FelderaTimeoutError(err: str)[source]

Bases: FelderaError

Error when Feldera operation takes longer than expected

__init__(err: str) None[source]

feldera.rest.feldera_client module

class feldera.rest.feldera_client.FelderaClient(url: str | None = None, api_key: str | None = None, timeout: float | None = None, connection_timeout: float | None = None, requests_verify: bool | str | None = None)[source]

Bases: object

A client for the Feldera HTTP API.

The client is initialized with the configuration needed for interacting with the Feldera HTTP API, which it uses in its calls. Its methods are implemented by issuing one or more HTTP requests to the API, and as such can provide higher level operations (e.g., support waiting for the success of asynchronous HTTP API functionality).

__init__(url: str | None = None, api_key: str | None = None, timeout: float | None = None, connection_timeout: float | None = None, requests_verify: bool | str | None = None) None[source]

Constructs a Feldera client.

Parameters:
  • url – (Optional) URL to the Feldera API. The default is read from the FELDERA_HOST environment variable; if the variable is not set, the default is “http://localhost:8080”.

  • api_key – (Optional) API key to access Feldera (format: “apikey:…”). The default is read from the FELDERA_API_KEY environment variable; if the variable is not set, the default is None (no API key is provided).

  • timeout – (Optional) Duration in seconds that the client will wait to receive a response of an HTTP request, after which it times out. The default is None (wait indefinitely; no timeout is enforced).

  • connection_timeout – (Optional) Duration in seconds that the client will wait to establish the connection of an HTTP request, after which it times out. The default is None (wait indefinitely; no timeout is enforced).

  • requests_verify – (Optional) The verify parameter passed to the requests library, which is used internally to perform HTTP requests. See also: https://requests.readthedocs.io/en/latest/user/advanced/#ssl-cert-verification . The default is based on the FELDERA_HTTPS_TLS_CERT or the FELDERA_TLS_INSECURE environment variable. By setting FELDERA_HTTPS_TLS_CERT to a path, the default becomes the CA bundle it points to. By setting FELDERA_TLS_INSECURE to “1”, “true” or “yes” (all case-insensitive), the default becomes False which means to disable TLS verification by default. If both variables are set, the former takes priority over the latter. If neither variable is set, the default is True.

activate_pipeline(pipeline_name: str, wait: bool = True, timeout_s: float | None = None) PipelineStatus | None[source]
Parameters:
  • pipeline_name – The name of the pipeline to activate

  • wait – Set True to wait for the pipeline to activate. True by default

  • timeout_s – The amount of time in seconds to wait for the pipeline to activate.

approve_pipeline(pipeline_name: str)[source]
checkpoint_pipeline(pipeline_name: str) int[source]

Checkpoint a pipeline.

Parameters:

pipeline_name – The name of the pipeline to checkpoint

checkpoint_pipeline_status(pipeline_name: str) dict[source]

Gets the checkpoint status

Parameters:

pipeline_name – The name of the pipeline to check the checkpoint status of.

clear_storage(pipeline_name: str, timeout_s: float | None = None)[source]

Clears the storage from the pipeline. This operation cannot be canceled.

Parameters:
  • pipeline_name – The name of the pipeline

  • timeout_s – The amount of time in seconds to wait for the storage to clear.

commit_transaction(pipeline_name: str, transaction_id: int | None = None, wait: bool = True, timeout_s: float | None = None)[source]

Commits the currently active transaction.

Parameters:
  • pipeline_name – The name of the pipeline.

  • transaction_id – If provided, the function verifies that the currently active transaction matches this ID. If the active transaction ID does not match, the function raises an error.

  • wait – If True, the function blocks until the transaction either commits successfully or the timeout is reached. If False, the function initiates the commit and returns immediately without waiting for completion. The default value is True.

  • timeout_s – Maximum time (in seconds) to wait for the transaction to commit when wait is True. If None, the function will wait indefinitely.

Raises:
  • RuntimeError – If there is currently no transaction in progress.

  • ValueError – If the provided transaction_id does not match the current transaction.

  • TimeoutError – If the transaction does not commit within the specified timeout (when wait is True).

  • FelderaAPIError – If the pipeline fails to commit a transaction.

completion_token_processed(pipeline_name: str, token: str) bool[source]

Check whether the pipeline has finished processing all inputs received from the connector before the token was generated.

Parameters:
  • pipeline_name – The name of the pipeline

  • token – The token to check for completion

Returns:

True if the pipeline has finished processing all inputs represented by the token, False otherwise

create_or_update_pipeline(pipeline: Pipeline, wait: bool = True) Pipeline[source]

Create a pipeline if it doesn’t exist or update a pipeline and wait for it to compile

Parameters:
  • pipeline – The pipeline to create or update

  • wait – Whether to wait for the pipeline to compile. True by default

Returns:

The created or updated pipeline

create_pipeline(pipeline: Pipeline, wait: bool = True) Pipeline[source]

Create a pipeline if it doesn’t exist and wait for it to compile

Parameters:
  • pipeline – The pipeline to create

  • wait – Whether to wait for the pipeline to compile. True by default

delete_pipeline(name: str)[source]

Deletes a pipeline by name

Parameters:

name – The name of the pipeline

generate_completion_token(pipeline_name: str, table_name: str, connector_name: str) str[source]

Generate a completion token that can be passed to FelderaClient.completion_token_processed() to check whether the pipeline has finished processing all inputs received from the connector before the token was generated.

Parameters:
  • pipeline_name – The name of the pipeline

  • table_name – The name of the table associated with this connector.

  • connector_name – The name of the connector.

Raises:

FelderaAPIError – If the connector cannot be found, or if the pipeline is not running.

get_config() FelderaConfig[source]

Retrieves the general Feldera server configuration.

get_pipeline(pipeline_name: str, field_selector: PipelineFieldSelector) Pipeline[source]

Get a pipeline by name

Parameters:
  • pipeline_name – The name of the pipeline

  • field_selector – Choose what pipeline information to refresh; see PipelineFieldSelector enum definition.

get_pipeline_logs(pipeline_name: str) Generator[str, None, None][source]

Get the pipeline logs

Parameters:

name – The name of the pipeline

Returns:

A generator yielding the logs, one line at a time.

get_pipeline_stats(name: str) dict[source]

Get the pipeline metrics and performance counters

Parameters:

name – The name of the pipeline

get_pipeline_support_bundle(pipeline_name: str, params: Dict[str, Any] | None = None) bytes[source]

Generate a support bundle containing diagnostic information from a pipeline.

This endpoint collects various diagnostic data from the pipeline including circuit profile, heap profile, metrics, logs, stats, and connector statistics, and packages them into a single ZIP file for support purposes.

Parameters:
  • pipeline_name – The name of the pipeline

  • params – Optional query parameters to control data collection

Returns:

The support bundle as bytes (ZIP file)

Raises:

FelderaAPIError – If the pipeline does not exist or if there’s an error

get_runtime_config(pipeline_name) Mapping[str, Any][source]

Get the runtime config of a pipeline by name

Parameters:

pipeline_name – The name of the pipeline

listen_to_pipeline(pipeline_name: str, table_name: str, format: str, backpressure: bool = True, array: bool = False, timeout: float | None = None, case_sensitive: bool = False)[source]

Listen for updates to views for pipeline, yields the chunks of data

Parameters:
  • pipeline_name – The name of the pipeline

  • table_name – The name of the table to listen to

  • format – The format of the data, either “json” or “csv”

  • backpressure – When the flag is True (the default), this method waits for the consumer to receive each chunk and blocks the pipeline if the consumer cannot keep up. When this flag is False, the pipeline drops data chunks if the consumer is not keeping up with its output. This prevents a slow consumer from slowing down the entire pipeline.

  • array – Set True to group updates in this stream into JSON arrays, used in conjunction with the “json” format, the default value is False

  • timeout – The amount of time in seconds to listen to the stream for

  • case_sensitive – True if the table name is case sensitive or a reserved keyword, False by default

static localhost(port: int = 8080) FelderaClient[source]

Create a FelderaClient that connects to the local Feldera instance

patch_pipeline(name: str, sql: str | None = None, udf_rust: str | None = None, udf_toml: str | None = None, program_config: Mapping[str, Any] | None = None, runtime_config: Mapping[str, Any] | None = None, description: str | None = None)[source]

Incrementally update pipeline

Parameters:

name – The name of the pipeline

pause_connector(pipeline_name, table_name, connector_name)[source]

Pause the specified input connector.

Connectors allow feldera to fetch data from a source or write data to a sink. This method allows users to PAUSE a specific INPUT connector. All connectors are RUNNING by default.

Refer to the connector documentation for more information: <https://docs.feldera.com/connectors/#input-connector-orchestration>

Parameters:
  • pipeline_name – The name of the pipeline.

  • table_name – The name of the table associated with this connector.

  • connector_name – The name of the connector.

Raises:

FelderaAPIError – If the connector cannot be found, or if the pipeline is not running.

pause_pipeline(pipeline_name: str, wait: bool = True, timeout_s: float | None = None)[source]

Pause a pipeline

Parameters:
  • pipeline_name – The name of the pipeline to stop

  • error_message – The error message to show if the pipeline is in STOPPED state due to a failure.

  • wait – Set True to wait for the pipeline to pause. True by default

  • timeout_s – The amount of time in seconds to wait for the pipeline to pause.

pipelines(selector: PipelineFieldSelector = PipelineFieldSelector.STATUS) list[Pipeline][source]

Get all pipelines

push_to_pipeline(pipeline_name: str, table_name: str, format: str, data: list[list | str | dict] | dict | str, array: bool = False, force: bool = False, update_format: str = 'raw', json_flavor: str | None = None, serialize: bool = True, wait: bool = True, wait_timeout_s: float | None = None) str[source]

Insert data into a pipeline

Parameters:
  • pipeline_name – The name of the pipeline

  • table_name – The name of the table

  • format – The format of the data, either “json” or “csv”

  • array – True if updates in this stream are packed into JSON arrays, used in conjunction with the “json” format

  • force – If True, the data will be inserted even if the pipeline is paused

  • update_format – JSON data change event format, used in conjunction with the “json” format, the default value is “insert_delete”, other supported formats: “weighted”, “debezium”, “snowflake”, “raw”

  • json_flavor – JSON encoding used for individual table records, the default value is “default”, other supported encodings: “debezium_mysql”, “snowflake”, “kafka_connect_json_converter”, “pandas”

  • data – The data to insert

  • serialize – If True, the data will be serialized to JSON. True by default

  • wait – If True, blocks until this input has been processed by the pipeline

  • wait_timeout_s – The timeout in seconds to wait for this set of inputs to be processed by the pipeline. None by default

Returns:

The completion token to this input.

query_as_hash(pipeline_name: str, query: str) str[source]

Executes an ad-hoc query on the specified pipeline and returns a hash of the result.

Parameters:
  • pipeline_name – The name of the pipeline to query.

  • query – The SQL query to be executed.

Returns:

A string containing the hash of the query result.

query_as_json(pipeline_name: str, query: str) Generator[Mapping[str, Any], None, None][source]

Executes an ad-hoc query on the specified pipeline and returns the result as a generator that yields rows of the query as Python dictionaries. All floating-point numbers are deserialized as Decimal objects to avoid precision loss.

Parameters:
  • pipeline_name – The name of the pipeline to query.

  • query – The SQL query to be executed.

Returns:

A generator that yields each row of the result as a Python dictionary, deserialized from JSON.

query_as_parquet(pipeline_name: str, query: str, path: str)[source]

Executes an ad-hoc query on the specified pipeline and saves the result to a parquet file. If the extension isn’t parquet, it will be automatically appended to path.

Parameters:
  • pipeline_name – The name of the pipeline to query.

  • query – The SQL query to be executed.

  • path – The path including the file name to save the resulting parquet file in.

query_as_text(pipeline_name: str, query: str) Generator[str, None, None][source]

Executes an ad-hoc query on the specified pipeline and returns a generator that yields lines of the table.

Parameters:
  • pipeline_name – The name of the pipeline to query.

  • query – The SQL query to be executed.

Returns:

A generator yielding the query result in tabular format, one line at a time.

resume_connector(pipeline_name: str, table_name: str, connector_name: str)[source]

Resume the specified connector.

Connectors allow feldera to fetch data from a source or write data to a sink. This method allows users to RESUME / START a specific INPUT connector. All connectors are RUNNING by default.

Refer to the connector documentation for more information: <https://docs.feldera.com/connectors/#input-connector-orchestration>

Parameters:
  • pipeline_name – The name of the pipeline.

  • table_name – The name of the table associated with this connector.

  • connector_name – The name of the connector.

Raises:

FelderaAPIError – If the connector cannot be found, or if the pipeline is not running.

resume_pipeline(pipeline_name: str, wait: bool = True, timeout_s: float | None = None)[source]

Resume a pipeline

Parameters:
  • pipeline_name – The name of the pipeline to stop

  • wait – Set True to wait for the pipeline to pause. True by default

  • timeout_s – The amount of time in seconds to wait for the pipeline to pause.

start_pipeline(pipeline_name: str, bootstrap_policy: BootstrapPolicy | None = None, wait: bool = True, timeout_s: float | None = None) PipelineStatus | None[source]
Parameters:
  • pipeline_name – The name of the pipeline to start

  • wait – Set True to wait for the pipeline to start. True by default

  • timeout_s – The amount of time in seconds to wait for the pipeline to start.

start_pipeline_as_paused(pipeline_name: str, bootstrap_policy: BootstrapPolicy | None = None, wait: bool = True, timeout_s: float | None = None) PipelineStatus | None[source]
Parameters:
  • pipeline_name – The name of the pipeline to start as paused.

  • wait – Set True to wait for the pipeline to start as pause. True by default

  • timeout_s – The amount of time in seconds to wait for the pipeline to start.

start_pipeline_as_standby(pipeline_name: str, bootstrap_policy: BootstrapPolicy | None = None, wait: bool = True, timeout_s: float | None = None)[source]
Parameters:
  • pipeline_name – The name of the pipeline to start as standby.

  • wait – Set True to wait for the pipeline to start as standby. True by default

  • timeout_s – The amount of time in seconds to wait for the pipeline to start.

start_transaction(pipeline_name: str) int[source]

Start a new transaction.

Transaction ID.

Parameters:

pipeline_name – The name of the pipeline.

stop_pipeline(pipeline_name: str, force: bool, wait: bool = True, timeout_s: float | None = None)[source]

Stop a pipeline

Parameters:
  • pipeline_name – The name of the pipeline to stop

  • force – Set True to immediately scale compute resources to zero. Set False to automatically checkpoint before stopping.

  • wait – Set True to wait for the pipeline to stop. True by default

  • timeout_s – The amount of time in seconds to wait for the pipeline to stop.

sync_checkpoint(pipeline_name: str) str[source]

Triggers a checkpoint synchronization for the specified pipeline. Check the status by calling pipeline_sync_checkpoint_status.

Parameters:

pipeline_name – Name of the pipeline whose checkpoint should be synchronized.

sync_checkpoint_status(pipeline_name: str) dict[source]

Gets the checkpoint sync status of the pipeline

Parameters:

pipeline_name – The name of the pipeline to check the checkpoint synchronization status of.

testing_force_update_platform_version(name: str, platform_version: str)[source]
update_pipeline_runtime(name: str)[source]

Recompile a pipeline with the Feldera runtime version included in the currently installed Feldera platform.

Parameters:

name – The name of the pipeline

wait_for_token(pipeline_name: str, token: str, timeout_s: float | None = None)[source]

Blocks until all records represented by this completion token have been processed.

Parameters:
  • pipeline_name – The name of the pipeline

  • token – The token to check for completion

  • timeout_s – The amount of time in seconds to wait for the pipeline to process these records.

feldera.rest.feldera_config module

class feldera.rest.feldera_config.FelderaConfig(cfg: dict)[source]

Bases: object

General configuration of the current Feldera instance.

__init__(cfg: dict)[source]
class feldera.rest.feldera_config.FelderaEdition(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: Enum

The compilation profile to use when compiling the program. Represents the Feldera edition between Enterprise and Open source.

ENTERPRISE = 'Enterprise'

The Enterprise version of Feldera.

OPEN_SOURCE = 'Open source'

The open source version of Feldera.

static from_value(value)[source]
is_enterprise()[source]

feldera.rest.pipeline module

class feldera.rest.pipeline.Pipeline(name: str, sql: str, udf_rust: str, udf_toml: str, program_config: Mapping[str, Any], runtime_config: Mapping[str, Any], description: str | None = None)[source]

Bases: object

Represents a Feldera pipeline

__init__(name: str, sql: str, udf_rust: str, udf_toml: str, program_config: Mapping[str, Any], runtime_config: Mapping[str, Any], description: str | None = None)[source]

Initializes a new pipeline

Parameters:
  • name – The name of the pipeline

  • sql – The SQL code of the pipeline

  • udf_rust – Rust code for UDFs

  • udf_toml – Rust dependencies required by UDFs (in the TOML format)

  • program_config – The program config of the pipeline

  • runtime_config – The configuration of the pipeline

  • description – Optional. The description of the pipeline

classmethod from_dict(d: Mapping[str, Any])[source]

feldera.rest.sql_table module

class feldera.rest.sql_table.SQLTable(name: str, fields: list[dict], case_sensitive: bool = False, materialized: bool = False)[source]

Bases: object

Represents a SQL table in Feldera

__init__(name: str, fields: list[dict], case_sensitive: bool = False, materialized: bool = False)[source]
classmethod from_dict(table_dict: dict)[source]

feldera.rest.sql_view module

class feldera.rest.sql_view.SQLView(name: str, fields: list[dict], case_sensitive: bool = False, materialized: bool = False)[source]

Bases: object

Represents a SQL view in Feldera

__init__(name: str, fields: list[dict], case_sensitive: bool = False, materialized: bool = False)[source]
classmethod from_dict(view_dict: dict)[source]

Module contents

This is the lower level REST client for Feldera.

This is a thin wrapper around the Feldera REST API.

It is recommended to use the higher level abstractions in the feldera package, instead of using the REST client directly.