User-Defined Functions
The SQL statement CREATE FUNCTION
can be used to declare new
functions. Functions can be implemented either in SQL or in Rust.
User-defined functions written in SQL
The following example shows a user-defined function:
CREATE FUNCTION contains_number(str VARCHAR NOT NULL, value INTEGER)
RETURNS BOOLEAN NOT NULL
AS (str LIKE ('%' || CAST(value AS VARCHAR) || '%'));
CREATE VIEW V0 AS SELECT contains_number(CAST('YES: 10 NO:5' AS VARCHAR), 5)
User-defined functions written in Rust
Rust UDF support is currently experimental and may undergo significant changes, including non-backward-compatible modifications, in future releases of Feldera.
-
Feldera's incremental query engine assumes that all computations are deterministic. Using a non-deterministic UDF is likely to result in incorrect outputs. The SQL compiler cannot verify that Rust functions meet this requirement. Therefore, it is the responsibility of the UDF developer to ensure that their UDFs are deterministic, i.e., the function's return value must depend solely on its arguments and not on clocks, random number generators, external data sources, or other sources of non-determinism.
We are considering adding support for non-deterministic UDFs in the future. If your use case requires a non-deterministic UDF, please share your feedback by commenting on the associated GitHub issue
-
UDFs are compiled into native binary code and executed directly within the address space of the pipeline. Therefore, only trusted code should be included in UDFs. UDFs should not contain panics or invoke undefined behaviors. These can lead to crashes, memory corruption, or unpredictable behavior within the pipeline. UDFs should handle all runtime errors gracefully, by returning a
Result::Err()
(see below). This ensures that errors are propagated in a controlled manner and can be handled by the calling code.
Feldera supports UDFs implemented in Rust. To define a UDF, users declare the function in SQL without providing a body. The corresponding Rust implementation is supplied in a separate Rust file, which contains the logic for all UDFs declared within the SQL program. A complete set of files that defines the pipeline in this case is:
File | Description |
---|---|
program.sql | The SQL code, including table, view, and user-defined function declarations. |
stubs.rs | Auto-generated by the SQL compiler; contains Rust stubs for UDFs (see below). |
udf.rs | User-provided implementation of UDFs in Rust. |
udf.toml | User-provided list of external crates required by the code in udf.rs . This list is appended to the [dependencies] section of the Cargo.toml file. |
We recommend using the Web Console to develop and debug Rust UDFs, as described below. The Web Console code editor has a separate tab for each of these files, and supports an interactive development workflow. Once ready, the creation and deployment of the pipeline can be automated with scripts that make use of the Feldera CLI, the Python API, or the REST API. These scripts and the SQL/Rust code can then be maintained with the usual practices such as version control.
Step 1: Declare a function in the SQL program using CREATE FUNCTION
Declare a SQL function without a body:
-- UDF which encodes text in base64.
CREATE FUNCTION base64(s VARBINARY) RETURNS VARCHAR;
Step 2: Locate the auto-generated Rust prototype
When the SQL compiler encounters such a function, it generates a Rust stub for it in the stubs.rs
file. The stub is an implementation of the UDF in Rust that simply invokes another function with the
same name and signature exported from the udf.rs
module:
/* stubs.rs */
...
pub fn base64(s: Option<ByteArray>) -> Result<Option<String>, Box<dyn std::error::Error>> {
udf::base64(
s)
}
The udf::base64
function does not exist yet, so you will see a Rust compiler error similar to this:
error[E0425]: cannot find function `base64` in module `udf`
--> project01926e00-f404-7813-b65c-d66ac9959998/src/stubs.rs:11:10
|
11 | udf::base64(
| ^^^^^^ not found in `udf`
Step 3: Implement the UDF in Rust
Copy the declaration from stubs.rs
to udf.rs
and replace its body with the actual implementation:
/* udf.rs */
use feldera_sqllib::*;
use base64::prelude::*;
pub fn base64(s: Option<ByteArray>) -> Result<Option<SqlString>, Box<dyn std::error::Error>> {
Ok(s.map(|v| SqlString::from_ref(BASE64_STANDARD.encode(v.as_slice()))))
}
The use feldera_sqllib::*
directive imports the definitions of the
Rust types that the compiler uses to implement some of the SQL
datatypes. The next section explains what these types are.
If your UDF uses external crates, list these external dependencies in udf.toml
.
The contents of this file is appended to the [dependencies]
section in the generated
Cargo.toml
file.
base64 = "0.22.1"
Step 4: Deploy the UDF
Using Feldera CLI:
fda create udf_fda_test --udf-rs udf.rs --udf-toml udf.toml program.sql
Using Feldera Python SDK
Click to see how to create a pipeline with a Rust UDF using the Feldera Python SDK
from feldera import FelderaClient, PipelineBuilder
# SQL program.
sql = """
-- UDF which encodes text in base64.
CREATE FUNCTION base64(s VARBINARY) RETURNS VARCHAR;
CREATE TABLE binary_t(
b VARBINARY
);
CREATE MATERIALIZED VIEW base64_v AS
SELECT
base64(b) as text
FROM
binary_t;
"""
# UDF implementation in Rust.
udf_rust = """
use feldera_sqllib::*;
use base64::prelude::*;
pub fn base64(s: Option<ByteArray>) -> Result<Option<String>, Box<dyn std::error::Error>> {
Ok(s.map(|v| BASE64_STANDARD.encode(v.as_slice())))
}"""
# External Rust dependencies.
udf_toml = """
base64 = "0.22.1"
"""
# Create a pipeline using the above SQL, Rust, and TOML code.
feldera = FelderaClient("http://127.0.0.1:8080")
pipeline = PipelineBuilder(
feldera, name="udf_test", sql=sql, udf_rust=udf_rust, udf_toml = udf_toml).create_or_replace()
pipeline.start()
next(pipeline.query("insert into binary_t values(X'0123456789ABCDEF')"))
output = pipeline.query("select * from base64_v")
assert list(output) == [{'text': 'ASNFZ4mrze8='}]
pipeline.shutdown()
pipeline.delete()
Using the REST API
Click to see how to create a pipeline with a Rust UDF using the Feldera REST API
#!/bin/bash
echo "
-- UDF which encodes text in base64.
CREATE FUNCTION base64(s VARBINARY) RETURNS VARCHAR;
CREATE TABLE binary_t(
b VARBINARY
);
CREATE MATERIALIZED VIEW base64_v AS
SELECT
base64(b) as text
FROM
binary_t;
" > program.sql
echo "
use feldera_sqllib::*;
use base64::prelude::*;
pub fn base64(s: Option<ByteArray>) -> Result<Option<String>, Box<dyn std::error::Error>> {
Ok(s.map(|v| BASE64_STANDARD.encode(v.as_slice())))
}" > udf.rs
echo "
base64 = \"0.22.1\"
" > udf.toml
curl -i -X PUT http://127.0.0.1:8080/v0/pipelines/udf_api_test \
-H 'Content-Type: application/json' \
-d "$(jq -Rsn \
--rawfile sql program.sql \
--rawfile udf_rust udf.rs \
--rawfile udf_toml udf.toml \
'{
name: "udf_api_test",
description: "Create a UDF using REST API",
program_code: $sql,
runtime_config: {},
program_config: {},
udf_rust: $udf_rust,
udf_toml: $udf_toml
}')"
SQL type representation in Rust
The following table shows the Rust representation of standard SQL data
types. A nullable SQL type is represented by the corresponding rust
Option<>
type. Notice that some of these types are not standard
Rust types, but are defined in the
feldera-sqllib
crate, which is part of the Feldera SQL runtime.
SQL | Rust |
---|---|
BOOLEAN | bool |
TINYINT | i8 |
SMALLINT | i16 |
INT | i32 |
BIGINT | i64 |
TINYINT UNSIGNED | u8 |
SMALLINT UNSIGNED | u16 |
INT UNSIGNED | u32 |
BIGINT UNSIGNED | u64 |
DECIMAL(p, s) | feldera_sqllib::SqlDecimal<P, S> |
REAL | feldera_sqllib::F32 |
DOUBLE | feldera_sqllib::F64 |
CHAR , CHAR(n) | feldera_sqllib::SqlString |
VARCHAR , VARCHAR(n) | feldera_sqllib::SqlString |
BINARY , BINARY(n) , VARBINARY , VARBINARY(n) | feldera_sqllib::ByteArray |
NULL | () |
INTERVAL | feldera_sqllib::ShortInterval , feldera_sqllib::LongInterval |
TIME | feldera_sqllib::Time |
TIMESTAMP | feldera_sqllib::Timestamp |
DATE | feldera_sqllib::Date |
T ARRAY | feldera_sqllib::Array<T> |
MAP<K, V> | feldera_sqllib::Map<K, V> |
UUID | feldera_sqllib::Uuid |
VARIANT | feldera_sqllib::Variant |
ROW | Tup N |
User-defined struct type | Tup N |
Multiple SQL types may be represented by the same Rust type. For
example, CHAR
, CHAR(n)
, VARCHAR(n)
, and VARCHAR
are all
represented by the SqlString
type.
The SQL family of INTERVAL
types translates to one of two Rust
types: ShortInterval
(representing intervals from days to seconds),
and LongInterval
(representing intervals from years to months).
(Our dialect of SQL does not allow mixing the two kinds of intervals
in a single expression.)
Currently feldera_sqlllib::Map
is defined as type Map = Arc<BTreeMap>
, and feldera_sqlllib::Array
is defined as type Array = Arc<Vec>
. Currently feldera_sqlllib::SqlString
is a thin wrapper
type around the ArcStr
type from the arcstr
crate.
A ROW
type with N fields represented by a Rust Tup
N datatype. A
user-defined structure type is also represented by a tuple Tup
N
type. These tuple types can be imported from the current crate.
Here are some examples using ROW
and user-defined types:
CREATE TYPE X AS (x int);
CREATE FUNCTION f(arg X) RETURNS X;
CREATE FUNCTION g(x int NOT NULL) RETURNS ROW(a INT, b INT) NOT NULL;
CREATE VIEW V AS SELECT f(X(1)), g(2).a;
And here is a possible implementation of the used-defined functions
f
and g
in Rust:
use crate::{Tup1, Tup2};
use feldera_sqllib::*;
pub fn f(x: Option<Tup1<Option<i32>>>) ->
Result<Option<Tup1<Option<i32>>>, Box<dyn std::error::Error>> {
match x {
None => Ok(None),
Some(x) => match x.0 {
None => Ok(Some(Tup1::new(None))),
Some(x) => Ok(Some(Tup1::new(Some(x + 1)))),
}
}
}
pub fn g(x: i32) -> Result<Tup2<i32, i32>, Box<dyn std::error::Error>> {
Ok(Tup2::new(x-1, x+1))
}
Return types
In the Rust implementation the function always has to return the type
Result<T, Box<dyn std::error::Error>>
, where T
is the Rust
equivalent of the expected return type of the SQL function. The Rust
function should return an Err
only when the function fails at
runtime with a fatal condition, e.g., array index out of
bounds, arithmetic overflows, etc.
Developing complex UDFs
While many useful UDFs can be implemented with just a few lines of Rust,
some may require more complex code that is easier to develop
using a full-featured Rust IDE. To support the development of such
UDFs, we made the feldera-sqllib
crate available on
crates.io.
In order to implement a complex Rust UDF (or a library of UDFs) using
a Rust IDE:
- Create a new Rust crate to serve as the container for your UDFs.
- Add
feldera-sqllib
as a dependency toCargo.toml
(use the crate version that matches the version of Feldera you are working with). - Implement and test your UDFs within this crate.
- Copy the final Rust code and dependencies to the Feldera Web Console.
If your UDFs require a larger Rust project with multiple modules, we
recommend encapsulating the majority of the UDF logic in a crate.
This crate can be hosted on crates.io or GitHub.
Import this crate to your Feldera pipeline via the udf.toml
file,
including only wrapper functions that call the API of this crate in
udf.rs
.
Limitations
-
Currently only limited implicit casts are inserted by the compiler for the function arguments and function result in the SQL program. For example, a call such as
CONTAINS_NUMBER('2010-10-20', '5')
will fail at SQL compilation time because the first argument has typeCHAR(8)
instead ofVARCHAR
, and the second argument has typeCHAR(1)
instead ofINTEGER
. This can be avoided by calling the function using an explicit cast:CONTAINS_NUMBER(CAST('2010-10-20' AS VARCHAR), CAST('5' AS INTEGER))
. -
User-defined functions cannot have names identical to standard SQL library function names.
-
Polymorphic functions are not supported. For example, in SQL the addition operation operates on any numeric types; such an operation cannot be implemented as a single user-defined function.
User-defined aggregates
Rust UDA support is currently experimental and may undergo significant changes, including non-backward-compatible modifications, in future releases of Feldera.
The SQL statement CREATE AGGREGATE
can be used to extend the set of
aggregate functions supported by Feldera SQL with user-defined
aggregation functions. Such functions need to be implemented in Rust.
The argument and result of CREATE AGGREGATE
must be nullable.
Creating user-defined linear aggregate functions
In general, a function A is linear if it has the following property: A(C1 UNION C2) = A(C1) + A(C2), where C1 and C2 are arbitrary collections. The type of the result produced by the function is expected to have a + operation, and it must be commutative and associative.
To implement a user-defined linear aggregate function the user has to define 3 Rust objects:
-
a type for the accumulator; the type name is obtained from the aggregate function name with the suffix
_accumulator_type
.The actual arithmetic is performed using values of this type. The accumulator type is thus required to implement several traits defined in the DBSP core library:
-
DBWeight
which is a combination ofDBData
andMonoidValue
.DBData
allows accumulators to be stored in relations which may be spilled to disk; amond other traits, it requiresOrd
andArchivedDBData
.MonoidValue
essentially requires the traitsZero
,HasZero
,Add
(and variants such asAddByRef
). -
MulByRef
which allows accumulator values to be multiplied by integer (signed and unsigned) weights. Negative weights are used when elements are removed from collections; weights larger than 1 are used when multiple copies of an element are updated in one operation. Currently theWeight
type isi64
.
-
-
a function which converts a collection value into an accumulator value. The function's name is obtained from the aggregate function name with the suffix
_map
. -
a function to perform additional post-processing on the aggregation result, returning the expected result. The function's name is obtained from the aggregate function name with the suffix
_post
.
This use of the post-processing function enables linear implementations for aggregates such as "average", where the aggregation produces a sum and a count, while the post-processing step produces the actual result by computing sum/count.
If the Add
operation for the accumulator type is not linear, the
results produced by the program are undefined. This requirement can
be subtle; for example, floating point addition is not associative,
and thus an computing an aggregate like SUM
on floating-point values
using standard floating point arithmetic will produce incorrect
results or runtime crashes.
Example user-defined aggregate
We show an example building a user-defined linear aggregate for
computing sum of 128-bit values. We use the SQL type BINARY(16)
to
represent 128-bit numbers. The first step requires declaring the
user-defined aggregate function in SQL:
CREATE LINEAR AGGREGATE i128_sum(value BINARY(16)) RETURNS BINARY(16);
Notice that the type of the argument and result are both nullable. We can then use the user-defined aggregate in a SQL program:
CREATE TABLE T(value BINARY(16));
CREATE MATERIALIZED VIEW V0 AS SELECT i128_sum(value) FROM T;
In SQL the SUM
function is polymorphic, since it works for any
numeric SQL type. Currently user-defined aggregate functions cannot
be polymorphic, so one needs to define a new user-defined aggregate
function for each type of values; moreover, the SUM
function name
cannot be used for a user-defined aggregate.
The user needs to add the following 2 dependencies to the udf.toml
file:
i256 = { version = "0.2.2", features = ["num-traits"] }
num-traits = "0.2.19"
We will use the I256
Rust crate for 256-bit arithmetic. In our implementation we wrap this
type into the type I256Wrapper
, for which we implement the required
traits. Most of the code is devoted for this task, and is relatively
straightforward.
For our example the accumulator type that the user has to define is
named i128_sum_accumulator_type
. In our implementation the
accumulator is a tuple with 3 fields:
-
the partial sum computed, stored in an I256 value
-
the count of non-null elements in the collection encountered
-
the total count of elements in the collection
The user would add the following implementation to the udf.rs
file:
use i256::I256;
use feldera_sqllib::*;
use crate::{AddAssignByRef, AddByRef, HasZero, MulByRef, SizeOf, Tup3};
use derive_more::Add;
use num_traits::Zero;
use rkyv::Fallible;
use std::ops::{Add, AddAssign};
#[derive(Add, Clone, Debug, Default, PartialOrd, Ord, Eq, PartialEq, Hash)]
pub struct I256Wrapper {
pub data: I256,
}
impl SizeOf for I256Wrapper {
fn size_of_children(&self, context: &mut size_of::Context) {}
}
impl From<[u8; 32]> for I256Wrapper {
fn from(value: [u8; 32]) -> Self {
Self { data: I256::from_be_bytes(value) }
}
}
impl From<&[u8]> for I256Wrapper {
fn from(value: &[u8]) -> Self {
let mut padded = [0u8; 32];
// If original value is negative, pad with sign
if value[0] & 0x80 != 0 {
padded.fill(0xff);
}
let len = value.len();
if len > 32 {
panic!("Slice larger than target");
}
padded[32-len..].copy_from_slice(&value[..len]);
Self { data: I256::from_be_bytes(padded) }
}
}
impl MulByRef<Weight> for I256Wrapper {
type Output = Self;
fn mul_by_ref(&self, other: &Weight) -> Self::Output {
println!("Mul {:?} by {}", self, other);
Self {
data: self.data.checked_mul_i64(*other)
.expect("Overflow during multiplication"),
}
}
}
impl HasZero for I256Wrapper {
fn zero() -> Self {
Self { data: I256::zero() }
}
fn is_zero(&self) -> bool {
self.data.is_zero()
}
}
impl AddByRef for I256Wrapper {
fn add_by_ref(&self, other: &Self) -> Self {
Self { data: self.data.add(other.data) }
}
}
impl AddAssignByRef<Self> for I256Wrapper {
fn add_assign_by_ref(&mut self, other: &Self) {
self.data += other.data
}
}
#[repr(C)]
#[derive(Debug, Copy, Clone, PartialOrd, Ord, Eq, PartialEq)]
pub struct ArchivedI256Wrapper {
pub bytes: [u8; 32],
}
impl rkyv::Archive for I256Wrapper {
type Archived = ArchivedI256Wrapper;
type Resolver = ();
#[inline]
unsafe fn resolve(&self, pos: usize, _: Self::Resolver, out: *mut Self::Archived) {
out.write(ArchivedI256Wrapper {
bytes: self.data.to_be_bytes(),
});
}
}
impl<S: Fallible + ?Sized> rkyv::Serialize<S> for I256Wrapper {
#[inline]
fn serialize(&self, serializer: &mut S) -> Result<Self::Resolver, S::Error> {
Ok(())
}
}
impl<D: Fallible + ?Sized> rkyv::Deserialize<I256Wrapper, D> for ArchivedI256Wrapper {
#[inline]
fn deserialize(&self, _: &mut D) -> Result<I256Wrapper, D::Error> {
Ok(I256Wrapper::from(self.bytes))
}
}
pub type i128_sum_accumulator_type = Tup3<I256Wrapper, i64, i64>;
pub fn i128_sum_map(val: Option<ByteArray>) -> i128_sum_accumulator_type {
match val {
None => Tup3::new(I256Wrapper::zero(), 0, 1),
Some(val) => Tup3::new(
I256Wrapper::from(val.as_slice()),
1,
1,
),
}
}
pub fn i128_sum_post(val: i128_sum_accumulator_type) -> Option<ByteArray> {
if val.1 == 0 {
None
} else {
// Check for overflow
if val.0.data < I256::from(i128::MIN) || val.0.data > I256::from(i128::MAX) {
panic!("Result of aggregation {} does not fit in 128 bits", val.0.data);
}
Some(ByteArray::new(&val.0.data.to_be_bytes()[16..]))
}
}
The two functions needed to implement the aggregation are
i128_sum_map
, and i128_sum_post
.
i128_sum_map
converts a BINARY(16)
value into an accumulator
value. Notice that in the SQL runtime library BINARY(16)
is
implemented as a ByteArray
.
i128_sum_post
converts the accumulator value into the expected
result type BINARY(16)
.
We use the Tup3
type from our SQL runtime library. This type
implements Add
and other required operations if all fields do.
The addition of Tup3
values is done field-wise, and the Zero
trait
for Tup3
is a tuple with all fields zero.
Creating user-defined non-linear aggregate functions
Currently user-defined non-linear aggregation functions are not supported. These may be added in the future.