Skip to main content

Apache Iceberg input connector

note

This page describes configuration options specific to the Apache Iceberg connector. See top-level connector documentation for general information about configuring input and output connectors.

warning

Iceberg support is still experimental, and it may be substantially modified in the future.

The Iceberg input connector enables data ingestion from an Apache Iceberg table into a Feldera pipeline. Currently, the connector supports batch reads, allowing users to load a static snapshot of the table. However, it does not yet support ingesting incremental changes. Incremental ingestion capabilities are planned for future releases.

The connector is compatible with REST and AWS Glue catalogs and also supports direct table reads without a catalog, provided the location of the metadata file. Supported storage systems include S3, GCS, and local file systems.

The Iceberg input connector does not yet support fault tolerance.

Configuration

PropertyTypeDescription
mode*enumTable read mode. Currently, the only supported mode is snapshot, in which the connector reads a snapshot of the table and stops.
snapshot_filterstring

Optional row filter. When specified, only rows that satisfy the filter condition are included in the snapshot. The condition must be a valid SQL Boolean expression that can be used in the where clause of the select * from snapshot where .. query.

This option can be used to specify the range of event times to include in the snapshot, e.g.: ts BETWEEN TIMESTAMP '2005-01-01 00:00:00' AND TIMESTAMP '2010-12-31 23:59:59'.

snapshot_idinteger

Optional table snapshot id. When this option is set, the connector reads the specified snapshot of the table.

Note: at most one of version and datetime options can be specified. When neither of the two options is specified, the latest snapshot of the table is used.

datetimestring

Optional timestamp for the snapshot in the ISO-8601/RFC-3339 format, e.g., "2024-12-09T16:09:53+00:00". When this option is set, the connector reads the version of the table as of the specified point in time (based on the server time recorded in the transaction log, not the event time encoded in the data).

Note: at most one of version and datetime options can be specified. When neither of the two options is specified, the latest committed version of the table is used.

metadata_locationstringLocation of the table metadata JSON file. This propery is used to access an Iceberg table directly, without a catalog. It is mutually exclusive with the catalog_type property.
table_namestringSpecifies the Iceberg table name within the catalog in the namespace.table format. This option is applicable when an Iceberg catalog is configured using the catalog_type property.
catalog_typeenumType of the Iceberg catalog used to access the table. Supported options include rest and glue. This property is mutually exclusive with metadata_location.

[*]: Required fields

Rest catalog configuration

The following properties are used when catalog_type is set to rest to configure access to an Iceberg REST catalog.

PropertyTypeDescription
rest.uri*stringURI identifying the REST catalog server
rest.warehousestringThe default location for managed tables created by the catalog.
rest.oauth2-server-uristringAuthentication URL to use for client credentials authentication (default: uri + v1/oauth/tokens)
rest.credentialstringCredential to use for OAuth2 credential flow when initializing the catalog. A key and secret pair separated by ":" (key is optional).
rest.tokenstringBearer token value to use for Authorization header.
rest.scopestringDesired scope of the requested security token (default: catalog).
rest.prefixstringCustomize table storage paths. When combined with the warehouse property, the prefix determines how table data is organized within the storage.
rest.audiencestringLogical name of target resource or service.
rest.resourcestringURI for the target resource or service.
rest.headers[(string, string)]Additional HTTP request headers added to each catalog REST API call.

[*]: These fields are required when the catalog_type property is set to rest.

Glue catalog configuration

The following properties are used when catalog_type is set to glue to configure access to the AWS Glue catalog.

PropertyTypeDescription
glue.warehouse*stringLocation for table metadata. Example: s3://my-data-warehouse/tables/
glue.endpointstringConfigure an alternative endpoint of the Glue service for Glue catalog to access. Example: https://glue.us-east-1.amazonaws.com
glue.access-key-idstringAccess key id used to access the Glue catalog.
glue.secret-access-keystringSecret access key used to access the Glue catalog.
glue.profile-namestringProfile used to access the Glue catalog.
glue.regionstringRegion of the Glue catalog.
glue.session-tokenstringStatic session token used to access the Glue catalog.
glue.idstringThe 12-digit ID of the Glue catalog.

[*]: These fields are required when the catalog_type property is set to glue.

FileIO configuration

Iceberg works with the concept of a FileIO which is a pluggable module for reading, writing, and deleting files. Feldera currently supports S3, GCS, and file system-based FileIO implementations. The Iceberg connector detects the correct type of FileIO from the prefix of the Iceberg table location:

  • s3://, s3a:// - S3.
  • gs:// - Google Cloud Storage.
  • file:// or no prefix - local file system.

S3 and GCP FileIO implementations require additional configuration options documented below.

S3 FileIO configuration

PropertyTypeDescription
glue.warehouse*stringLocation for table metadata. Example: s3://my-data-warehouse/tables/
s3.access-key-idstringS3 access key id.
s3.secret-access-keystringS3 secret access key.
s3.endpointstringConfigure an alternative endpoint of the S3 service for the FileIO to access. This could be used to use S3 FileIO with any S3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud.
s3.regionstringS3 region.
s3.session-tokenstringS3 session token. This is required when using temporary credentials.
s3.allow-anonymousstringSet to "true" to skip signing requests (e.g., for public buckets).
s3.disable-ec2-metadatastringSet to "true" to skip loading the credential from EC2 metadata (typically used in conjunction with s3.allow-anonymous).

GCS FileIO configuration

PropertyTypeDescription
gcs.project-idstringGoogle Cloud Project ID.
gcs.service.pathstringGoogle Cloud Storage endpoint.
gcs.no-authstringSet to "true" to allow unauthenticated requests.
gcs.credentials-jsonstringGoogle Cloud Storage credentials JSON string, base64 encoded.
gcs.oauth2.tokenstringString representation of the access token used for temporary access.

Data type mapping

The following table lists supported Iceberg data types and corresponding Feldera types.

Iceberg typeFeldera SQL typeComment
booleanBOOLEAN
intINT  
longBIGINT  
floatREAL
doubleDOUBLE
decimal(P,S)DECIMAL(P, S)The largest supported precision P is 28.
dateDATE
timeTIME
timestampTIMESTAMPTimestamp values are rounded to the nearest millisecond.
timestamp_nsTIMESTAMPTimestamp values are rounded to the nearest millisecond.
stringSTRING
fixed(L)BINARY(L)
binaryVARBINARY

Types that are currently not supported include Iceberg's nested data types (structs, lists and maps), uuid, and timestamps with time zone.

Examples

Read an Iceberg table from S3 through the AWS Glue catalog

Create an Iceberg input connector to read a snapshot of a table stored in an S3 bucket through the AWS Glue Catalog. Note that the connector configuration specifies separate AWS credentials — including the access key ID, secret access key, and region — for the AWS Glue Catalog and the S3 bucket containing the table data. These credentials can either be the same, when using a single IAM identity for both services, or different, when using separate IAM identities.

create table iceberg_table(
id bigint,
name STRING,
b BOOLEAN,
ts TIMESTAMP,
dt DATE
) with (
'materialized' = 'true',
'connectors' = '[{
"transport": {
"name": "iceberg_input",
"config": {
"mode": "snapshot",
"glue.warehouse": "s3://feldera-iceberg-test/",
"catalog_type": "glue",
"table_name": "iceberg_test.test_table",
"glue.access-key-id": "<AWS_ACCESS_KEY_ID>",
"glue.secret-access-key": "<AWS_SECRET_ACCESS_KEY>",
"glue.region": "us-east-1",
"s3.access-key-id": "<AWS_ACCESS_KEY_ID>",
"s3.secret-access-key": "<AWS_SECRET_ACCESS_KEY>",
"s3.region": "us-east-1"
}
}
}]'
);

Read an Iceberg table from S3 through a REST catalog

Create an Iceberg input connector to read a snapshot of a table stored in an S3 bucket through a REST catalog runnin on http://localhost:8181.

create table iceberg_table(
id bigint,
name STRING,
b BOOLEAN,
ts TIMESTAMP,
dt DATE
)
with (
'materialized' = 'true',
'connectors' = '[{
"transport": {
"name": "iceberg_input",
"config": {
"mode": "snapshot",
"catalog_type": "rest",
"table_name": "iceberg_test.test_table",
"rest.uri": "http://localhost:8181",
"rest.warehouse": "s3://feldera-iceberg-test/",
"s3.access-key-id": "<AWS_ACCESS_KEY_ID>",
"s3.secret-access-key": "<AWS_SECRET_ACCESS_KEY>",
"s3.region": "us-east-1"
}
}
}]'
);

Read an Iceberg table from local file system

Read an Iceberg table from the local file system. Use the specified snapshot id. Only select records with timestamp 2023-01-01 00:00:00 or later.

create table iceberg_table(
id bigint,
name STRING,
b BOOLEAN,
ts TIMESTAMP,
dt DATE
) with (
'materialized' = 'true',
'connectors' = '[{
"transport": {
"name": "iceberg_input",
"config": {
"mode": "snapshot",
"metadata_location": "file:///tmp/warehouse/test_table/metadata/00001-26093ae9-b816-40ca-8ca4-05bd445a8a1d.metadata.json",
"snapshot_id": 3325185130458326470,
"snapshot_filter": "ts >= ''2023-01-01 00:00:00''",
}
}
}]'
);