Source code for tests.platform.test_update_runtime

import unittest

from feldera.pipeline_builder import PipelineBuilder
from feldera.runtime_config import RuntimeConfig
from feldera.testutils import unique_pipeline_name
from tests import TEST_CLIENT
from feldera.enums import PipelineStatus


[docs] class TestPipeline(unittest.TestCase):
[docs] def test_update_runtime(self): """ Test the /update_runtime API. Pipeline's platform version should only be updated to the current version on-demand (by calling /update_runtime) or when updating the pipeline code. """ pipeline_name = unique_pipeline_name("test_update_runtime") sql = """ CREATE TABLE tbl(id INT) WITH ('materialized' = 'true'); CREATE MATERIALIZED VIEW v0 AS SELECT * FROM tbl; """ pipeline = PipelineBuilder( TEST_CLIENT, pipeline_name, sql=sql, runtime_config=RuntimeConfig( dev_tweaks={"backfill_avoidance": True}, logging="debug", ), ).create_or_replace() # Make sure the pipeline gets compiled, so we can set its platform version # without having it overwriten by the compiler. pipeline.start() assert pipeline.status() == PipelineStatus.RUNNING pipeline.stop(force=True) pipeline.testing_force_update_platform_version("0.0.1") assert pipeline.platform_version() == "0.0.1" # Run pipeline compiled by a previous version of the platform without triggering recompilation. pipeline.start() assert pipeline.status() == PipelineStatus.RUNNING assert pipeline.platform_version() == "0.0.1" # update_runtime fails while the pipeline is running with self.assertRaises(Exception): pipeline.update_runtime() pipeline.stop(force=True) # update_runtime works when the pipeline is stopped pipeline.update_runtime() assert pipeline.platform_version() != "0.0.1" pipeline.start() assert pipeline.status() == PipelineStatus.RUNNING pipeline.stop(force=True) # Modifying program code updates platform version automatically. pipeline.testing_force_update_platform_version("0.0.1") assert pipeline.platform_version() == "0.0.1" pipeline.modify(sql=sql + "\nCREATE MATERIALIZED VIEW v1 AS SELECT * FROM tbl;") pipeline.start() assert pipeline.status() == PipelineStatus.RUNNING assert pipeline.platform_version() != "0.0.1" pipeline.stop(force=True)
if __name__ == "__main__": unittest.main()