Examples ~~~~~~~~ Connecting to Feldera Sandbox ============================= Ensure that you have an API key to connect to Feldera Sandbox. To get the key: - Login to the Feldera Sandbox. - Click on the top right button that says: "Logged in" - Click on "Manage API keys" - Generate a new API key - Give it a name, and copy the API key .. code-block:: python from feldera import FelderaClient, PipelineBuilder client = FelderaClient('https://try.feldera.com', api_key=api_key) pipeline = PipelineBuilder(client, name, sql).create() Connecting to Feldera on localhost ================================== .. code-block:: python from feldera import FelderaClient, PipelineBuilder client = FelderaClient('http://127.0.0.1:8080', api_key=api_key) pipeline = PipelineBuilder(client, name, sql).create() Setting HTTP Connection Timeouts ================================ To set a timeout for the HTTP connection, pass the timeout parameter to the `.class:FelderaClient` constructor. If the Feldera backend server takes longer than the specified timeout to respond, a `.class:FelderaTimeoutError` exception will be raised. This example sets the timeout for each HTTP request to 10 seconds. .. code-block:: python from feldera import FelderaClient, PipelineBuilder client = FelderaClient("http://127.0.0.1:8080", api_key=api_key, timeout=10) .. note:: This is for an individual HTTP request, and does not affect things like waiting for a pipeline to start, pause, resume and shutdown. To set a timeout for these state transitions, set the parameter `timeout_s` in respective functions. Creating a Pipeline (OVERWRITING existing pipelines) ==================================================== .. code-block:: python sql = """ CREATE TABLE student ( name STRING, id INT ); CREATE TABLE grades ( student_id INT, science INT, maths INT, art INT ); CREATE VIEW average_scores AS SELECT name, ((science + maths + art) / 3) as average FROM {TBL_NAMES[0]} JOIN {TBL_NAMES[1]} on id = student_id ORDER BY average DESC; """ # This will shutdown and overwrite any existing pipeline with the same name. pipeline = PipelineBuilder(client, name="notebook", sql=sql).create_or_replace() Starting a Pipeline =================== .. code-block:: python pipeline.start() Using Pandas DataFrames ======================= .. code-block:: python # populate pandas dataframes df_students = pd.read_csv('students.csv') df_grades = pd.read_csv('grades.csv') # subscribe to listen to outputs from a view out = pipeline.listen("average_scores") pipeline.start() # feed pandas dataframes as input pipeline.input_pandas("students", df_students) pipeline.input_pandas("grades", df_grades) # wait for the pipeline to complete and shutdown pipeline.wait_for_completion(True) # get the output of the view as a pandas dataframe df = out.to_pandas() # delete the pipeline pipeline.delete() Executing ad-hoc SQL Queries ============================ Ad-hoc SQL queries can be executed on running or paused pipelines. Ad-hoc queries provide a way to query the state of **materialized** views or tables. For more information, refer to the docs at: https://docs.feldera.com/sql/ad-hoc We provide the following methods to execute ad-hoc queries: #. :meth:`.Pipeline.execute` - Execute an ad-hoc query and discard the result. Useful for ``INSERT`` queries. #. :meth:`.Pipeline.query` **(Lazy)** - Executes an ad-hoc query and returns a generator to iterate over the result. #. :meth:`.Pipeline.query_tabular` **(Lazy)** - Executes an ad-hoc query and returns a generator that yields a string representing the query result in human-readable tabular format. #. :meth:`.Pipeline.query_parquet` - Executes an ad-hoc query and saves the result to the specified path as a parquet file. .. code-block:: python # execute an `INSERT` ad-hoc SQL query pipeline.execute("INSERT into students VALUES ('John', 1)") # executing a `SELECT` ad-hoc SQL query students = list(pipeline.query("SELECT * FROM students")) Iterating over Output Chunks ============================ Use :meth:`.foreach_chunk` to process each chunk of data from a view or table. It takes a callback, and calls the callback on each chunk of received data. .. code-block:: python # define your callback to run on every chunk of data received # ensure that it takes two parameters, the chunk (DataFrame) and the sequence number def callback(df: pd.DataFrame, seq_no: int): print(f"\nSeq No: {seq_no}, DF size: {df.shape[0]}\n") pipeline = PipelineBuilder(client, name="notebook", sql=sql).create_or_replace() # register the callback for data received from the selected view pipeline.foreach_chunk("view_name", callback) # run the pipeline pipeline.start() pipeline.input_pandas("table_name", df) # wait for the pipeline to finish and shutdown pipeline.wait_for_completion(True) pipeline.delete() Waiting for Completion ====================== To wait (block) till the pipeline has been completed, use :meth:`.Pipeline.wait_for_completion`. .. code-block:: python pipeline.wait_for_completion() Optionally, to shutdown the pipeline after completion: .. code-block:: python pipeline.wait_for_completion(shutdown=True) .. warning:: If the data source is streaming, this will block forever. End-to-End Example with Kafka Sink ================================== This example shows creating and running a pipeline with Feldera's internal data generator and writing to a Kafka sink. .. code-block:: python from feldera import FelderaClient, PipelineBuilder client = FelderaClient('http://localhost:8080') sql = """ CREATE TABLE Stocks ( symbol VARCHAR NOT NULL, price_time BIGINT NOT NULL, -- UNIX timestamp price DOUBLE NOT NULL ) with ( 'connectors' = '[{ "transport": { "name": "datagen", "config": { "plan": [{ "limit": 5, "rate": 1, "fields": { "symbol": { "values": ["AAPL", "GOOGL", "SPY", "NVDA"] }, "price": { "strategy": "uniform", "range": [100, 10000] } } }] } } }]' ); CREATE VIEW googl_stocks WITH ( 'connectors' = '[ { "name": "kafka-3", "transport": { "name": "kafka_output", "config": { "bootstrap.servers": "localhost:9092", "topic": "googl_stocks", "auto.offset.reset": "earliest" } }, "format": { "name": "json", "config": { "update_format": "insert_delete", "array": false } } } ]' ) AS SELECT * FROM Stocks WHERE symbol = 'GOOGL'; """ pipeline = PipelineBuilder(client, name="kafka_example", sql=sql).create_or_replace() out = pipeline.listen("googl_stocks") pipeline.start() # important: `wait_for_completion` will block forever here pipeline.wait_for_idle() pipeline.shutdown() df = out.to_pandas() assert df.shape[0] != 0 pipeline.delete() Specifying Data Sources / Sinks =============================== To connect Feldera to various data sources or sinks, you can define them in the SQL code. Refer to the connector documentation at: https://docs.feldera.com/connectors/