Skip to main content

Connectors: connect to data sources and sinks

A Feldera pipeline can process data from multiple heterogeneous sources and produce outputs to multiple heterogeneous destinations. To this end it relies on a growing library of input and output connectors.

Basics

Users configure connectors using a JSON object, that describes an external source or sink such as a database table or a Kafka topic. A SQL table can have multiple source connectors attached to it, specified as a list of JSON connector objects under the 'connectors' attribute in the WITH clause of the table. Similarly, a views can have multiple sink connectors attached to it.

Here is an example, where the VENDOR table has one connector, configured to fetch and insert JSON data from an HTTP URL, and a VENDOR_VIEW which sends the changes of the view to a Kafka topic in a format that can be consumed by Debezium:

create table VENDOR (
id bigint not null primary key,
name varchar,
address varchar
) WITH ('connectors' = '[{
"transport": {
"name": "url_input", "config": {"path": "https://feldera-basics-tutorial.s3.amazonaws.com/vendor.json"}
},
"format": { "name": "json" }
}]');

create view VENDOR_VIEW
WITH (
'connectors' = '[{
"max_queued_records": 1000,
"format": {
"name": "json",
"config": {
"update_format": "debezium"
}
},
"transport": {
"name": "kafka_output",
"config": {
"bootstrap.servers": "redpanda:9092",
"topic": "test_view"
}
}
}]'
)
as select * from VENDOR;
info

The WITH clause for tables needs to be put at the end, after the column definitions, whereas for views it has to appear before the AS clause to resolve any parsing ambiguities.

A connector specification consists of three parts:

note

Some transports, e.g., Delta Lake and datagen, use fixed predefined data formats and do not require the format section in the connector specification.

This architecture allows the user to combine different transports and data formats.

These basics apply to all connectors except the HTTP input and output connectors which are not managed by the user, as they directly feed/fetch data into/from a pipeline via dedicated pipeline endpoints and therefore do not need to be configured in the WITH clauses of tables and views.

Generic attributes

The following attributes are common to all connectors:

  • max_queued_records - The approximate maximum number of records to keep in memory. For an input connector, this is the maximum number that the endpoint will read into memory, before the endpoint pauses further reading until the pipeline has consumed some of the backlog. For an output connector, this is the maximum number that the endpoint will hold in memory waiting for the output endpoint to send them, before the circuit pauses execution until the backlog subsides. By default, this is 1,000,000.

  • max_batch_size - For an input connector, the approximate maximum number of records that the pipeline will process in a single pipeline step. By default, this is 10,000.

  • paused - If set to to true the connector will not fetch or push data to the pipeline when started unless explicitly enabled through the API. By default this is set to false.

Configuring the output buffer

By default a Feldera pipeline sends a batch of changes to the output transport for each batch of input updates it processes. This can result in a stream of small updates, which is normal and even preferable for output transports like Kafka; however it can cause performance problems for other connectors, such as the Delta Lake connector by creating a large number of small files.

The output buffer mechanism is designed to solve this problem by decoupling the rate at which the pipeline pushes changes to the output transport from the rate of input changes. It works by accumulating updates inside the pipeline for up to a user-defined period of time or until accumulating a user-defined number of updates and writing them as a single batch to the output transport.

The output buffer can be setup for each individual output connector as part of connector configuration. The following parameters are used to configure the output buffer:

  • enable_output_buffer - Enable output buffer.

  • max_output_buffer_time_millis - Maximum time in milliseconds data is kept in the output buffer.

    When not specified, data is kept in the buffer indefinitely until one of the other trigger conditions is satisfied. When this option is set the buffer will be flushed at most every max_output_buffer_time_millis milliseconds.

    This configuration option requires the enable_output_buffer flag to be set.

  • max_output_buffer_size_records - Maximum number of updates to be kept in the output buffer.

    This parameter bounds the maximal size of the buffer. Note that the size of the buffer is not always equal to the total number of updates output by the pipeline. Updates to the same record can overwrite or cancel previous updates.

    When not specified, the buffer can grow indefinitely until one of the other trigger conditions is satisfied.

    This configuration option requires the enable_output_buffer flag to be set.

note

When the enable_output_buffer flag is set, at least one of max_output_buffer_time_millis or max_output_buffer_size_records must be specified.

See Delta Lake output connector documentation for an example of configuring the output buffer.

Fault tolerance

As a preview feature, Feldera supports fault tolerance, meaning that it can gracefully restart from the exact point of an abrupt shutdown or crash, picking up from where it left off without dropping or duplicating input or output. To enable fault tolerance on a pipeline, set "fault_tolerance": "latest_checkpoint" in the pipeline configuration.

When a fault-tolerant pipeline restarts, it loads its state from the most recent checkpoint, then it replays any data from input connectors previously processed beyond that checkpoint. If replay produces output that was previously sent to output connectors, it discards that output. After replay completes, the pipeline continues with new input that has not previously been processed.

For Feldera to run a pipeline in fault-tolerant mode, all of its input connectors must support fault tolerance. Some input adapters do not yet support fault tolerance and therefore may not be part of a fault-tolerant pipeline. See the documentation for individual input connectors for details.

For a pipeline to be fully fault tolerant, a pipeline's output connectors must also be fault tolerant. Only the Kafka output connector supports fault tolerance as of now. If a pipeline configured to be fault-tolerant includes non-fault-tolerant output connectors, then in the event of a crash and restart, Feldera may send duplicate output to those connectors, but it will not drop output.

Additional resources

For more information, see: