Skip to main content

Kafka output connector

Feldera can output a stream of changes to a SQL table or view to a Kafka topic.

The Kafka output connector supports fault tolerance.

The Kafka connector uses librdkafka in its implementation and supports relevant options for consumers.

Example usage

We will create a Kafka output connector named total-sales. Kafka broker is located at example.com:9092 and the topic is total-sales.

CREATE VIEW V
WITH (
'connectors' = '[
{
"transport": {
"name": "kafka_output",
"config": {
"bootstrap.servers": "example.com:9092",
"auto.offset.reset": "earliest",
"topic": "total-sales"
}
},
"format": {
"name": "json",
"config": {
"update_format": "insert_delete",
"array": false
}
}
}
]'
)
AS ...

Authentication & Encryption

SSL with PEM keys

"config": {
"bootstrap.servers": "example.com:9092",
"auto.offset.reset": "earliest",
"topic": "book-fair-sales",
"security.protocol": "SSL",
"ssl.ca.pem": "-----BEGIN CERTIFICATE-----TOPSECRET0\n-----END CERTIFICATE-----\n",
"ssl.key.pem": "-----BEGIN CERTIFICATE-----TOPSECRET1\n-----END CERTIFICATE-----\n",
"ssl.certificate.pem": "-----BEGIN CERTIFICATE-----TOPSECRET2\n-----END CERTIFICATE-----\n",
}

PEM-encoded certificates can be passed directly in the configuration using ssl.*.pem keys.

Multiple certificates

librdkafka only accepts multiple certificates when provided via ssl.certificate.location keys (file paths) rather than directly with ssl.certificate.pem.

This is documented in librdkafka issue #3225.

To work around this limitation, Feldera handles PEM-encoded certificates and keys by:

  1. Storing the value passed to ssl.certificate.pem into a file.
  2. Naming the file using the SHA256 hash of the data.
  3. Replacing the ssl.certificate.pem configuration option with ssl.certificate.location option that points to the newly saved file.

Example:

The updated configuration would look like:

"config": {
...,
"ssl.certificate.location": "path/to/certificate.pem"
}

⚠️ If both ssl.certificate.pem and ssl.certificate.location are set the latter will be overwritten.

Additional resources

For more information, see:

  • Kafka input connector for examples on how to write the configuration to connect to Kafka brokers (e.g., how to specify authentication and encryption).

  • Tutorial section which involves creating a Kafka output connector.

  • Data formats such as JSON and CSV

  • Overview of Kafka configuration options: librdkafka options