Kafka input connector
Feldera can consume a stream of changes to a SQL table from Kafka with
the kafka_input connector.
The Kafka input connector supports fault tolerance.
Kafka Input Connector Configuration
| Property | Type | Default | Description |
|---|---|---|---|
topic (required) | string | The Kafka topic to subscribe to. | |
bootstrap.servers (required) | string | A comma separated list of Kafka brokers to connect to. | |
start_from | variant | latest | The starting point for reading from the topic, one of "earliest", "latest", {"offsets": [1, 2, 3, ...]}, where [1, 2, 3, ...] are particular offsets within each partition in the topic, or {"timestamp": <timestamp>} where <timestamp> is a Kafka timestamp as an integer number of milliseconds since the epoch. |
partitions | integer list | The list of Kafka partitions to read from. Only the specified partitions will be consumed. If this field is not set, the connector will consume from all available partitions. If If offsets are provided for all partitions, this field can be omitted. | |
log_level | string | The log level for the Kafka client. | |
group_join_timeout_secs | seconds | 10 | Maximum timeout (in seconds) for the endpoint to join the Kafka consumer group during initialization. |
poller_threads | positive integer | 3 | Number of threads used to poll Kafka messages. Setting it to multiple threads can improve performance with small messages. Default is 3. |
resume_earliest_if_data_expires | boolean | false | See Tolerating missing data on resume. |
include_headers | boolean | false | Whether to include Kafka headers in connector metadata (see Accessing Kafka metadata). |
include_topic | boolean | false | Whether to include Kafka topic name in connector metadata (see Accessing Kafka metadata). |
include_partition | boolean | false | Whether to include Kafka partition in connector metadata (see Accessing Kafka metadata). |
include_offset | boolean | false | Whether to include Kafka offset name in connector metadata (see Accessing Kafka metadata). |
include_timestamp | boolean | false | Whether to include Kafka timestamp in connector metadata (see Accessing Kafka metadata). |
The connector passes additional options directly to librdkafka. Some of the relevant options:
group.idis ignored. The connector never uses consumer groups, soenable.auto.commit,enable.auto.offset.store, and other options related to consumer groups are also ignored.auto.offset.resetis ignored. Usestart_from(described in the table above) instead.
Example usage
We will create a Kafka connector named book-fair-sales.
Kafka broker is located at example.com:9092 and the topic is book-fair-sales.
Format in this example is newline-delimited JSON (NDJSON). For example, there can be two messages containing three rows:
Message 1:
{"insert": {"sid": 123, pid": 2, "sold_at": "2024-01-01 12:00:04", "price": 5.0}}
{"insert": {"sid": 124, pid": 12, "sold_at": "2024-01-01 12:00:08", "price": 20.5}}
Message 2:
{"insert": {"sid": 125, pid": 8, "sold_at": "2024-01-01 12:01:02", "price": 1.10}}
CREATE TABLE INPUT (
... -- columns omitted
) WITH (
'connectors' = '[
{
"transport": {
"name": "kafka_input",
"config": {
"topic": "book-fair-sales",
"start_from": "earliest",
"bootstrap.servers": "example.com:9092"
}
},
"format": {
"name": "json",
"config": {
"update_format": "insert_delete",
"array": false
}
}
}]'
)
Starting from a specific offset
Feldera supports starting a Kafka connector from a specific offset in a specific partition.
CREATE TABLE INPUT (
... -- columns omitted
) WITH (
'connectors' = '[
{
"transport": {
"name": "kafka_input",
"config": {
"topic": "book-fair-sales",
"start_from": {"offsets": [42]},
"bootstrap.servers": "example.com:9092"
}
},
"format": {
"name": "json",
"config": {
"update_format": "insert_delete",
"array": false
}
}
}]'
)
How to write connector config
Below are a couple of examples on how to connect to a Kafka broker by specifying the options passed to librdkafka underneath. Note that it is strongly recommended outside of test/dev environments to have encrypted and authenticated communication. In addition, in order for connectors to not have secrets in plaintext in their configuration, it is recommended to use secret references.
No authentication and plaintext (no encryption)
Default value for security.protocol is PLAINTEXT, as such
it does not need to be explicitly specified.
"config": {
"topic": "book-fair-sales",
"start_from": "earliest",
"bootstrap.servers": "example.com:9092"
}
SASL authentication and plaintext (no encryption)
"config": {
"topic": "book-fair-sales",
"start_from": "earliest",
"bootstrap.servers": "example.com:9092",
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanism": "SCRAM-SHA-256",
"sasl.username": "<USER>",
"sasl.password": "<PASSWORD>"
}
No authentication and with encryption (SSL)
"config": {
"topic": "book-fair-sales",
"start_from": "earliest",
"bootstrap.servers": "example.com:9092",
"security.protocol": "SSL"
}
SASL authentication and with encryption (SSL)
"config": {
"topic": "book-fair-sales",
"start_from": "earliest",
"bootstrap.servers": "example.com:9092",
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-256",
"sasl.username": "<USER>",
"sasl.password": "<PASSWORD>"
}
For example, at the time of writing (15 May 2024),
Confluent Cloud Kafka C client tutorial configuration
uses SASL_SSL with SASL mechanism PLAIN (see
their tutorial,
select Kafka Location: Confluent Cloud, and then go to the Configuration tab).
SSL with PEM keys
"config": {
"topic": "book-fair-sales",
"start_from": "earliest",
"bootstrap.servers": "example.com:9092",
"security.protocol": "SSL",
"ssl.ca.pem": "-----BEGIN CERTIFICATE-----TOPSECRET0\n-----END CERTIFICATE-----\n",
"ssl.key.pem": "-----BEGIN CERTIFICATE-----TOPSECRET1\n-----END CERTIFICATE-----\n",
"ssl.certificate.pem": "-----BEGIN CERTIFICATE-----TOPSECRET2\n-----END CERTIFICATE-----\n"
}
PEM-encoded certificates can be passed directly in the configuration using
ssl.*.pem keys.
Multiple certificates
librdkafka only accepts multiple certificates when provided via
ssl.certificate.location keys (file paths) rather than directly
with ssl.certificate.pem.
This is documented in librdkafka issue #3225.
To work around this limitation, Feldera handles PEM-encoded certificates and keys by:
- Storing the value passed to
ssl.certificate.peminto a file. - Naming the file using the SHA256 hash of the data.
- Replacing the
ssl.certificate.pemconfiguration option withssl.certificate.locationoption that points to the newly saved file.
Example:
The updated configuration would look like:
"config": {
...,
"ssl.certificate.location": "path/to/certificate.pem"
}
⚠️ If both
ssl.certificate.pemandssl.certificate.locationare set the latter will be overwritten.
Using Kafka as a Debezium transport
The Kafka connector can be used to ingest data from a source via Debezium. For information on how to setup Debezium integration for Feldera, see Debezium connector documentation.
Connecting to AWS MSK with IAM SASL
Example of reading data from AWS MSK with IAM SASL.
- AWS credentials must either be set as Environment Variables or present in
~/.aws/credentials. sasl.mechanismmust be set toOAUTHBEARER.security.protocolmust be set toSASL_SSL.- When the
sasl.mechanismisOAUTHBEARER, the AWS region for MSK must either be set via the environment variableAWS_REGIONor theregionfield in connector definition as in the example below.
Other protocols and mechanisms aren't supported.
CREATE TABLE INPUT (
... -- columns omitted
) WITH (
'connectors' = '[
{
"transport": {
"name": "kafka_input",
"config": {
"bootstrap.servers": "broker-1.kafka.region.amazonaws.com:9098,broker-2.kafka.region.amazonaws.com:9098",
"sasl.mechanism": "OAUTHBEARER",
"security.protocol": "SASL_SSL",
"region": "<AWS_REGION>",
"topic": "<TOPIC>"
}
},
"format": {
"name": "json",
"config": {
"update_format": "insert_delete",
"array": false
}
}
}
]'
);
Accessing Kafka metadata
Kafka messages include several metadata attributes in addition to the payload. These can be extracted by the Kafka connector and accessed from SQL:
| Metadata attribute | SQL type | CONNECTOR_METADATA() field | Configuration option |
|---|---|---|---|
| Message headers | MAP<STRING, VARBINARY> | kafka_headers | include_headers |
| Topic name | VARCHAR | kafka_topic | include_topic |
| Partition | INT | kafka_partition | include_partition |
| Message offset | BIGINT | kafka_offset | include_offset |
| Timestamp | TIMESTAMP | kafka_timestamp | include_timestamp |
Some applications need to ingest and store these attributes alongside the message payload. The steps below describe how to extract and use Kafka metadata in SQL tables.
-
Enable metadata extraction in the Kafka connector. Use the configuration options listed in the table above to enable only the metadata fields your application needs. Extracting unnecessary attributes adds overhead to ingestion and processing.
-
Use metadata values to populate table columns. Enabled metadata attributes are exposed via the
CONNECTOR_METADATA()function, which returns aVARIANTcontaining a map with all selected attributes. You can reference these values inDEFAULTexpressions to initialize table columns:
create table my_table(
x int,
kafka_headers MAP<STRING, VARBINARY> DEFAULT CAST(CONNECTOR_METADATA()['kafka_headers'] as MAP<STRING, VARBINARY>),
kafka_timestamp TIMESTAMP DEFAULT CAST(CONNECTOR_METADATA()['kafka_timestamp'] as TIMESTAMP),
kafka_topic VARCHAR DEFAULT CAST(CONNECTOR_METADATA()['kafka_topic'] AS VARCHAR),
kafka_offset BIGINT DEFAULT CAST(CONNECTOR_METADATA()['kafka_offset'] AS BIGINT),
kafka_partition INT DEFAULT CAST(CONNECTOR_METADATA()['kafka_partition'] AS INT)
) with (
'materialized' = 'true',
'connectors' = '[{
"transport": {
"name": "kafka_input",
"config": {
"topic": "meta_topic",
"start_from": "earliest",
"bootstrap.servers": "localhost:19092",
"include_headers": true,
"include_topic": true,
"include_offset": true,
"include_partition": true,
"include_timestamp": true
}
},
"format": {
"name": "json",
"config": {
"update_format": "raw",
"array": false
}
}
}]');
Converting Kafka header values to strings
Kafka headers can contain arbitrary byte arrays, but in practice they typically hold UTF-8–encoded strings.
Use the BIN2UTF8 function to convert binary values to text:
create materialized view v as
select
BIN2UTF8(kafka_headers['my_header']) as my_header
from t;
Tolerating missing data on resume
The resume_earliest_if_data_expires setting controls how the Kafka
input connector behaves when a pipeline configured with at-least-once
fault tolerance resumes from a
checkpoint and the configured Kafka topic no longer has data at the
offsets saved in the checkpoints:
-
If
resume_earliest_if_data_expiresis false, which is the default, then the connector will report an error and the pipeline will fail to start. This behavior makes sense because it is no longer possible to continue the pipeline from where it left off. -
If
resume_earliest_if_data_expiresis true, then the connector will log a warning and start reading data from the earliest offsets now available in the topic. This is reasonable behavior in the special case where some errors were detected in the data in the Kafka topic and the topic was deleted and recreated with correct data starting at the point of an earlier checkpoint, and the pipeline was restarted from that checkpoint.
This setting only has an effect when a Kafka topic cannot be read at the checkpointed offsets. It has no effect if fault tolerance is not enabled, or if exactly once fault tolerance is enabled, or at any time other than the point of resuming from a checkpoint.
Partition changes
Kafka supports increasing, but not decreasing, the number of partitions in a topic. Feldera will only read partitions that existed in a topic at the time that the pipeline was started or resumed from a checkpoint. To make a running Feldera pipeline start to read newly added partitions, stop the pipeline with a checkpoint and then resume it.
If
partitionsis set to a list of partition numbers orstart_fromis set to a list of partition offsets, this is not possible. Instead, force-stop the pipeline, clear its storage, change the configuration, and restart the pipeline from an empty state.
Additional resources
For more information, see:
-
Tutorial section which involves creating a Kafka input connector.
-
Overview of Kafka configuration options: librdkafka options