Kafka output connector
Feldera can output a stream of changes to a SQL table or view to a Kafka topic.
The Kafka output connector supports fault tolerance.
- The Kafka connector uses librdkafka in its implementation. Relevant options supported by it can be defined in the connector configuration.
Example usage
We will create a Kafka output connector named total-sales
.
Kafka broker is located at example.com:9092
and the topic is total-sales
.
CREATE VIEW V AS ...
WITH (
'connectors' = '[
{
"transport": {
"name": "kafka_output",
"config": {
"bootstrap.servers": "example.com:9092",
"auto.offset.reset": "earliest",
"topic": "total-sales"
}
},
"format": {
"name": "json",
"config": {
"update_format": "insert_delete",
"array": false
}
}
}
]'
)
Authentication & Encryption
SSL with PEM keys
"config": {
"bootstrap.servers": "example.com:9092",
"auto.offset.reset": "earliest",
"topic": "book-fair-sales",
"security.protocol": "SSL",
"ssl.ca.pem": "-----BEGIN CERTIFICATE-----TOPSECRET0\n-----END CERTIFICATE-----\n",
"ssl.key.pem": "-----BEGIN CERTIFICATE-----TOPSECRET1\n-----END CERTIFICATE-----\n",
"ssl.certificate.pem": "-----BEGIN CERTIFICATE-----TOPSECRET2\n-----END CERTIFICATE-----\n",
}
PEM-encoded certificates can be passed directly in the configuration using
ssl.*.pem
keys.
During development, we encountered an issue with librdkafka
where the SSL
handshake fails if the certificate PEM string contain more than one certificate.
Feldera circumvents this issue as follows:
Issue with librdkafka
librdkafka only accepts multiple certificates when provided via
ssl.certificate.location
keys (file paths) rather than directly
with ssl.certificate.pem
.
This is documented in librdkafka issue #3225.
Feldera's Approach
To work around this limitation, Feldera handles PEM-encoded certificates and keys by:
- Storing the value passed to
ssl.certificate.pem
into a file. - Naming the file using the SHA256 hash of the data.
- Replacing the
ssl.certificate.pem
configuration option withssl.certificate.location
option that points to the newly saved file.
Example:
The updated configuration would look like:
"config": {
...,
"ssl.certificate.location": "path/to/certificate.pem"
}
⚠️ If both
ssl.certificate.pem
andssl.certificate.location
are set the latter will be overwritten.
Additional resources
For more information, see:
-
Kafka input connector for examples on how to write the configuration to connect to Kafka brokers (e.g., how to specify authentication and encryption).
-
Tutorial section which involves creating a Kafka output connector.
-
Overview of Kafka configuration options: librdkafka options