Pandas Compatibility
Feldera tries to be compatible with the Pandas as much as possible. However, some types in SQL have limited support in Pandas.
Columns with the following SQL types will be converted to the corresponding Pandas types:
SQL Type |
Pandas Type |
---|---|
BOOLEAN |
bool |
TINYINT |
Int8 |
SMALLINT |
Int16 |
INTEGER |
Int32 |
BIGINT |
Int64 |
REAL |
Float32 |
DOUBLE |
Float64 |
DECIMAL |
decimal.Decimal |
CHAR |
str |
VARCHAR |
str |
DATE |
datetime64[ns] |
TIMESTAMP |
datetime64[ns] |
TIME |
timedelta64[ns] |
INTERVAL |
timedelta64[ns] |
ARRAY |
object |
BINARY |
object |
VARBINARY |
object |
STRUCT |
object |
MAP |
object |
Note
Please note that the “object” type in Pandas is dynamic and can hold any type of data. Therefore, the representation of primitive types in arrays, binary, struct, and map types may be different to their representation as a standalone column.
Using Pandas DataFrames as Input / Output
You can use Pipeline.input_pandas()
to insert records from a
DataFrame to a Feldera table.
Use Pipeline.listen()
to subscribe to updates to a view in the form of a stream of DataFrames.
To ensure all data is received start listening before calling
Pipeline.start()
.
from feldera import FelderaClient, PipelineBuilder
import pandas as pd
sql = f"""
CREATE TABLE students (
name STRING,
id INT
);
CREATE TABLE grades (
student_id INT,
science INT,
maths INT,
art INT
);
CREATE VIEW average_scores AS SELECT name, ((science + maths + art) / 3) as average FROM {TBL_NAMES[0]} JOIN {TBL_NAMES[1]} on id = student_id ORDER BY average DESC;
"""
# Create a client
client = FelderaClient("https://try.feldera.com", api_key="YOUR_API_KEY")
pipeline = PipelineBuilder(client, name="notebook", sql=sql).create_or_replace()
df_students = pd.read_csv('students.csv')
df_grades = pd.read_csv('grades.csv')
# listen for the output of the view here in the notebook
# you do not need to call this if you are forwarding the data to a sink
out = pipeline.listen("average_scores")
pipeline.start()
pipeline.input_pandas("students", df_students)
pipeline.input_pandas("grades", df_grades)
# wait for the pipeline to complete
# note that if the source is a stream, this will run indefinitely
pipeline.wait_for_completion(True)
df = out.to_pandas()
# see the result
print(df)
pipeline.delete()