Source code for tests.platform.test_pipeline_crud

from .helper import (
    API_PREFIX,
    HTTPStatus,
    get,
    post_json,
    put_json,
    patch_json,
    delete,
    wait_for_program_success,
    gen_pipeline_name,
    cleanup_pipeline,
)

# Field selector constants

PIPELINE_FIELD_SELECTOR_ALL_FIELDS = [
    "id",
    "name",
    "description",
    "created_at",
    "version",
    "platform_version",
    "runtime_config",
    "program_code",
    "udf_rust",
    "udf_toml",
    "program_config",
    "program_version",
    "program_status",
    "program_status_since",
    "program_error",
    "program_info",
    "deployment_status",
    "deployment_status_since",
    "deployment_desired_status",
    "deployment_error",
    "refresh_version",
    "storage_status",
]

PIPELINE_FIELD_SELECTOR_STATUS_FIELDS = [
    "id",
    "name",
    "description",
    "created_at",
    "version",
    "platform_version",
    "program_version",
    "program_status",
    "program_status_since",
    "deployment_status",
    "deployment_status_since",
    "deployment_desired_status",
    "deployment_error",
    "refresh_version",
    "storage_status",
]


[docs] @gen_pipeline_name def test_pipeline_post(pipeline_name): name_min = pipeline_name + "-min" name_sql2 = pipeline_name + "-sql2" name_all = pipeline_name + "-all" name_missing = pipeline_name + "-missing" cleanup_pipeline(name_min) cleanup_pipeline(name_sql2) cleanup_pipeline(name_all) cleanup_pipeline(name_missing) # Empty body r = post_json(f"{API_PREFIX}/pipelines", {}) assert r.status_code == HTTPStatus.BAD_REQUEST # Name missing r = post_json(f"{API_PREFIX}/pipelines", {"program-code": ""}) assert r.status_code == HTTPStatus.BAD_REQUEST # Program SQL code missing r = post_json(f"{API_PREFIX}/pipelines", {"name": name_missing}) assert r.status_code == HTTPStatus.BAD_REQUEST # Minimum body r = post_json( f"{API_PREFIX}/pipelines", { "name": name_min, "program_code": "", }, ) assert r.status_code == HTTPStatus.CREATED pipeline = r.json() assert pipeline["name"] == name_min assert pipeline["description"] == "" assert isinstance(pipeline["runtime_config"], dict) assert pipeline["program_code"] == "" assert pipeline.get("udf_rust", "") == "" assert pipeline.get("udf_toml", "") == "" assert isinstance(pipeline["program_config"], dict) # Body with (invalid) SQL r = post_json( f"{API_PREFIX}/pipelines", { "name": name_sql2, "program_code": "sql-2", }, ) assert r.status_code == HTTPStatus.CREATED pipeline = r.json() assert pipeline["name"] == name_sql2 assert pipeline["program_code"] == "sql-2" assert pipeline["udf_rust"] == "" assert pipeline["udf_toml"] == "" # All fields r = post_json( f"{API_PREFIX}/pipelines", { "name": name_all, "description": "description-3", "runtime_config": {"workers": 123}, "program_code": "sql-3", "udf_rust": "rust-3", "udf_toml": "toml-3", "program_config": {"profile": "dev"}, }, ) assert r.status_code == HTTPStatus.CREATED pipeline = r.json() assert pipeline["name"] == name_all assert pipeline["description"] == "description-3" assert pipeline["runtime_config"]["workers"] == 123 assert pipeline["program_code"] == "sql-3" assert pipeline["udf_rust"] == "rust-3" assert pipeline["udf_toml"] == "toml-3" assert pipeline["program_config"]["profile"] == "dev"
[docs] @gen_pipeline_name def test_pipeline_get(pipeline_name): name1 = pipeline_name + "-1" name2 = pipeline_name + "-2" cleanup_pipeline(name1) cleanup_pipeline(name2) # Not found r = get(f"{API_PREFIX}/pipelines/nonexistent-pg") assert r.status_code == HTTPStatus.NOT_FOUND # List is initially empty r = get(f"{API_PREFIX}/pipelines") assert r.status_code == HTTPStatus.OK # Create first pipeline sql1 = "CREATE TABLE t1(c1 INT);" r = post_json( f"{API_PREFIX}/pipelines", {"name": name1, "program_code": sql1}, ) assert r.status_code == HTTPStatus.CREATED # Retrieve list r = get(f"{API_PREFIX}/pipelines") assert r.status_code == HTTPStatus.OK list_all = r.json() assert any(obj["name"] == name1 for obj in list_all) # Retrieve first pipeline r = get(f"{API_PREFIX}/pipelines/{name1}") assert r.status_code == HTTPStatus.OK object1_1 = r.json() # Create second pipeline sql2 = "CREATE TABLE t2(c2 INT);" r = post_json( f"{API_PREFIX}/pipelines", {"name": name2, "program_code": sql2}, ) assert r.status_code == HTTPStatus.CREATED # Retrieve list again r = get(f"{API_PREFIX}/pipelines") assert r.status_code == HTTPStatus.OK list2 = r.json() assert any(obj["name"] == name1 for obj in list2) assert any(obj["name"] == name2 for obj in list2) # Retrieve first pipeline again r = get(f"{API_PREFIX}/pipelines/{name1}") assert r.status_code == HTTPStatus.OK assert object1_1 == r.json() # Retrieve second pipeline r = get(f"{API_PREFIX}/pipelines/{name2}") assert r.status_code == HTTPStatus.OK object2_2 = r.json() assert object2_2["name"] == name2 assert object2_2["program_code"] == sql2
[docs] @gen_pipeline_name def test_pipeline_get_selector(pipeline_name): r = post_json( f"{API_PREFIX}/pipelines", {"name": pipeline_name, "program_code": "CREATE TABLE t1(c1 INT);"}, ) assert r.status_code == HTTPStatus.CREATED for base in [f"{API_PREFIX}/pipelines", f"{API_PREFIX}/pipelines/{pipeline_name}"]: for selector_value, expected_fields in [ ("", PIPELINE_FIELD_SELECTOR_ALL_FIELDS), ("all", PIPELINE_FIELD_SELECTOR_ALL_FIELDS), ("status", PIPELINE_FIELD_SELECTOR_STATUS_FIELDS), ]: if selector_value: endpoint = f"{base}?selector={selector_value}" else: endpoint = base r = get(endpoint) assert r.status_code == HTTPStatus.OK val = r.json() if isinstance(val, list): assert len(val) >= 1 obj = next(o for o in val if o["name"] == pipeline_name) else: obj = val keys = sorted(obj.keys()) assert sorted(expected_fields) == keys
[docs] @gen_pipeline_name def test_pipeline_create_compile_delete(pipeline_name): r = post_json( f"{API_PREFIX}/pipelines", { "name": pipeline_name, "description": "desc", "runtime_config": {}, "program_code": "CREATE TABLE t1(c1 INTEGER);", "program_config": {}, }, ) assert r.status_code == HTTPStatus.CREATED wait_for_program_success(pipeline_name, expected_program_version=1) # Delete dr = delete(f"{API_PREFIX}/pipelines/{pipeline_name}") assert dr.status_code == HTTPStatus.OK # Confirm gone gr = get(f"{API_PREFIX}/pipelines/{pipeline_name}") assert gr.status_code == HTTPStatus.NOT_FOUND
[docs] @gen_pipeline_name def test_pipeline_name_conflict(pipeline_name): body = { "name": pipeline_name, "description": "desc", "runtime_config": {}, "program_code": "CREATE TABLE t1(c1 INTEGER);", "program_config": {}, } r1 = post_json(f"{API_PREFIX}/pipelines", body) assert r1.status_code == HTTPStatus.CREATED # Conflict body2 = dict(body) body2["description"] = "different" body2["program_code"] = "CREATE TABLE t2(c2 VARCHAR);" r2 = post_json(f"{API_PREFIX}/pipelines", body2) assert r2.status_code == HTTPStatus.CONFLICT
[docs] def test_pipeline_name_invalid(): # Empty r = post_json( f"{API_PREFIX}/pipelines", { "name": "", "description": "", "runtime_config": {}, "program_code": "", "program_config": {}, }, ) # TODO check that there's enough information to figure out why the request is bad assert r.status_code == HTTPStatus.BAD_REQUEST # Too long r = post_json( f"{API_PREFIX}/pipelines", { "name": "a" * 101, "description": "", "runtime_config": {}, "program_code": "", "program_config": {}, }, ) # TODO check that there's enough information to figure out why the request is bad assert r.status_code == HTTPStatus.BAD_REQUEST # Invalid characters r = post_json( f"{API_PREFIX}/pipelines", { "name": "%abc", "description": "", "runtime_config": {}, "program_code": "", "program_config": {}, }, ) # TODO check that there's enough information to figure out why the request is bad assert r.status_code == HTTPStatus.BAD_REQUEST
[docs] @gen_pipeline_name def test_refresh_version(pipeline_name): # Create r = post_json( f"{API_PREFIX}/pipelines", { "name": pipeline_name, "program_code": "", }, ) assert r.status_code == HTTPStatus.CREATED obj = r.json() assert obj["refresh_version"] == 1 wait_for_program_success(pipeline_name, expected_program_version=1) obj = get(f"{API_PREFIX}/pipelines/{pipeline_name}").json() assert obj["refresh_version"] == 3 # Patch program_code r = patch_json( f"{API_PREFIX}/pipelines/{pipeline_name}", {"program_code": "CREATE TABLE t1 ( v1 INT );"}, ) assert r.status_code == HTTPStatus.OK obj = r.json() assert obj["refresh_version"] == 4 wait_for_program_success(pipeline_name, expected_program_version=2) obj = get(f"{API_PREFIX}/pipelines/{pipeline_name}").json() assert obj["refresh_version"] == 6
[docs] @gen_pipeline_name def test_pipeline_connector_endpoint_naming(pipeline_name): sql = """ CREATE TABLE t1 (i1 BIGINT) WITH ( 'connectors' = '[ { "name": "abc", "transport": { "name": "datagen", "config": {} } }, { "transport": { "name": "datagen", "config": {} } }, { "name": "def", "transport": { "name": "datagen", "config": {} } }, { "transport": { "name": "datagen", "config": {} } } ]' ); CREATE TABLE t2 (i1 BIGINT) WITH ( 'connectors' = '[ { "name": "c1", "transport": { "name": "datagen", "config": {} } } ]' ); CREATE TABLE t3 (i1 BIGINT) WITH ( 'connectors' = '[ { "transport": { "name": "datagen", "config": {} } } ]' ); CREATE MATERIALIZED VIEW v1 WITH ( 'connectors' = '[ { "transport": { "name": "kafka_output", "config": { "topic": "p1" } } }, { "name": "c1", "transport": { "name": "kafka_output", "config": { "topic": "p1" } } }, { "transport": { "name": "kafka_output", "config": { "topic": "p1" } } }, { "name": "c3", "transport": { "name": "kafka_output", "config": { "topic": "p1" } } } ]' ) AS ( SELECT * FROM t1 ); CREATE MATERIALIZED VIEW v2 WITH ( 'connectors' = '[ { "name": "c1", "transport": { "name": "kafka_output", "config": { "topic": "p1" } } } ]' ) AS ( SELECT * FROM t2 ); CREATE MATERIALIZED VIEW v3 WITH ( 'connectors' = '[ { "transport": { "name": "kafka_output", "config": { "topic": "p1" } } } ]' ) AS ( SELECT * FROM t3 ); """ r = put_json( f"{API_PREFIX}/pipelines/{pipeline_name}", {"name": pipeline_name, "program_code": sql}, ) assert r.status_code in (HTTPStatus.OK, HTTPStatus.CREATED) wait_for_program_success(pipeline_name, expected_program_version=1) obj = get(f"{API_PREFIX}/pipelines/{pipeline_name}").json() input_connectors = sorted(obj["program_info"]["input_connectors"].keys()) output_connectors = sorted(obj["program_info"]["output_connectors"].keys()) assert input_connectors == [ "t1.abc", "t1.def", "t1.unnamed-1", "t1.unnamed-3", "t2.c1", "t3.unnamed-0", ] assert output_connectors == [ "v1.c1", "v1.c3", "v1.unnamed-0", "v1.unnamed-2", "v2.c1", "v3.unnamed-0", ]