Connectors: connect to data sources and sinks
A Feldera pipeline can process data from multiple heterogeneous sources and produce outputs to multiple heterogeneous destinations. To this end it relies on a growing library of input and output connectors.
Basics
Users configure connectors using a JSON object, that describes an external source or sink such as a database
table or a Kafka topic. A SQL table can have multiple source connectors attached to it, specified as a
list of JSON connector objects under the 'connectors'
attribute in the WITH
clause of the table.
Similarly, a views can have multiple sink connectors attached to it.
Here is an example, where the VENDOR
table has one connector, configured to fetch and insert JSON data
from an HTTP URL, and a VENDOR_VIEW
which sends the changes of the view
to a Kafka topic in a format that can be consumed by Debezium:
create table VENDOR (
id bigint not null primary key,
name varchar,
address varchar
) WITH ('connectors' = '[{
"transport": {
"name": "url_input", "config": {"path": "https://feldera-basics-tutorial.s3.amazonaws.com/vendor.json"}
},
"format": { "name": "json" }
}]');
create view VENDOR_VIEW
WITH (
'connectors' = '[{
"max_queued_records": 1000,
"format": {
"name": "json",
"config": {
"update_format": "debezium"
}
},
"transport": {
"name": "kafka_output",
"config": {
"bootstrap.servers": "redpanda:9092",
"topic": "test_view"
}
}
}]'
)
as select * from VENDOR;
The WITH
clause for tables needs to be put at the end, after the column definitions, whereas for views it has to
appear before the AS
clause to resolve any parsing ambiguities.
A connector specification consists of three parts:
- Generic attributes common to all connectors, such as backpressure thresholds.
- Transport specification (
transport
) for either input or output defines the data transport to be used by the connector. Example transports include Kafka, URL, Delta Lake, etc. - Data format specification (
format
), which defines the data format for the connector. Example data formats include CSV, JSON, Parquet, or Avro).
Some transports, e.g., Delta Lake and datagen, use fixed predefined data formats and do not require the format section in the connector specification.
This architecture allows the user to combine different transports and data formats.
These basics apply to all connectors except the HTTP input and
output connectors which are not managed by the user, as they directly feed/fetch data
into/from a pipeline via dedicated pipeline endpoints and therefore do not need to be configured in the WITH
clauses
of tables and views.
Generic attributes
The following attributes are common to all connectors:
-
name
- The name that is given to the connector, which must be unique among the connectors of the table or view. This is particularly useful to define when wanting to refer to it, for example to start or pause it at runtime. By default, this is randomly generated. -
paused
- If set to to true the connector will not fetch or push data to the pipeline when started unless explicitly enabled through the API. By default this is set to false. -
max_queued_records
- The approximate maximum number of records to keep in memory. For an input connector, this is the maximum number that the endpoint will read into memory, before the endpoint pauses further reading until the pipeline has consumed some of the backlog. For an output connector, this is the maximum number that the endpoint will hold in memory waiting for the output endpoint to send them, before the circuit pauses execution until the backlog subsides. By default, this is 1,000,000. -
max_batch_size
- For an input connector, the approximate maximum number of records that the pipeline will process in a single pipeline step. By default, this is 10,000.
Configuring the output buffer
By default a Feldera pipeline sends a batch of changes to the output transport for each batch of input updates it processes. This can result in a stream of small updates, which is normal and even preferable for output transports like Kafka; however it can cause performance problems for other connectors, such as the Delta Lake connector by creating a large number of small files.
The output buffer mechanism is designed to solve this problem by decoupling the rate at which the pipeline pushes changes to the output transport from the rate of input changes. It works by accumulating updates inside the pipeline for up to a user-defined period of time or until accumulating a user-defined number of updates and writing them as a single batch to the output transport.
The output buffer can be setup for each individual output connector as part of connector configuration. The following parameters are used to configure the output buffer:
-
enable_output_buffer
- Enable output buffer. -
max_output_buffer_time_millis
- Maximum time in milliseconds data is kept in the output buffer.When not specified, data is kept in the buffer indefinitely until one of the other trigger conditions is satisfied. When this option is set the buffer will be flushed at most every
max_output_buffer_time_millis
milliseconds.This configuration option requires the
enable_output_buffer
flag to be set. -
max_output_buffer_size_records
- Maximum number of updates to be kept in the output buffer.This parameter bounds the maximal size of the buffer. Note that the size of the buffer is not always equal to the total number of updates output by the pipeline. Updates to the same record can overwrite or cancel previous updates.
When not specified, the buffer can grow indefinitely until one of the other trigger conditions is satisfied.
This configuration option requires the
enable_output_buffer
flag to be set.
When the enable_output_buffer
flag is set, at least one of
max_output_buffer_time_millis
or max_output_buffer_size_records
must be
specified.
See Delta Lake output connector documentation for an example of configuring the output buffer.
Input connector orchestration
Connector Orchestration enables users to activate or deactivate input connectors on demand, giving them control over the timing and order of data ingestion from multiple sources. It can, for example, be used to backfill a pipeline with historical data from a database or data lake before switching over to real-time ingestion from a streaming source like Kafka.
Input connectors can be in either the Running
or Paused
state. By default,
connectors are initialized in the Running
state when a pipeline is deployed.
In this state, the connector actively fetches data from its configured data
source and forwards it to the pipeline. If needed, a connector can be created
in the Paused
state by setting its
paused
property
to true
.
The current connector state can be retrieved via the
pipeline statistics endpoint.
When paused, the connector remains idle until it is reactivated.
Conversely, a connector in the Running
state can be paused at any time.
This can be done by calling its
start/pause endpoint,
in which to specify table and connector name.
Note that only if both the pipeline and the connector state is Running
,
is the input connector active. The following table illustrates this:
Pipeline state Connector state Connector is active?
-------------- --------------- --------------------
Paused Paused No
Paused Running No
Running Paused No
Running Running Yes
Orchestration example
-
Create and start a pipeline named
example
with the following SQL:CREATE TABLE numbers (
num INT
) WITH (
'connectors' = '[
{
"name": "c1",
"paused": true,
"transport": {
"name": "datagen",
"config": {"plan": [{ "rate": 1, "fields": { "num": { "range": [0, 10], "strategy": "uniform" } } }]}
}
},
{
"name": "c2",
"paused": false,
"transport": {
"name": "datagen",
"config": {"plan": [{ "rate": 1, "fields": { "num": { "range": [10, 20], "strategy": "uniform" } } }]}
}
}
]'
);Note that the
numbers
table has two input connectors, one of which haspaused
property set totrue
. This connector will be created in thePaused
state. -
Check the
numbers
table checkbox in the Change Stream tab. Observe that although the pipeline isRunning
, the change stream only shows input records from connectorc2
(i.e.,[10, 20)
) but not of connectorc1
(i.e.,[0, 10)
). -
Start connector
c1
:fda table-connector example numbers c1 start
Now the Changes Stream tab will show new input records from both connectors.
-
Pause connector
c2
:fda table-connector example numbers c2 pause
Now the Changes Stream tab no longer will show new input records from connector
c2
.
Additional resources
For more information, see: