tests.platform package

Submodules

tests.platform.conftest module

tests.platform.conftest.fixture_pipeline_name(request)[source]

tests.platform.helper module

Helper utilities for platform tests.

Provides:

  • Lightweight REST wrappers (no SDK abstraction where raw status codes matter)

  • Polling helpers (compilation, generic condition)

  • Simple selector/object helpers

No automatic cleanup; pipelines are left in place for inspection after failures.

class tests.platform.helper.HTTPStatus(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: IntEnum

HTTP status codes and reason phrases

Status codes from the following RFCs are all observed:

  • RFC 7231: Hypertext Transfer Protocol (HTTP/1.1), obsoletes 2616

  • RFC 6585: Additional HTTP Status Codes

  • RFC 3229: Delta encoding in HTTP

  • RFC 4918: HTTP Extensions for WebDAV, obsoletes 2518

  • RFC 5842: Binding Extensions to WebDAV

  • RFC 7238: Permanent Redirect

  • RFC 2295: Transparent Content Negotiation in HTTP

  • RFC 2774: An HTTP Extension Framework

  • RFC 7725: An HTTP Status Code to Report Legal Obstacles

  • RFC 7540: Hypertext Transfer Protocol Version 2 (HTTP/2)

  • RFC 2324: Hyper Text Coffee Pot Control Protocol (HTCPCP/1.0)

  • RFC 8297: An HTTP Status Code for Indicating Hints

  • RFC 8470: Using Early Data in HTTP

ACCEPTED = 202
ALREADY_REPORTED = 208
BAD_GATEWAY = 502
BAD_REQUEST = 400
CONFLICT = 409
CONTINUE = 100
CREATED = 201
EARLY_HINTS = 103
EXPECTATION_FAILED = 417
FAILED_DEPENDENCY = 424
FORBIDDEN = 403
FOUND = 302
GATEWAY_TIMEOUT = 504
GONE = 410
HTTP_VERSION_NOT_SUPPORTED = 505
IM_A_TEAPOT = 418
IM_USED = 226
INSUFFICIENT_STORAGE = 507
INTERNAL_SERVER_ERROR = 500
LENGTH_REQUIRED = 411
LOCKED = 423
LOOP_DETECTED = 508
METHOD_NOT_ALLOWED = 405
MISDIRECTED_REQUEST = 421
MOVED_PERMANENTLY = 301
MULTIPLE_CHOICES = 300
MULTI_STATUS = 207
NETWORK_AUTHENTICATION_REQUIRED = 511
NON_AUTHORITATIVE_INFORMATION = 203
NOT_ACCEPTABLE = 406
NOT_EXTENDED = 510
NOT_FOUND = 404
NOT_IMPLEMENTED = 501
NOT_MODIFIED = 304
NO_CONTENT = 204
OK = 200
PARTIAL_CONTENT = 206
PAYMENT_REQUIRED = 402
PERMANENT_REDIRECT = 308
PRECONDITION_FAILED = 412
PRECONDITION_REQUIRED = 428
PROCESSING = 102
PROXY_AUTHENTICATION_REQUIRED = 407
REQUESTED_RANGE_NOT_SATISFIABLE = 416
REQUEST_ENTITY_TOO_LARGE = 413
REQUEST_HEADER_FIELDS_TOO_LARGE = 431
REQUEST_TIMEOUT = 408
REQUEST_URI_TOO_LONG = 414
RESET_CONTENT = 205
SEE_OTHER = 303
SERVICE_UNAVAILABLE = 503
SWITCHING_PROTOCOLS = 101
TEMPORARY_REDIRECT = 307
TOO_EARLY = 425
TOO_MANY_REQUESTS = 429
UNAUTHORIZED = 401
UNPROCESSABLE_ENTITY = 422
UNSUPPORTED_MEDIA_TYPE = 415
UPGRADE_REQUIRED = 426
USE_PROXY = 305
VARIANT_ALSO_NEGOTIATES = 506
property is_client_error
property is_informational
property is_redirection
property is_server_error
property is_success
tests.platform.helper.connector_action(pipeline: str, table: str, connector: str, action: str)[source]
tests.platform.helper.delete(path: str, **kw) Response[source]
tests.platform.helper.extract_object_by_name(collection: Iterable[Dict[str, Any]], name: str) Dict[str, Any][source]
tests.platform.helper.gen_pipeline_name(func)[source]

Decorator for pytest functions that automatically generates a unique pipeline name. The decorated function will receive a ‘pipeline_name’ parameter.

After the test completes, attempts to delete the pipeline but ignores any errors.

tests.platform.helper.get(path: str, **kw) Response[source]
tests.platform.helper.http_request(method: str, path: str, **kwargs) Response[source]

Low-level request wrapper (no retries). Raises only on network errors.

tests.platform.helper.patch_json(path: str, body: Dict[str, Any], **kw) Response[source]
tests.platform.helper.post_json(path: str, body: Dict[str, Any], **kw) Response[source]
tests.platform.helper.put_json(path: str, body: Dict[str, Any], **kw) Response[source]
tests.platform.helper.unique_pipeline_name(base_name: str) str[source]

In CI, multiple tests of different runs can run against the same Feldera instance, we make sure the pipeline names they use are unique by appending the first 5 characters of the commit SHA or ‘local’ if not in CI.

tests.platform.helper.wait_for_condition(description: str, predicate, timeout_s: float = 30.0, sleep_s: float = 0.2) None[source]

Generic polling helper. predicate: callable returning truthy when condition met (can be sync or async).

tests.platform.helper.wait_for_program_success(pipeline_name: str, expected_program_version: int, timeout_s: float = 1800.0, sleep_s: float = 0.5) WaitResult[source]

Poll until the pipeline’s program_status is Success and program_version >= expected_program_version.

Mirrors semantics of the Rust wait_for_compiled_program helper.

Returns a WaitResult. Raises AssertionError on compile error, TimeoutError on timeout.

tests.platform.negative_test module

class tests.platform.negative_test.NegativeCompilationTests(methodName='runTest')[source]

Bases: TestCase

test_errors0()[source]
test_initialization_error()[source]
test_program_error0()[source]
test_program_error1()[source]
test_rust_error()[source]
test_sql_error()[source]

tests.platform.test_checkpoint_suspend module

tests.platform.test_checkpoint_sync module

tests.platform.test_cluster_health module

tests.platform.test_cluster_health.test_cluster_health_check()[source]

Poll the cluster health endpoint until both runner and compiler are healthy.

  • Repeatedly query the cluster health endpoint.

  • If both runner and compiler report healthy=true, expect HTTP 200 and stop.

  • Otherwise expect HTTP 503 while waiting.

  • Timeout after 300 seconds.

tests.platform.test_cluster_health.test_health_check()[source]

Poll the /healthz endpoint until it reports overall healthy or timeouts.

Success condition:

status code 200 and body == {“status”: “healthy”}

Acceptable transient condition (database not ready yet):
status code 500 and body == {

“status”: “unhealthy: unable to reach database (see logs for further details)”

}

tests.platform.test_completion_tokens module

tests.platform.test_completion_tokens.test_completion_tokens(pipeline_name)[source]
  • Pipeline without output connectors

  • Ingest many single-row JSON events, each returning a completion token

  • Poll completion_status for each token

  • Validate the row becomes visible exactly once

tests.platform.test_completion_tokens.test_completion_tokens_with_outputs(pipeline_name)[source]
  • Pipeline with multiple file_output connectors on materialized views.

  • Ingest multiple records, verify completion tokens, and validate counts.

  • Start a paused datagen input connector, obtain a completion token through its endpoint, wait for completion, and validate resulting counts.

tests.platform.test_ingress_formats module

class tests.platform.test_ingress_formats.JsonLineReader(resp)[source]

Bases: object

read_events(n, timeout_s=10.0)[source]
tests.platform.test_ingress_formats.create_pipeline(name: str, sql: str)[source]
tests.platform.test_ingress_formats.test_case_sensitive_tables(pipeline_name)[source]
  • Distinguish between quoted and unquoted identifiers.

  • Validate streaming outputs for two views.

tests.platform.test_ingress_formats.test_duplicate_outputs(pipeline_name)[source]

multiple inserts producing duplicate output values.

tests.platform.test_ingress_formats.test_json_ingress(pipeline_name)[source]

Exercise raw inserts, insert_delete format, array format, parse errors, debezium update, and CSV ingestion with parse error.

tests.platform.test_ingress_formats.test_map_column(pipeline_name)[source]

Table with column of type MAP

tests.platform.test_ingress_formats.test_parse_datetime(pipeline_name)[source]
tests.platform.test_ingress_formats.test_primary_keys(pipeline_name)[source]

Port of primary_keys: test insert/update/delete semantics with primary key.

tests.platform.test_ingress_formats.test_quoted_columns(pipeline_name)[source]
tests.platform.test_ingress_formats.test_upsert(pipeline_name)[source]
  • Insert several rows with composite PK.

  • Perform updates/inserts overwriting existing rows.

  • Perform no-op updates and deletes of non-existing keys.

tests.platform.test_metrics_logs module

tests.platform.test_metrics_logs.test_pipeline_logs(pipeline_name)[source]
  • Logs 404 before pipeline creation.

  • Create pipeline; poll until logs return 200.

  • Pause / start / stop / clear transitions keep logs accessible (200).

  • After delete, logs eventually return 404 again.

tests.platform.test_metrics_logs.test_pipeline_metrics(pipeline_name)[source]

Tests that circuit metrics can be retrieved from the pipeline.

tests.platform.test_metrics_logs.test_pipeline_stats(pipeline_name)[source]

Tests retrieving pipeline statistics via /stats.

tests.platform.test_orchestration module

tests.platform.test_orchestration.test_pipeline_orchestration_basic(pipeline_name)[source]

Tests the orchestration of the pipeline, which means the starting and pausing of the pipeline itself as well as its connectors individually. This tests the basic processing of data and handling of case sensitivity and special characters.

tests.platform.test_orchestration.test_pipeline_orchestration_errors(pipeline_name)[source]

Port of Rust pipeline_orchestration_errors: - Validate return codes for valid/invalid pipeline & connector actions.

tests.platform.test_orchestration.test_pipeline_orchestration_scenarios(pipeline_name)[source]

Tests for orchestration that the effects (i.e., pipeline and connector state) are indeed as expected after each scenario consisting of various start and pause steps.

tests.platform.test_pipeline_builder module

class tests.platform.test_pipeline_builder.TestPipelineBuilder(methodName='runTest')[source]

Bases: TestCase

test_connector_orchestration()[source]

tests.platform.test_pipeline_configs module

tests.platform.test_pipeline_configs.test_pipeline_program_config(pipeline_name)[source]

Tests that the pipeline program configuration is validated and stored correctly, and that patching works on the field as whole.

tests.platform.test_pipeline_configs.test_pipeline_runtime_config(pipeline_name)[source]

Tests that the pipeline runtime configuration is validated and stored correctly, and that patching works on the field as whole.

tests.platform.test_pipeline_crud module

tests.platform.test_pipeline_crud.test_pipeline_connector_endpoint_naming(pipeline_name)[source]
tests.platform.test_pipeline_crud.test_pipeline_create_compile_delete(pipeline_name)[source]
tests.platform.test_pipeline_crud.test_pipeline_get(pipeline_name)[source]
tests.platform.test_pipeline_crud.test_pipeline_get_selector(pipeline_name)[source]
tests.platform.test_pipeline_crud.test_pipeline_name_conflict(pipeline_name)[source]
tests.platform.test_pipeline_crud.test_pipeline_name_invalid()[source]
tests.platform.test_pipeline_crud.test_pipeline_post(pipeline_name)[source]
tests.platform.test_pipeline_crud.test_refresh_version(pipeline_name)[source]

tests.platform.test_pipeline_lifecycle module

tests.platform.test_shared_pipeline module

tests.platform.test_shared_pipeline_stress module

class tests.platform.test_shared_pipeline_stress.TestPipeline(methodName='runTest')[source]

Bases: SharedTestPipeline

test_create_pipeline()[source]

CREATE TABLE tbl(id INT) WITH (‘materialized’ = ‘true’); CREATE MATERIALIZED VIEW v0 AS SELECT * FROM tbl;

test_get_pipeline_logs()[source]

Module contents

Platform tests package.

This file makes the tests.platform directory a proper Python package so that relative imports like from .helper import … work when running pytest with PYTHONPATH pointing at the repository python directory.