Skip to main content

Efficient Bulk Data Processing using Transactions

warning

Transaction support is an experimental feature and may undergo significant changes, including non-backward-compatible modifications in future releases of Feldera.

Transactions enable Feldera pipelines to ingest and process large volumes of data atomically—in one logical unit of work rather than piece-by-piece. Transactions are used to achieve:

  • Efficient backfill: ingest and process large historical datasets.

  • Atomicity: process multiple inputs atomically without emitting intermediate results.

Overview

By default, Feldera pipelines run in continuous mode: they ingest a chunk of data from input connectors, process it to completion and produce updates to all views before processing the next chunk. This mode is optimal for low-latency incremental view maintenance; but less suited for bulk data ingest.

A common scenario is when a pipeline must ingest a large volume of historical data accumulated over years before processing new real-time inputs. This is known as the backfill problem. While backfill can be performed in continuous mode, this is likely to lead to two issues:

  • Performance: computing all intermediate updates can be more expensive than computing the cumulative update. The exact performance difference depends on your SQL queries and can be very significant in some cases.

  • Atomicity: in continuous mode the pipeline produces a stream of intermediate updates, which often cancel each other out. Consider for instance a COUNT(*) query whose output changes any time new records are ingested. These updates expose intermediate computation results to downstream systems while placing excessive load on the data sinks.

Transactions solve these challenges by allowing input updates to be grouped into a single batch and processed atomically. In transactional mode, Feldera still executes SQL queries incrementally and produces a delta of changes to output views. The key difference is in who controls the batch size: instead of the engine automatically deciding chunk boundaries, the user explicitly defines the scope of each transaction.

The following table summarizes the two modes:

Continuous ModeTransactions
When to use

Process real-time inputs with low latency

  • Efficiently process bulk data
  • Process multiple updates atomically
When query evaluation is triggered

When an input chunk is ingested

On transaction commit

Strong consistency

Yes: after processing an input chunk, the contents of all views matches all the inputs received so far.

Yes: after committing a transaction the contents of all matches all the inputs received so far, including inputs received as part of the transaction.

Transaction API

Start a transaction

Use the start_transaction API to start a transaction. The API returns a transaction ID:

$ curl -X POST http://localhost:8080/v0/pipelines/my_pipeline/start_transaction

{"transaction_id":1}
  • The start_transaction call will fail if there is already a transaction in progress.

  • During a transaction, the pipeline ingests incoming data without producing output, performing only minimal processing such as resolving primary keys and indexing inputs.

  • Any inputs received by the pipeline between start_transaction and commit_transaction, along with data buffered by input connectors but not yet processed at the time start_transaction is called, are processed as part of the transaction.

Commit a transaction

Once the pipeline has ingested all inputs that must be processed as part of the transaction, commit the transaction using the commit_transaction API.

$ curl -X POST http://localhost:8080/v0/pipelines/my_pipeline/commit_transaction
"Transaction commit initiated"
  • The commit_transaction REST API initiates the commit. The commit can take some time and complete later. Use the /stats endpoint to monitor transaction status.

  • The Python SDK and fda CLI provide both blocking and non-blocking variants of the commit operation. The blocking variant waits until the commit is complete before returning control to the caller.

  • During a transaction commit, the pipeline computes updates to all views in the program. Depending on the volume of ingested data and the complexity of the views, this process can take a significant amount of time. While the commit is in progress, the pipeline neither ingests new inputs nor produces outputs. To provide visibility, the pipeline periodically (every 10 seconds) logs the progress of the current commit operation.

  • Once the commit is complete, the pipeline outputs a batch of updates for every view in the program. These updates are processed by the output connectors, which send them to their associated data sinks.

When the transaction is complete, the pipeline goes back into continuous processing mode. The user can start a new transaction any time.

Monitoring transaction status

The user can monitor the transaction handling status of the pipeline using the /stats endpoint. The status can be one of:

StatusDescription
NoTransactionThere is currently no active transaction. The pipeline is running in continuous mode.
TransactionInProgressThere is an active transaction in progress.
CommitInProgressThe current transaction is being committed.

When the status is TransactionInProgress or CommitInProgress, the transaction_id attribute contains the current transaction ID.

$ curl -s http://localhost:8080/v0/pipelines/my_pipeline/stats | jq -r '.global_metrics.transaction_status'
TransactionInProgress

$ curl -s http://localhost:8080/v0/pipelines/my_pipeline/stats | jq -r '.global_metrics.transaction_id'
1

Limitations

  • Concurrent transactions are not supported. At most one transaction can run at a time. All inputs ingested by the pipeline while the transaction is active are processed as part of the transaction.

  • Checkpointing is disabled during a transaction. A checkpoint initiated during a transaction gets delayed until the transaction has committed.

  • A transaction currently cannot be aborted or rolled back. Let us know if this feature is important for your use case by leaving a comment.

Example

# Create a simple pipeline that copies all records in table `t` to view `v`.
$ echo 'create table t(x int); create materialized view v as select * from t;' | fda create transaction_test --stdin
Pipeline created successfully.

$ fda start transaction_test
Pipeline started successfully.

# Update the table.
$ fda query transaction_test "insert into t values(1)"
+-------+
| count |
+-------+
| 1 |
+-------+

# The change is instantly reflected in the output view.
$ fda query transaction_test "select * from v"
+---+
| x |
+---+
| 1 |
+---+

# Start a transaction.
$ fda start-transaction transaction_test
Transaction started successfully with ID: 1

# Insert more records.
$ fda query transaction_test "insert into t values(2)"
+-------+
| count |
+-------+
| 1 |
+-------+

$ fda query transaction_test "insert into t values(3)"
+-------+
| count |
+-------+
| 1 |
+-------+

# The view remains unmodified since the transaction has not been committed.
$ fda query transaction_test "select * from v"
+---+
| x |
+---+
| 1 |
+---+

# Commit the transaction.
$ fda commit-transaction transaction_test
Transaction committed successfully.

# Updates performed during the transaction are now propagated to all views.
$ fda query transaction_test "select * from v"
+---+
| x |
+---+
| 2 |
| 3 |
| 1 |
+---+