Source code for tests.platform.test_metrics_logs

import json
import time
from http import HTTPStatus
from urllib.parse import quote_plus

from .helper import (
    get,
    post_json,
    post_no_body,
    http_request,
    wait_for_program_success,
    api_url,
    start_pipeline,
    stop_pipeline,
    clear_pipeline,
    pause_pipeline,
    gen_pipeline_name,
)


def _ingest_lines(name: str, table: str, body: str):
    r = http_request(
        "POST",
        api_url(f"/pipelines/{name}/ingress/{table}"),
        headers={"Content-Type": "text/plain"},
        data=body.encode("utf-8"),
    )
    assert r.status_code in (HTTPStatus.OK, HTTPStatus.ACCEPTED), (
        r.status_code,
        r.text,
    )
    return r


def _adhoc_count(name: str, table: str) -> int:
    path = api_url(
        f"/pipelines/{name}/query?sql={quote_plus(f'SELECT COUNT(*) AS c FROM {table}')}&format=json"
    )
    r = get(path)
    if r.status_code != HTTPStatus.OK:
        return -1
    txt = r.text.strip()
    if not txt:
        return 0
    line = json.loads(txt.split("\n")[0])
    return line.get("c") or 0


[docs] @gen_pipeline_name def test_pipeline_metrics(pipeline_name): """ Tests that circuit metrics can be retrieved from the pipeline. """ r = post_json(api_url("/pipelines"), {"name": pipeline_name, "program_code": ""}) assert r.status_code == HTTPStatus.CREATED wait_for_program_success(pipeline_name, 1) pause_pipeline(pipeline_name) # Default r_default = get(api_url(f"/pipelines/{pipeline_name}/metrics")) assert r_default.status_code == HTTPStatus.OK text_default = r_default.text # Prometheus r_prom = get(api_url(f"/pipelines/{pipeline_name}/metrics?format=prometheus")) assert r_prom.status_code == HTTPStatus.OK text_prom = r_prom.text # JSON r_json = get(api_url(f"/pipelines/{pipeline_name}/metrics?format=json")) assert r_json.status_code == HTTPStatus.OK parsed_json = json.loads(r_json.text) assert isinstance(parsed_json, list), "Expected JSON metrics array" # Invalid r_bad = get(api_url(f"/pipelines/{pipeline_name}/metrics?format=does-not-exist")) assert r_bad.status_code == HTTPStatus.BAD_REQUEST # Minimal checks assert "# TYPE records_processed_total counter" in text_default assert "# TYPE records_processed_total counter" in text_prom assert any(m.get("key") == "records_processed_total" for m in parsed_json), ( "records_processed_total missing in JSON metrics" )
[docs] @gen_pipeline_name def test_pipeline_stats(pipeline_name): """ Tests retrieving pipeline statistics via `/stats`. """ sql = """ CREATE TABLE t1(c1 INT) WITH ( 'materialized'='true', 'connectors'='[{ "transport":{ "name":"datagen", "config":{"plan":[{"limit":5,"rate":1000}]} } }]' ); CREATE MATERIALIZED VIEW v1 AS SELECT * FROM t1; """.strip() r = post_json(api_url("/pipelines"), {"name": pipeline_name, "program_code": sql}) assert r.status_code == HTTPStatus.CREATED, r.text wait_for_program_success(pipeline_name, 1) start_pipeline(pipeline_name) # Create output connector on v1 (egress) r_out = post_no_body(api_url(f"/pipelines/{pipeline_name}/egress/v1"), stream=True) assert r_out.status_code == HTTPStatus.OK, (r_out.status_code, r_out.text) # Wait for datagen completion time.sleep(3) deadline = time.time() + 10 while time.time() < deadline: cnt = _adhoc_count(pipeline_name, "t1") if cnt == 5: break time.sleep(1) assert _adhoc_count(pipeline_name, "t1") == 5, "Did not ingest expected 5 rows" r_stats = get(api_url(f"/pipelines/{pipeline_name}/stats")) assert r_stats.status_code == HTTPStatus.OK, (r_stats.status_code, r_stats.text) stats = r_stats.json() keys = sorted(stats.keys()) assert keys == ["global_metrics", "inputs", "outputs", "suspend_error"] gm = stats["global_metrics"] assert gm.get("state") == "Running" assert gm.get("total_input_records") == 5 assert gm.get("total_processed_records") == 5 assert gm.get("pipeline_complete") assert gm.get("buffered_input_records") == 0 assert gm.get("buffered_input_bytes") == 0 inputs = stats["inputs"] assert isinstance(inputs, list) and len(inputs) == 1 inp = inputs[0] assert inp.get("config", {}).get("stream") == "t1" assert inp.get("metrics", {}).get("buffered_bytes") == 0 assert inp.get("metrics", {}).get("buffered_records") == 0 assert inp.get("metrics", {}).get("end_of_input") assert inp.get("metrics", {}).get("num_parse_errors") == 0 assert inp.get("metrics", {}).get("num_transport_errors") == 0 assert inp.get("metrics", {}).get("total_bytes") == 40 assert inp.get("metrics", {}).get("total_records") == 5 outputs = stats["outputs"] assert isinstance(outputs, list) and len(outputs) == 1 out = outputs[0] assert out.get("config", {}).get("stream") == "v1" assert out.get("metrics", {}).get("total_processed_input_records") == 5 # /time_series r_ts = get(api_url(f"/pipelines/{pipeline_name}/time_series")) assert r_ts.status_code == HTTPStatus.OK, r_ts.text ts = r_ts.json() samples = ts.get("samples") or [] assert len(samples) > 1, f"Expected >=2 samples, got {len(samples)}" last = samples[-1] assert last.get("r") == 5
[docs] @gen_pipeline_name def test_pipeline_logs(pipeline_name): """ - Logs 404 before pipeline creation. - Create pipeline; poll until logs return 200. - Pause / start / stop / clear transitions keep logs accessible (200). - After delete, logs eventually return 404 again. """ # 404 before creation r = get(api_url(f"/pipelines/{pipeline_name}/logs")) assert r.status_code == HTTPStatus.NOT_FOUND # Create pipeline r = post_json( api_url("/pipelines"), { "name": pipeline_name, "program_code": "CREATE TABLE t1(c1 INTEGER) WITH ('materialized'='true');", }, ) assert r.status_code == HTTPStatus.CREATED, r.text wait_for_program_success(pipeline_name, 1) # Poll for logs availability deadline = time.time() + 30 while time.time() < deadline: resp = get(api_url(f"/pipelines/{pipeline_name}/logs"), stream=True) if resp.status_code == HTTPStatus.OK: break elif resp.status_code == HTTPStatus.NOT_FOUND: time.sleep(0.5) continue else: raise AssertionError( f"Unexpected status while waiting for logs: {resp.status_code} {resp.text}" ) else: raise TimeoutError("Logs did not become available in time") # Pause pipeline pause_pipeline(pipeline_name) assert ( get(api_url(f"/pipelines/{pipeline_name}/logs"), stream=True).status_code == HTTPStatus.OK ) # Start pipeline start_pipeline(pipeline_name) assert ( get(api_url(f"/pipelines/{pipeline_name}/logs"), stream=True).status_code == HTTPStatus.OK ) # Stop force stop_pipeline(pipeline_name, force=True) assert ( get(api_url(f"/pipelines/{pipeline_name}/logs"), stream=True).status_code == HTTPStatus.OK ) # Clear storage clear_pipeline(pipeline_name) # Logs should remain accessible assert ( get(api_url(f"/pipelines/{pipeline_name}/logs"), stream=True).status_code == HTTPStatus.OK ) # Delete pipeline dr = http_request("DELETE", api_url(f"/pipelines/{pipeline_name}")) assert dr.status_code in (HTTPStatus.OK, HTTPStatus.ACCEPTED), ( dr.status_code, dr.text, ) # Poll until logs become unavailable (404) deadline = time.time() + 30 while time.time() < deadline: resp = get(api_url(f"/pipelines/{pipeline_name}/logs"), stream=True) if resp.status_code == HTTPStatus.NOT_FOUND: break elif resp.status_code == HTTPStatus.OK: time.sleep(0.5) continue else: raise AssertionError( f"Unexpected status while waiting for logs to disappear: {resp.status_code} {resp.text}" ) else: raise TimeoutError("Logs did not become unavailable after deletion")