Skip to main content

Connectors: connect to data sources and sinks

A Feldera pipeline can process data from multiple heterogeneous sources and produce outputs to multiple heterogeneous destinations. To this end it relies on a growing library of input and output connectors.

Basics

Users configure connectors using a JSON object, that describes an external source or sink such as a database table or a Kafka topic. A SQL table can have multiple source connectors attached to it, specified as a list of JSON connector objects under the 'connectors' attribute in the WITH clause of the table. Similarly, a views can have multiple sink connectors attached to it.

Here is an example, where the VENDOR table has one connector, configured to fetch and insert JSON data from an HTTP URL, and a VENDOR_VIEW which sends the changes of the view to a Kafka topic in a format that can be consumed by Debezium:

create table VENDOR (
id bigint not null primary key,
name varchar,
address varchar
) WITH ('connectors' = '[{
"transport": {
"name": "url_input", "config": {"path": "https://feldera-basics-tutorial.s3.amazonaws.com/vendor.json"}
},
"format": { "name": "json" }
}]');

create view VENDOR_VIEW
WITH (
'connectors' = '[{
"max_queued_records": 1000,
"format": {
"name": "json",
"config": {
"update_format": "debezium"
}
},
"transport": {
"name": "kafka_output",
"config": {
"bootstrap.servers": "redpanda:9092",
"topic": "test_view"
}
}
}]'
)
as select * from VENDOR;
info

The WITH clause for tables needs to be put at the end, after the column definitions, whereas for views it has to appear before the AS clause to resolve any parsing ambiguities.

A connector specification consists of three parts:

note

Some transports, e.g., Delta Lake and datagen, use fixed predefined data formats and do not require the format section in the connector specification.

This architecture allows the user to combine different transports and data formats.

These basics apply to all connectors except the HTTP input and output connectors which are not managed by the user, as they directly feed/fetch data into/from a pipeline via dedicated pipeline endpoints and therefore do not need to be configured in the WITH clauses of tables and views.

Generic attributes

The following attributes are common to all connectors:

  • name - The name that is given to the connector, which must be unique among the connectors of the table or view. This is particularly useful to define when wanting to refer to it, for example to start or pause it at runtime. By default, this is randomly generated.

  • paused - If set to to true the connector will not fetch or push data to the pipeline when started unless explicitly enabled through the API. By default this is set to false.

  • max_queued_records - The approximate maximum number of records to keep in memory. For an input connector, this is the maximum number that the endpoint will read into memory, before the endpoint pauses further reading until the pipeline has consumed some of the backlog. For an output connector, this is the maximum number that the endpoint will hold in memory waiting for the output endpoint to send them, before the circuit pauses execution until the backlog subsides. By default, this is 1,000,000.

  • max_batch_size - For an input connector, the approximate maximum number of records that the pipeline will process in a single pipeline step. By default, this is 10,000.

Configuring the output buffer

By default a Feldera pipeline sends a batch of changes to the output transport for each batch of input updates it processes. This can result in a stream of small updates, which is normal and even preferable for output transports like Kafka; however it can cause performance problems for other connectors, such as the Delta Lake connector by creating a large number of small files.

The output buffer mechanism is designed to solve this problem by decoupling the rate at which the pipeline pushes changes to the output transport from the rate of input changes. It works by accumulating updates inside the pipeline for up to a user-defined period of time or until accumulating a user-defined number of updates and writing them as a single batch to the output transport.

The output buffer can be setup for each individual output connector as part of connector configuration. The following parameters are used to configure the output buffer:

  • enable_output_buffer - Enable output buffer.

  • max_output_buffer_time_millis - Maximum time in milliseconds data is kept in the output buffer.

    When not specified, data is kept in the buffer indefinitely until one of the other trigger conditions is satisfied. When this option is set the buffer will be flushed at most every max_output_buffer_time_millis milliseconds.

    This configuration option requires the enable_output_buffer flag to be set.

  • max_output_buffer_size_records - Maximum number of updates to be kept in the output buffer.

    This parameter bounds the maximal size of the buffer. Note that the size of the buffer is not always equal to the total number of updates output by the pipeline. Updates to the same record can overwrite or cancel previous updates.

    When not specified, the buffer can grow indefinitely until one of the other trigger conditions is satisfied.

    This configuration option requires the enable_output_buffer flag to be set.

note

When the enable_output_buffer flag is set, at least one of max_output_buffer_time_millis or max_output_buffer_size_records must be specified.

See Delta Lake output connector documentation for an example of configuring the output buffer.

Input connector orchestration

Connector Orchestration enables users to activate or deactivate input connectors on demand, giving them control over the timing and order of data ingestion from multiple sources. It can, for example, be used to backfill a pipeline with historical data from a database or data lake before switching over to real-time ingestion from a streaming source like Kafka.

Input connectors can be in either the Running or Paused state. By default, connectors are initialized in the Running state when a pipeline is deployed. In this state, the connector actively fetches data from its configured data source and forwards it to the pipeline. If needed, a connector can be created in the Paused state by setting its paused property to true. The current connector state can be retrieved via the pipeline statistics endpoint.

When paused, the connector remains idle until it is reactivated. Conversely, a connector in the Running state can be paused at any time. This can be done by calling its start/pause endpoint, in which to specify table and connector name.

Note that only if both the pipeline and the connector state is Running, is the input connector active. The following table illustrates this:

Pipeline state    Connector state    Connector is active?
-------------- --------------- --------------------
Paused Paused No
Paused Running No
Running Paused No
Running Running Yes

Orchestration example

  1. Create and start a pipeline named example with the following SQL:

    CREATE TABLE numbers (
    num INT
    ) WITH (
    'connectors' = '[
    {
    "name": "c1",
    "paused": true,
    "transport": {
    "name": "datagen",
    "config": {"plan": [{ "rate": 1, "fields": { "num": { "range": [0, 10], "strategy": "uniform" } } }]}
    }
    },
    {
    "name": "c2",
    "paused": false,
    "transport": {
    "name": "datagen",
    "config": {"plan": [{ "rate": 1, "fields": { "num": { "range": [10, 20], "strategy": "uniform" } } }]}
    }
    }
    ]'
    );

    Note that the numbers table has two input connectors, one of which has paused property set to true. This connector will be created in the Paused state.

  2. Check the numbers table checkbox in the Change Stream tab. Observe that although the pipeline is Running, the change stream only shows input records from connector c2 (i.e., [10, 20)) but not of connector c1 (i.e., [0, 10)).

  3. Start connector c1:

    fda table-connector example numbers c1 start

    Now the Changes Stream tab will show new input records from both connectors.

  4. Pause connector c2:

    fda table-connector example numbers c2 pause

    Now the Changes Stream tab no longer will show new input records from connector c2.

Additional resources

For more information, see: