Pandas Compatibility

Feldera tries to be compatible with the Pandas as much as possible. However, some types in SQL have limited support in Pandas.

Columns with the following SQL types will be converted to the corresponding Pandas types:

SQL Type

Pandas Type

BOOLEAN

bool

TINYINT

Int8

SMALLINT

Int16

INTEGER

Int32

BIGINT

Int64

REAL

Float32

DOUBLE

Float64

DECIMAL

decimal.Decimal

CHAR

str

VARCHAR

str

DATE

datetime64[ns]

TIMESTAMP

datetime64[ns]

TIME

timedelta64[ns]

INTERVAL

timedelta64[ns]

ARRAY

object

BINARY

object

VARBINARY

object

STRUCT

object

MAP

object

Note

Please note that the “object” type in Pandas is dynamic and can hold any type of data. Therefore, the representation of primitive types in arrays, binary, struct, and map types may be different to their representation as a standalone column.

Using Pandas DataFrames as Input / Output

You can use Pipeline.input_pandas() to insert records from a DataFrame to a Feldera table.

Use Pipeline.listen() to subscribe to updates to a view in the form of a stream of DataFrames. To ensure all data is received start listening before calling Pipeline.start().

from feldera import FelderaClient, PipelineBuilder
import pandas as pd

sql = f"""
CREATE TABLE students (
    name STRING,
    id INT
);

CREATE TABLE grades (
    student_id INT,
    science INT,
    maths INT,
    art INT
);

CREATE VIEW average_scores AS SELECT name, ((science + maths + art) / 3) as average FROM {TBL_NAMES[0]} JOIN {TBL_NAMES[1]} on id = student_id ORDER BY average DESC;
"""

# Create a client
client = FelderaClient("https://try.feldera.com", api_key="YOUR_API_KEY")
pipeline = PipelineBuilder(client, name="notebook", sql=sql).create_or_replace()

df_students = pd.read_csv('students.csv')
df_grades = pd.read_csv('grades.csv')

# listen for the output of the view here in the notebook
# you do not need to call this if you are forwarding the data to a sink
out = pipeline.listen("average_scores")

pipeline.start()
pipeline.input_pandas("students", df_students)
pipeline.input_pandas("grades", df_grades)

# wait for the pipeline to complete
# note that if the source is a stream, this will run indefinitely
pipeline.wait_for_completion(True)
df = out.to_pandas()

# see the result
print(df)

pipeline.delete()