Kafka input connector
Feldera can consume a stream of changes to a SQL table from Kafka with
the kafka_input
connector.
The Kafka input connector supports fault tolerance.
- One or more Kafka topics can be defined.
- The Kafka connector uses librdkafka in its implementation. Relevant options supported by it can be defined in the connector configuration.
Kafka Input Connector Configuration
Property | Type | Default | Description |
---|---|---|---|
bootstrap.servers * | string | A comma separated list of Kafka brokers to connect to. | |
topics * | list | A list of Kafka topics to subscribe to. | |
log_level | string | The log level for the Kafka client. | |
group_join_timeout_secs | seconds | 10 | Maximum timeout (in seconds) for the endpoint to join the Kafka consumer group during initialization. |
poller_threads | positive integer | 3 | Number of threads used to poll Kafka messages. Setting it to multiple threads can improve performance with small messages. Default is 3. |
start_from | list | Specifies offsets and partitions to begin reading Kafka topics from. When specified, this property must contain a list of JSON objects with the following fields:
|
Any other configurations from librdkafka are directly passed to librdkafka
.
Example usage
We will create a Kafka connector named book-fair-sales
.
Kafka broker is located at example.com:9092
and the topic is book-fair-sales
.
Format in this example is newline-delimited JSON (NDJSON). For example, there can be two messages containing three rows:
Message 1:
{"insert": {"sid": 123, pid": 2, "sold_at": "2024-01-01 12:00:04", "price": 5.0}}
{"insert": {"sid": 124, pid": 12, "sold_at": "2024-01-01 12:00:08", "price": 20.5}}
Message 2:
{"insert": {"sid": 125, pid": 8, "sold_at": "2024-01-01 12:01:02", "price": 1.10}}
CREATE TABLE INPUT (
... -- columns omitted
) WITH (
'connectors' = '[
{
"transport": {
"name": "kafka_input",
"config": {
"bootstrap.servers": "example.com:9092",
"auto.offset.reset": "earliest",
"topics": ["book-fair-sales"]
}
},
"format": {
"name": "json",
"config": {
"update_format": "insert_delete",
"array": false
}
}
}]'
)
Starting from a specific offset
Reading Kafka messages from a specific offset is an experimental feature.
Fault tolerant kafka connectors do not yet support this.
Feldera supports starting a Kafka connector from a specific offset in a specifc partition.
CREATE TABLE INPUT (
... -- columns omitted
) WITH (
'connectors' = '[
{
"transport": {
"name": "kafka_input",
"config": {
"bootstrap.servers": "example.com:9092",
"topics": ["book-fair-sales"],
"start_from": [{
"topic": "book-fair-sales",
"partition": 0,
"offset": 42
}]
}
},
"format": {
"name": "json",
"config": {
"update_format": "insert_delete",
"array": false
}
}
}]'
)
The start_from
field takes a list of JSON objects that must include the
following fields:
topic
: The name of the Kafka topic.partition
: The partition number within the topic.offset
: The specific offset from which to start consuming messages.
- The topic specified in
start_from
must also be included intopics
field. - If a topic has multiple partitions but only one partition is defined in
start_from
, only that partition will be read.
How to write connector config
Below are a couple of examples on how to connect to a Kafka broker by specifying the options passed to librdkafka underneath. Note that it is strongly recommended outside of test/dev environments to have encrypted and authenticated communication. In addition, in order for connectors to not have secrets in plaintext in their configuration, it is recommended to use secret management.
No authentication and plaintext (no encryption)
Default value for security.protocol
is PLAINTEXT
, as such
it does not need to be explicitly specified.
"config": {
"bootstrap.servers": "example.com:9092",
"auto.offset.reset": "earliest",
"topics": ["book-fair-sales"]
}
SASL authentication and plaintext (no encryption)
"config": {
"bootstrap.servers": "example.com:9092",
"auto.offset.reset": "earliest",
"topics": ["book-fair-sales"],
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanism": "SCRAM-SHA-256",
"sasl.username": "<USER>",
"sasl.password": "<PASSWORD>"
}
No authentication and with encryption (SSL)
"config": {
"bootstrap.servers": "example.com:9092",
"auto.offset.reset": "earliest",
"topics": ["book-fair-sales"],
"security.protocol": "SSL"
}
SASL authentication and with encryption (SSL)
"config": {
"bootstrap.servers": "example.com:9092",
"auto.offset.reset": "earliest",
"topics": ["book-fair-sales"],
"security.protocol": "SASL_SSL",
"sasl.mechanism": "SCRAM-SHA-256",
"sasl.username": "<USER>",
"sasl.password": "<PASSWORD>"
}
For example, at the time of writing (15 May 2024),
Confluent Cloud Kafka C client tutorial configuration
uses SASL_SSL
with SASL mechanism PLAIN
(see
their tutorial,
select Kafka Location: Confluent Cloud, and then go to the Configuration tab).
SSL with PEM keys
"config": {
"bootstrap.servers": "example.com:9092",
"auto.offset.reset": "earliest",
"topics": ["book-fair-sales"],
"security.protocol": "SSL",
"ssl.ca.pem": "-----BEGIN CERTIFICATE-----TOPSECRET0\n-----END CERTIFICATE-----\n",
"ssl.key.pem": "-----BEGIN CERTIFICATE-----TOPSECRET1\n-----END CERTIFICATE-----\n",
"ssl.certificate.pem": "-----BEGIN CERTIFICATE-----TOPSECRET2\n-----END CERTIFICATE-----\n",
}
PEM-encoded certificates can be passed directly in the configuration using
ssl.*.pem
keys.
During development, we encountered an issue with librdkafka
where the SSL
handshake fails if the certificate PEM string contain more than one certificate.
Feldera circumvents this issue as follows:
Issue with librdkafka
librdkafka only accepts multiple certificates when provided via
ssl.certificate.location
keys (file paths) rather than directly
with ssl.certificate.pem
.
This is documented in librdkafka issue #3225.
Feldera's Approach
To work around this limitation, Feldera handles PEM-encoded certificates and keys by:
- Storing the value passed to
ssl.certificate.pem
into a file. - Naming the file using the SHA256 hash of the data.
- Replacing the
ssl.certificate.pem
configuration option withssl.certificate.location
option that points to the newly saved file.
Example:
The updated configuration would look like:
"config": {
...,
"ssl.certificate.location": "path/to/certificate.pem"
}
⚠️ If both
ssl.certificate.pem
andssl.certificate.location
are set the latter will be overwritten.
Using Kafka as a Debezium transport
The Kafka connector can be used to ingest data from a source via Debezium. For information on how to setup Debezium integration for Feldera, see Debezium connector documentation.
Additional resources
For more information, see:
-
Tutorial section which involves creating a Kafka input connector.
-
Overview of Kafka configuration options: librdkafka options