Skip to main content

Part 3: Input and Output Connectors

In this part of the tutorial where we will

  • Learn to connect Feldera pipelines to external data sources and sinks.

  • Introduce the third key concept behind Feldera: computation over data in motion.

Computing on data in motion

A Feldera pipeline starts evaluating queries as soon as it receives the first input record and continues updating the results incrementally as more inputs arrive. This enables Feldera to operate over data en-route from a source to a destination. The destination, be it a database, a data lake, an ML model, or a real-time dashboard, receives the latest query results, and not the raw data. A Feldera pipeline can connect multiple heterogeneous sources to multiple destinations. In this part of the tutorial, we will build such a pipeline:

Real-time supply chain analytics

Step 1. Configure HTTPS GET connectors

An HTTPS GET connector retrieves data from a user-provided URL and pushes it to a SQL table. Let us start with adding some GET connectors to the pipeline we created in Part 1 of this tutorial. We uploaded minimal datasets for the three tables in this example to a public S3 bucket:

Modify your SQL table declarations adding the WITH clause with input connector configuration:

create table VENDOR (
id bigint not null primary key,
name varchar,
address varchar
) WITH ('connectors' = '[{
"transport": {
"name": "url_input", "config": {"path": "https://feldera-basics-tutorial.s3.amazonaws.com/vendor.json"}
},
"format": { "name": "json" }
}]');

create table PART (
id bigint not null primary key,
name varchar
) WITH ('connectors' = '[{
"transport": {
"name": "url_input", "config": {"path": "https://feldera-basics-tutorial.s3.amazonaws.com/part.json" }
},
"format": { "name": "json" }
}]');

create table PRICE (
part bigint not null,
vendor bigint not null,
price decimal
) WITH ('connectors' = '[{
"transport": {
"name": "url_input", "config": {"path": "https://feldera-basics-tutorial.s3.amazonaws.com/price.json" }
},
"format": { "name": "json" }
}]');

Select the PREFERRED_VENDOR view in the Change Stream tab and start the pipeline. The input connectors ingest data from S3 and push it to the pipeline. You should see the outputs appear in the Change Stream tab.

note

You must select the table or view you want to monitor in the Change Stream tab before starting the pipeline in order to observe all updates to the view from the start of the execution.

note

Feldera will soon support browsing and querying the entire contents of a table or view.

Step 2. Configure Kafka/Redpanda connectors

Next, we will add a pair of connectors to our pipeline to ingest changes to the PRICE table from a Kafka topic and output changes to the PREFERRED_VENDOR table to another Kafka topic.

Install Redpanda

To complete this part of the tutorial, you will need access to a Kafka cluster. For your convenience, the Feldera Docker Compose file contains instructions to bring up a local instance of Redpanda, a Kafka-compatible message queue. If you started Feldera in the demo mode (by supplying the --profile demo switch to docker compose) then the Redpanda container should already be running. Otherwise, you can start it using the following command:

curl -L https://github.com/feldera/feldera/releases/latest/download/docker-compose.yml | docker compose -f - up redpanda

Next, you will need to install rpk, the Redpanda CLI, by following the instructions on redpanda.com. On success, you will be able to retrieve the state of the Redpanda cluster using:

rpk -X brokers=127.0.0.1:19092 cluster metadata

Create input/output topics

Create a pair of Redpanda topics that will be used to send input updates to the PRICE table and receive output changes from the PREFERRED_VENDOR view.

rpk -X brokers=127.0.0.1:19092 topic create price preferred_vendor

Configure connectors

Modify the PRICE table adding a Kafka input connector to read from the price topic:

create table PRICE (
part bigint not null,
vendor bigint not null,
price decimal
) WITH ('connectors' = '[{
"transport": {
"name": "url_input", "config": {"path": "https://feldera-basics-tutorial.s3.amazonaws.com/price.json" }
},
"format": { "name": "json" }
},
{
"format": {"name": "json"},
"transport": {
"name": "kafka_input",
"config": {
"topics": ["price"],
"bootstrap.servers": "redpanda:9092",
"auto.offset.reset": "earliest"
}
}
}]');

This table now ingests data from two heterogeneous sources: an S3 bucket and a Kafka topic.

Add a Kafka connector to the PREFERRED_VENDOR view:

create view PREFERRED_VENDOR (
part_id,
part_name,
vendor_id,
vendor_name,
price
)
WITH (
'connectors' = '[{
"format": {"name": "json"},
"transport": {
"name": "kafka_output",
"config": {
"topic": "preferred_vendor",
"bootstrap.servers": "redpanda:9092"
}
}
}]'
)
as
select
PART.id as part_id,
PART.name as part_name,
VENDOR.id as vendor_id,
VENDOR.name as vendor_name,
PRICE.price
from
PRICE,
PART,
VENDOR,
LOW_PRICE
where
PRICE.price = LOW_PRICE.price AND
PRICE.part = LOW_PRICE.part AND
PART.id = PRICE.part AND
VENDOR.id = PRICE.vendor;

Here is the final version of the program with all connector:

Click to expand SQL code
create table VENDOR (
id bigint not null primary key,
name varchar,
address varchar
) WITH ('connectors' = '[{
"transport": {
"name": "url_input", "config": {"path": "https://feldera-basics-tutorial.s3.amazonaws.com/vendor.json"}
},
"format": { "name": "json" }
}]');

create table PART (
id bigint not null primary key,
name varchar
) WITH ('connectors' = '[{
"transport": {
"name": "url_input", "config": {"path": "https://feldera-basics-tutorial.s3.amazonaws.com/part.json" }
},
"format": { "name": "json" }
}]');

create table PRICE (
part bigint not null,
vendor bigint not null,
price decimal
) WITH ('connectors' = '[{
"transport": {
"name": "url_input", "config": {"path": "https://feldera-basics-tutorial.s3.amazonaws.com/price.json" }
},
"format": { "name": "json" }
},
{
"format": {"name": "json"},
"transport": {
"name": "kafka_input",
"config": {
"topics": ["price"],
"bootstrap.servers": "redpanda:9092",
"auto.offset.reset": "earliest"
}
}
}]');

-- Lowest available price for each part across all vendors.
create view LOW_PRICE (
part,
price
) as
select part, MIN(price) as price from PRICE group by part;

-- Lowest available price for each part along with part and vendor details.
create view PREFERRED_VENDOR (
part_id,
part_name,
vendor_id,
vendor_name,
price
)
WITH (
'connectors' = '[{
"format": {"name": "json"},
"transport": {
"name": "kafka_output",
"config": {
"topic": "preferred_vendor",
"bootstrap.servers": "redpanda:9092"
}
}
}]'
)
as
select
PART.id as part_id,
PART.name as part_name,
VENDOR.id as vendor_id,
VENDOR.name as vendor_name,
PRICE.price
from
PRICE,
PART,
VENDOR,
LOW_PRICE
where
PRICE.price = LOW_PRICE.price AND
PRICE.part = LOW_PRICE.part AND
PART.id = PRICE.part AND
VENDOR.id = PRICE.vendor;

Run the pipeline

Start the pipeline. The GET connectors instantly ingest input files from S3 and the output Redpanda connector writes computed view updates to the output topic. Use the Redpanda CLI to inspect the outputs:

rpk -X brokers=127.0.0.1:19092 topic consume preferred_vendor -f '%v'

which should output:

{"insert":{"PART_ID":1,"PART_NAME":"Flux Capacitor","VENDOR_ID":2,"VENDOR_NAME":"HyperDrive Innovations","PRICE":"10000"}}
{"insert":{"PART_ID":2,"PART_NAME":"Warp Core","VENDOR_ID":1,"VENDOR_NAME":"Gravitech Dynamics","PRICE":"15000"}}
{"insert":{"PART_ID":3,"PART_NAME":"Kyber Crystal","VENDOR_ID":3,"VENDOR_NAME":"DarkMatter Devices","PRICE":"9000"}}

In a different terminal, push input updates to the price topic using the JSON format we are already familiar with from Part2 of the tutorial:

echo '
{"delete": {"part": 1, "vendor": 2, "price": 10000}}
{"insert": {"part": 1, "vendor": 2, "price": 30000}}
{"delete": {"part": 2, "vendor": 1, "price": 15000}}
{"insert": {"part": 2, "vendor": 1, "price": 50000}}
{"insert": {"part": 1, "vendor": 3, "price": 20000}}
{"insert": {"part": 2, "vendor": 3, "price": 11000}}' | rpk -X brokers=127.0.0.1:19092 topic produce price -f '%v'

You should see the following new output updates in the preferred_vendor topic:

{"delete":{"PART_ID":1,"PART_NAME":"Flux Capacitor","VENDOR_ID":2,"VENDOR_NAME":"HyperDrive Innovations","PRICE":"10000"}}
{"insert":{"PART_ID":1,"PART_NAME":"Flux Capacitor","VENDOR_ID":3,"VENDOR_NAME":"DarkMatter Devices","PRICE":"20000"}}
{"delete":{"PART_ID":2,"PART_NAME":"Warp Core","VENDOR_ID":1,"VENDOR_NAME":"Gravitech Dynamics","PRICE":"15000"}}
{"insert":{"PART_ID":2,"PART_NAME":"Warp Core","VENDOR_ID":3,"VENDOR_NAME":"DarkMatter Devices","PRICE":"11000"}}

Takeaways

To summarize Part 3 of the tutorial,

  • A Feldera pipeline can ingest data from multiple sources and send outputs to multiple destinations using input and output connectors.

  • Combined with the incremental query evaluation mechanism, this enables Feldera to analyze data on the fly as it moves from sources to destinations, so that the destination receives up-to-date query results in real time.