Skip to main content

Delta Lake input connector

note

This page describes configuration options specific to the Delta Lake connector. See top-level connector documentation for general information about configuring input and output connectors.

Delta Lake is a popular open table format based on Parquet files. It is typically used with the Apache Spark runtime. Data in a Delta Lake is organized in tables, stored in a file system or an object stores like AWS S3, Google GCS, or Azure Blob Storage.

The Delta Lake input connector supports checkpoint and resume and at-least-once fault tolerance, but not exactly once fault tolerance.

Delta Lake input connector configuration

PropertyTypeDefaultDescription
uri*stringTable URI, e.g., s3://feldera-fraud-detection-data/demographics_train. Supported URI schemes include:
  • AWS S3: s3://, s3a://
  • Azure Blob Storage: az://, adl://, azure://, abfs://, abfss://
  • Google Cloud Storage: gs://
  • uc:// - Unity catalog
mode*enumTable read mode. Three options are available:
  • snapshot - read a snapshot of the table and stop.
  • follow - follow the changelog of the table, only ingesting changes (new and deleted rows)
  • snapshot_and_follow - Read a snapshot of the table before switching to the follow mode. This mode implements the backfill pattern where we load historical data for the table before ingesting the stream of real-time updates.
  • cdc - Change-Data-Capture (CDC) mode. The table behaves as an append-only log where every row represents an insert or delete action. The order of actions is determined by the cdc_order_by property, and the type of each action is determined by the cdc_delete_filter property. In this mode, the connector does not read the initial snapshot of the table and follows the transaction log starting from the version of the table specified by the version or datetime property.
timestamp_columnstringTable column that serves as an event timestamp. When this option is specified, and mode is one of snapshot or snapshot_and_follow, table rows are ingested in the timestamp order, respecting the LATENESS property of the column: each ingested row has a timestamp no more than LATENESS time units earlier than the most recent timestamp of any previously ingested row. See details below.
filterstring

Optional row filter.

When specified, only rows that satisfy the filter condition are read from the delta table. The condition must be a valid SQL Boolean expression that can be used in the where clause of the select * from my_table where ... query.

snapshot_filterstring

Optional snapshot filter.

This option is only valid when mode is set to snapshot or snapshot_and_follow. When specified, only rows that satisfy the filter condition are included in the snapshot.

The condition must be a valid SQL Boolean expression that can be used in the where clause of the select * from snapshot where ... query.

Unlike the filter option, which applies to all records retrieved from the table, this filter only applies to rows in the initial snapshot of the table. For instance, it can be used to specify the range of event times to include in the snapshot, e.g.: ts BETWEEN TIMESTAMP '2005-01-01 00:00:00' AND TIMESTAMP '2010-12-31 23:59:59'. This option can be used together with the filter option. During the initial snapshot, only rows that satisfy both filter and snapshot_filter are retrieved from the Delta table. When subsequently following changes in the the transaction log (mode = snapshot_and_follow), all rows that meet the filter condition are ingested, regardless of snapshot_filter.

version, start_versioninteger

Optional table version. When this option is set, the connector finds and opens the specified version of the table. In snapshot and snapshot_and_follow modes, it retrieves the snapshot of this version of the table. In follow, snapshot_and_follow, and cdc modes, it follows transaction log records after this version.

Note: at most one of version and datetime options can be specified. When neither of the two options is specified, the latest committed version of the table is used.

datetimestring

Optional timestamp for the snapshot in the ISO-8601/RFC-3339 format, e.g., "2024-12-09T16:09:53+00:00". When this option is set, the connector finds and opens the version of the table as of the specified point in time (based on the server time recorded in the transaction log, not the event time encoded in the data). In snapshot and snapshot_and_follow modes, it retrieves the snapshot of this version of the table. In follow, snapshot_and_follow, and cdc modes, it follows transaction log records after this version.

Note: at most one of version and datetime options can be specified. When neither of the two options is specified, the latest committed version of the table is used.

end_versioninteger

Optional final table version.

Valid only when the connector is configured in follow, snapshot_and_follow, or cdc mode.

When set, the connector will stop scanning the table’s transaction log after reaching this version or any greater version.

This bound is inclusive: if the specified version appears in the log, it will be processed before signaling end-of-input.

cdc_delete_filerstring

A predicate that determines whether the record represents a deletion.

This setting is only valid in the cdc mode. It specifies a predicate applied to each row in the Delta table to determine whether the row represents a deletion event. Its value must be a valid Boolean SQL expression that can be used in a query of the form SELECT * from <table> WHERE <cdc_delete_filter>.

cdc_order_bystring

An expression that determines the ordering of updates in the Delta table.

This setting is only valid in the cdc mode. It specifies a predicate applied to each row in the Delta table to determine the order in which updates in the table should be applied. Its value must be a valid SQL expression that can be used in a query of the form SELECT * from <table> ORDER BY <cdc_order_by>.

num_parsersstringThe number of parallel parsing tasks the connector uses to process data read from the table. Increasing this value can enhance performance by allowing more concurrent processing. Recommended range: 1–10. The default is 4.
skip_unused_columnsboolfalse

Don't read unused columns from the Delta table. When set to true, this option instructs the connector to avoid reading columns from the Delta table that are not used in any view definitions. To be skipped, the columns must be either nullable or have default values. This can improve ingestion performance, especially for wide tables.

Note: The simplest way to exclude unused columns is to omit them from the Feldera SQL table declaration. The connector never reads columns that aren't declared in the SQL schema. Additionally, the SQL compiler emits warnings for declared but unused columns—use these as a guide to optimize your schema.

max_concurrent_readersinteger6

Maximum number of concurrent object store reads performed by all Delta Lake connectors.

This setting is used to limit the number of concurrent reads of the object store in a pipeline with a large number of Delta Lake connectors. When multiple connectors are simultaneously reading from the object store, this can lead to transport timeouts.

When enabled, this setting limits the number of concurrent reads across all connectors. This is a global setting that affects all Delta Lake connectors, and not just the connector where it is specified. It should therefore be used at most once in a pipeline. If multiple connectors specify this setting, they must all use the same value.

The default value is 6.

[*]: Required fields

Storage parameters

Along with the parameters listed above, there are additional configuration options for specific storage backends. These can be configured either as connector properties or as environment variables set inside the pipeline pod. If the same option is specified in both places, the connector property takes precedence.

The following configuration options are supported for Amazon S3 object store. See object_store documentation for additional details.

Configuration OptionEnvironment VariableDescription
aws_access_key_id, access_key_idAWS_ACCESS_KEY_IDAWS access key ID for authentication.
aws_secret_access_key, secret_access_keyAWS_SECRET_ACCESS_KEYAWS secret access key for authentication.
aws_region, regionAWS_REGIONAWS default region for operations.
aws_endpoint_url, aws_endpoint, endpoint_url, endpointAWS_ENDPOINTCustom S3 endpoint (e.g., for local or alternative services).
aws_session_token, aws_token, session_token, tokenAWS_SESSION_TOKENSession token for temporary credentials.
aws_skip_signature, skip_signatureAWS_SKIP_SIGNATUREIf enabled, AmazonS3 will not fetch credentials and will not sign requests. This can be useful when interacting with public S3 buckets that deny authorized requests
aws_imdsv1_fallback, imdsv1_fallbackAWS_IMDSV1_FALLBACKEnable automatic fallback to using IMDSv1 if the token endpoint returns a 403 error indicating that IMDSv2 is not supported.
aws_virtual_hosted_style_request, virtual_hosted_style_requestAWS_VIRTUAL_HOSTED_STYLE_REQUESTConfigured whether virtual hosted style request has to be used. If "false" (default), path style request is used, if "true", virtual hosted style request is used. If the endpoint is provided then it should be consistent with virtual_hosted_style_request. i.e. if virtual_hosted_style_request is set to true then endpoint should have bucket name included.
aws_unsigned_payload, unsigned_payloadAWS_UNSIGNED_PAYLOADAvoid computing payload checksum when calculating signature.
aws_metadata_endpoint, metadata_endpointAWS_METADATA_ENDPOINTInstance metadata endpoint
aws_container_credentials_relative_uri, container_credentials_relative_uriAWS_CONTAINER_CREDENTIALS_RELATIVE_URIContainer credentials relative URI when used in ECS
aws_container_credentials_full_uri, container_credentials_full_uriAWS_CONTAINER_CREDENTIALS_FULL_URIContainer credentials full URI when used in EKS.
aws_container_authorization_token_file, container_authorization_token_fileAWS_CONTAINER_AUTHORIZATION_TOKEN_FILEAuthorization token in plain text when used in EKS to authenticate with aws_container_credentials_full_uri
aws_s3_express, s3_expressAWS_S3_EXPRESSEnable support for S3 Express One Zone.
aws_request_payer, request_payerAWS_REQUEST_PAYEREnable support for S3 Requester Pays.
aws_allow_http, allow_httpAWS_ALLOW_HTTPSet to "true" to permit HTTP (insecure) connections to S3.
aws_server_side_encryptionAWS_SERVER_SIDE_ENCRYPTIONType of encryption to use. If set, must be one of "AES256" (SSE-S3), "aws:kms" (SSE-KMS), "aws:kms:dsse" (DSSE-KMS) or "sse-c".
aws_sse_kms_key_idAWS_SSE_KMS_KEY_IDThe KMS key ID to use for server-side encryption. If set, aws_server_side_encryption must be "aws:kms" or "aws:kms:dsse".
aws_sse_bucket_key_enabledAWS_SSE_BUCKET_KEY_ENABLEDIf set to "true", will use the bucket's default KMS key for server-side encryption. If set to "false", will disable the use of the bucket's default KMS key for server-side encryption.
aws_sse_customer_key_base64AWS_SSE_CUSTOMER_KEY_BASE64The base64 encoded, 256-bit customer encryption key to use for server-side encryption. If set, ServerSideEncryption must be "sse-c".

A typical connector config includes unity_client_id, unity_client_secret, and unity_host options. You may need to configure additional object store-specific properties, e.g., you may need to configure aws_region when opening a Delta table in S3.

HTTP client configuration

Additional configuration options to configure HTTP client for remote object stores:

Configuration OptionDescription
allow_httpAllow non-TLS, i.e. non-HTTPS connections.
allow_invalid_certificatesSkip certificate validation on https connections. This introduces significant vulnerabilities, and should only be used as a last resort or for testing.
connect_timeoutTimeout for only the connect phase of a client. Format: <number><units>, e.g., 30s, 1.5m.
http1_onlyOnly use http1 connections.
http2_onlyOnly use http2 connections.
http2_keep_alive_intervalInterval for HTTP2 Ping frames should be sent to keep a connection alive. Format: <number><units>, e.g., 30s, 1.5m.
http2_keep_alive_timeoutTimeout for receiving an acknowledgement of the keep-alive ping Format: <number><units>, e.g., 30s, 1.5m.
http2_keep_alive_while_idleEnable HTTP2 keep alive pings for idle connections.
http2_max_frame_sizeMaximum frame size to use for HTTP2.
pool_idle_timeoutThe pool max idle timeout. This is the length of time an idle connection will be kept alive. Format: <number><units>, e.g., 30s, 1.5m.
pool_max_idle_per_hostMaximum number of idle connections per host.
proxy_urlHTTP proxy to use for requests.
proxy_ca_certificatePEM-formatted CA certificate for proxy connections.
proxy_excludesList of hosts that bypass proxy.
randomize_addressesRandomize order addresses that the DNS resolution yields. This will spread the connections across more servers.
timeoutRequest timeout. The timeout is applied from when the request starts connecting until the response body has finished. Format: <number><units>, e.g., 30s, 1.5m.
user_agentUser-Agent header to be used by this client.

Data type mapping

The following table lists supported Delta Lake data types and corresponding Feldera types.

Delta Lake typeFeldera SQL typeComment
BIGINTBIGINT
BINARYVARBINARY
BOOLEANBOOLEAN  
DATEDATE
DOUBLEDOUBLE
FLOATREAL
INTINT
SMALLINTSMALLINT
STRINGSTRING
DECIMAL(P,S)DECIMAL(P,S)The largest supported precision P is 28.
TIMESTAMP, TIMESTAMP_NTZTIMESTAMPTimestamp values are rounded to the nearest millisecond. Feldera currently does not support timestamps with time zones. When using the TIMESTAMP DeltaLake type, time zone information gets discarded.
TINYINTTINYINT
MAP<K,V>MAP<K,V>
ARRAY<T>T ARRAY
STRUCTROW or user-defined typestructs can be encoded as either anonymous ROW types or as named user-defined structs
VARIANTVARIANT

Ingesting time series data from a Delta Lake

Feldera is optimized to efficiently process time series data by taking advantage of the fact that such data often arrives ordered by timestamp, i.e., every event has the same or larger timestamp than the previous event. In some cases, events can get reordered and delayed, but this delay is bounded, e.g., it may not exceed 1 hour. We refer to this bound as lateness and specify it by attaching the LATENESS attribute to the timestamp column of the table declaration. See our Time Series Analysis Guide for more details.

When reading from a Delta Table that contains time series data, the user must ensure that the initial snapshot of the table is ingested respecting the LATENESS annotation, e.g., if the table contains one year worth of data, and its lateness is equal to 1 month, then the connector must ingest all data for the first month before moving to the second month, and so on. If this requirement is violated, the pipeline will drop records that arrive more that LATENESS out of order.

This can be achieved using the timestamp_column property, which specifies the table column that serves as an event timestamp. When this property is set, and mode is one of snapshot or snapshot_and_follow, table rows are ingested in the timestamp order, respecting the LATENESS annotation on the column: each ingested row has a timestamp no more than LATENESS time units earlier than the most recent timestamp of any previously ingested row. The ingestion is performed by partitioning the table into timestamp ranges of width LATENESS and ingesting ranges one by one in increasing timestamp order.

Requirements:

  • The timestamp column must be of a supported type: integer, DATE, or TIMESTAMP.
  • The timestamp column must be declared with non-zero LATENESS.
  • LATENESS must be a valid constant expression in the DataFusion SQL dialect. The reason for this is that Feldera uses the Apache Datafusion engine to query Delta Lake. In practice, most valid Feldera SQL expressions are accepted by DataFusion.
  • For efficient ingest, the Delta table must be optimized for timestamp-based queries using partitioning, Z-ordering, or liquid clustering.

Note that the timestamp_column property only controls the initial table snapshot. When mode is set to follow or snapshot_and_follow and the connector is following the transaction log of the table, it ingests changes in the order they appear in the log. It is the responsibility of the application that writes to the table to ensure that changes it applies to the table respect the LATENESS annotations.

Example

The following table contains a timestamp column of type TIMESTAMP with LATENESS equal to INTERVAL 30 days. Assuming that the oldest timestamp in the table is 2024-01-01T00:00:00, the connector will fetch all records with timestamps from 2024-01-01, then all records for 2024-01-02, 2024-01-03, etc., until all records in the table have been ingested.

CREATE TABLE transaction(
trans_date_trans_time TIMESTAMP NOT NULL LATENESS INTERVAL 1 day,
cc_num BIGINT,
merchant STRING,
category STRING,
amt DECIMAL(38, 2),
trans_num STRING,
unix_time BIGINT,
merch_lat DOUBLE,
merch_long DOUBLE,
is_fraud BIGINT
) WITH (
'connectors' = '[{
"transport": {
"name": "delta_table_input",
"config": {
"uri": "s3://feldera-fraud-detection-data/transaction_train",
"mode": "snapshot",
"aws_skip_signature": "true",
"timestamp_column": "trans_date_trans_time"
}
}
}
]');

Additional examples

Example: Setting timestamp_column

Create a Delta Lake input connector to read a snapshot of a table from a public S3 bucket, using unix_time as the timestamp column. The column stores event time in seconds since UNIX epoch and has lateness equal to 30 days (3600 seconds/hour * 24 hours/day * 30 days). Note the aws_skip_signature flag, required to read from the bucket without authentication,

CREATE TABLE transaction(
trans_date_trans_time TIMESTAMP,
cc_num BIGINT,
merchant STRING,
category STRING,
amt DECIMAL(38, 2),
trans_num STRING,
unix_time BIGINT LATENESS 3600 * 24 * 30,
merch_lat DOUBLE,
merch_long DOUBLE,
is_fraud BIGINT
) WITH (
'connectors' = '[{
"transport": {
"name": "delta_table_input",
"config": {
"uri": "s3://feldera-fraud-detection-data/transaction_train",
"mode": "snapshot",
"aws_skip_signature": "true",
"timestamp_column": "unix_time"
}
}
}
]');

Example: Using snapshot_and_follow mode

Read a full snapshot of version 10 of the table before ingesting the stream of changes for versions 11 onward. The initial snapshot will be sorted by the unix_time column. Here and below we only show the contents of the transport.config field of the connector.

{
"uri": "s3://feldera-fraud-detection-data/transaction_infer",
"mode": "snapshot_and_follow",
"version": 10,
"timestamp_column": "unix_time",
"aws_skip_signature": "true"
}

Example: Setting AWS credentials

Read a full snapshot of a Delta table using the specified AWS access key. Note that the aws_region parameter is required in this case, because the Delta Lake Rust library we use does not currently auto-detect the AWS region.

{
"uri": "s3://feldera-fraud-detection-demo/transaction_train",
"mode": "snapshot",
"aws_access_key_id": "<AWS_ACCESS_KEY_ID>",
"aws_secret_access_key": "<AWS_SECRET_ACCESS_KEY>",
"aws_region": "us-east-1"
}

Example: Unity catalog

Read table snapshot via Unity catalog.

{
"uri": "uc://feldera_experimental.default.people_2m",
"mode": "snapshot",
"unity_client_id": "<CLIENT_ID>",
"unity_client_secret": "<CLIENT_SECRET>",
"unity_host": "https://dbc-XXX-XXX.cloud.databricks.com",
"aws_region": "us-west-1"
}