From d28cbb20275799628b3f7db99065d88ffc6de418 Mon Sep 17 00:00:00 2001 From: Iaroslav Zeigerman Date: Fri, 22 Aug 2025 15:32:19 -0700 Subject: [PATCH] Fix: Check if the target table exists when determining the value of the is_incremental flag --- sqlmesh/core/snapshot/evaluator.py | 14 +++++++-- sqlmesh/dbt/builtin.py | 6 ++++ sqlmesh/dbt/seed.py | 1 + tests/core/test_integration.py | 29 +++++++++++++++++++ tests/dbt/test_config.py | 9 +++--- tests/dbt/test_manifest.py | 16 +++++----- tests/dbt/test_transformation.py | 23 +++++++++++++++ tests/fixtures/dbt/sushi_test/config.py | 9 +++--- tests/fixtures/dbt/sushi_test/dbt_project.yml | 1 - .../models/waiter_revenue_by_day.sql | 6 +--- 10 files changed, 89 insertions(+), 25 deletions(-) diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 6baa440ba6..82924e4c3a 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -773,7 +773,8 @@ def _evaluate_snapshot( allow_destructive_snapshots=allow_destructive_snapshots, allow_additive_snapshots=allow_additive_snapshots, ) - common_render_kwargs["runtime_stage"] = RuntimeStage.EVALUATING + runtime_stage = RuntimeStage.EVALUATING + target_table_exists = True elif model.annotated or model.is_seed or model.kind.is_scd_type_2: self._execute_create( snapshot=snapshot, @@ -785,7 +786,14 @@ def _evaluate_snapshot( dry_run=False, run_pre_post_statements=False, ) - common_render_kwargs["runtime_stage"] = RuntimeStage.EVALUATING + runtime_stage = RuntimeStage.EVALUATING + target_table_exists = True + + evaluate_render_kwargs = { + **common_render_kwargs, + "runtime_stage": runtime_stage, + "snapshot_table_exists": target_table_exists, + } wap_id: t.Optional[str] = None if snapshot.is_materialized and ( @@ -801,7 +809,7 @@ def _evaluate_snapshot( execution_time=execution_time, snapshot=snapshot, snapshots=snapshots, - render_kwargs=common_render_kwargs, + render_kwargs=evaluate_render_kwargs, create_render_kwargs=create_render_kwargs, rendered_physical_properties=rendered_physical_properties, deployability_index=deployability_index, diff --git a/sqlmesh/dbt/builtin.py b/sqlmesh/dbt/builtin.py index 70e1b10099..4b564eb781 100644 --- a/sqlmesh/dbt/builtin.py +++ b/sqlmesh/dbt/builtin.py @@ -407,6 +407,12 @@ def create_builtin_globals( else snapshot.dev_intervals ) is_incremental = bool(intervals) + + snapshot_table_exists = jinja_globals.get("snapshot_table_exists") + if is_incremental and snapshot_table_exists is not None: + # If we know the information about table existence, we can use it to correctly + # set the flag + is_incremental &= snapshot_table_exists else: is_incremental = False builtin_globals["is_incremental"] = lambda: is_incremental diff --git a/sqlmesh/dbt/seed.py b/sqlmesh/dbt/seed.py index 10e98cf93c..882c240289 100644 --- a/sqlmesh/dbt/seed.py +++ b/sqlmesh/dbt/seed.py @@ -86,6 +86,7 @@ def to_sqlmesh( dialect=self.dialect(context), audit_definitions=audit_definitions, virtual_environment_mode=virtual_environment_mode, + start=self.start or context.sqlmesh_config.model_defaults.start, **kwargs, ) diff --git a/tests/core/test_integration.py b/tests/core/test_integration.py index dec7309591..f80c42f579 100644 --- a/tests/core/test_integration.py +++ b/tests/core/test_integration.py @@ -2037,6 +2037,35 @@ def test_dbt_select_star_is_directly_modified(sushi_test_dbt_context: Context): assert plan.snapshots[snapshot_b_id].change_category == SnapshotChangeCategory.NON_BREAKING +@time_machine.travel("2023-01-08 15:00:00 UTC") +def test_dbt_is_incremental_table_is_missing(sushi_test_dbt_context: Context): + context = sushi_test_dbt_context + + model = context.get_model("sushi.waiter_revenue_by_day_v2") + model = model.copy(update={"kind": IncrementalUnmanagedKind(), "start": "2023-01-01"}) + context.upsert_model(model) + + context.plan("prod", auto_apply=True, no_prompts=True, skip_tests=True) + + snapshot = context.get_snapshot("sushi.waiter_revenue_by_day_v2") + assert snapshot + + # Manually drop the table + context.engine_adapter.drop_table(snapshot.table_name()) + + context.snapshot_evaluator.evaluate( + snapshot, + start="2023-01-01", + end="2023-01-08", + execution_time="2023-01-08 15:00:00", + snapshots={s.name: s for s in context.snapshots.values()}, + deployability_index=DeployabilityIndex.all_deployable(), + ) + + # Make sure the table was recreated + assert context.engine_adapter.table_exists(snapshot.table_name()) + + def test_model_attr(sushi_test_dbt_context: Context, assert_exp_eq): context = sushi_test_dbt_context model = context.get_model("sushi.top_waiters") diff --git a/tests/dbt/test_config.py b/tests/dbt/test_config.py index 99426ebb97..695c745c1d 100644 --- a/tests/dbt/test_config.py +++ b/tests/dbt/test_config.py @@ -354,7 +354,6 @@ def test_variables(assert_exp_eq, sushi_test_project): # Finally, check that variable scoping & overwriting (some_var) works as expected expected_sushi_variables = { - "start": "Jan 1 2022", "yet_another_var": 1, "top_waiters:limit": 10, "top_waiters:revenue": "revenue", @@ -379,7 +378,6 @@ def test_variables(assert_exp_eq, sushi_test_project): "yet_another_var": 5, "customers:bla": False, "customers:customer_id": "customer_id", - "start": "Jan 1 2022", } assert sushi_test_project.packages["sushi"].variables == expected_sushi_variables @@ -1006,8 +1004,11 @@ def test_db_type_to_quote_policy(): def test_variable_override(): project_root = "tests/fixtures/dbt/sushi_test" project = Project.load( - DbtContext(project_root=Path(project_root)), - variables={"yet_another_var": 2, "start": "2021-01-01"}, + DbtContext( + project_root=Path(project_root), + sqlmesh_config=Config(model_defaults=ModelDefaultsConfig(start="2021-01-01")), + ), + variables={"yet_another_var": 2}, ) assert project.packages["sushi"].variables["yet_another_var"] == 2 diff --git a/tests/dbt/test_manifest.py b/tests/dbt/test_manifest.py index 2bed6acb55..efbd2687fd 100644 --- a/tests/dbt/test_manifest.py +++ b/tests/dbt/test_manifest.py @@ -4,6 +4,7 @@ import pytest +from sqlmesh.core.config import ModelDefaultsConfig from sqlmesh.dbt.basemodel import Dependencies from sqlmesh.dbt.context import DbtContext from sqlmesh.dbt.manifest import ManifestHelper @@ -24,7 +25,7 @@ def test_manifest_helper(caplog): project_path, "sushi", profile.target, - variable_overrides={"start": "2020-01-01"}, + model_defaults=ModelDefaultsConfig(start="2020-01-01"), ) models = helper.models() @@ -135,7 +136,7 @@ def test_tests_referencing_disabled_models(): project_path, "sushi", profile.target, - variable_overrides={"start": "2020-01-01"}, + model_defaults=ModelDefaultsConfig(start="2020-01-01"), ) assert "disabled_model" not in helper.models() @@ -151,7 +152,7 @@ def test_call_cache(): project_path, "sushi", profile.target, - variable_overrides={"start": "2020-01-01"}, + model_defaults=ModelDefaultsConfig(start="2020-01-01"), ) unused = "0000" @@ -172,7 +173,7 @@ def test_variable_override(): project_path, "sushi", profile.target, - variable_overrides={"start": "2020-01-01"}, + model_defaults=ModelDefaultsConfig(start="2020-01-01"), ) assert helper.models()["top_waiters"].limit_value == 10 @@ -181,7 +182,8 @@ def test_variable_override(): project_path, "sushi", profile.target, - variable_overrides={"top_waiters:limit": 1, "start": "2020-01-01"}, + variable_overrides={"top_waiters:limit": 1}, + model_defaults=ModelDefaultsConfig(start="2020-01-01"), ) assert helper.models()["top_waiters"].limit_value == 1 @@ -196,7 +198,7 @@ def test_source_meta_external_location(): project_path, "sushi", profile.target, - variable_overrides={"start": "2020-01-01"}, + model_defaults=ModelDefaultsConfig(start="2020-01-01"), ) sources = helper.sources() @@ -229,7 +231,7 @@ def test_top_level_dbt_adapter_macros(): project_path, "sushi", profile.target, - variable_overrides={"start": "2020-01-01"}, + model_defaults=ModelDefaultsConfig(start="2020-01-01"), ) # Adapter macros must be marked as top-level diff --git a/tests/dbt/test_transformation.py b/tests/dbt/test_transformation.py index 1bcc3081f7..baef96eb6d 100644 --- a/tests/dbt/test_transformation.py +++ b/tests/dbt/test_transformation.py @@ -1346,6 +1346,29 @@ def test_is_incremental(sushi_test_project: Project, assert_exp_eq, mocker): 'SELECT 1 AS "one" FROM "tbl_a" AS "tbl_a" WHERE "ds" > (SELECT MAX("ds") FROM "model" AS "model")', ) + # If the snapshot_table_exists flag was set to False, intervals should be ignored + assert_exp_eq( + model_config.to_sqlmesh(context) + .render_query_or_raise(snapshot=snapshot, snapshot_table_exists=False) + .sql(), + 'SELECT 1 AS "one" FROM "tbl_a" AS "tbl_a"', + ) + + # If the snapshot_table_exists flag was set to True, intervals should be taken into account + assert_exp_eq( + model_config.to_sqlmesh(context) + .render_query_or_raise(snapshot=snapshot, snapshot_table_exists=True) + .sql(), + 'SELECT 1 AS "one" FROM "tbl_a" AS "tbl_a" WHERE "ds" > (SELECT MAX("ds") FROM "model" AS "model")', + ) + snapshot.intervals = [] + assert_exp_eq( + model_config.to_sqlmesh(context) + .render_query_or_raise(snaspshot=snapshot, snapshot_table_exists=True) + .sql(), + 'SELECT 1 AS "one" FROM "tbl_a" AS "tbl_a"', + ) + @pytest.mark.xdist_group("dbt_manifest") def test_is_incremental_non_incremental_model(sushi_test_project: Project, assert_exp_eq, mocker): diff --git a/tests/fixtures/dbt/sushi_test/config.py b/tests/fixtures/dbt/sushi_test/config.py index d82291f793..83118b02cf 100644 --- a/tests/fixtures/dbt/sushi_test/config.py +++ b/tests/fixtures/dbt/sushi_test/config.py @@ -3,11 +3,9 @@ from sqlmesh.core.config import ModelDefaultsConfig from sqlmesh.dbt.loader import sqlmesh_config -variables = {"start": "Jan 1 2022"} - config = sqlmesh_config( - Path(__file__).parent, variables=variables, model_defaults=ModelDefaultsConfig(dialect="duckdb") + Path(__file__).parent, model_defaults=ModelDefaultsConfig(dialect="duckdb", start="Jan 1 2022") ) @@ -15,6 +13,7 @@ test_config_with_normalization_strategy = sqlmesh_config( Path(__file__).parent, - variables=variables, - model_defaults=ModelDefaultsConfig(dialect="duckdb,normalization_strategy=LOWERCASE"), + model_defaults=ModelDefaultsConfig( + dialect="duckdb,normalization_strategy=LOWERCASE", start="Jan 1 2022" + ), ) diff --git a/tests/fixtures/dbt/sushi_test/dbt_project.yml b/tests/fixtures/dbt/sushi_test/dbt_project.yml index c86057c928..ecd060159b 100644 --- a/tests/fixtures/dbt/sushi_test/dbt_project.yml +++ b/tests/fixtures/dbt/sushi_test/dbt_project.yml @@ -20,7 +20,6 @@ clean-targets: # directories to be removed by `dbt clean` # Full documentation: https://docs.getdbt.com/docs/configuring-models models: - +start: "{{ var('start') }}" sushi: +materialized: table +pre-hook: diff --git a/tests/fixtures/dbt/sushi_test/models/waiter_revenue_by_day.sql b/tests/fixtures/dbt/sushi_test/models/waiter_revenue_by_day.sql index 317cc87e68..5eeb0002e0 100644 --- a/tests/fixtures/dbt/sushi_test/models/waiter_revenue_by_day.sql +++ b/tests/fixtures/dbt/sushi_test/models/waiter_revenue_by_day.sql @@ -30,11 +30,7 @@ LEFT JOIN {{ source('streaming', 'items') }} AS i ON oi.item_id = i.id AND oi.ds = i.ds {% if is_incremental() %} WHERE - o.ds > (select max(ds) from {{ this }}) -{% endif %} -{% if sqlmesh_incremental is defined %} - WHERE - o.ds BETWEEN '{{ start_ds }}' AND '{{ end_ds }}' + o.ds > (select CAST(max(ds) AS DATE) from {{ this }}) {% endif %} GROUP BY o.waiter_id,