Connectors: connect to data sources and sinks
A Feldera pipeline can process data from multiple heterogeneous sources and produce outputs to multiple heterogeneous destinations. To this end it relies on a growing library of input and output connectors.
Basics
Users configure connectors using a JSON object, that describes an external source or sink such as a database
table or a Kafka topic. A SQL table can have multiple source connectors attached to it, specified as a
list of JSON connector objects under the 'connectors'
attribute in the WITH
clause of the table.
Similarly, a views can have multiple sink connectors attached to it.
Here is an example, where the VENDOR
table has one connector, configured to fetch and insert JSON data
from an HTTP URL, and a VENDOR_VIEW
which sends the changes of the view
to a Kafka topic in a format that can be consumed by Debezium:
create table VENDOR (
id bigint not null primary key,
name varchar,
address varchar
) WITH ('connectors' = '[{
"transport": {
"name": "url_input", "config": {"path": "https://feldera-basics-tutorial.s3.amazonaws.com/vendor.json"}
},
"format": { "name": "json" }
}]');
create view VENDOR_VIEW
WITH (
'connectors' = '[{
"max_queued_records": 1000,
"format": {
"name": "json",
"config": {
"update_format": "debezium"
}
},
"transport": {
"name": "kafka_output",
"config": {
"bootstrap.servers": "redpanda:9092",
"topic": "test_view"
}
}
}]'
)
as select * from VENDOR;
The WITH
clause for tables needs to be put at the end, after the column definitions, whereas for views it has to
appear before the AS
clause to resolve any parsing ambiguities.
A connector specification consists of three parts:
- Generic attributes common to all connectors, such as backpressure thresholds.
- Transport specification (
transport
) for either input or output defines the data transport to be used by the connector. Example transports include Kafka, URL, Delta Lake, etc. - Data format specification (
format
), which defines the data format for the connector. Example data formats include CSV, JSON, Parquet, or Avro).
Some transports, e.g., Delta Lake and datagen, use fixed predefined data formats and do not require the format section in the connector specification.
This architecture allows the user to combine different transports and data formats.
These basics apply to all connectors except the HTTP input and
output connectors which are not managed by the user, as they directly feed/fetch data
into/from a pipeline via dedicated pipeline endpoints and therefore do not need to be configured in the WITH
clauses
of tables and views.
Generic attributes
The following attributes are common to all connectors:
-
max_queued_records
- The approximate maximum number of records to keep in memory. For an input connector, this is the maximum number that the endpoint will read into memory, before the endpoint pauses further reading until the pipeline has consumed some of the backlog. For an output connector, this is the maximum number that the endpoint will hold in memory waiting for the output endpoint to send them, before the circuit pauses execution until the backlog subsides. By default, this is 1,000,000. -
max_batch_size
- For an input connector, the approximate maximum number of records that the pipeline will process in a single pipeline step. By default, this is 10,000. -
paused
- If set to to true the connector will not fetch or push data to the pipeline when started unless explicitly enabled through the API. By default this is set to false.
Configuring the output buffer
By default a Feldera pipeline sends a batch of changes to the output transport for each batch of input updates it processes. This can result in a stream of small updates, which is normal and even preferable for output transports like Kafka; however it can cause performance problems for other connectors, such as the Delta Lake connector by creating a large number of small files.
The output buffer mechanism is designed to solve this problem by decoupling the rate at which the pipeline pushes changes to the output transport from the rate of input changes. It works by accumulating updates inside the pipeline for up to a user-defined period of time or until accumulating a user-defined number of updates and writing them as a single batch to the output transport.
The output buffer can be setup for each individual output connector as part of connector configuration. The following parameters are used to configure the output buffer:
-
enable_output_buffer
- Enable output buffer. -
max_output_buffer_time_millis
- Maximum time in milliseconds data is kept in the output buffer.When not specified, data is kept in the buffer indefinitely until one of the other trigger conditions is satisfied. When this option is set the buffer will be flushed at most every
max_output_buffer_time_millis
milliseconds.This configuration option requires the
enable_output_buffer
flag to be set. -
max_output_buffer_size_records
- Maximum number of updates to be kept in the output buffer.This parameter bounds the maximal size of the buffer. Note that the size of the buffer is not always equal to the total number of updates output by the pipeline. Updates to the same record can overwrite or cancel previous updates.
When not specified, the buffer can grow indefinitely until one of the other trigger conditions is satisfied.
This configuration option requires the
enable_output_buffer
flag to be set.
When the enable_output_buffer
flag is set, at least one of
max_output_buffer_time_millis
or max_output_buffer_size_records
must be
specified.
See Delta Lake output connector documentation for an example of configuring the output buffer.
Fault tolerance
As a preview feature in the enterprise edition only, Feldera supports
fault tolerance, meaning that it can gracefully restart from the exact
point of an abrupt shutdown or crash, picking up from where it left
off without dropping or duplicating input or output. To enable fault
tolerance on a pipeline, set "fault_tolerance": {}
in the pipeline
configuration (optional configuration for fault tolerance may be
specified inside {}
).
When a fault-tolerant pipeline restarts, it loads its state from the most recent checkpoint, then it replays any data from input connectors previously processed beyond that checkpoint. If replay produces output that was previously sent to output connectors, it discards that output. After replay completes, the pipeline continues with new input that has not previously been processed.
For Feldera to run a pipeline in fault-tolerant mode, all of its input connectors must support fault tolerance. Some input adapters do not yet support fault tolerance and therefore may not be part of a fault-tolerant pipeline. See the documentation for individual input connectors for details.
For a pipeline to be fully fault tolerant, a pipeline's output connectors must also be fault tolerant. Only the Kafka output connector supports fault tolerance as of now. If a pipeline configured to be fault-tolerant includes non-fault-tolerant output connectors, then in the event of a crash and restart, Feldera may send duplicate output to those connectors, but it will not drop output.
Additional resources
For more information, see: