Skip to main content

Using the SQL Compiler directly

SQL programs

The compiler, which runs internally to Feldera, must be given the table definition first, and then the view definition. e.g.,

-- define Person table
CREATE TABLE Person
(
name VARCHAR NOT NULL,
age INT,
present BOOLEAN
);
CREATE VIEW Adult AS SELECT Person.name FROM Person WHERE Person.age > 18;

The compiler generates a Rust function which implements the query as a function: given the input data, it produces the output data.

The compiler can also generate a function which will incrementally maintain the view Adult when presented with changes to table Person:

                                           table changes
V
tables -----> SQL-to-DBSP compiler ------> DBSP circuit
views V
view changes

Command-line options

The compiler is invoked using a Linux shell script, called sql-to-dbsp, residing in the directory SQL-compiler directory. Here is an example:

$ ./sql-to-dbsp -h
Usage: sql-to-dbsp [options] Input file to compile
Options:
--alltables
Generate an input for each CREATE TABLE, even if the table is not used
by any view
Default: false
--handles
Use handles (true) or Catalog (false) in the emitted Rust code
Default: false
-h, --help, -?
Show this message and exit
--ignoreOrder
Ignore ORDER BY clauses at the end
Default: false
--jdbcSource
Connection string to a database that contains table metadata
Default: <empty string>
--lenient
Lenient SQL validation. If true it allows duplicate column names in a
view
Default: false
--nowstream
Implement NOW as a stream (true) or as an internal operator (false)
Default: false
--outputsAreSets
Ensure that outputs never contain duplicates
Default: false
--streaming
Compiling a streaming program, where only inserts are allowed
Default: false
--udf
Specify a Rust file containing implementations of user-defined functions
Default: <empty string>
--unquotedCasing
How unquoted identifiers are treated. Choices are: 'upper', 'lower',
'unchanged'
Default: upper
--no-restrict-io
Do not restrict the types of columns allowed in tables and views
Default: false
-O
Optimization level (0, 1, or 2)
Default: 2
-T
Specify logging level for a class (can be repeated)
Syntax: -Tkey=value
Default: {}
-d
SQL syntax dialect used
Default: ORACLE
Possible Values: [BIG_QUERY, ORACLE, MYSQL, MYSQL_ANSI, SQL_SERVER, JAVA]
-f
Name of function to generate
Default: circuit
-i
Generate an incremental circuit
Default: false
-je
Emit error messages as a JSON array to stderr
Default: false
-jpg
Emit a jpg image of the circuit instead of Rust
Default: false
-js
Emit a JSON file containing the schema of all views and tables involved
-o
Output file; stdout if null
Default: <empty string>
-png
Emit a png image of the circuit instead of Rust
Default: false
-q
Quiet: do not print warnings
Default: false
-v
Output verbosity
Default: 0

Here is a description of the non-obvious command-line options:

--handles: The Rust generated code can expose the input tables and output views in two ways: through explicit handles, and through a Catalog object. The catalog allows one to retrieve the handles by name, but offers only untyped handles, that require a serializer (for output handles) or a deserializer (for input handles) to transmit data. The handles API gives access to typed stream handles, which allow data insertion and retrieval without using serialization/deserialization.

--ignoreOrder: ORDER BY clauses are not naturally incrementalizable. Using this flag directs the compiler to ignore ORDER BY clauses that occur last in a view definition (thus giving unsorte outputs). This will not affect ORDER BY clauses in an OVER clause, or ORDER BY clauses followed by LIMIT clauses. The use of this flag is recommended with the -i flag that incrementalizes the compiler output.

--lenient: Some SQL queries generate output views having multiple columns with the same name. Such views can cause problems with other tools that interface with the compiler outputs. By default the compiler will emit an error when given such views. For example, the following definition:

CREATE VIEW V AS SELECT T.COL2, S.COL2 from T, S

will create a view with two columns named COL2. The workaround is to explicitly name the view columns, e.g.:

CREATE VIEW V AS SELECT T.COL2 AS TCOL2, S.COL2 AS SCOL2 from T, S

Using the --lenient flag will only emit warnings, but compile such programs.

--outputsAreSets: SQL queries can produce outputs that contain duplicates, but such outputs are rarely useful in practice. Using this flag will ensure that each output VIEW does not contain duplicates. This can also be ensured by using a SELECT DISTINCT statement in the view definition. An example query that can produce duplicates is:

CREATE VIEW V AS SELECT T.COL1 FROM T

-O: sets the optimization level. A higher values implies more optimizations.

-d: Sets the lexical rules used. SQL dialects differ in rules for allowed identifiers, quoting identifiers, conversions to uppercase, case sensitivity of identifiers.

--streaming: Equivalent to adding the following property to all program tables: 'appendOnly' = 'true'.

--nowstream: If this property is set to 'true' it implements the NOW() function in a special way, as an input table, which allows deterministic testing.

When a program uses the NOW function, the following input table is automatically injected by the compiler:

CREATE TABLE NOW(now TIMESTAMP NOT NULL LATENESS INTERVAL 0 SECONDS);

All invocations of the NOW() function within the program will produce the last value inserted in this table.

This table does is not populated automatically. Instead, the user is responsible for supplying the data to this table. In every step of the circuit the user has to insert a new value in this table, which should be larger than the previous value.

Example: Compiling a SQL program to Rust

The following command-line compiles a script called x.sql and writes the result in a file lib.rs:

$ ./sql-to-dbsp x.sql --handles -o ../temp/src/lib.rs

Let's assume we are compiling a file containing the program in the example above.

In the generated program every CREATE TABLE is translated to an input, and every CREATE VIEW is translated to an output. The result produced will look like this^[1]:

[1] Note: the compiler output changes as the compiler implementation evolves. This code is shown for illustrative purposes only.

$ cat ../temp/src/lib.rs
// Automatically-generated file

[...boring stuff removed...]
fn circuit(workers: usize) -> Result<(DBSPHandle, (CollectionHandle<Tup3<String, Option<i32>, Option<bool>>, Weight>, OutputHandle<OrdZSet<Tup1<String>, Weight>>, )), DBSPError> {

let (circuit, streams) = Runtime::init_circuit(workers, |circuit| {
// CREATE TABLE `PERSON` (`NAME` VARCHAR NOT NULL, `AGE` INTEGER, `PRESENT` BOOLEAN)
#[derive(Clone, Debug, Eq, PartialEq)]
struct r#PERSON_0 {
r#field: String,
r#field_0: Option<i32>,
r#field_1: Option<bool>,
}
impl From<PERSON_0> for Tup3<String, Option<i32>, Option<bool>> {
fn from(table: r#PERSON_0) -> Self {
Tup3::new(table.r#field,table.r#field_0,table.r#field_1,)
}
}
impl From<Tup3<String, Option<i32>, Option<bool>>> for r#PERSON_0 {
fn from(tuple: Tup3<String, Option<i32>, Option<bool>>) -> Self {
Self {
r#field: tuple.0,
r#field_0: tuple.1,
r#field_1: tuple.2,
}
}
}
deserialize_table_record!(PERSON_0["PERSON", 3] {
(r#field, "NAME", false, String, None),
(r#field_0, "AGE", false, Option<i32>, Some(None)),
(r#field_1, "PRESENT", false, Option<bool>, Some(None))
});
serialize_table_record!(PERSON_0[3]{
r#field["NAME"]: String,
r#field_0["AGE"]: Option<i32>,
r#field_1["PRESENT"]: Option<bool>
});
// DBSPSourceMultisetOperator 312(32)
// CREATE TABLE `PERSON` (`NAME` VARCHAR NOT NULL, `AGE` INTEGER, `PRESENT` BOOLEAN)
let (PERSON, handlePERSON) = circuit.add_input_zset::<Tup3<String, Option<i32>, Option<bool>>, Weight>();

// rel#36:LogicalFilter.(input=LogicalTableScan#1,condition=>($1, 18))
// DBSPFilterOperator 332(57)
let stream3: Stream<_, OrdZSet<Tup3<String, Option<i32>, Option<bool>>, Weight>> = PERSON.filter(move |t: &Tup3<String, Option<i32>, Option<bool>>, | ->
bool {
wrap_bool(gt_i32N_i32((*t).1, 18i32))
});
// rel#38:LogicalProject.(input=LogicalFilter#36,inputs=0)
// DBSPMapOperator 350(79)
let stream4: Stream<_, OrdZSet<Tup1<String>, Weight>> = stream3.map(move |t: &Tup3<String, Option<i32>, Option<bool>>, | ->
Tup1<String> {
Tup1::new((*t).0.clone())
});
// CREATE VIEW `ADULT` AS
// SELECT `PERSON`.`NAME`
// FROM `schema`.`PERSON` AS `PERSON`
// WHERE `PERSON`.`AGE` > 18
#[derive(Clone, Debug, Eq, PartialEq)]
struct r#ADULT_0 {
r#field: String,
}
impl From<ADULT_0> for Tup1<String> {
fn from(table: r#ADULT_0) -> Self {
Tup1::new(table.r#field,)
}
}
impl From<Tup1<String>> for r#ADULT_0 {
fn from(tuple: Tup1<String>) -> Self {
Self {
r#field: tuple.0,
}
}
}
deserialize_table_record!(ADULT_0["ADULT", 1] {
(r#field, "NAME", false, String, None)
});
serialize_table_record!(ADULT_0[1]{
r#field["NAME"]: String
});
let handleADULT = stream4.output();

Ok((handlePERSON, handleADULT, ))
})?;
Ok((circuit, streams))
}

You can compile the generated Rust code:

$ cd ../temp
$ cargo build

The generated file contains a Rust function called circuit (you can change its name using the compiler option -f). Calling circuit will return an executable DBSP circuit handle, and a tuple containing a handle for each input and output stream, in the order they are declared in the SQL program.

Executing the produced circuit

We can write a unit test to exercise this circuit:

#[test]
pub fn test() {
let (mut circuit, (person, adult) ) = circuit(2).unwrap();
// Feed two input records to the circuit.
// First input has a count of "1"
person.push( ("Bob".to_string(), Some(12), Some(true)).into(), 1 );
// Second input has a count of "2"
person.push( ("Tom".to_string(), Some(20), Some(false)).into(), 2 );
// Execute the circuit on these inputs
circuit.step().unwrap();
// Read the produced output
let out = adult.consolidate();
// Print the produced output
println!("{}", out);
}

The unit test can be exercised with cargo test -- --nocapture.

This will print the output as:

layer:
("Tom",) -> 2

Serialization and Deserialization

In general a circuit will connect with external data sources, and thus will need to convert the data to/from other representations. Here is an example using the Catalog circuit API. We compile the same program as before, with different command-line flags:

$ ./sql-to-dbsp x.sql -i -o ../temp/src/lib.rs

This time we are producing an incremental version of the circuit. The difference between this circuit and the previous one is as follows:

  • For the non-incremental circuit, every time we supply an input value, the table PERSONS is cleared and filled with the supplied value. The circuit.step() function computes the contents of the output view ADULTS. Reading the output handle gives us the entire contents of this view.

  • For the incremental circuit, the PERSONS table is initially empty. Every time we supply an input it is added to the table. (Input encoding formats such as JSON can also specify deletions from the table.) The circuit.step() function computes the changes to the output view ADULTS. Reading the output handle gives us the latest changes to the contents of this view. Formats such as CSV cannot represent deletions, so they may be insufficient for representing changes. In the following example we input only insertions using CSV, but the output is received in a JSON format which can describe deletions too.

We exercise this circuit by inserting data using a CSV format:

#[test]
pub fn test() {
use dbsp_adapters::{CircuitCatalog, RecordFormat};

let (mut circuit, catalog) = circuit(2)
.expect("Failed to build circuit");
let persons = catalog
.input_collection_handle(&SqlIdentifier::from("PERSON"))
.expect("Failed to get input collection handle");
let mut persons_stream = persons
.configure_deserializer(RecordFormat::Csv)
.expect("Failed to configure deserializer");
persons_stream
.insert(b"Bob,12,true")
.expect("Failed to insert data");
persons_stream
.insert(b"Tom,20,false")
.expect("Failed to insert data");
persons_stream
.insert(b"Tom,20,false")
.expect("Failed to insert data"); // Insert twice
persons_stream.flush();
// Execute the circuit on these inputs
circuit
.step()
.unwrap();

let adult = &catalog
.output_handles(&SqlIdentifier::from("ADULT"))
.expect("Failed to get output collection handles")
.delta_handle;

// Read the produced output
let out = adult.consolidate();
// Print the produced output
println!("{:?}", out);
}

Obtaining the schema information from the compiler

The -js compiler flag is followed by a file name. If the flag is supplied, the compiler will write information about the input tables and output views in JSON in the supplied file name. Here is an example of the generated JSON for the following program:

CREATE TABLE T (
COL1 INT NOT NULL
, COL2 DOUBLE NOT NULL
, COL3 VARCHAR(3) NOT NULL PRIMARY KEY
, COL4 VARCHAR(3) ARRAY
)

CREATE VIEW V AS SELECT COL1 AS xCol FROM T
CREATE VIEW V1 (yCol) AS SELECT COL1 FROM T
```sql

Output:

```json
{
"inputs" : [ {
"name" : "T",
"fields" : [ {
"name" : "COL1",
"case_sensitive" : false,
"columntype" : {
"type" : "INTEGER",
"nullable" : false
}
}, {
"name" : "COL2",
"case_sensitive" : false,
"columntype" : {
"type" : "DOUBLE",
"nullable" : false
}
}, {
"name" : "COL3",
"case_sensitive" : false,
"columntype" : {
"type" : "VARCHAR",
"nullable" : false,
"precision" : 3
}
}, {
"name" : "COL4",
"case_sensitive" : false,
"columntype" : {
"type" : "ARRAY",
"nullable" : true,
"component" : {
"type" : "VARCHAR",
"nullable" : false,
"precision" : 3
}
}
} ],
"primary_key" : [ "COL3" ]
} ],
"outputs" : [ {
"name" : "V",
"fields" : [ {
"name" : "xCol",
"case_sensitive" : false,
"columntype" : {
"type" : "INTEGER",
"nullable" : false
}
} ]
}, {
"name" : "V1",
"fields" : [ {
"name" : "yCol",
"case_sensitive" : true,
"columntype" : {
"type" : "INTEGER",
"nullable" : false
}
} ]
} ]
}