Examples
Connecting to Feldera Sandbox
Ensure that you have an API key to connect to Feldera Sandbox.
To get the key:
Login to the Feldera Sandbox.
Click on the top right button that says: “Logged in”
Click on “Manage API keys”
Generate a new API key
Give it a name, and copy the API key
from feldera import FelderaClient, PipelineBuilder
client = FelderaClient('https://try.feldera.com', api_key=api_key)
pipeline = PipelineBuilder(client, name, sql).create()
Connecting to Feldera on localhost
from feldera import FelderaClient, PipelineBuilder
client = FelderaClient('http://localhost:8080', api_key=api_key)
pipeline = PipelineBuilder(client, name, sql).create()
Setting HTTP Connection Timeouts
To set a timeout for the HTTP connection, pass the timeout parameter to the .class:FelderaClient constructor. If the Feldera backend server takes longer than the specified timeout to respond, a .class:FelderaTimeoutError exception will be raised. This example sets the timeout for each HTTP request to 10 seconds.
from feldera import FelderaClient, PipelineBuilder
client = FelderaClient("http://localhost:8080", api_key=api_key, timeout=10)
Note
This is for an individual HTTP request, and does not affect things like waiting for a pipeline to start, pause, resume and shutdown. To set a timeout for these state transitions, set the parameter timeout_s in respective functions.
Creating a Pipeline (OVERWRITING existing pipelines)
sql = """
CREATE TABLE student (
name STRING,
id INT
);
CREATE TABLE grades (
student_id INT,
science INT,
maths INT,
art INT
);
CREATE VIEW average_scores AS SELECT name, ((science + maths + art) / 3) as average FROM {TBL_NAMES[0]} JOIN {TBL_NAMES[1]} on id = student_id ORDER BY average DESC;
"""
# This will shutdown and overwrite any existing pipeline with the same name.
pipeline = PipelineBuilder(client, name="notebook", sql=sql).create_or_replace()
Starting a Pipeline
pipeline.start()
Using Pandas DataFrames
# populate pandas dataframes
df_students = pd.read_csv('students.csv')
df_grades = pd.read_csv('grades.csv')
# subscribe to listen to outputs from a view
out = pipeline.listen("average_scores")
pipeline.start()
# feed pandas dataframes as input
pipeline.input_pandas("students", df_students)
pipeline.input_pandas("grades", df_grades)
# wait for the pipeline to complete and shutdown
pipeline.wait_for_completion(True)
# get the output of the view as a pandas dataframe
df = out.to_pandas()
# delete the pipeline
pipeline.delete()
Executing ad-hoc SQL Queries
Ad-hoc SQL queries can be executed on running or paused pipelines. Ad-hoc queries provide a way to query the state of materialized views or tables.
For more information, refer to the docs at: https://docs.feldera.com/sql/ad-hoc
We provide the following methods to execute ad-hoc queries:
Pipeline.execute()
- Execute an ad-hoc query and discard the result. Useful forINSERT
queries.Pipeline.query()
(Lazy) - Executes an ad-hoc query and returns a generator to iterate over the result.Pipeline.query_tabular()
(Lazy) - Executes an ad-hoc query and returns a generator that yields a string representing the query result in human-readable tabular format.Pipeline.query_parquet()
- Executes an ad-hoc query and saves the result to the specified path as a parquet file.
# execute an `INSERT` ad-hoc SQL query
pipeline.execute("INSERT into students VALUES ('John', 1)")
# executing a `SELECT` ad-hoc SQL query
students = list(pipeline.query("SELECT * FROM students"))
Iterating over Output Chunks
Use foreach_chunk()
to process each chunk of data from a view or table.
It takes a callback, and calls the callback on each chunk of received data.
# define your callback to run on every chunk of data received
# ensure that it takes two parameters, the chunk (DataFrame) and the sequence number
def callback(df: pd.DataFrame, seq_no: int):
print(f"\nSeq No: {seq_no}, DF size: {df.shape[0]}\n")
pipeline = PipelineBuilder(client, name="notebook", sql=sql).create_or_replace()
# register the callback for data received from the selected view
pipeline.foreach_chunk("view_name", callback)
# run the pipeline
pipeline.start()
pipeline.input_pandas("table_name", df)
# wait for the pipeline to finish and shutdown
pipeline.wait_for_completion(True)
pipeline.delete()
Waiting for Completion
To wait (block) till the pipeline has been completed, use Pipeline.wait_for_completion()
.
pipeline.wait_for_completion()
Optionally, to shutdown the pipeline after completion:
pipeline.wait_for_completion(shutdown=True)
Warning
If the data source is streaming, this will block forever.
End-to-End Example with Kafka Sink
This example shows creating and running a pipeline with Feldera’s internal data generator and writing to a Kafka sink.
from feldera import FelderaClient, PipelineBuilder
client = FelderaClient('http://localhost:8080')
sql = """
CREATE TABLE Stocks (
symbol VARCHAR NOT NULL,
price_time BIGINT NOT NULL, -- UNIX timestamp
price DOUBLE NOT NULL
) with (
'connectors' = '[{
"transport": {
"name": "datagen",
"config": {
"plan": [{
"limit": 5,
"rate": 1,
"fields": {
"symbol": { "values": ["AAPL", "GOOGL", "SPY", "NVDA"] },
"price": { "strategy": "uniform", "range": [100, 10000] }
}
}]
}
}
}]'
);
CREATE VIEW googl_stocks
WITH (
'connectors' = '[
{
"name": "kafka-3",
"transport": {
"name": "kafka_output",
"config": {
"bootstrap.servers": "localhost:9092",
"topic": "googl_stocks",
"auto.offset.reset": "earliest"
}
},
"format": {
"name": "json",
"config": {
"update_format": "insert_delete",
"array": false
}
}
}
]'
)
AS SELECT * FROM Stocks WHERE symbol = 'GOOGL';
"""
pipeline = PipelineBuilder(client, name="kafka_example", sql=sql).create_or_replace()
out = pipeline.listen("googl_stocks")
pipeline.start()
# important: `wait_for_completion` will block forever here
pipeline.wait_for_idle()
pipeline.shutdown()
df = out.to_pandas()
assert df.shape[0] != 0
pipeline.delete()
Specifying Data Sources / Sinks
To connect Feldera to various data sources or sinks, you can define them in the SQL code. Refer to the connector documentation at: https://docs.feldera.com/connectors/