Kafka output connector
Feldera can output a stream of changes to a SQL table or view to a Kafka topic.
The Kafka output connector supports fault tolerance.
The Kafka connector uses librdkafka in its implementation and supports relevant options for consumers.
Example usage
We will create a Kafka output connector named total-sales
.
Kafka broker is located at example.com:9092
and the topic is total-sales
.
CREATE VIEW V
WITH (
'connectors' = '[
{
"transport": {
"name": "kafka_output",
"config": {
"bootstrap.servers": "example.com:9092",
"auto.offset.reset": "earliest",
"topic": "total-sales"
}
},
"format": {
"name": "json",
"config": {
"update_format": "insert_delete",
"array": false
}
}
}
]'
)
AS ...
Authentication & Encryption
SSL with PEM keys
"config": {
"bootstrap.servers": "example.com:9092",
"auto.offset.reset": "earliest",
"topic": "book-fair-sales",
"security.protocol": "SSL",
"ssl.ca.pem": "-----BEGIN CERTIFICATE-----TOPSECRET0\n-----END CERTIFICATE-----\n",
"ssl.key.pem": "-----BEGIN CERTIFICATE-----TOPSECRET1\n-----END CERTIFICATE-----\n",
"ssl.certificate.pem": "-----BEGIN CERTIFICATE-----TOPSECRET2\n-----END CERTIFICATE-----\n",
}
PEM-encoded certificates can be passed directly in the configuration using
ssl.*.pem
keys.
Multiple certificates
librdkafka only accepts multiple certificates when provided via
ssl.certificate.location
keys (file paths) rather than directly
with ssl.certificate.pem
.
This is documented in librdkafka issue #3225.
To work around this limitation, Feldera handles PEM-encoded certificates and keys by:
- Storing the value passed to
ssl.certificate.pem
into a file. - Naming the file using the SHA256 hash of the data.
- Replacing the
ssl.certificate.pem
configuration option withssl.certificate.location
option that points to the newly saved file.
Example:
The updated configuration would look like:
"config": {
...,
"ssl.certificate.location": "path/to/certificate.pem"
}
⚠️ If both
ssl.certificate.pem
andssl.certificate.location
are set the latter will be overwritten.
Additional resources
For more information, see:
-
Kafka input connector for examples on how to write the configuration to connect to Kafka brokers (e.g., how to specify authentication and encryption).
-
Tutorial section which involves creating a Kafka output connector.
-
Overview of Kafka configuration options: librdkafka options