Examples

Connecting to Feldera Sandbox

Ensure that you have an API key to connect to Feldera Sandbox.

To get the key:

  • Login to the Feldera Sandbox.

  • Click on the top right button that says: “Logged in”

  • Click on “Manage API keys”

  • Generate a new API key

  • Give it a name, and copy the API key

from feldera import FelderaClient, PipelineBuilder

client = FelderaClient('https://try.feldera.com', api_key=api_key)

pipeline = PipelineBuilder(client, name, sql).create()

Connecting to Feldera on localhost

from feldera import FelderaClient, PipelineBuilder

client = FelderaClient('http://localhost:8080', api_key=api_key)

pipeline = PipelineBuilder(client, name, sql).create()

Setting HTTP Connection Timeouts

To set a timeout for the HTTP connection, pass the timeout parameter to the .class:FelderaClient constructor. If the Feldera backend server takes longer than the specified timeout to respond, a .class:FelderaTimeoutError exception will be raised. This example sets the timeout for each HTTP request to 10 seconds.

from feldera import FelderaClient, PipelineBuilder

client = FelderaClient("http://localhost:8080", api_key=api_key, timeout=10)

Note

This is for an individual HTTP request, and does not affect things like waiting for a pipeline to start, pause, resume and shutdown. To set a timeout for these state transitions, set the parameter timeout_s in respective functions.

Creating a Pipeline (OVERWRITING existing pipelines)

sql = """
CREATE TABLE student (
    name STRING,
    id INT
);

CREATE TABLE grades (
    student_id INT,
    science INT,
    maths INT,
    art INT
);

CREATE VIEW average_scores AS SELECT name, ((science + maths + art) / 3) as average FROM {TBL_NAMES[0]} JOIN {TBL_NAMES[1]} on id = student_id ORDER BY average DESC;
"""

# This will shutdown and overwrite any existing pipeline with the same name.
pipeline = PipelineBuilder(client, name="notebook", sql=sql).create_or_replace()

Starting a Pipeline

pipeline.start()

Using Pandas DataFrames

# populate pandas dataframes
df_students = pd.read_csv('students.csv')
df_grades = pd.read_csv('grades.csv')

# subscribe to listen to outputs from a view
out = pipeline.listen("average_scores")

pipeline.start()

# feed pandas dataframes as input
pipeline.input_pandas("students", df_students)
pipeline.input_pandas("grades", df_grades)

# wait for the pipeline to complete and shutdown
pipeline.wait_for_completion(True)

# get the output of the view as a pandas dataframe
df = out.to_pandas()

# delete the pipeline
pipeline.delete()

Executing ad-hoc SQL Queries

Ad-hoc SQL queries can be executed on running or paused pipelines. Ad-hoc queries provide a way to query the state of materialized views or tables.

For more information, refer to the docs at: https://docs.feldera.com/sql/ad-hoc

We provide the following methods to execute ad-hoc queries:

  1. Pipeline.execute() - Execute an ad-hoc query and discard the result. Useful for INSERT queries.

  2. Pipeline.query() (Lazy) - Executes an ad-hoc query and returns a generator to iterate over the result.

  3. Pipeline.query_tabular() (Lazy) - Executes an ad-hoc query and returns a generator that yields a string representing the query result in human-readable tabular format.

  4. Pipeline.query_parquet() - Executes an ad-hoc query and saves the result to the specified path as a parquet file.

# execute an `INSERT` ad-hoc SQL query
pipeline.execute("INSERT into students VALUES ('John', 1)")

# executing a `SELECT` ad-hoc SQL query
students = list(pipeline.query("SELECT * FROM students"))

Iterating over Output Chunks

Use foreach_chunk() to process each chunk of data from a view or table. It takes a callback, and calls the callback on each chunk of received data.

# define your callback to run on every chunk of data received
# ensure that it takes two parameters, the chunk (DataFrame) and the sequence number
def callback(df: pd.DataFrame, seq_no: int):
    print(f"\nSeq No: {seq_no}, DF size: {df.shape[0]}\n")

pipeline = PipelineBuilder(client, name="notebook", sql=sql).create_or_replace()

# register the callback for data received from the selected view
pipeline.foreach_chunk("view_name", callback)

# run the pipeline
pipeline.start()
pipeline.input_pandas("table_name", df)

# wait for the pipeline to finish and shutdown
pipeline.wait_for_completion(True)
pipeline.delete()

Waiting for Completion

To wait (block) till the pipeline has been completed, use Pipeline.wait_for_completion().

pipeline.wait_for_completion()

Optionally, to shutdown the pipeline after completion:

pipeline.wait_for_completion(shutdown=True)

Warning

If the data source is streaming, this will block forever.

End-to-End Example with Kafka Sink

This example shows creating and running a pipeline with Feldera’s internal data generator and writing to a Kafka sink.

from feldera import FelderaClient, PipelineBuilder

client = FelderaClient('http://localhost:8080')

sql = """
        CREATE TABLE Stocks (
        symbol VARCHAR NOT NULL,
        price_time BIGINT NOT NULL,  -- UNIX timestamp
        price DOUBLE NOT NULL
        ) with (
          'connectors' = '[{
            "transport": {
              "name": "datagen",
              "config": {
                "plan": [{
                    "limit": 5,
                    "rate": 1,
                    "fields": {
                        "symbol": { "values": ["AAPL", "GOOGL", "SPY", "NVDA"] },
                        "price": { "strategy": "uniform", "range": [100, 10000] }
                    }
                }]
              }
            }
          }]'
        );

        CREATE VIEW googl_stocks
        WITH (
            'connectors' = '[
                {
                    "name": "kafka-3",
                    "transport": {
                        "name": "kafka_output",
                        "config": {
                            "bootstrap.servers": "localhost:9092",
                            "topic": "googl_stocks",
                            "auto.offset.reset": "earliest"
                        }
                    },
                    "format": {
                        "name": "json",
                        "config": {
                            "update_format": "insert_delete",
                            "array": false
                        }
                    }
                }
            ]'
        )
        AS SELECT * FROM Stocks WHERE symbol = 'GOOGL';
        """

pipeline = PipelineBuilder(client, name="kafka_example", sql=sql).create_or_replace()

out = pipeline.listen("googl_stocks")
pipeline.start()

# important: `wait_for_completion` will block forever here
pipeline.wait_for_idle()
pipeline.shutdown()
df = out.to_pandas()
assert df.shape[0] != 0

pipeline.delete()

Specifying Data Sources / Sinks

To connect Feldera to various data sources or sinks, you can define them in the SQL code. Refer to the connector documentation at: https://docs.feldera.com/connectors/