Skip to main content

Part 4. Sending Output to Multiple Destinations

Once data has been processed, the next step is to deliver it to downstream systems. Feldera supports this through output connectors, which can continuously push the contents of the views to a variety of destinations.

Common use cases include:

  • Writing aggregated and enriched data to PostgreSQL
  • Publishing real-time changes to a Kafka topic
  • Sending updates to web applications via HTTP

Architecture Diagram with output to PostgreSQL, Kafka and Web Apps

PostgreSQL: Real-Time Materialized Views

Feldera can be used to incrementally maintain materialized views in PostgreSQL. Unlike traditional materialized views, which must be manually refreshed (by running the query REFRESH MATERIALIZED VIEW q1), these views stay in sync with Feldera as changes happen.

Step 1: Create the Target Table in PostgreSQL

We connect to PostgreSQL instance running locally:

psql postgres://postgres:password@localhost:5432/postgres

Create a table q1 that will receive output from the Feldera view:

-- PostgreSQL
CREATE TABLE q1 (
l_returnflag CHAR(1) NOT NULL,
l_linestatus CHAR(1) NOT NULL,
sum_qty DECIMAL(15,2),
sum_base_price DECIMAL(15,2),
sum_disc_price DECIMAL(15,2),
sum_charge DECIMAL(15,2),
avg_qty DECIMAL(15,2),
avg_price DECIMAL(15,2),
avg_disc DECIMAL(15,2),
count_order BIGINT,

PRIMARY KEY(l_returnflag, l_linestatus)
);

Step 2: Define an Index for Output View

Create an index on the unique key columns so that updates and deletions in PostgreSQL can be tracked correctly:

create index q1_idx on q1(l_returnflag, l_linestatus);
important

This index needs to be defined after the view definition.

Step 3: Configure Output Connector in Feldera

Next, we define the output connector for the view to send updates to PostgreSQL. See docs: PostgreSQL output connector.

-- Feldera SQL
create materialized view q1 with (
'connectors' = '[{
"index": "q1_idx",
"transport": {
"name": "postgres_output",
"config": {
"uri": "postgres://postgres:password@localhost:5432/postgres",
"table": "q1"
}
}
}]'
) as select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= date '1998-12-01' - interval '90' day
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;

Note the index attribute that links this PostreSQL connector to the q1_idx index declared above. This enables the connector to group changes to the same unique key in the output stream as described here.

Step 4: Verifying the Output in PostgreSQL

Once the pipeline is running, Feldera will continuously update the view q1 and push its output to the target table defined in PostgreSQL. We can verify that data is flowing correctly by querying the target table:

SELECT * FROM Q1;

Result:

l_returnflagl_linestatussum_qtysum_base_pricesum_disc_pricesum_chargeavg_qtyavg_priceavg_disccount_order
AF3,774,200.005,320,753,880.695,054,096,266.685,256,751,331.4525.5336,002.120.05147,790
NF95,257.00133,737,795.84127,132,372.65132,286,291.2325.3035,521.320.043,765
NO7,459,297.0010,512,270,008.909,986,238,338.3810,385,578,376.5925.5436,000.920.05292,000
RF3,785,523.005,337,950,526.475,071,818,532.945,274,405,503.0525.5235,994.020.04148,301

Kafka

A pipeline can have multiple output connectors sending data to a variety of destinations. These connectors can be attached to the same or different SQL views. For example, we can publish the output of q2 to a Kafka topic:

-- Feldera SQL
create materialized view q2 with (
'connectors' = '[{
"transport": {
"name": "kafka_output",
"config": {
"bootstrap.servers": "localhost:9092",
"topic": "q2_output"
}
},
"format": {
"name": "json",
"config": {
"update_format": "insert_delete",
"array": false
}
}
}]'
) as
select ...

Web Apps / HTTP

Web applications and backend services can receive live updates from Feldera using the HTTP output connector. This enables applications to consume real-time data streams over standard HTTP. Feldera can stream updates as they happen without constant re-polling.

Streaming with curl

Live stream of updates to the view can be observed using the following curl command:

curl -i -X 'POST' http://127.0.0.1:8080/v0/pipelines/batch/egress/q2?format=json

As the contents of the view change, the changes will be streamed continuously.

Streaming in Python

Feldera's Python SDK provides a high-level API to subscribe to view updates in Python applications.

The following snippet demonstrates how to register a callback that runs for every chunk of update:

from feldera import Pipeline, FelderaClient

# define your callback to run on every chunk of data received
# ensure that it takes two parameters, the chunk (DataFrame) and the sequence number
def callback(df: pd.DataFrame, seq_no: int):
print(f"\nSeq No: {seq_no}, DF size: {df.shape[0]}\n")

pipeline = Pipeline.get("batch", FelderaClient.localhost())

# register the callback for data received from the selected view
pipeline.foreach_chunk("q3", callback)

# run the pipeline
pipeline.start()

# wait for the pipeline to finish and shutdown
pipeline.wait_for_completion(shutdown=True)

This is useful for:

  • Updating live dashboards
  • Triggering side effects in response to new data

Takeaways

  • Feldera allows streaming data directly to popular data sinks like PostgreSQL, Kafka, Delta Lake and Redis.
  • By connecting Feldera's incremental SQL engine to different sources and destinations, you can query data from multiple independent sources and materialize the results in multiple independent destinations in real-time.