Skip to main content

NATS input connector

Feldera can consume a stream of changes to a SQL table from NATS JetStream with the nats_input connector.

The NATS input connector supports exactly-once fault tolerance using JetStream's ordered pull consumer.

warning

NATS support is still experimental, and it may be substantially modified in the future.

How it works

The NATS input connector uses JetStream's ordered pull consumer, which provides:

  • Strict ordering: Messages delivered in exact stream order without gaps.
  • Automatic recovery: On gap detection, heartbeat loss, or deletion, the consumer automatically recreates itself and resumes from the last processed position.
  • Retry loop with health checks: On transient startup/runtime failures, the connector enters retry mode and periodically attempts to reconnect.
  • Exactly-once semantics: Combined with Feldera's checkpoint mechanism, ensures each message is processed exactly once.

NATS Input Connector Configuration

The connector configuration consists of three main sections:

Connection Options

PropertyTypeRequiredDescription
server_urlstringYesNATS server URL (e.g., nats://localhost:4222)
authobjectNoAuthentication configuration (see Authentication)
connection_timeout_secsintegerNoConnection timeout in seconds. How long to wait when establishing the initial connection to the NATS server. Default: 10
request_timeout_secsintegerNoRequest timeout in seconds. How long to wait for responses to requests. Default: 10

Stream Configuration

PropertyTypeRequiredDescription
stream_namestringYesThe name of the NATS JetStream stream to consume from
inactivity_timeout_secsintegerNoMaximum idle time while waiting for the next message before running a stream/server health check. Must be at least 1. Default: 60
retry_interval_secsintegerNoDelay between automatic retry attempts while the connector is in retry mode. Must be at least 1. Default: 5

Consumer Configuration

PropertyTypeRequiredDescription
namestringNoConsumer name for identification
descriptionstringNoConsumer description
filter_subjectsstring listNoFilter messages by subject(s). If empty, consumes all subjects in the stream
replay_policyvariantNoMessage replay speed: "Instant" (default, fast) or "Original" (rate-limited at original timing)
rate_limitintegerNoRate limit in bytes per second. Default: 0 (unlimited)
deliver_policyvariantYesStarting point for reading from the stream (see Deliver Policy)
max_waitingintegerNoMaximum outstanding pull requests. Default: 0
metadatamap (string → string)NoConsumer metadata key-value pairs
max_batchintegerNoMaximum messages per batch
max_bytesintegerNoMaximum bytes per batch
max_expiresdurationNoMaximum duration for pull requests

Deliver Policy

The deliver_policy field determines where in the stream to start consuming messages:

  • "All" - Start from the earliest available message in the stream
  • "Last" - Start from the last message in the stream
  • "New" - Start from new messages only (messages arriving after consumer creation)
  • "LastPerSubject" - Start with the last message for all subjects received (useful for KV-like workloads)
  • {"ByStartSequence": {"start_sequence": 100}} - Start from a specific sequence number
  • {"ByStartTime": {"start_time": "2024-01-01T12:00:00Z"}} - Start from messages at or after the specified timestamp (RFC 3339 format)

Replay Policy

The replay_policy field controls how fast messages are delivered to the consumer:

  • "Instant" (default) - Delivers messages as quickly as possible. Use for maximum throughput in production workloads.
  • "Original" - Delivers messages at the rate they were originally received, preserving the timing between messages. Useful for:
    • Replaying production traffic patterns in test/staging environments
    • Load testing with realistic timing
    • Debugging scenarios where message timing matters

If not specified, defaults to "Instant".

Retry, startup and replay behavior

The connector distinguishes between retryable and fatal errors:

  • Retryable errors (temporary network/server issues, missing stream during startup, transient message-stream failures, and temporary failures while fetching JetStream stream metadata used during startup, resume, or replay validation) move the connector into retry mode. It reports non-fatal endpoint errors and retries automatically every retry_interval_secs.
  • Fatal errors stop the connector and report a fatal endpoint error. This is used when checkpoint/replay metadata is incompatible with the current stream sequence space.

Before reading after startup or resume, the connector validates the checkpoint resume cursor against the stream's available sequence range. During replay, it validates that the requested replay range still exists.

Only transient I/O failures during these validation checks are retried. Once the connector successfully reads the stream metadata, logical validation failures remain fatal.

Typical fatal scenarios include:

  • Stream deleted or recreated: The checkpoint references sequence numbers that no longer exist in the current stream. For example, the resume cursor is before the stream's earliest available sequence, or after the stream's latest sequence.
  • Stream purged: The stream exists but required replay messages have been removed. The connector detects that the requested replay range falls outside the available sequence range.
  • Stream emptied: The checkpoint says to resume from a specific sequence, but the stream now contains zero messages.
  • Unexpected sequence during replay: While replaying a checkpoint batch, the connector receives a message with a sequence number beyond the expected replay range, indicating that earlier messages may have been deleted mid-replay.

In all of these cases the connector fails fast with a fatal error instead of retrying forever, because the data needed to maintain exactly-once guarantees is permanently gone.

Recovery from fatal errors

To recover from a fatal error caused by stream data loss, you typically need to reset the pipeline's checkpoint state (e.g., by recreating the pipeline) so it starts fresh without referencing the now-invalid sequence numbers.

Authentication

The NATS connector currently supports credentials-based authentication through the auth object.

Credentials File Authentication

Use a credentials file containing JWT and NKey seed:

{
"credentials": {
"FromFile": "/path/to/credentials.creds"
}
}

Or provide credentials directly as a string:

{
"credentials": {
"FromString": "-----BEGIN NATS USER JWT-----\n...\n------END NATS USER JWT------\n\n************************* IMPORTANT *************************\n..."
}
}
note

Additional authentication methods (JWT, NKey, token, username/password) are defined in the configuration schema but not yet implemented. Only credentials-based authentication is currently supported.

tip

For production environments, it is strongly recommended to use secret references instead of hardcoding credentials in the configuration.

Setting up NATS JetStream

Before using the NATS input connector, you need a NATS server with JetStream enabled and a stream created.

Quickstart

The quickest way to start experimenting with Feldera and NATS is to use Docker Compose:

curl -L https://raw.githubusercontent.com/feldera/feldera/main/deploy/docker-compose.yml -o docker-compose.yml
docker compose --profile nats up

This starts a Feldera pipeline manager, NATS server, and the NATS CLI. Connect to the CLI container with:

docker compose exec nats-cli sh

You can then easily publish messages to the NATS server using the nats CLI.

Creating a Stream

Once installed, create a stream and publish test messages:

# Create a stream
nats stream add my_texts --subjects "text.>" --defaults

# Publish test messages
nats pub -J --count 100 text.area.1 '{"unix": {{UnixNano}}, "text": "{{Random 0 20}}"}'

Example usage

Basic example with raw JSON format

Create a NATS input connector that reads from the my_texts stream:

CREATE TABLE raw_text (
unix BIGINT,
text STRING
) WITH (
'append_only' = 'true',
'connectors' = '[{
"name": "my_text",
"transport": {
"name": "nats_input",
"config": {
"connection_config": {
"server_url": "nats://nats:4222"
},
"stream_name": "my_texts",
"consumer_config": {
"deliver_policy": "All"
}
}
},
"format": {
"name": "json",
"config": {
"update_format": "raw"
}
}
}]'
);

CREATE MATERIALIZED VIEW summary as
SELECT
len(text) as text_length,
(max(unix)/1e6)::TIMESTAMP as last_recived,
count(*) as count
FROM raw_text
GROUP BY text_length

Only receive new NATS messages

If you only want to receive messages published after the Feldera pipeline starts, change deliver_policy to New.

CREATE TABLE raw_text (
unix BIGINT,
text STRING
) WITH (
'append_only' = 'true',
'connectors' = '[{
"name": "my_text",
"transport": {
"name": "nats_input",
"config": {
"connection_config": {
"server_url": "nats://nats:4222"
},
"stream_name": "my_texts",
"consumer_config": {
"deliver_policy": "New"
}
}
},
"format": {
"name": "json",
"config": {
"update_format": "raw"
}
}
}]'
);

CREATE MATERIALIZED VIEW summary as
SELECT
len(text) as text_length,
(max(unix)/1e6)::TIMESTAMP as last_recived,
count(*) as count
FROM raw_text
GROUP BY text_length

Filtering by subject

Use filter_subjects to only consume messages from specific subjects text.area.2 and text.*.3:

CREATE TABLE raw_text (
unix BIGINT,
text STRING
) WITH (
'append_only' = 'true',
'connectors' = '[{
"name": "my_text",
"transport": {
"name": "nats_input",
"config": {
"connection_config": {
"server_url": "nats://nats:4222"
},
"stream_name": "my_texts",
"consumer_config": {
"deliver_policy": "All",
"filter_subjects": ["text.area.2", "text.*.3"]
}
}
},
"format": {
"name": "json",
"config": {
"update_format": "raw"
}
}
}]'
);

CREATE MATERIALIZED VIEW summary as
SELECT
len(text) as text_length,
(max(unix)/1e6)::TIMESTAMP as last_recived,
count(*) as count
FROM raw_text
GROUP BY text_length

Replaying at original timing

You can use "Original" replay policy to replay production traffic in a test environment with realistic timing:

CREATE TABLE raw_text (
unix BIGINT,
text STRING
) WITH (
'append_only' = 'true',
'connectors' = '[{
"name": "my_text",
"transport": {
"name": "nats_input",
"config": {
"connection_config": {
"server_url": "nats://nats:4222"
},
"stream_name": "my_texts",
"consumer_config": {
"deliver_policy": "All",
"replay_policy": "Original"
}
}
},
"format": {
"name": "json",
"config": {
"update_format": "raw"
}
}
}]'
);

CREATE MATERIALIZED VIEW summary as
SELECT
len(text) as text_length,
(max(unix)/1e6)::TIMESTAMP as last_recived,
count(*) as count
FROM raw_text
GROUP BY text_length

Additional resources

For more information, see: