Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2378,14 +2378,19 @@ def migrate(
def delete(self, name: str, **kwargs: t.Any) -> None:
cascade = kwargs.pop("cascade", False)
try:
self.adapter.drop_view(name, cascade=cascade)
# Some engines (e.g., RisingWave) don’t fail when dropping a materialized view with a DROP VIEW statement,
# because views and materialized views don’t share the same namespace. Therefore, we should not ignore if the
# view doesn't exist and let the exception handler attempt to drop the materialized view.
self.adapter.drop_view(name, cascade=cascade, ignore_if_not_exists=False)
except Exception:
logger.debug(
"Failed to drop view '%s'. Trying to drop the materialized view instead",
name,
exc_info=True,
)
self.adapter.drop_view(name, materialized=True, cascade=cascade)
self.adapter.drop_view(
name, materialized=True, cascade=cascade, ignore_if_not_exists=True
)
logger.info("Dropped view '%s'", name)

def _is_materialized_view(self, model: Model) -> bool:
Expand Down
58 changes: 58 additions & 0 deletions tests/core/test_snapshot_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,64 @@ def create_and_cleanup(name: str, dev_table_only: bool):
)


def test_cleanup_view(adapter_mock, make_snapshot):
evaluator = SnapshotEvaluator(adapter_mock)

model = SqlModel(
name="catalog.test_schema.test_model",
kind=ViewKind(materialized=False),
query=parse_one("SELECT a FROM tbl"),
)

snapshot = make_snapshot(model)
snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True)

evaluator.promote([snapshot], EnvironmentNamingInfo(name="test_env"))
evaluator.cleanup([SnapshotTableCleanupTask(snapshot=snapshot.table_info, dev_table_only=True)])

adapter_mock.get_data_object.assert_not_called()
adapter_mock.drop_view.assert_called_once_with(
f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev",
cascade=True,
ignore_if_not_exists=False,
)


def test_cleanup_materialized_view(adapter_mock, make_snapshot):
evaluator = SnapshotEvaluator(adapter_mock)

model = SqlModel(
name="catalog.test_schema.test_model",
kind=ViewKind(materialized=True),
query=parse_one("SELECT a FROM tbl"),
)

snapshot = make_snapshot(model)
snapshot.categorize_as(SnapshotChangeCategory.BREAKING, forward_only=True)

adapter_mock.drop_view.side_effect = [RuntimeError("failed to drop view"), None]

evaluator.promote([snapshot], EnvironmentNamingInfo(name="test_env"))
evaluator.cleanup([SnapshotTableCleanupTask(snapshot=snapshot.table_info, dev_table_only=True)])

adapter_mock.get_data_object.assert_not_called()
adapter_mock.drop_view.assert_has_calls(
[
call(
f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev",
cascade=True,
ignore_if_not_exists=False,
),
call(
f"catalog.sqlmesh__test_schema.test_schema__test_model__{snapshot.fingerprint.to_version()}__dev",
materialized=True,
cascade=True,
ignore_if_not_exists=True,
),
]
)


def test_cleanup_fails(adapter_mock, make_snapshot):
adapter_mock.drop_table.side_effect = RuntimeError("test_error")

Expand Down