NATS input connector
Feldera can consume a stream of changes to a SQL table from NATS JetStream
with the nats_input connector.
The NATS input connector supports exactly-once fault tolerance using JetStream's ordered pull consumer.
NATS support is still experimental, and it may be substantially modified in the future.
How it works
The NATS input connector uses JetStream's ordered pull consumer, which provides:
- Strict ordering: Messages delivered in exact stream order without gaps.
- Automatic recovery: On gap detection, heartbeat loss, or deletion, the consumer automatically recreates itself and resumes from the last processed position.
- Retry loop with health checks: On transient startup/runtime failures, the connector enters retry mode and periodically attempts to reconnect.
- Exactly-once semantics: Combined with Feldera's checkpoint mechanism, ensures each message is processed exactly once.
NATS Input Connector Configuration
The connector configuration consists of three main sections:
Connection Options
| Property | Type | Required | Description |
|---|---|---|---|
server_url | string | Yes | NATS server URL (e.g., nats://localhost:4222) |
auth | object | No | Authentication configuration (see Authentication) |
connection_timeout_secs | integer | No | Connection timeout in seconds. How long to wait when establishing the initial connection to the NATS server. Default: 10 |
request_timeout_secs | integer | No | Request timeout in seconds. How long to wait for responses to requests. Default: 10 |
Stream Configuration
| Property | Type | Required | Description |
|---|---|---|---|
stream_name | string | Yes | The name of the NATS JetStream stream to consume from |
inactivity_timeout_secs | integer | No | Maximum idle time while waiting for the next message before running a stream/server health check. Must be at least 1. Default: 60 |
retry_interval_secs | integer | No | Delay between automatic retry attempts while the connector is in retry mode. Must be at least 1. Default: 5 |
Consumer Configuration
| Property | Type | Required | Description |
|---|---|---|---|
name | string | No | Consumer name for identification |
description | string | No | Consumer description |
filter_subjects | string list | No | Filter messages by subject(s). If empty, consumes all subjects in the stream |
replay_policy | variant | No | Message replay speed: "Instant" (default, fast) or "Original" (rate-limited at original timing) |
rate_limit | integer | No | Rate limit in bytes per second. Default: 0 (unlimited) |
deliver_policy | variant | Yes | Starting point for reading from the stream (see Deliver Policy) |
max_waiting | integer | No | Maximum outstanding pull requests. Default: 0 |
metadata | map (string → string) | No | Consumer metadata key-value pairs |
max_batch | integer | No | Maximum messages per batch |
max_bytes | integer | No | Maximum bytes per batch |
max_expires | duration | No | Maximum duration for pull requests |
Deliver Policy
The deliver_policy field determines where in the stream to start consuming messages:
"All"- Start from the earliest available message in the stream"Last"- Start from the last message in the stream"New"- Start from new messages only (messages arriving after consumer creation)"LastPerSubject"- Start with the last message for all subjects received (useful for KV-like workloads){"ByStartSequence": {"start_sequence": 100}}- Start from a specific sequence number{"ByStartTime": {"start_time": "2024-01-01T12:00:00Z"}}- Start from messages at or after the specified timestamp (RFC 3339 format)
Replay Policy
The replay_policy field controls how fast messages are delivered to the consumer:
"Instant"(default) - Delivers messages as quickly as possible. Use for maximum throughput in production workloads."Original"- Delivers messages at the rate they were originally received, preserving the timing between messages. Useful for:- Replaying production traffic patterns in test/staging environments
- Load testing with realistic timing
- Debugging scenarios where message timing matters
If not specified, defaults to "Instant".
Retry, startup and replay behavior
The connector distinguishes between retryable and fatal errors:
- Retryable errors (temporary network/server issues, missing stream during startup, transient message-stream failures, and temporary failures while fetching JetStream stream metadata used during startup, resume, or replay validation) move the connector into retry mode. It reports non-fatal endpoint errors and retries automatically every
retry_interval_secs. - Fatal errors stop the connector and report a fatal endpoint error. This is used when checkpoint/replay metadata is incompatible with the current stream sequence space.
Before reading after startup or resume, the connector validates the checkpoint resume cursor against the stream's available sequence range. During replay, it validates that the requested replay range still exists.
Only transient I/O failures during these validation checks are retried. Once the connector successfully reads the stream metadata, logical validation failures remain fatal.
Typical fatal scenarios include:
- Stream deleted or recreated: The checkpoint references sequence numbers that no longer exist in the current stream. For example, the resume cursor is before the stream's earliest available sequence, or after the stream's latest sequence.
- Stream purged: The stream exists but required replay messages have been removed. The connector detects that the requested replay range falls outside the available sequence range.
- Stream emptied: The checkpoint says to resume from a specific sequence, but the stream now contains zero messages.
- Unexpected sequence during replay: While replaying a checkpoint batch, the connector receives a message with a sequence number beyond the expected replay range, indicating that earlier messages may have been deleted mid-replay.
In all of these cases the connector fails fast with a fatal error instead of retrying forever, because the data needed to maintain exactly-once guarantees is permanently gone.
To recover from a fatal error caused by stream data loss, you typically need to reset the pipeline's checkpoint state (e.g., by recreating the pipeline) so it starts fresh without referencing the now-invalid sequence numbers.
Authentication
The NATS connector currently supports credentials-based authentication through the auth object.
Credentials File Authentication
Use a credentials file containing JWT and NKey seed:
{
"credentials": {
"FromFile": "/path/to/credentials.creds"
}
}
Or provide credentials directly as a string:
{
"credentials": {
"FromString": "-----BEGIN NATS USER JWT-----\n...\n------END NATS USER JWT------\n\n************************* IMPORTANT *************************\n..."
}
}
Additional authentication methods (JWT, NKey, token, username/password) are defined in the configuration schema but not yet implemented. Only credentials-based authentication is currently supported.
For production environments, it is strongly recommended to use secret references instead of hardcoding credentials in the configuration.
Setting up NATS JetStream
Before using the NATS input connector, you need a NATS server with JetStream enabled and a stream created.
Quickstart
The quickest way to start experimenting with Feldera and NATS is to use Docker Compose:
curl -L https://raw.githubusercontent.com/feldera/feldera/main/deploy/docker-compose.yml -o docker-compose.yml
docker compose --profile nats up
This starts a Feldera pipeline manager, NATS server, and the NATS CLI. Connect to the CLI container with:
docker compose exec nats-cli sh
You can then easily publish messages to the NATS server using the nats CLI.
Creating a Stream
Once installed, create a stream and publish test messages:
# Create a stream
nats stream add my_texts --subjects "text.>" --defaults
# Publish test messages
nats pub -J --count 100 text.area.1 '{"unix": {{UnixNano}}, "text": "{{Random 0 20}}"}'
Example usage
Basic example with raw JSON format
Create a NATS input connector that reads from the my_texts stream:
CREATE TABLE raw_text (
unix BIGINT,
text STRING
) WITH (
'append_only' = 'true',
'connectors' = '[{
"name": "my_text",
"transport": {
"name": "nats_input",
"config": {
"connection_config": {
"server_url": "nats://nats:4222"
},
"stream_name": "my_texts",
"consumer_config": {
"deliver_policy": "All"
}
}
},
"format": {
"name": "json",
"config": {
"update_format": "raw"
}
}
}]'
);
CREATE MATERIALIZED VIEW summary as
SELECT
len(text) as text_length,
(max(unix)/1e6)::TIMESTAMP as last_recived,
count(*) as count
FROM raw_text
GROUP BY text_length
Only receive new NATS messages
If you only want to receive messages published after the Feldera pipeline starts,
change deliver_policy to New.
CREATE TABLE raw_text (
unix BIGINT,
text STRING
) WITH (
'append_only' = 'true',
'connectors' = '[{
"name": "my_text",
"transport": {
"name": "nats_input",
"config": {
"connection_config": {
"server_url": "nats://nats:4222"
},
"stream_name": "my_texts",
"consumer_config": {
"deliver_policy": "New"
}
}
},
"format": {
"name": "json",
"config": {
"update_format": "raw"
}
}
}]'
);
CREATE MATERIALIZED VIEW summary as
SELECT
len(text) as text_length,
(max(unix)/1e6)::TIMESTAMP as last_recived,
count(*) as count
FROM raw_text
GROUP BY text_length
Filtering by subject
Use filter_subjects to only consume messages from specific subjects text.area.2 and text.*.3:
CREATE TABLE raw_text (
unix BIGINT,
text STRING
) WITH (
'append_only' = 'true',
'connectors' = '[{
"name": "my_text",
"transport": {
"name": "nats_input",
"config": {
"connection_config": {
"server_url": "nats://nats:4222"
},
"stream_name": "my_texts",
"consumer_config": {
"deliver_policy": "All",
"filter_subjects": ["text.area.2", "text.*.3"]
}
}
},
"format": {
"name": "json",
"config": {
"update_format": "raw"
}
}
}]'
);
CREATE MATERIALIZED VIEW summary as
SELECT
len(text) as text_length,
(max(unix)/1e6)::TIMESTAMP as last_recived,
count(*) as count
FROM raw_text
GROUP BY text_length
Replaying at original timing
You can use "Original" replay policy to replay production traffic in a test environment with realistic timing:
CREATE TABLE raw_text (
unix BIGINT,
text STRING
) WITH (
'append_only' = 'true',
'connectors' = '[{
"name": "my_text",
"transport": {
"name": "nats_input",
"config": {
"connection_config": {
"server_url": "nats://nats:4222"
},
"stream_name": "my_texts",
"consumer_config": {
"deliver_policy": "All",
"replay_policy": "Original"
}
}
},
"format": {
"name": "json",
"config": {
"update_format": "raw"
}
}
}]'
);
CREATE MATERIALIZED VIEW summary as
SELECT
len(text) as text_length,
(max(unix)/1e6)::TIMESTAMP as last_recived,
count(*) as count
FROM raw_text
GROUP BY text_length
Additional resources
For more information, see:
- Top-level connector documentation
- Fault tolerance
- Data formats such as JSON and CSV
- NATS JetStream documentation
- NATS Ordered Consumer documentation