Kafka output connector
Feldera can output a stream of changes to a SQL table or view to a Kafka topic.
The Kafka output connector supports fault tolerance.
Kafka Output Connector Configuration
| Property | Type | Default | Description |
|---|---|---|---|
topic (required) | string | The Kafka topic to send to. | |
bootstrap.servers (required) | string | A comma separated list of Kafka brokers to connect to. | |
headers | array of objects | Kafka headers to be added to each message produced by this connector. A header specifies a key and, optionally, a value. The value must be a string or an array of bytes, e.g., [{"key": "key1", "value": "val1"}, {"key": "key2", "value": [1,2,3,4,5]}, {"key": "key3"}] | |
log_level | string | The log level for the Kafka client. | |
initialization_timeout_secs | seconds | 60 | Maximum timeout in seconds to wait for the connector to connect to a Kafka broker. |
kafka_service | string | If specified, this service is used to provide defaults for the Kafka options. | |
region | string | The AWS region to use while connecting to AWS Managed Streaming for Kafka (MSK). |
The connector passes additional options directly to librdkafka. See relevant options for producers.
Example usage
We will create a Kafka output connector named total-sales.
Kafka broker is located at example.com:9092 and the topic is total-sales.
CREATE VIEW V
WITH (
'connectors' = '[
{
"transport": {
"name": "kafka_output",
"config": {
"bootstrap.servers": "example.com:9092",
"topic": "total-sales"
}
},
"format": {
"name": "json",
"config": {
"update_format": "insert_delete",
"array": false
}
}
}
]'
)
AS ...
Authentication & Encryption
SSL with PEM keys
"config": {
"bootstrap.servers": "example.com:9092",
"auto.offset.reset": "earliest",
"topic": "book-fair-sales",
"security.protocol": "SSL",
"ssl.ca.pem": "-----BEGIN CERTIFICATE-----TOPSECRET0\n-----END CERTIFICATE-----\n",
"ssl.key.pem": "-----BEGIN CERTIFICATE-----TOPSECRET1\n-----END CERTIFICATE-----\n",
"ssl.certificate.pem": "-----BEGIN CERTIFICATE-----TOPSECRET2\n-----END CERTIFICATE-----\n",
}
PEM-encoded certificates can be passed directly in the configuration using
ssl.*.pem keys.
Multiple certificates
librdkafka only accepts multiple certificates when provided via
ssl.certificate.location keys (file paths) rather than directly
with ssl.certificate.pem.
This is documented in librdkafka issue #3225.
To work around this limitation, Feldera handles PEM-encoded certificates and keys by:
- Storing the value passed to
ssl.certificate.peminto a file. - Naming the file using the SHA256 hash of the data.
- Replacing the
ssl.certificate.pemconfiguration option withssl.certificate.locationoption that points to the newly saved file.
Example:
The updated configuration would look like:
"config": {
...,
"ssl.certificate.location": "path/to/certificate.pem"
}
⚠️ If both
ssl.certificate.pemandssl.certificate.locationare set the latter will be overwritten.
Connecting to AWS MSK with IAM SASL
Example of writing data to AWS MSK with IAM SASL.
- AWS credentials must either be set as Environment Variables or present in
~/.aws/credentials. - Ensure that the defined output topic either exists in AWS MSK or, automatic topic creation is enabled.
sasl.mechanismmust be set toOAUTHBEARER.security.protocolmust be set toSASL_SSL.- When the
sasl.mechanismisOAUTHBEARER, the AWS region for MSK must either be set via the environment variableAWS_REGIONor theregionfield in connector definition as in the example below.
Other protocols and mechanisms aren't supported.
CREATE VIEW OUTPUT
WITH (
'connectors' = '[
{
"transport": {
"name": "kafka_output",
"config": {
"bootstrap.servers": "broker-1.kafka.region.amazonaws.com:9098,broker-2.kafka.region.amazonaws.com:9098",
"sasl.mechanism": "OAUTHBEARER",
"security.protocol": "SASL_SSL",
"region": "<AWS_REGION>",
"topic": "<TOPIC>"
}
},
"format": {
"name": "json",
"config": {
"update_format": "insert_delete",
"array": false
}
}
}
]'
) as select * from INPUT;
Additional resources
For more information, see:
-
Kafka input connector for examples on how to write the configuration to connect to Kafka brokers (e.g., how to specify authentication and encryption).
-
Tutorial section which involves creating a Kafka output connector.
-
Overview of Kafka configuration options: librdkafka options