Skip to content

Commit 394cc70

Browse files
committed
Fix: Check if the target table exists when determining the value of the is_incremental flag
1 parent 3d144c5 commit 394cc70

File tree

10 files changed

+89
-25
lines changed

10 files changed

+89
-25
lines changed

sqlmesh/core/snapshot/evaluator.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -773,7 +773,8 @@ def _evaluate_snapshot(
773773
allow_destructive_snapshots=allow_destructive_snapshots,
774774
allow_additive_snapshots=allow_additive_snapshots,
775775
)
776-
common_render_kwargs["runtime_stage"] = RuntimeStage.EVALUATING
776+
runtime_stage = RuntimeStage.EVALUATING
777+
target_table_exists = True
777778
elif model.annotated or model.is_seed or model.kind.is_scd_type_2:
778779
self._execute_create(
779780
snapshot=snapshot,
@@ -785,7 +786,14 @@ def _evaluate_snapshot(
785786
dry_run=False,
786787
run_pre_post_statements=False,
787788
)
788-
common_render_kwargs["runtime_stage"] = RuntimeStage.EVALUATING
789+
runtime_stage = RuntimeStage.EVALUATING
790+
target_table_exists = True
791+
792+
evaluate_render_kwargs = {
793+
**common_render_kwargs,
794+
"runtime_stage": runtime_stage,
795+
"snapshot_table_exists": target_table_exists,
796+
}
789797

790798
wap_id: t.Optional[str] = None
791799
if snapshot.is_materialized and (
@@ -801,7 +809,7 @@ def _evaluate_snapshot(
801809
execution_time=execution_time,
802810
snapshot=snapshot,
803811
snapshots=snapshots,
804-
render_kwargs=common_render_kwargs,
812+
render_kwargs=evaluate_render_kwargs,
805813
create_render_kwargs=create_render_kwargs,
806814
rendered_physical_properties=rendered_physical_properties,
807815
deployability_index=deployability_index,

sqlmesh/dbt/builtin.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,12 @@ def create_builtin_globals(
407407
else snapshot.dev_intervals
408408
)
409409
is_incremental = bool(intervals)
410+
411+
snapshot_table_exists = jinja_globals.get("snapshot_table_exists")
412+
if is_incremental and snapshot_table_exists is not None:
413+
# If we know the information about table existence, we can use it to correctly
414+
# set the flag
415+
is_incremental &= snapshot_table_exists
410416
else:
411417
is_incremental = False
412418
builtin_globals["is_incremental"] = lambda: is_incremental

sqlmesh/dbt/seed.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ def to_sqlmesh(
8686
dialect=self.dialect(context),
8787
audit_definitions=audit_definitions,
8888
virtual_environment_mode=virtual_environment_mode,
89+
start=self.start or context.sqlmesh_config.model_defaults.start,
8990
**kwargs,
9091
)
9192

tests/core/test_integration.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2037,6 +2037,35 @@ def test_dbt_select_star_is_directly_modified(sushi_test_dbt_context: Context):
20372037
assert plan.snapshots[snapshot_b_id].change_category == SnapshotChangeCategory.NON_BREAKING
20382038

20392039

2040+
@time_machine.travel("2023-01-08 15:00:00 UTC")
2041+
def test_dbt_is_incremental_table_is_missing(sushi_test_dbt_context: Context):
2042+
context = sushi_test_dbt_context
2043+
2044+
model = context.get_model("sushi.waiter_revenue_by_day_v2")
2045+
model = model.copy(update={"kind": IncrementalUnmanagedKind(), "start": "2023-01-01"})
2046+
context.upsert_model(model)
2047+
2048+
context.plan("prod", auto_apply=True, no_prompts=True, skip_tests=True)
2049+
2050+
snapshot = context.get_snapshot("sushi.waiter_revenue_by_day_v2")
2051+
assert snapshot
2052+
2053+
# Manually drop the table
2054+
context.engine_adapter.drop_table(snapshot.table_name())
2055+
2056+
context.snapshot_evaluator.evaluate(
2057+
snapshot,
2058+
start="2023-01-01",
2059+
end="2023-01-08",
2060+
execution_time="2023-01-08 15:00:00",
2061+
snapshots={s.name: s for s in context.snapshots.values()},
2062+
deployability_index=DeployabilityIndex.all_deployable(),
2063+
)
2064+
2065+
# Make sure the table was recreated
2066+
assert context.engine_adapter.table_exists(snapshot.table_name())
2067+
2068+
20402069
def test_model_attr(sushi_test_dbt_context: Context, assert_exp_eq):
20412070
context = sushi_test_dbt_context
20422071
model = context.get_model("sushi.top_waiters")

tests/dbt/test_config.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,6 @@ def test_variables(assert_exp_eq, sushi_test_project):
354354

355355
# Finally, check that variable scoping & overwriting (some_var) works as expected
356356
expected_sushi_variables = {
357-
"start": "Jan 1 2022",
358357
"yet_another_var": 1,
359358
"top_waiters:limit": 10,
360359
"top_waiters:revenue": "revenue",
@@ -378,7 +377,6 @@ def test_variables(assert_exp_eq, sushi_test_project):
378377
"yet_another_var": 1,
379378
"customers:bla": False,
380379
"customers:customer_id": "customer_id",
381-
"start": "Jan 1 2022",
382380
"top_waiters:limit": 10,
383381
"top_waiters:revenue": "revenue",
384382
"customers:boo": ["a", "b"],
@@ -1018,8 +1016,11 @@ def test_db_type_to_quote_policy():
10181016
def test_variable_override():
10191017
project_root = "tests/fixtures/dbt/sushi_test"
10201018
project = Project.load(
1021-
DbtContext(project_root=Path(project_root)),
1022-
variables={"yet_another_var": 2, "start": "2021-01-01"},
1019+
DbtContext(
1020+
project_root=Path(project_root),
1021+
sqlmesh_config=Config(model_defaults=ModelDefaultsConfig(start="2021-01-01")),
1022+
),
1023+
variables={"yet_another_var": 2},
10231024
)
10241025
assert project.packages["sushi"].variables["yet_another_var"] == 2
10251026

tests/dbt/test_manifest.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import pytest
66

7+
from sqlmesh.core.config import ModelDefaultsConfig
78
from sqlmesh.dbt.basemodel import Dependencies
89
from sqlmesh.dbt.context import DbtContext
910
from sqlmesh.dbt.manifest import ManifestHelper
@@ -24,7 +25,7 @@ def test_manifest_helper(caplog):
2425
project_path,
2526
"sushi",
2627
profile.target,
27-
variable_overrides={"start": "2020-01-01"},
28+
model_defaults=ModelDefaultsConfig(start="2020-01-01"),
2829
)
2930

3031
models = helper.models()
@@ -133,7 +134,7 @@ def test_tests_referencing_disabled_models():
133134
project_path,
134135
"sushi",
135136
profile.target,
136-
variable_overrides={"start": "2020-01-01"},
137+
model_defaults=ModelDefaultsConfig(start="2020-01-01"),
137138
)
138139

139140
assert "disabled_model" not in helper.models()
@@ -149,7 +150,7 @@ def test_call_cache():
149150
project_path,
150151
"sushi",
151152
profile.target,
152-
variable_overrides={"start": "2020-01-01"},
153+
model_defaults=ModelDefaultsConfig(start="2020-01-01"),
153154
)
154155

155156
unused = "0000"
@@ -170,7 +171,7 @@ def test_variable_override():
170171
project_path,
171172
"sushi",
172173
profile.target,
173-
variable_overrides={"start": "2020-01-01"},
174+
model_defaults=ModelDefaultsConfig(start="2020-01-01"),
174175
)
175176
assert helper.models()["top_waiters"].limit_value == 10
176177

@@ -179,7 +180,8 @@ def test_variable_override():
179180
project_path,
180181
"sushi",
181182
profile.target,
182-
variable_overrides={"top_waiters:limit": 1, "start": "2020-01-01"},
183+
variable_overrides={"top_waiters:limit": 1},
184+
model_defaults=ModelDefaultsConfig(start="2020-01-01"),
183185
)
184186
assert helper.models()["top_waiters"].limit_value == 1
185187

@@ -194,7 +196,7 @@ def test_source_meta_external_location():
194196
project_path,
195197
"sushi",
196198
profile.target,
197-
variable_overrides={"start": "2020-01-01"},
199+
model_defaults=ModelDefaultsConfig(start="2020-01-01"),
198200
)
199201

200202
sources = helper.sources()
@@ -227,7 +229,7 @@ def test_top_level_dbt_adapter_macros():
227229
project_path,
228230
"sushi",
229231
profile.target,
230-
variable_overrides={"start": "2020-01-01"},
232+
model_defaults=ModelDefaultsConfig(start="2020-01-01"),
231233
)
232234

233235
# Adapter macros must be marked as top-level

tests/dbt/test_transformation.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1344,6 +1344,29 @@ def test_is_incremental(sushi_test_project: Project, assert_exp_eq, mocker):
13441344
'SELECT 1 AS "one" FROM "tbl_a" AS "tbl_a" WHERE "ds" > (SELECT MAX("ds") FROM "model" AS "model")',
13451345
)
13461346

1347+
# If the snapshot_table_exists flag was set to False, intervals should be ignored
1348+
assert_exp_eq(
1349+
model_config.to_sqlmesh(context)
1350+
.render_query_or_raise(snapshot=snapshot, snapshot_table_exists=False)
1351+
.sql(),
1352+
'SELECT 1 AS "one" FROM "tbl_a" AS "tbl_a"',
1353+
)
1354+
1355+
# If the snapshot_table_exists flag was set to True, intervals should be taken into account
1356+
assert_exp_eq(
1357+
model_config.to_sqlmesh(context)
1358+
.render_query_or_raise(snapshot=snapshot, snapshot_table_exists=True)
1359+
.sql(),
1360+
'SELECT 1 AS "one" FROM "tbl_a" AS "tbl_a" WHERE "ds" > (SELECT MAX("ds") FROM "model" AS "model")',
1361+
)
1362+
snapshot.intervals = []
1363+
assert_exp_eq(
1364+
model_config.to_sqlmesh(context)
1365+
.render_query_or_raise(snaspshot=snapshot, snapshot_table_exists=True)
1366+
.sql(),
1367+
'SELECT 1 AS "one" FROM "tbl_a" AS "tbl_a"',
1368+
)
1369+
13471370

13481371
@pytest.mark.xdist_group("dbt_manifest")
13491372
def test_is_incremental_non_incremental_model(sushi_test_project: Project, assert_exp_eq, mocker):

tests/fixtures/dbt/sushi_test/config.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,17 @@
33
from sqlmesh.core.config import ModelDefaultsConfig
44
from sqlmesh.dbt.loader import sqlmesh_config
55

6-
variables = {"start": "Jan 1 2022"}
7-
86

97
config = sqlmesh_config(
10-
Path(__file__).parent, variables=variables, model_defaults=ModelDefaultsConfig(dialect="duckdb")
8+
Path(__file__).parent, model_defaults=ModelDefaultsConfig(dialect="duckdb", start="Jan 1 2022")
119
)
1210

1311

1412
test_config = config
1513

1614
test_config_with_normalization_strategy = sqlmesh_config(
1715
Path(__file__).parent,
18-
variables=variables,
19-
model_defaults=ModelDefaultsConfig(dialect="duckdb,normalization_strategy=LOWERCASE"),
16+
model_defaults=ModelDefaultsConfig(
17+
dialect="duckdb,normalization_strategy=LOWERCASE", start="Jan 1 2022"
18+
),
2019
)

tests/fixtures/dbt/sushi_test/dbt_project.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ clean-targets: # directories to be removed by `dbt clean`
2020
# Full documentation: https://docs.getdbt.com/docs/configuring-models
2121

2222
models:
23-
+start: "{{ var('start') }}"
2423
sushi:
2524
+materialized: table
2625
+pre-hook:

tests/fixtures/dbt/sushi_test/models/waiter_revenue_by_day.sql

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,7 @@ LEFT JOIN {{ source('streaming', 'items') }} AS i
2929
ON oi.item_id = i.id AND oi.ds = i.ds
3030
{% if is_incremental() %}
3131
WHERE
32-
o.ds > (select max(ds) from {{ this }})
33-
{% endif %}
34-
{% if sqlmesh_incremental is defined %}
35-
WHERE
36-
o.ds BETWEEN '{{ start_ds }}' AND '{{ end_ds }}'
32+
o.ds > (select CAST(max(ds) AS DATE) from {{ this }})
3733
{% endif %}
3834
GROUP BY
3935
o.waiter_id,

0 commit comments

Comments
 (0)