Source code for tests.runtime.test_adhoc_queries

import json
import os
import tempfile
from pathlib import Path

from tests import TEST_CLIENT
from tests.shared_test_pipeline import SharedTestPipeline, sql


[docs] class TestAdhocQueries(SharedTestPipeline): @property def pipeline(self): return self.p def _count(self, sql: str) -> int: rows = list(self.pipeline.query(sql)) if not rows: return 0 row = rows[0] for k in ("c", "count", "count(*)"): if k in row: return row[k] # Fallback: first value return next(iter(row.values()))
[docs] @sql(""" CREATE TABLE not_materialized(id bigint not null); CREATE TABLE t1 ( id INT NOT NULL, dt DATE NOT NULL, uid UUID NOT NULL ) WITH ( 'materialized' = 'true', 'connectors' = '[{ "transport": { "name": "datagen", "config": { "plan": [{ "limit": 5 }] } } }]' ); CREATE TABLE t2 ( id INT NOT NULL, st VARCHAR NOT NULL ) WITH ( 'materialized' = 'true', 'connectors' = '[{ "transport": { "name": "datagen", "config": { "plan": [{ "limit": 5 }] } } }]' ); CREATE MATERIALIZED VIEW joined AS ( SELECT t1.dt AS c1, t2.st AS c2, t1.uid as c3 FROM t1, t2 WHERE t1.id = t2.id ); CREATE MATERIALIZED VIEW view_of_not_materialized AS ( SELECT * FROM not_materialized ); """) def test_pipeline_adhoc_query(self): self.pipeline.start() ADHOC_SQL_A = "SELECT * FROM joined" ADHOC_SQL_B = "SELECT t1.dt AS c1, t2.st AS c2, t1.uid as c3 FROM t1, t2 WHERE t1.id = t2.id" ADHOC_SQL_INSERT = ( "INSERT INTO t1 VALUES " "(99,'2020-01-01','c32d330f-5757-4ada-bcf6-1fac2d54e37f')," "(100,'2020-01-01','00000000-0000-0000-0000-000000000000')" ) # Baseline row count assert self._count("SELECT COUNT(*) AS c FROM t1") == 5 # Compare logically equivalent queries in text + json formats. # Text format via TEST_CLIENT (ASCII table) text_a = "\n".join(TEST_CLIENT.query_as_text(self.pipeline.name, ADHOC_SQL_A)) text_b = "\n".join(TEST_CLIENT.query_as_text(self.pipeline.name, ADHOC_SQL_B)) def _extract_table_lines(txt: str): return { ln.strip() for ln in txt.splitlines() if ln.strip() and not ln.startswith("+") and not ln.startswith("| c1") } assert _extract_table_lines(text_a) == _extract_table_lines(text_b) # JSON lines (unordered compare) json_a = list(TEST_CLIENT.query_as_json(self.pipeline.name, ADHOC_SQL_A)) json_b = list(TEST_CLIENT.query_as_json(self.pipeline.name, ADHOC_SQL_B)) def norm(rows): return sorted((json.dumps(r, sort_keys=True) for r in rows)) assert norm(json_a) == norm(json_b) # Parquet comparison with ORDER BY to establish deterministic ordering. pf1 = tempfile.NamedTemporaryFile( prefix="adhoc_a_", suffix=".parquet", delete=False ) pf2 = tempfile.NamedTemporaryFile( prefix="adhoc_b_", suffix=".parquet", delete=False ) pf1.close() pf2.close() try: TEST_CLIENT.query_as_parquet( self.pipeline.name, f"{ADHOC_SQL_A} ORDER BY c1,c2,c3", pf1.name[:-8] ) TEST_CLIENT.query_as_parquet( self.pipeline.name, f"{ADHOC_SQL_B} ORDER BY c1,c2,c3", pf2.name[:-8] ) b1 = Path(pf1.name).read_bytes() b2 = Path(pf2.name).read_bytes() assert b1, "Parquet output empty" assert b1 == b2, "Parquet outputs differ for equivalent queries" finally: for p in ( pf1.name, pf2.name, pf1.name[:-8] + ".parquet", pf2.name[:-8] + ".parquet", ): try: os.remove(p) except OSError: pass # Case-sensitive quoted table quoted_rows = list(self.pipeline.query('SELECT * FROM "TaBle1"')) assert quoted_rows == [], "Expected quoted TaBle1 to be empty initially" # Invalid table -> expect error invalid_ok = False try: print(list(self.pipeline.query("SELECT * FROM invalid_table"))) except Exception: invalid_ok = True assert invalid_ok, "Expected querying invalid_table to raise" # Expression runtime error (division by zero) - expect textual ERROR error_text = "\n".join( TEST_CLIENT.query_as_text(self.pipeline.name, "SELECT 1/0") ) assert "ERROR" in error_text.upper() # Non-materialized table direct access should fail res = list(self.pipeline.query("SELECT * FROM not_materialized")) assert res and "error" in res[0] # INSERT into materialized t1 using JSON query endpoint (to retrieve count row) insert_resp = list( TEST_CLIENT.query_as_json(self.pipeline.name, ADHOC_SQL_INSERT) ) assert insert_resp and insert_resp[0].get("count") == 2 assert self._count("SELECT COUNT(*) AS c FROM t1") == 7 # Non-materialized table via its materialized view assert self._count("SELECT COUNT(*) AS c FROM view_of_not_materialized") == 0 ins_nm = list( TEST_CLIENT.query_as_json( self.pipeline.name, "INSERT INTO not_materialized VALUES (99),(100)" ) ) assert ins_nm and ins_nm[0].get("count") == 2 assert self._count("SELECT COUNT(*) AS c FROM view_of_not_materialized") == 2
[docs] @sql( """CREATE TABLE "TaBle1"(id bigint not null) WITH ('materialized' = 'true');""" ) def test_pipeline_adhoc_query_empty(self): self.pipeline.start() rows = list(self.pipeline.query('SELECT COUNT(*) AS c FROM "TaBle1"')) c = rows[0].get("c") print(rows, rows[0], c) assert c == 0