Source code for tests.platform.test_ingress_formats

# TODO: these tests should be part of runtime tests

import json
import time
from http import HTTPStatus
from urllib.parse import quote

from .helper import (
    post_json,
    http_request,
    wait_for_program_success,
    api_url,
    start_pipeline,
    gen_pipeline_name,
    adhoc_query_json,
)


[docs] def create_pipeline(name: str, sql: str): r = post_json(api_url("/pipelines"), {"name": name, "program_code": sql}) assert r.status_code == HTTPStatus.CREATED, r.text wait_for_program_success(name, 1)
def _ingress( pipeline: str, table: str, body: str, *, format: str = "json", update_format: str = "raw", array: bool = False, content_type: str | None = None, ): params = [f"format={format}", f"update_format={update_format}"] if array: params.append("array=true") path = api_url( f"/pipelines/{pipeline}/ingress/{table}" + ("?" + "&".join(params) if params else "") ) headers = {} if content_type: headers["Content-Type"] = content_type else: headers["Content-Type"] = ( "application/json" if format == "json" else "text/plain" ) return http_request("POST", path, data=body.encode("utf-8"), headers=headers) def _change_stream_start(pipeline: str, object_name: str): # object_name may already has quotes around it; if so, percent-encode them. if object_name.startswith('"') and object_name.endswith('"'): encoded = quote(object_name, safe="") else: encoded = object_name path = api_url( f"/pipelines/{pipeline}/egress/{encoded}?format=json&backpressure=true" ) r = http_request("POST", path, stream=True) return r def _read_json_events(resp, expected_count: int, timeout_s: float = 10.0): """ Read expected_count JSON events from a streaming response. """ events = [] start = time.time() for line in resp.iter_lines(): if not line: continue try: for data in json.loads(line.decode("utf-8")).get("json_data"): events.append(data) except Exception as e: # noqa: BLE001 raise AssertionError(f"Invalid JSON line: {line!r} ({e})") if len(events) >= expected_count: break if time.time() - start > timeout_s: raise TimeoutError( f"Timeout reading events (wanted {expected_count}, got {len(events)})" ) return events
[docs] class JsonLineReader: def __init__(self, resp): self.resp = resp self._iter = resp.iter_lines()
[docs] def read_events(self, n, timeout_s=10.0): events, start = [], time.time() while len(events) < n: try: line = next(self._iter) except StopIteration: # server closed the stream break if not line: if time.time() - start > timeout_s: raise TimeoutError( f"Timeout waiting for {n} events, got {len(events)}" ) continue try: payload = json.loads(line.decode("utf-8")) for data in payload.get("json_data") or []: events.append(data) if len(events) >= n: break except Exception as e: raise AssertionError(f"Invalid JSON line: {line!r} ({e})") return events
[docs] @gen_pipeline_name def test_json_ingress(pipeline_name): """ Exercise raw inserts, insert_delete format, array format, parse errors, debezium update, and CSV ingestion with parse error. """ sql = ( "CREATE TABLE t1(c1 integer, c2 bool, c3 varchar) " "WITH ('materialized' = 'true'); " "CREATE MATERIALIZED VIEW v1 AS SELECT * FROM t1;" ) create_pipeline(pipeline_name, sql) start_pipeline(pipeline_name) # Raw format (missing some fields) r = _ingress( pipeline_name, "T1", '{"c1":10,"c2":true}\n{"c1":20,"c3":"foo"}', format="json", update_format="raw", ) assert r.status_code == HTTPStatus.OK, r.text got = adhoc_query_json(pipeline_name, "select * from t1 order by c1, c2, c3") assert got == [ {"c1": 10, "c2": True, "c3": None}, {"c1": 20, "c2": None, "c3": "foo"}, ] # insert_delete format (delete and new insert) r = _ingress( pipeline_name, "t1", '{"delete":{"c1":10,"c2":true}}\n{"insert":{"c1":30,"c3":"bar"}}', format="json", update_format="insert_delete", ) assert r.status_code == HTTPStatus.OK got = adhoc_query_json(pipeline_name, "select * from t1 order by c1, c2, c3") assert got == [ {"c1": 20, "c2": None, "c3": "foo"}, {"c1": 30, "c2": None, "c3": "bar"}, ] # Insert via JSON array style r = _ingress( pipeline_name, "T1", '{"insert":[40,true,"buzz"]}', format="json", update_format="insert_delete", ) assert r.status_code == HTTPStatus.OK # Use array of updates instead of newline-delimited JSON r = _ingress( pipeline_name, "T1", '[{"delete":[40,true,"buzz"]},{"insert":[50,true,""]}]', format="json", update_format="insert_delete", array=True, ) assert r.status_code == HTTPStatus.OK got = adhoc_query_json(pipeline_name, "select * from T1 order by c1, c2, c3") # Expect 20,30,50 assert got == [ {"c1": 20, "c2": None, "c3": "foo"}, {"c1": 30, "c2": None, "c3": "bar"}, {"c1": 50, "c2": True, "c3": ""}, ] # Trigger parse errors with array=true (some invalid types) bad_payload = ( '[{"insert":[35,true,""]},' '{"delete":[40,"foo","buzz"]},' '{"insert":[true,true,""]}]' ) r = _ingress( pipeline_name, "T1", bad_payload, format="json", update_format="insert_delete", array=True, ) assert r.status_code == HTTPStatus.BAD_REQUEST, r.text assert "Errors parsing input data (2 errors)" in r.text # Even records that are parsed successfully don't get ingested when # using array format got = adhoc_query_json(pipeline_name, "select * from T1 order by c1, c2, c3") # Expect 20,30,50 assert got == [ {"c1": 20, "c2": None, "c3": "foo"}, {"c1": 30, "c2": None, "c3": "bar"}, {"c1": 50, "c2": True, "c3": ""}, ] # Debezium CDC style ('u' update) r = _ingress( pipeline_name, "T1", '{"payload":{"op":"u","before":[50,true,""],"after":[60,true,"hello"]}}', format="json", update_format="debezium", ) assert r.status_code == HTTPStatus.OK got = adhoc_query_json(pipeline_name, "select * from t1 order by c1, c2, c3") assert got[-1] == {"c1": 60, "c2": True, "c3": "hello"} # CSV with a parse error in the second row, # (the second record is invalid, but the other two should # get ingested). csv_body = "15,true,foo\nnot_a_number,true,ΑαΒβΓγΔδ\n16,false,unicode🚲" r = _ingress( pipeline_name, "t1", csv_body, format="csv", update_format="raw", content_type="text/csv", ) # Expect BAD_REQUEST due to parse error, but first and last ingested assert r.status_code == HTTPStatus.BAD_REQUEST, r.text assert "Errors parsing input data (1 errors)" in r.text got = adhoc_query_json(pipeline_name, "select * from t1 order by c1, c2, c3") # Verify 15 & 16 present along with earlier rows (20,30,60, etc.) assert any(row["c1"] == 15 for row in got) assert any(row["c1"] == 16 for row in got)
[docs] @gen_pipeline_name def test_map_column(pipeline_name): """ Table with column of type MAP """ sql = ( "CREATE TABLE t1(c1 integer, c2 bool, c3 MAP<varchar,varchar>) " "WITH ('materialized'='true'); CREATE VIEW v1 AS SELECT * FROM t1;" ) create_pipeline(pipeline_name, sql) start_pipeline(pipeline_name) r = _ingress( pipeline_name, "T1", '{"c1":10,"c2":true,"c3":{"foo":"1","bar":"2"}}\n{"c1":20}', format="json", update_format="raw", ) assert r.status_code == HTTPStatus.OK got = adhoc_query_json(pipeline_name, "select * from t1 order by c1") assert got == [ {"c1": 10, "c2": True, "c3": {"bar": "2", "foo": "1"}}, {"c1": 20, "c2": None, "c3": None}, ]
[docs] @gen_pipeline_name def test_parse_datetime(pipeline_name): sql = "CREATE TABLE t1(t TIME, ts TIMESTAMP, d DATE) WITH ('materialized'='true');" create_pipeline(pipeline_name, sql) start_pipeline(pipeline_name) r = _ingress( pipeline_name, "t1", '{"t":"13:22:00","ts":"2021-05-20 12:12:33","d":"2021-05-20"}\n' '{"t":" 11:12:33.483221092 ","ts":" 2024-02-25 12:12:33 ","d":" 2024-02-25 "}', format="json", update_format="raw", ) assert r.status_code == HTTPStatus.OK # Order by normalized got = adhoc_query_json(pipeline_name, "select * from t1 order by t, ts, d") # Compare normalized strings assert any(row["t"] == "11:12:33.483221092" for row in got) assert any(row["t"] == "13:22:00" for row in got)
[docs] @gen_pipeline_name def test_quoted_columns(pipeline_name): sql = ( 'CREATE TABLE t1("c1" integer not null,"C2" bool not null,"😁❤" varchar not null,' "\"αβγ\" boolean not null, ΔΘ boolean not null) WITH ('materialized'='true');" ) create_pipeline(pipeline_name, sql) start_pipeline(pipeline_name) r = _ingress( pipeline_name, "T1", '{"c1":10,"C2":true,"😁❤":"foo","αβγ":true,"δθ":false}', format="json", update_format="raw", ) assert r.status_code == HTTPStatus.OK got = adhoc_query_json(pipeline_name, 'select * from t1 order by "c1"') assert got == [{"c1": 10, "C2": True, "😁❤": "foo", "αβγ": True, "δθ": False}]
[docs] @gen_pipeline_name def test_primary_keys(pipeline_name): """ Port of primary_keys: test insert/update/delete semantics with primary key. """ sql = ( "CREATE TABLE t1(id bigint not null, s varchar not null, primary key(id)) " "WITH ('materialized'='true');" ) create_pipeline(pipeline_name, sql) start_pipeline(pipeline_name) # Insert two rows r = _ingress( pipeline_name, "T1", '{"insert":{"id":1,"s":"1"}}\n{"insert":{"id":2,"s":"2"}}', format="json", update_format="insert_delete", ) assert r.status_code == HTTPStatus.OK got = adhoc_query_json(pipeline_name, "select * from t1 order by id") assert got == [{"id": 1, "s": "1"}, {"id": 2, "s": "2"}] # Modify: insert (overwrite id=1) and update id=2 r = _ingress( pipeline_name, "T1", '{"insert":{"id":1,"s":"1-modified"}}\n{"update":{"id":2,"s":"2-modified"}}', format="json", update_format="insert_delete", ) assert r.status_code == HTTPStatus.OK got = adhoc_query_json(pipeline_name, "select * from t1 order by id") assert got == [ {"id": 1, "s": "1-modified"}, {"id": 2, "s": "2-modified"}, ] # Delete id=2 r = _ingress( pipeline_name, "T1", '{"delete":{"id":2}}', format="json", update_format="insert_delete", ) assert r.status_code == HTTPStatus.OK got = adhoc_query_json(pipeline_name, "select * from t1 order by id") assert got == [{"id": 1, "s": "1-modified"}]
[docs] @gen_pipeline_name def test_case_sensitive_tables(pipeline_name): """ - Distinguish between quoted and unquoted identifiers. - Validate streaming outputs for two views. """ sql = ( 'CREATE TABLE "TaBle1"(id bigint not null);' "CREATE TABLE table1(id bigint);" 'CREATE MATERIALIZED VIEW "V1" AS SELECT * FROM "TaBle1";' 'CREATE MATERIALIZED VIEW "v1" AS SELECT * FROM table1;' ) create_pipeline(pipeline_name, sql) start_pipeline(pipeline_name) stream_v1 = _change_stream_start(pipeline_name, '"V1"') stream_v1_lower = _change_stream_start(pipeline_name, '"v1"') # Ingest into quoted "TaBle1" r = _ingress( pipeline_name, quote('"TaBle1"', safe=""), '{"insert":{"id":1}}', format="json", update_format="insert_delete", ) assert r.status_code == HTTPStatus.OK # Ingest into unquoted table1 r = _ingress( pipeline_name, "table1", '{"insert":{"id":2}}', format="json", update_format="insert_delete", ) assert r.status_code == HTTPStatus.OK ev_v1 = _read_json_events(stream_v1, 1) ev_v1_lower = _read_json_events(stream_v1_lower, 1) assert ev_v1 == [{"insert": {"id": 1}}] assert ev_v1_lower == [{"insert": {"id": 2}}] # Validate adhoc queries respect case q1 = adhoc_query_json(pipeline_name, 'select * from "V1"') q2 = adhoc_query_json(pipeline_name, "select * from v1") assert q1 == [{"id": 1}] assert q2 == [{"id": 2}]
[docs] @gen_pipeline_name def test_duplicate_outputs(pipeline_name): """ multiple inserts producing duplicate output values. """ sql = ( "CREATE TABLE t1(id bigint not null, s varchar not null); " "CREATE VIEW v1 AS SELECT s FROM t1;" ) create_pipeline(pipeline_name, sql) start_pipeline(pipeline_name) stream = _change_stream_start(pipeline_name, "V1") reader = JsonLineReader(stream) # First batch r = _ingress( pipeline_name, "T1", '{"insert":{"id":1,"s":"1"}}\n{"insert":{"id":2,"s":"2"}}', format="json", update_format="insert_delete", ) assert r.status_code == HTTPStatus.OK evs = reader.read_events(2) assert evs == [{"insert": {"s": "1"}}, {"insert": {"s": "2"}}] # Second batch r = _ingress( pipeline_name, "T1", '{"insert":{"id":3,"s":"3"}}\n{"insert":{"id":4,"s":"4"}}', format="json", update_format="insert_delete", ) assert r.status_code == HTTPStatus.OK evs = reader.read_events(2) assert evs == [{"insert": {"s": "3"}}, {"insert": {"s": "4"}}] # Duplicates r = _ingress( pipeline_name, "T1", '{"insert":{"id":5,"s":"1"}}\n{"insert":{"id":6,"s":"2"}}', format="json", update_format="insert_delete", ) assert r.status_code == HTTPStatus.OK evs = reader.read_events(2) assert evs == [{"insert": {"s": "1"}}, {"insert": {"s": "2"}}]
[docs] @gen_pipeline_name def test_upsert(pipeline_name): """ - Insert several rows with composite PK. - Perform updates/inserts overwriting existing rows. - Perform no-op updates and deletes of non-existing keys. """ sql = ( "CREATE TABLE t1(" "id1 bigint not null," "id2 bigint not null," "str1 varchar not null," "str2 varchar," "int1 bigint not null," "int2 bigint," "primary key(id1,id2)) " "WITH ('materialized'='true');" ) create_pipeline(pipeline_name, sql) start_pipeline(pipeline_name) stream = _change_stream_start(pipeline_name, "T1") reader = JsonLineReader(stream) # Initial inserts (array=true) r = _ingress( pipeline_name, "T1", '[{"insert":{"id1":1,"id2":1,"str1":"1","int1":1}},' '{"insert":{"id1":2,"id2":1,"str1":"1","int1":1}},' '{"insert":{"id1":3,"id2":1,"str1":"1","int1":1}}]', format="json", update_format="insert_delete", array=True, ) assert r.status_code == HTTPStatus.OK evs = reader.read_events(3) assert evs == [ { "insert": { "id1": 1, "id2": 1, "str1": "1", "str2": None, "int1": 1, "int2": None, } }, { "insert": { "id1": 2, "id2": 1, "str1": "1", "str2": None, "int1": 1, "int2": None, } }, { "insert": { "id1": 3, "id2": 1, "str1": "1", "str2": None, "int1": 1, "int2": None, } }, ] # Mixed updates r = _ingress( pipeline_name, "T1", '[{"update":{"id1":1,"id2":1,"str1":"2"}},' '{"update":{"id1":2,"id2":1,"str2":"foo"}},' '{"insert":{"id1":3,"id2":1,"str1":"1","str2":"2","int1":3,"int2":33}}]', format="json", update_format="insert_delete", array=True, ) assert r.status_code == HTTPStatus.OK evs = reader.read_events(6) assert evs == [ { "delete": { "id1": 1, "id2": 1, "str1": "1", "str2": None, "int1": 1, "int2": None, } }, { "delete": { "id1": 2, "id2": 1, "str1": "1", "str2": None, "int1": 1, "int2": None, } }, { "delete": { "id1": 3, "id2": 1, "str1": "1", "str2": None, "int1": 1, "int2": None, } }, { "insert": { "id1": 1, "id2": 1, "str1": "2", "str2": None, "int1": 1, "int2": None, } }, { "insert": { "id1": 2, "id2": 1, "str1": "1", "str2": "foo", "int1": 1, "int2": None, } }, { "insert": { "id1": 3, "id2": 1, "str1": "1", "str2": "2", "int1": 3, "int2": 33, } }, ] # No-op / mixed operations (some won't generate output) r = _ingress( pipeline_name, "T1", '[{"update":{"id1":1,"id2":1}},' '{"update":{"id1":2,"id2":1,"str2":null}},' '{"delete":{"id1":3,"id2":1}},' '{"delete":{"id1":4,"id2":1}},' '{"update":{"id1":4,"id2":1,"int1":0,"str1":""}}]', format="json", update_format="insert_delete", array=True, ) assert r.status_code == HTTPStatus.OK # Expect 3 events: delete (id2=1 id1=2 old str2=foo), delete (id1=3...), insert (id1=2 updated str2 null) evs = reader.read_events(3) assert evs == [ { "delete": { "id1": 2, "id2": 1, "str1": "1", "str2": "foo", "int1": 1, "int2": None, } }, { "delete": { "id1": 3, "id2": 1, "str1": "1", "str2": "2", "int1": 3, "int2": 33, } }, { "insert": { "id1": 2, "id2": 1, "str1": "1", "str2": None, "int1": 1, "int2": None, } }, ]