Apache Iceberg input connector
This page describes configuration options specific to the Apache Iceberg connector. See top-level connector documentation for general information about configuring input and output connectors.
Iceberg support is still experimental, and it may be substantially modified in the future.
The Iceberg input connector enables data ingestion from an Apache Iceberg table into a Feldera pipeline. Currently, the connector supports batch reads, allowing users to load a static snapshot of the table. However, it does not yet support ingesting incremental changes. Incremental ingestion capabilities are planned for future releases.
The connector is compatible with REST and AWS Glue catalogs and also supports direct table reads without a catalog, provided the location of the metadata file. Supported storage systems include S3, GCS, and local file systems.
The Iceberg input connector does not yet support fault tolerance.
Configuration
Property | Type | Description |
---|---|---|
mode * | enum | Table read mode. Currently, the only supported mode is snapshot , in which the connector reads a snapshot of the table and stops. |
snapshot_filter | string | Optional row filter. When specified, only rows that satisfy the filter condition are included in the snapshot. The condition must be a valid SQL Boolean expression that can be used in the This option can be used to specify the range of event times to include in the snapshot, e.g.: |
snapshot_id | integer | Optional table snapshot id. When this option is set, the connector reads the specified snapshot of the table. Note: at most one of |
datetime | string | Optional timestamp for the snapshot in the ISO-8601/RFC-3339 format, e.g., "2024-12-09T16:09:53+00:00". When this option is set, the connector reads the version of the table as of the specified point in time (based on the server time recorded in the transaction log, not the event time encoded in the data). Note: at most one of |
metadata_location | string | Location of the table metadata JSON file. This propery is used to access an Iceberg table directly, without a catalog. It is mutually exclusive with the catalog_type property. |
table_name | string | Specifies the Iceberg table name within the catalog in the namespace.table format. This option is applicable when an Iceberg catalog is configured using the catalog_type property. |
catalog_type | enum | Type of the Iceberg catalog used to access the table. Supported options include rest and glue . This property is mutually exclusive with metadata_location . |
[*]: Required fields
Rest catalog configuration
The following properties are used when catalog_type
is set to rest
to configure access to an Iceberg REST catalog.
Property | Type | Description |
---|---|---|
rest.uri * | string | URI identifying the REST catalog server |
rest.warehouse | string | The default location for managed tables created by the catalog. |
rest.oauth2-server-uri | string | Authentication URL to use for client credentials authentication (default: uri + v1/oauth/tokens ) |
rest.credential | string | Credential to use for OAuth2 credential flow when initializing the catalog. A key and secret pair separated by ":" (key is optional). |
rest.token | string | Bearer token value to use for Authorization header. |
rest.scope | string | Desired scope of the requested security token (default: catalog). |
rest.prefix | string | Customize table storage paths. When combined with the warehouse property, the prefix determines how table data is organized within the storage. |
rest.audience | string | Logical name of target resource or service. |
rest.resource | string | URI for the target resource or service. |
rest.headers | [(string, string)] | Additional HTTP request headers added to each catalog REST API call. |
[*]: These fields are required when the catalog_type
property is set to rest
.
Glue catalog configuration
The following properties are used when catalog_type
is set to glue
to configure access to the AWS Glue catalog.
Property | Type | Description |
---|---|---|
glue.warehouse * | string | Location for table metadata. Example: s3://my-data-warehouse/tables/ |
glue.endpoint | string | Configure an alternative endpoint of the Glue service for Glue catalog to access. Example: https://glue.us-east-1.amazonaws.com |
glue.access-key-id | string | Access key id used to access the Glue catalog. |
glue.secret-access-key | string | Secret access key used to access the Glue catalog. |
glue.profile-name | string | Profile used to access the Glue catalog. |
glue.region | string | Region of the Glue catalog. |
glue.session-token | string | Static session token used to access the Glue catalog. |
glue.id | string | The 12-digit ID of the Glue catalog. |
[*]: These fields are required when the catalog_type
property is set to glue
.
FileIO configuration
Iceberg works with the concept of a FileIO which is a pluggable module for reading, writing, and deleting files. Feldera currently supports S3, GCS, and file system-based FileIO implementations. The Iceberg connector detects the correct type of FileIO from the prefix of the Iceberg table location:
s3://
,s3a://
- S3.gs://
- Google Cloud Storage.file://
or no prefix - local file system.
S3 and GCP FileIO implementations require additional configuration options documented below.
S3 FileIO configuration
Property | Type | Description |
---|---|---|
glue.warehouse * | string | Location for table metadata. Example: s3://my-data-warehouse/tables/ |
s3.access-key-id | string | S3 access key id. |
s3.secret-access-key | string | S3 secret access key. |
s3.endpoint | string | Configure an alternative endpoint of the S3 service for the FileIO to access. This could be used to use S3 FileIO with any S3-compatible object storage service that has a different endpoint, or access a private S3 endpoint in a virtual private cloud. |
s3.region | string | S3 region. |
s3.session-token | string | S3 session token. This is required when using temporary credentials. |
s3.allow-anonymous | string | Set to "true" to skip signing requests (e.g., for public buckets). |
s3.disable-ec2-metadata | string | Set to "true" to skip loading the credential from EC2 metadata (typically used in conjunction with s3.allow-anonymous ). |
GCS FileIO configuration
Property | Type | Description |
---|---|---|
gcs.project-id | string | Google Cloud Project ID. |
gcs.service.path | string | Google Cloud Storage endpoint. |
gcs.no-auth | string | Set to "true" to allow unauthenticated requests. |
gcs.credentials-json | string | Google Cloud Storage credentials JSON string, base64 encoded. |
gcs.oauth2.token | string | String representation of the access token used for temporary access. |
Data type mapping
The following table lists supported Iceberg data types and corresponding Feldera types.
Iceberg type | Feldera SQL type | Comment |
---|---|---|
boolean | BOOLEAN | |
int | INT | |
long | BIGINT | |
float | REAL | |
double | DOUBLE | |
decimal(P,S) | DECIMAL(P, S) | The largest supported precision P is 28. |
date | DATE | |
time | TIME | |
timestamp | TIMESTAMP | Timestamp values are rounded to the nearest millisecond. |
timestamp_ns | TIMESTAMP | Timestamp values are rounded to the nearest millisecond. |
string | STRING | |
fixed(L) | BINARY(L) | |
binary | VARBINARY |
Types that are currently not supported include Iceberg's nested data types (struct
s,
list
s and map
s), uuid
, and timestamps with time zone.
Examples
Read an Iceberg table from S3 through the AWS Glue catalog
Create an Iceberg input connector to read a snapshot of a table stored in an S3 bucket through the AWS Glue Catalog. Note that the connector configuration specifies separate AWS credentials — including the access key ID, secret access key, and region — for the AWS Glue Catalog and the S3 bucket containing the table data. These credentials can either be the same, when using a single IAM identity for both services, or different, when using separate IAM identities.
create table iceberg_table(
id bigint,
name STRING,
b BOOLEAN,
ts TIMESTAMP,
dt DATE
) with (
'materialized' = 'true',
'connectors' = '[{
"transport": {
"name": "iceberg_input",
"config": {
"mode": "snapshot",
"glue.warehouse": "s3://feldera-iceberg-test/",
"catalog_type": "glue",
"table_name": "iceberg_test.test_table",
"glue.access-key-id": "<AWS_ACCESS_KEY_ID>",
"glue.secret-access-key": "<AWS_SECRET_ACCESS_KEY>",
"glue.region": "us-east-1",
"s3.access-key-id": "<AWS_ACCESS_KEY_ID>",
"s3.secret-access-key": "<AWS_SECRET_ACCESS_KEY>",
"s3.region": "us-east-1"
}
}
}]'
);
Read an Iceberg table from S3 through a REST catalog
Create an Iceberg input connector to read a snapshot of a table stored in an S3 bucket
through a REST catalog runnin on http://localhost:8181
.
create table iceberg_table(
id bigint,
name STRING,
b BOOLEAN,
ts TIMESTAMP,
dt DATE
)
with (
'materialized' = 'true',
'connectors' = '[{
"transport": {
"name": "iceberg_input",
"config": {
"mode": "snapshot",
"catalog_type": "rest",
"table_name": "iceberg_test.test_table",
"rest.uri": "http://localhost:8181",
"rest.warehouse": "s3://feldera-iceberg-test/",
"s3.access-key-id": "<AWS_ACCESS_KEY_ID>",
"s3.secret-access-key": "<AWS_SECRET_ACCESS_KEY>",
"s3.region": "us-east-1"
}
}
}]'
);
Read an Iceberg table from local file system
Read an Iceberg table from the local file system. Use the specified snapshot id.
Only select records with timestamp 2023-01-01 00:00:00
or later.
create table iceberg_table(
id bigint,
name STRING,
b BOOLEAN,
ts TIMESTAMP,
dt DATE
) with (
'materialized' = 'true',
'connectors' = '[{
"transport": {
"name": "iceberg_input",
"config": {
"mode": "snapshot",
"metadata_location": "file:///tmp/warehouse/test_table/metadata/00001-26093ae9-b816-40ca-8ca4-05bd445a8a1d.metadata.json",
"snapshot_id": 3325185130458326470,
"snapshot_filter": "ts >= ''2023-01-01 00:00:00''",
}
}
}]'
);