Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions tests/functional/adapter/streaming_tables/test_st_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
from dbt.adapters.base.relation import BaseRelation
from dbt.tests import util
from dbt.tests.adapter.materialized_view.files import MY_SEED, MY_TABLE, MY_VIEW
from dbt_common.contracts.config.materialization import OnConfigurationChangeOption

from dbt.adapters.databricks.relation import DatabricksRelationType
from dbt.adapters.databricks.relation_configs.streaming_table import StreamingTableConfig
from tests.functional.adapter.streaming_tables import fixtures


Expand Down Expand Up @@ -300,6 +302,55 @@ def test_create_with_liquid_clustering_config(self, project):
assert relation_type == "streaming_table"


@pytest.mark.dlt
@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster")
class TestStreamingTableLiquidClusteringChanges:
"""Test liquid clustering changes on existing streaming tables (#1329)."""

@pytest.fixture(scope="class")
def seeds(self):
return {"my_seed.csv": MY_SEED}

@pytest.fixture(scope="class")
def models(self):
yield {
"liquid_clustered_st.sql": fixtures.liquid_clustered_st,
"schema.yml": fixtures.liquid_clustered_st_schema_v1,
}

@pytest.fixture(scope="class")
def project_config_update(self):
return {"models": {"on_configuration_change": OnConfigurationChangeOption.Apply.value}}

@pytest.fixture(scope="class")
def liquid_clustered_st(self, project) -> BaseRelation:
return project.adapter.Relation.create(
identifier="liquid_clustered_st",
schema=project.test_schema,
database=project.database,
type=DatabricksRelationType.StreamingTable,
)

@pytest.fixture(scope="class", autouse=True)
def setup(self, project, liquid_clustered_st):
util.run_dbt(["seed"])
util.run_dbt(["run", "--models", liquid_clustered_st.identifier, "--full-refresh"])

yield

project.run_sql(f"drop schema if exists {project.test_schema} cascade")

def test_liquid_clustering_change_is_applied(self, project, liquid_clustered_st):
"""Changing liquid_clustered_by on an existing ST should apply via ALTER."""
util.write_file(fixtures.liquid_clustered_st_schema_v2, "models", "schema.yml")
util.run_dbt(["run", "--models", liquid_clustered_st.identifier])

with util.get_connection(project.adapter):
config = project.adapter.get_relation_config(liquid_clustered_st)
assert isinstance(config, StreamingTableConfig)
assert config.config["liquid_clustering"].cluster_by == ["id", "value"]


@pytest.mark.dlt
@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster")
class TestStreamingTablesFromFiles(TestStreamingTablesMixin):
Expand Down
Loading