Skip to main content

User-Defined Functions

The SQL statement CREATE FUNCTION can be used to declare new functions. Functions can be implemented either in SQL or in Rust.

User-defined functions written in SQL

The following example shows a user-defined function:

CREATE FUNCTION contains_number(str VARCHAR NOT NULL, value INTEGER)
RETURNS BOOLEAN NOT NULL
AS (str LIKE ('%' || CAST(value AS VARCHAR) || '%'));

CREATE VIEW V0 AS SELECT contains_number(CAST('YES: 10 NO:5' AS VARCHAR), 5)

User-defined functions written in Rust

Experimental feature

Rust UDF support is currently experimental and may undergo significant changes, including non-backward-compatible modifications, in future releases of Feldera.

danger
  • Feldera's incremental query engine assumes that all computations are deterministic. Using a non-deterministic UDF is likely to result in incorrect outputs. The SQL compiler cannot verify that Rust functions meet this requirement. Therefore, it is the responsibility of the UDF developer to ensure that their UDFs are deterministic, i.e., the function's return value must depend solely on its arguments and not on clocks, random number generators, external data sources, or other sources of non-determinism.

    We are considering adding support for non-deterministic UDFs in the future. If your use case requires a non-deterministic UDF, please share your feedback by commenting on the associated GitHub issue

  • UDFs are compiled into native binary code and executed directly within the address space of the pipeline. Therefore, only trusted code should be included in UDFs. UDFs should not contain panics or invoke undefined behaviors. These can lead to crashes, memory corruption, or unpredictable behavior within the pipeline. UDFs should handle all runtime errors gracefully, by returning a Result::Err() (see below). This ensures that errors are propagated in a controlled manner and can be handled by the calling code.

Feldera supports UDFs implemented in Rust. To define a UDF, users declare the function in SQL without providing a body. The corresponding Rust implementation is supplied in a separate Rust file, which contains the logic for all UDFs declared within the SQL program. A complete set of files that defines the pipeline in this case is:

FileDescription
program.sqlThe SQL code, including table, view, and user-defined function declarations.
stubs.rsAuto-generated by the SQL compiler; contains Rust stubs for UDFs (see below).
udf.rsUser-provided implementation of UDFs in Rust.
udf.tomlUser-provided list of external crates required by the code in udf.rs. This list is appended to the [dependencies] section of the Cargo.toml file.

We recommend using the Web Console to develop and debug Rust UDFs, as described below. The Web Console code editor has a separate tab for each of these files, and supports an interactive development workflow. Once ready, the creation and deployment of the pipeline can be automated with scripts that make use of the Feldera CLI, the Python API, or the REST API. These scripts and the SQL/Rust code can then be maintained with the usual practices such as version control.

Step 1: Declare a function in the SQL program using CREATE FUNCTION

Declare a SQL function without a body:

-- UDF which encodes text in base64.
CREATE FUNCTION base64(s VARBINARY) RETURNS VARCHAR;

Step 2: Locate the auto-generated Rust prototype

When the SQL compiler encounters such a function, it generates a Rust stub for it in the stubs.rs file. The stub is an implementation of the UDF in Rust that simply invokes another function with the same name and signature exported from the udf.rs module:

/* stubs.rs */
...

pub fn base64(s: Option<ByteArray>) -> Result<Option<String>, Box<dyn std::error::Error>> {
udf::base64(
s)
}

The udf::base64 function does not exist yet, so you will see a Rust compiler error similar to this:

 error[E0425]: cannot find function `base64` in module `udf`
--> project01926e00-f404-7813-b65c-d66ac9959998/src/stubs.rs:11:10
|
11 | udf::base64(
| ^^^^^^ not found in `udf`

Step 3: Implement the UDF in Rust

Copy the declaration from stubs.rs to udf.rs and replace its body with the actual implementation:

/* udf.rs */
use feldera_sqllib::*;
use base64::prelude::*;

pub fn base64(s: Option<ByteArray>) -> Result<Option<SqlString>, Box<dyn std::error::Error>> {
Ok(s.map(|v| SqlString::from_ref(BASE64_STANDARD.encode(v.as_slice()))))
}

The use feldera_sqllib::* directive imports the definitions of the Rust types that the compiler uses to implement some of the SQL datatypes. The next section explains what these types are.

If your UDF uses external crates, list these external dependencies in udf.toml. The contents of this file is appended to the [dependencies] section in the generated Cargo.toml file.

base64 = "0.22.1"

Step 4: Deploy the UDF

Using Feldera CLI:

fda create udf_fda_test --udf-rs udf.rs --udf-toml udf.toml program.sql

Using Feldera Python SDK

Click to see how to create a pipeline with a Rust UDF using the Feldera Python SDK
from feldera import FelderaClient, PipelineBuilder

# SQL program.
sql = """
-- UDF which encodes text in base64.
CREATE FUNCTION base64(s VARBINARY) RETURNS VARCHAR;

CREATE TABLE binary_t(
b VARBINARY
);

CREATE MATERIALIZED VIEW base64_v AS
SELECT
base64(b) as text
FROM
binary_t;
"""

# UDF implementation in Rust.
udf_rust = """
use feldera_sqllib::*;
use base64::prelude::*;

pub fn base64(s: Option<ByteArray>) -> Result<Option<String>, Box<dyn std::error::Error>> {
Ok(s.map(|v| BASE64_STANDARD.encode(v.as_slice())))
}"""

# External Rust dependencies.
udf_toml = """
base64 = "0.22.1"
"""

# Create a pipeline using the above SQL, Rust, and TOML code.
feldera = FelderaClient("http://127.0.0.1:8080")
pipeline = PipelineBuilder(
feldera, name="udf_test", sql=sql, udf_rust=udf_rust, udf_toml = udf_toml).create_or_replace()

pipeline.start()

next(pipeline.query("insert into binary_t values(X'0123456789ABCDEF')"))
output = pipeline.query("select * from base64_v")

assert list(output) == [{'text': 'ASNFZ4mrze8='}]

pipeline.shutdown()
pipeline.delete()

Using the REST API

Click to see how to create a pipeline with a Rust UDF using the Feldera REST API
#!/bin/bash

echo "
-- UDF which encodes text in base64.
CREATE FUNCTION base64(s VARBINARY) RETURNS VARCHAR;

CREATE TABLE binary_t(
b VARBINARY
);

CREATE MATERIALIZED VIEW base64_v AS
SELECT
base64(b) as text
FROM
binary_t;
" > program.sql

echo "
use feldera_sqllib::*;
use base64::prelude::*;

pub fn base64(s: Option<ByteArray>) -> Result<Option<String>, Box<dyn std::error::Error>> {
Ok(s.map(|v| BASE64_STANDARD.encode(v.as_slice())))
}" > udf.rs

echo "
base64 = \"0.22.1\"
" > udf.toml


curl -i -X PUT http://127.0.0.1:8080/v0/pipelines/udf_api_test \
-H 'Content-Type: application/json' \
-d "$(jq -Rsn \
--rawfile sql program.sql \
--rawfile udf_rust udf.rs \
--rawfile udf_toml udf.toml \
'{
name: "udf_api_test",
description: "Create a UDF using REST API",
program_code: $sql,
runtime_config: {},
program_config: {},
udf_rust: $udf_rust,
udf_toml: $udf_toml
}')"

SQL type representation in Rust

Experimental feature

The following table shows the Rust representation of standard SQL data types. A nullable SQL type is represented by the corresponding rust Option<> type. Notice that some of these types are not standard Rust types, but are defined in the feldera-sqllib crate, which is part of the Feldera SQL runtime.

SQLRust
BOOLEANbool
TINYINTi8
SMALLINTi16
INTi32
BIGINTi64
TINYINT UNSIGNEDu8
SMALLINT UNSIGNEDu16
INT UNSIGNEDu32
BIGINT UNSIGNEDu64
DECIMAL(p, s)feldera_sqllib::SqlDecimal<P, S>
REALfeldera_sqllib::F32
DOUBLEfeldera_sqllib::F64
CHAR, CHAR(n)feldera_sqllib::SqlString
VARCHAR, VARCHAR(n)feldera_sqllib::SqlString
BINARY, BINARY(n), VARBINARY, VARBINARY(n)feldera_sqllib::ByteArray
NULL()
INTERVALfeldera_sqllib::ShortInterval, feldera_sqllib::LongInterval
TIMEfeldera_sqllib::Time
TIMESTAMPfeldera_sqllib::Timestamp
DATEfeldera_sqllib::Date
T ARRAYfeldera_sqllib::Array<T>
MAP<K, V>feldera_sqllib::Map<K, V>
UUIDfeldera_sqllib::Uuid
VARIANTfeldera_sqllib::Variant
ROWTupN
User-defined struct typeTupN

Multiple SQL types may be represented by the same Rust type. For example, CHAR, CHAR(n), VARCHAR(n), and VARCHAR are all represented by the SqlString type.

The SQL family of INTERVAL types translates to one of two Rust types: ShortInterval (representing intervals from days to seconds), and LongInterval (representing intervals from years to months). (Our dialect of SQL does not allow mixing the two kinds of intervals in a single expression.)

Currently feldera_sqlllib::Map is defined as type Map = Arc<BTreeMap>, and feldera_sqlllib::Array is defined as type Array = Arc<Vec>. Currently feldera_sqlllib::SqlString is a thin wrapper type around the ArcStr type from the arcstr crate.

A ROW type with N fields represented by a Rust TupN datatype. A user-defined structure type is also represented by a tuple TupN type. These tuple types can be imported from the current crate.

Here are some examples using ROW and user-defined types:

CREATE TYPE X AS (x int);
CREATE FUNCTION f(arg X) RETURNS X;
CREATE FUNCTION g(x int NOT NULL) RETURNS ROW(a INT, b INT) NOT NULL;
CREATE VIEW V AS SELECT f(X(1)), g(2).a;

And here is a possible implementation of the used-defined functions f and g in Rust:

use crate::{Tup1, Tup2};
use feldera_sqllib::*;
pub fn f(x: Option<Tup1<Option<i32>>>) ->
Result<Option<Tup1<Option<i32>>>, Box<dyn std::error::Error>> {
match x {
None => Ok(None),
Some(x) => match x.0 {
None => Ok(Some(Tup1::new(None))),
Some(x) => Ok(Some(Tup1::new(Some(x + 1)))),
}
}
}

pub fn g(x: i32) -> Result<Tup2<i32, i32>, Box<dyn std::error::Error>> {
Ok(Tup2::new(x-1, x+1))
}

Return types

In the Rust implementation the function always has to return the type Result<T, Box<dyn std::error::Error>>, where T is the Rust equivalent of the expected return type of the SQL function. The Rust function should return an Err only when the function fails at runtime with a fatal condition, e.g., array index out of bounds, arithmetic overflows, etc.

Developing complex UDFs

While many useful UDFs can be implemented with just a few lines of Rust, some may require more complex code that is easier to develop using a full-featured Rust IDE. To support the development of such UDFs, we made the feldera-sqllib crate available on crates.io. In order to implement a complex Rust UDF (or a library of UDFs) using a Rust IDE:

  • Create a new Rust crate to serve as the container for your UDFs.
  • Add feldera-sqllib as a dependency to Cargo.toml (use the crate version that matches the version of Feldera you are working with).
  • Implement and test your UDFs within this crate.
  • Copy the final Rust code and dependencies to the Feldera Web Console.

If your UDFs require a larger Rust project with multiple modules, we recommend encapsulating the majority of the UDF logic in a crate. This crate can be hosted on crates.io or GitHub. Import this crate to your Feldera pipeline via the udf.toml file, including only wrapper functions that call the API of this crate in udf.rs.

Limitations

  • Currently only limited implicit casts are inserted by the compiler for the function arguments and function result in the SQL program. For example, a call such as CONTAINS_NUMBER('2010-10-20', '5') will fail at SQL compilation time because the first argument has type CHAR(8) instead of VARCHAR, and the second argument has type CHAR(1) instead of INTEGER. This can be avoided by calling the function using an explicit cast: CONTAINS_NUMBER(CAST('2010-10-20' AS VARCHAR), CAST('5' AS INTEGER)).

  • User-defined functions cannot have names identical to standard SQL library function names.

  • Polymorphic functions are not supported. For example, in SQL the addition operation operates on any numeric types; such an operation cannot be implemented as a single user-defined function.

User-defined aggregates

Experimental feature

Rust UDA support is currently experimental and may undergo significant changes, including non-backward-compatible modifications, in future releases of Feldera.

The SQL statement CREATE AGGREGATE can be used to extend the set of aggregate functions supported by Feldera SQL with user-defined aggregation functions. Such functions need to be implemented in Rust. The argument and result of CREATE AGGREGATE must be nullable.

Creating user-defined linear aggregate functions

In general, a function A is linear if it has the following property: A(C1 UNION C2) = A(C1) + A(C2), where C1 and C2 are arbitrary collections. The type of the result produced by the function is expected to have a + operation, and it must be commutative and associative.

To implement a user-defined linear aggregate function the user has to define 3 Rust objects:

  • a type for the accumulator; the type name is obtained from the aggregate function name with the suffix _accumulator_type.

    The actual arithmetic is performed using values of this type. The accumulator type is thus required to implement several traits defined in the DBSP core library:

    • DBWeight which is a combination of DBData and MonoidValue. DBData allows accumulators to be stored in relations which may be spilled to disk; amond other traits, it requires Ord and ArchivedDBData. MonoidValue essentially requires the traits Zero, HasZero, Add (and variants such as AddByRef).

    • MulByRef which allows accumulator values to be multiplied by integer (signed and unsigned) weights. Negative weights are used when elements are removed from collections; weights larger than 1 are used when multiple copies of an element are updated in one operation. Currently the Weight type is i64.

  • a function which converts a collection value into an accumulator value. The function's name is obtained from the aggregate function name with the suffix _map.

  • a function to perform additional post-processing on the aggregation result, returning the expected result. The function's name is obtained from the aggregate function name with the suffix _post.

This use of the post-processing function enables linear implementations for aggregates such as "average", where the aggregation produces a sum and a count, while the post-processing step produces the actual result by computing sum/count.

If the Add operation for the accumulator type is not linear, the results produced by the program are undefined. This requirement can be subtle; for example, floating point addition is not associative, and thus an computing an aggregate like SUM on floating-point values using standard floating point arithmetic will produce incorrect results or runtime crashes.

Example user-defined aggregate

We show an example building a user-defined linear aggregate for computing sum of 128-bit values. We use the SQL type BINARY(16) to represent 128-bit numbers. The first step requires declaring the user-defined aggregate function in SQL:

CREATE LINEAR AGGREGATE i128_sum(value BINARY(16)) RETURNS BINARY(16);

Notice that the type of the argument and result are both nullable. We can then use the user-defined aggregate in a SQL program:

CREATE TABLE T(value BINARY(16));

CREATE MATERIALIZED VIEW V0 AS SELECT i128_sum(value) FROM T;

In SQL the SUM function is polymorphic, since it works for any numeric SQL type. Currently user-defined aggregate functions cannot be polymorphic, so one needs to define a new user-defined aggregate function for each type of values; moreover, the SUM function name cannot be used for a user-defined aggregate.

The user needs to add the following 2 dependencies to the udf.toml file:

i256 = { version = "0.2.2", features = ["num-traits"] }
num-traits = "0.2.19"

We will use the I256 Rust crate for 256-bit arithmetic. In our implementation we wrap this type into the type I256Wrapper, for which we implement the required traits. Most of the code is devoted for this task, and is relatively straightforward.

For our example the accumulator type that the user has to define is named i128_sum_accumulator_type. In our implementation the accumulator is a tuple with 3 fields:

  • the partial sum computed, stored in an I256 value

  • the count of non-null elements in the collection encountered

  • the total count of elements in the collection

The user would add the following implementation to the udf.rs file:

use i256::I256;
use feldera_sqllib::*;
use crate::{AddAssignByRef, AddByRef, HasZero, MulByRef, SizeOf, Tup3};
use derive_more::Add;
use num_traits::Zero;
use rkyv::Fallible;
use std::ops::{Add, AddAssign};

#[derive(Add, Clone, Debug, Default, PartialOrd, Ord, Eq, PartialEq, Hash)]
pub struct I256Wrapper {
pub data: I256,
}

impl SizeOf for I256Wrapper {
fn size_of_children(&self, context: &mut size_of::Context) {}
}

impl From<[u8; 32]> for I256Wrapper {
fn from(value: [u8; 32]) -> Self {
Self { data: I256::from_be_bytes(value) }
}
}

impl From<&[u8]> for I256Wrapper {
fn from(value: &[u8]) -> Self {
let mut padded = [0u8; 32];
// If original value is negative, pad with sign
if value[0] & 0x80 != 0 {
padded.fill(0xff);
}
let len = value.len();
if len > 32 {
panic!("Slice larger than target");
}
padded[32-len..].copy_from_slice(&value[..len]);
Self { data: I256::from_be_bytes(padded) }
}
}

impl MulByRef<Weight> for I256Wrapper {
type Output = Self;

fn mul_by_ref(&self, other: &Weight) -> Self::Output {
println!("Mul {:?} by {}", self, other);
Self {
data: self.data.checked_mul_i64(*other)
.expect("Overflow during multiplication"),
}
}
}

impl HasZero for I256Wrapper {
fn zero() -> Self {
Self { data: I256::zero() }
}

fn is_zero(&self) -> bool {
self.data.is_zero()
}
}

impl AddByRef for I256Wrapper {
fn add_by_ref(&self, other: &Self) -> Self {
Self { data: self.data.add(other.data) }
}
}

impl AddAssignByRef<Self> for I256Wrapper {
fn add_assign_by_ref(&mut self, other: &Self) {
self.data += other.data
}
}

#[repr(C)]
#[derive(Debug, Copy, Clone, PartialOrd, Ord, Eq, PartialEq)]
pub struct ArchivedI256Wrapper {
pub bytes: [u8; 32],
}

impl rkyv::Archive for I256Wrapper {
type Archived = ArchivedI256Wrapper;
type Resolver = ();

#[inline]
unsafe fn resolve(&self, pos: usize, _: Self::Resolver, out: *mut Self::Archived) {
out.write(ArchivedI256Wrapper {
bytes: self.data.to_be_bytes(),
});
}
}

impl<S: Fallible + ?Sized> rkyv::Serialize<S> for I256Wrapper {
#[inline]
fn serialize(&self, serializer: &mut S) -> Result<Self::Resolver, S::Error> {
Ok(())
}
}

impl<D: Fallible + ?Sized> rkyv::Deserialize<I256Wrapper, D> for ArchivedI256Wrapper {
#[inline]
fn deserialize(&self, _: &mut D) -> Result<I256Wrapper, D::Error> {
Ok(I256Wrapper::from(self.bytes))
}
}

pub type i128_sum_accumulator_type = Tup3<I256Wrapper, i64, i64>;

pub fn i128_sum_map(val: Option<ByteArray>) -> i128_sum_accumulator_type {
match val {
None => Tup3::new(I256Wrapper::zero(), 0, 1),
Some(val) => Tup3::new(
I256Wrapper::from(val.as_slice()),
1,
1,
),
}
}

pub fn i128_sum_post(val: i128_sum_accumulator_type) -> Option<ByteArray> {
if val.1 == 0 {
None
} else {
// Check for overflow
if val.0.data < I256::from(i128::MIN) || val.0.data > I256::from(i128::MAX) {
panic!("Result of aggregation {} does not fit in 128 bits", val.0.data);
}
Some(ByteArray::new(&val.0.data.to_be_bytes()[16..]))
}
}

The two functions needed to implement the aggregation are i128_sum_map, and i128_sum_post.

i128_sum_map converts a BINARY(16) value into an accumulator value. Notice that in the SQL runtime library BINARY(16) is implemented as a ByteArray.

i128_sum_post converts the accumulator value into the expected result type BINARY(16).

We use the Tup3 type from our SQL runtime library. This type implements Add and other required operations if all fields do. The addition of Tup3 values is done field-wise, and the Zero trait for Tup3 is a tuple with all fields zero.

Creating user-defined non-linear aggregate functions

Currently user-defined non-linear aggregation functions are not supported. These may be added in the future.