import unittest
from feldera import PipelineBuilder, Pipeline
from tests import TEST_CLIENT
[docs]
class NegativeCompilationTests(unittest.TestCase):
[docs]
def test_sql_error(self):
pipeline_name = "sql_error"
sql = """
CREATE TABLE student(
id INT,
name STRING
);
CREATE VIEW s AS SELECT * FROM blah;
"""
expected = f"""
Pipeline {pipeline_name} failed to compile:
Compilation error
From line 5, column 32 to line 5, column 35: Object 'blah' not found
Code snippet:
5|CREATE VIEW s AS SELECT * FROM blah;
^^^^""".strip()
with self.assertRaises(Exception) as err:
PipelineBuilder(
TEST_CLIENT, name=pipeline_name, sql=sql
).create_or_replace()
got_err: str = err.exception.args[0].strip()
assert expected == got_err
pipeline = Pipeline.get("sql_error", TEST_CLIENT)
pipeline.clear_storage()
[docs]
def test_rust_error(self):
pipeline_name = "rust_error"
sql = ""
with self.assertRaises(Exception) as err:
PipelineBuilder(
TEST_CLIENT, name=pipeline_name, sql=sql, udf_rust="Davy Jones"
).create_or_replace()
assert "Davy Jones" in err.exception.args[0].strip()
[docs]
def test_program_error0(self):
sql = "create taabl;"
name = "test_program_error0"
try:
_ = PipelineBuilder(TEST_CLIENT, name, sql).create_or_replace()
except Exception:
pass
pipeline = Pipeline.get(name, TEST_CLIENT)
err = pipeline.program_error()
assert err["sql_compilation"] != 0
pipeline.stop(force=True)
pipeline.clear_storage()
[docs]
def test_program_error1(self):
sql = ""
name = "test_program_error1"
_ = PipelineBuilder(TEST_CLIENT, name, sql).create_or_replace()
pipeline = Pipeline.get(name, TEST_CLIENT)
err = pipeline.program_error()
assert err["sql_compilation"]["exit_code"] == 0
assert err["rust_compilation"]["exit_code"] == 0
pipeline.stop(force=True)
pipeline.clear_storage()
[docs]
def test_errors0(self):
sql = "SELECT invalid"
name = "test_errors0"
try:
_ = PipelineBuilder(TEST_CLIENT, name, sql).create_or_replace()
except Exception:
pass
pipeline = Pipeline.get(name, TEST_CLIENT)
assert pipeline.errors()[0]["sql_compilation"]["exit_code"] != 0
[docs]
def test_initialization_error(self):
sql = """
CREATE TABLE t0 (
c0 INT NOT NULL
) with (
'connectors' = '[{
"transport": {
"name": "datagen",
"config": {
"plan": [{
"fields": {
"c1": { "strategy": "uniform", "range": [100, 10000] }
}
}]
}
}
}]'
);
"""
pipeline = PipelineBuilder(
TEST_CLIENT, name="test_initialization_error", sql=sql
).create_or_replace()
with self.assertRaises(RuntimeError) as err:
pipeline.start()
pipeline.stop(force=True)
pipeline.clear_storage()
got_err: str = err.exception.args[0].strip()
assert "Unable to START the pipeline" in got_err
if __name__ == "__main__":
unittest.main()