Source code for tests.shared_test_pipeline

import unittest
from feldera.testutils import unique_pipeline_name
from tests import TEST_CLIENT
from feldera import PipelineBuilder, Pipeline


[docs] def sql(text_or_iterable): """ Decorator to attach SQL (string or list/tuple of strings) to a test method. """ def _wrap(fn): fn.SQL = text_or_iterable return fn return _wrap
[docs] class SharedTestPipeline(unittest.TestCase):
[docs] @classmethod def setUpClass(cls): cls._ddls = [] cls.client = TEST_CLIENT cls.pipeline_name = unique_pipeline_name(cls.__name__) for attr in dir(cls): if not attr.startswith("test_"): continue func = getattr(cls, attr) # Check for enterprise_only decorator is_enterprise_only = getattr(func, "_enterprise_only", False) if ( is_enterprise_only and not cls.client.get_config().edition.is_enterprise() ): continue # Skip DDL for enterprise-only tests if not enterprise ddl = getattr(func, "SQL", getattr(func, "__doc__", None)) if ddl and ddl.strip() not in cls._ddls: cls._ddls.append(ddl.strip()) if not hasattr(cls, "_pipeline"): cls.ddl = "\n".join(cls._ddls) cls._pipeline = PipelineBuilder( cls.client, cls.pipeline_name, cls.ddl, ).create_or_replace()
[docs] def setUp(self): p = PipelineBuilder( self.client, unique_pipeline_name(self._testMethodName), sql=self.ddl ).create_or_replace() self.p = p
[docs] def tearDown(self): self.p.stop(force=True) self.p.clear_storage()
@property def pipeline(self) -> Pipeline: return self.p
[docs] def new_pipeline_with_suffix(self, suffix: str) -> Pipeline: return PipelineBuilder( self.client, unique_pipeline_name(f"{self._testMethodName}_{suffix}"), sql=self.ddl, ).create_or_replace()