Skip to main content

Connectors: connect to data sources and sinks

A Feldera pipeline can process data from multiple heterogeneous sources and produce outputs to multiple heterogeneous destinations. To this end it relies on a growing library of input and output connectors.

Basics

Users configure connectors using a JSON object, that describes an external source or sink such as a database table or a Kafka topic. A SQL table can have multiple source connectors attached to it, specified as a list of JSON connector objects under the 'connectors' attribute in the WITH clause of the table. Similarly, a views can have multiple sink connectors attached to it.

Here is an example, where the VENDOR table has one connector, configured to fetch and insert JSON data from an HTTP URL, and a VENDOR_VIEW which sends the changes of the view to a Kafka topic in a format that can be consumed by Debezium:

create table VENDOR (
id bigint not null primary key,
name varchar,
address varchar
) WITH ('connectors' = '[{
"transport": {
"name": "url_input", "config": {"path": "https://feldera-basics-tutorial.s3.amazonaws.com/vendor.json"}
},
"format": { "name": "json" }
}]');

create view VENDOR_VIEW
WITH (
'connectors' = '[{
"max_queued_records": 1000,
"format": {
"name": "json",
"config": {
"update_format": "debezium"
}
},
"transport": {
"name": "kafka_output",
"config": {
"bootstrap.servers": "redpanda:9092",
"topic": "test_view"
}
}
}]'
)
as select * from VENDOR;
info

The WITH clause for tables needs to be put at the end, after the column definitions, whereas for views it has to appear before the AS clause to resolve any parsing ambiguities.

A connector specification consists of three parts:

note

Some transports, e.g., Delta Lake and datagen, use fixed predefined data formats and do not require the format section in the connector specification.

This architecture allows the user to combine different transports and data formats.

These basics apply to all connectors except the HTTP input and output connectors which are not managed by the user, as they directly feed/fetch data into/from a pipeline via dedicated pipeline endpoints and therefore do not need to be configured in the WITH clauses of tables and views.

Generic attributes

The following attributes are common to all connectors:

  • max_queued_records - Backpressure threshold. Maximal number of records queued by the endpoint before the endpoint is paused by the backpressure mechanism. By default this is set to 1 million.
  • paused - If set to to true the connector will not fetch or push data to the pipeline when started unless explicitly enabled through the API. By default this is set to false.

Configuring the output buffer

By default a Feldera pipeline sends a batch of changes to the output transport for each batch of input updates it processes. This can result in a stream of small updates, which is normal and even preferable for output transports like Kafka; however it can cause performance problems for other connectors, such as the Delta Lake connector by creating a large number of small files.

The output buffer mechanism is designed to solve this problem by decoupling the rate at which the pipeline pushes changes to the output transport from the rate of input changes. It works by accumulating updates inside the pipeline for up to a user-defined period of time or until accumulating a user-defined number of updates and writing them as a single batch to the output transport.

The output buffer can be setup for each individual output connector as part of connector configuration. The following parameters are used to configure the output buffer:

  • enable_output_buffer - Enable output buffer.

  • max_output_buffer_time_millis - Maximum time in milliseconds data is kept in the output buffer.

    When not specified, data is kept in the buffer indefinitely until one of the other trigger conditions is satisfied. When this option is set the buffer will be flushed at most every max_output_buffer_time_millis milliseconds.

    This configuration option requires the enable_output_buffer flag to be set.

  • max_output_buffer_size_records - Maximum number of updates to be kept in the output buffer.

    This parameter bounds the maximal size of the buffer. Note that the size of the buffer is not always equal to the total number of updates output by the pipeline. Updates to the same record can overwrite or cancel previous updates.

    When not specified, the buffer can grow indefinitely until one of the other trigger conditions is satisfied.

    This configuration option requires the enable_output_buffer flag to be set.

note

When the enable_output_buffer flag is set, at least one of max_output_buffer_time_millis or max_output_buffer_size_records must be specified.

See Delta Lake output connector documentation for an example of configuring the output buffer.

Additional resources

For more information, see: