Skip to main content

Kafka output connector

Feldera can output a stream of changes to a SQL table or view to a Kafka topic.

The Kafka output connector supports fault tolerance.

Example usage

We will create a Kafka output connector named total-sales. Kafka broker is located at example.com:9092 and the topic is total-sales.

CREATE VIEW V AS ...
WITH (
'connectors' = '[
{
"transport": {
"name": "kafka_output",
"config": {
"bootstrap.servers": "example.com:9092",
"auto.offset.reset": "earliest",
"topic": "total-sales"
}
},
"format": {
"name": "json",
"config": {
"update_format": "insert_delete",
"array": false
}
}
}
]'
)

Authentication & Encryption

SSL with PEM keys

"config": {
"bootstrap.servers": "example.com:9092",
"auto.offset.reset": "earliest",
"topic": "book-fair-sales",
"security.protocol": "SSL",
"ssl.ca.pem": "-----BEGIN CERTIFICATE-----TOPSECRET0\n-----END CERTIFICATE-----\n",
"ssl.key.pem": "-----BEGIN CERTIFICATE-----TOPSECRET1\n-----END CERTIFICATE-----\n",
"ssl.certificate.pem": "-----BEGIN CERTIFICATE-----TOPSECRET2\n-----END CERTIFICATE-----\n",
}

PEM-encoded certificates can be passed directly in the configuration using ssl.*.pem keys.

During development, we encountered an issue with librdkafka where the SSL handshake fails if the certificate PEM string contain more than one certificate.

Feldera circumvents this issue as follows:

Issue with librdkafka

librdkafka only accepts multiple certificates when provided via ssl.certificate.location keys (file paths) rather than directly with ssl.certificate.pem.

This is documented in librdkafka issue #3225.

Feldera's Approach

To work around this limitation, Feldera handles PEM-encoded certificates and keys by:

  1. Storing the value passed to ssl.certificate.pem into a file.
  2. Naming the file using the SHA256 hash of the data.
  3. Replacing the ssl.certificate.pem configuration option with ssl.certificate.location option that points to the newly saved file.

Example:

The updated configuration would look like:

"config": {
...,
"ssl.certificate.location": "path/to/certificate.pem"
}

⚠️ If both ssl.certificate.pem and ssl.certificate.location are set the latter will be overwritten.

Additional resources

For more information, see:

  • Kafka input connector for examples on how to write the configuration to connect to Kafka brokers (e.g., how to specify authentication and encryption).

  • Tutorial section which involves creating a Kafka output connector.

  • Data formats such as JSON and CSV

  • Overview of Kafka configuration options: librdkafka options