Delta Lake input connector
Delta Lake is an open-source storage framework for the Lakehouse architecture. It is typically used with the Apache Spark runtime. Data in a Delta Lake is organized in tables (called Delta Tables), stored in a file system or an object stores like AWS S3, Google GCS, or Azure Blob Storage. Like other Lakehouse-native storage formats, Delta Lake is optimized for both batch and stream processing, offering a bridge between the two worlds.
The Delta Lake input connector does not yet support fault tolerance.
Delta Lake input connector configuration
Required parameters
uri
- Table URI, e.g., "s3://feldera-fraud-detection-data/demographics_train"mode
- Table read mode. Three options are available:snapshot
- read a snapshot of the table and stop.follow
- follow the changelog of the table, only ingesting changes (new and deleted rows)snapshot_and_follow
- Read a snapshot of the table before switching to thefollow
mode. This mode implements the backfill pattern where we load historical data for the table before ingesting the stream of real-time updates.
Optional parameters
-
timestamp_column
- Table column that serves as an event timestamp. When this option is specified, andmode
is one ofsnapshot
orsnapshot_and_follow
, the snapshot of the table is ingested in the timestamp order. This setting is required for tables declared with theLATENESS
attribute in Feldera SQL. It impacts the performance of the connector, since data must be sorted before pushing it to the pipeline; therefore it is not recommended to use this settings for tables withoutLATENESS
. -
snapshot_filter
- Optional row filter. This option is only valid whenmode
is set tosnapshot
orsnapshot_and_follow
. When specified, only rows that satisfy the filter condition are included in the snapshot. The condition must be a valid SPARK SQL Boolean expression that can be used in thewhere
clause of theselect * from snapshot where ..
query.This option can be used for example to specify the range of event times to include in the snapshot, e.g.:
ts BETWEEN '2005-01-01 00:00:00' AND '2010-12-31 23:59:59'
. -
version
- Optional table version. When this option is set, the connector finds and opens the specified version of the table. Insnapshot
andsnapshot_and_follow
modes, it retrieves the snapshot of this version of the table. Infollow
andsnapshot_and_follow
modes, it follows transaction log records after this version.Note: at most one of
version
anddatetime
options can be specified. When neither of the two options is specified, the latest committed version of the table is used. -
datetime
- Optional timestamp for the snapshot in the ISO-8601/RFC-3339 format, e.g., "2024-12-09T16:09:53+00:00". When this option is set, the connector finds and opens the version of the table as of the specified point in time (based on the server time recorded in the transaction log, not the event time encoded in the data). Insnapshot
andsnapshot_and_follow
modes, it retrieves the snapshot of this version of the table. Infollow
andsnapshot_and_follow
modes, it follows transaction log records after this version.Note: at most one of
version
anddatetime
options can be specified. When neither of the two options is specified, the latest committed version of the table is used.
Storage parameters
Along with the parameters mentioned above, there are additional configuration options for specific storage backends. Refer to backend-specific documentation for details:
Example usage
Create a Delta Lake input connector that reads a snapshot of a table from a public S3 bucket.
Note the aws_skip_signature
flag, required to read from the bucket without authentcation. The
snapshot will be sorted by the unix_time
column.
CREATE TABLE INPUT (
... -- columns omitted
) WITH (
'connectors' = '[
{
"transport": {
"name": "delta_table_input",
"config": {
"uri": "s3://feldera-fraud-detection-data/transaction_train",
"mode": "snapshot",
"aws_skip_signature": "true",
"timestamp_column": "unix_time"
}
}
}]'
)
Read a full snapshot of version 10 of the table before ingesting the stream of
changes for versions 11 onward. The initial snapshot will be sorted by the
unix_time
column. Here and below we only show the contents of the
transport.config
field of the connector.
{
"uri": "s3://feldera-fraud-detection-data/transaction_infer",
"mode": "snapshot_and_follow",
"version": 10,
"timestamp_column": "unix_time",
"aws_skip_signature": "true"
}
Read a full snapshot of a Delta table using the specified AWS access key. Note that
the aws_region
parameter is required in this case, because the Delta Lake Rust
library we use does not currently auto-detect the AWS region.
{
"uri": "s3://feldera-fraud-detection-demo/transaction_train",
"mode": "snapshot",
"aws_access_key_id": <AWS_ACCESS_KEY_ID>,
"aws_secret_access_key": <AWS_SECRET_ACCESS_KEY>,
"aws_region": "us-east-1"
}