Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions sqlmesh/core/snapshot/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,8 @@ def _evaluate_snapshot(
allow_destructive_snapshots=allow_destructive_snapshots,
allow_additive_snapshots=allow_additive_snapshots,
)
common_render_kwargs["runtime_stage"] = RuntimeStage.EVALUATING
runtime_stage = RuntimeStage.EVALUATING
target_table_exists = True
elif model.annotated or model.is_seed or model.kind.is_scd_type_2:
self._execute_create(
snapshot=snapshot,
Expand All @@ -785,7 +786,14 @@ def _evaluate_snapshot(
dry_run=False,
run_pre_post_statements=False,
)
common_render_kwargs["runtime_stage"] = RuntimeStage.EVALUATING
runtime_stage = RuntimeStage.EVALUATING
target_table_exists = True

evaluate_render_kwargs = {
**common_render_kwargs,
"runtime_stage": runtime_stage,
"snapshot_table_exists": target_table_exists,
}

wap_id: t.Optional[str] = None
if snapshot.is_materialized and (
Expand All @@ -801,7 +809,7 @@ def _evaluate_snapshot(
execution_time=execution_time,
snapshot=snapshot,
snapshots=snapshots,
render_kwargs=common_render_kwargs,
render_kwargs=evaluate_render_kwargs,
create_render_kwargs=create_render_kwargs,
rendered_physical_properties=rendered_physical_properties,
deployability_index=deployability_index,
Expand Down
6 changes: 6 additions & 0 deletions sqlmesh/dbt/builtin.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,12 @@ def create_builtin_globals(
else snapshot.dev_intervals
)
is_incremental = bool(intervals)

snapshot_table_exists = jinja_globals.get("snapshot_table_exists")
if is_incremental and snapshot_table_exists is not None:
# If we know the information about table existence, we can use it to correctly
# set the flag
is_incremental &= snapshot_table_exists
else:
is_incremental = False
builtin_globals["is_incremental"] = lambda: is_incremental
Expand Down
1 change: 1 addition & 0 deletions sqlmesh/dbt/seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def to_sqlmesh(
dialect=self.dialect(context),
audit_definitions=audit_definitions,
virtual_environment_mode=virtual_environment_mode,
start=self.start or context.sqlmesh_config.model_defaults.start,
**kwargs,
)

Expand Down
29 changes: 29 additions & 0 deletions tests/core/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2037,6 +2037,35 @@ def test_dbt_select_star_is_directly_modified(sushi_test_dbt_context: Context):
assert plan.snapshots[snapshot_b_id].change_category == SnapshotChangeCategory.NON_BREAKING


@time_machine.travel("2023-01-08 15:00:00 UTC")
def test_dbt_is_incremental_table_is_missing(sushi_test_dbt_context: Context):
context = sushi_test_dbt_context

model = context.get_model("sushi.waiter_revenue_by_day_v2")
model = model.copy(update={"kind": IncrementalUnmanagedKind(), "start": "2023-01-01"})
context.upsert_model(model)

context.plan("prod", auto_apply=True, no_prompts=True, skip_tests=True)

snapshot = context.get_snapshot("sushi.waiter_revenue_by_day_v2")
assert snapshot

# Manually drop the table
context.engine_adapter.drop_table(snapshot.table_name())

context.snapshot_evaluator.evaluate(
snapshot,
start="2023-01-01",
end="2023-01-08",
execution_time="2023-01-08 15:00:00",
snapshots={s.name: s for s in context.snapshots.values()},
deployability_index=DeployabilityIndex.all_deployable(),
)

# Make sure the table was recreated
assert context.engine_adapter.table_exists(snapshot.table_name())


def test_model_attr(sushi_test_dbt_context: Context, assert_exp_eq):
context = sushi_test_dbt_context
model = context.get_model("sushi.top_waiters")
Expand Down
9 changes: 5 additions & 4 deletions tests/dbt/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,6 @@ def test_variables(assert_exp_eq, sushi_test_project):

# Finally, check that variable scoping & overwriting (some_var) works as expected
expected_sushi_variables = {
"start": "Jan 1 2022",
"yet_another_var": 1,
"top_waiters:limit": 10,
"top_waiters:revenue": "revenue",
Expand All @@ -379,7 +378,6 @@ def test_variables(assert_exp_eq, sushi_test_project):
"yet_another_var": 5,
"customers:bla": False,
"customers:customer_id": "customer_id",
"start": "Jan 1 2022",
}

assert sushi_test_project.packages["sushi"].variables == expected_sushi_variables
Expand Down Expand Up @@ -1006,8 +1004,11 @@ def test_db_type_to_quote_policy():
def test_variable_override():
project_root = "tests/fixtures/dbt/sushi_test"
project = Project.load(
DbtContext(project_root=Path(project_root)),
variables={"yet_another_var": 2, "start": "2021-01-01"},
DbtContext(
project_root=Path(project_root),
sqlmesh_config=Config(model_defaults=ModelDefaultsConfig(start="2021-01-01")),
),
variables={"yet_another_var": 2},
)
assert project.packages["sushi"].variables["yet_another_var"] == 2

Expand Down
16 changes: 9 additions & 7 deletions tests/dbt/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import pytest

from sqlmesh.core.config import ModelDefaultsConfig
from sqlmesh.dbt.basemodel import Dependencies
from sqlmesh.dbt.context import DbtContext
from sqlmesh.dbt.manifest import ManifestHelper
Expand All @@ -24,7 +25,7 @@ def test_manifest_helper(caplog):
project_path,
"sushi",
profile.target,
variable_overrides={"start": "2020-01-01"},
model_defaults=ModelDefaultsConfig(start="2020-01-01"),
)

models = helper.models()
Expand Down Expand Up @@ -135,7 +136,7 @@ def test_tests_referencing_disabled_models():
project_path,
"sushi",
profile.target,
variable_overrides={"start": "2020-01-01"},
model_defaults=ModelDefaultsConfig(start="2020-01-01"),
)

assert "disabled_model" not in helper.models()
Expand All @@ -151,7 +152,7 @@ def test_call_cache():
project_path,
"sushi",
profile.target,
variable_overrides={"start": "2020-01-01"},
model_defaults=ModelDefaultsConfig(start="2020-01-01"),
)

unused = "0000"
Expand All @@ -172,7 +173,7 @@ def test_variable_override():
project_path,
"sushi",
profile.target,
variable_overrides={"start": "2020-01-01"},
model_defaults=ModelDefaultsConfig(start="2020-01-01"),
)
assert helper.models()["top_waiters"].limit_value == 10

Expand All @@ -181,7 +182,8 @@ def test_variable_override():
project_path,
"sushi",
profile.target,
variable_overrides={"top_waiters:limit": 1, "start": "2020-01-01"},
variable_overrides={"top_waiters:limit": 1},
model_defaults=ModelDefaultsConfig(start="2020-01-01"),
)
assert helper.models()["top_waiters"].limit_value == 1

Expand All @@ -196,7 +198,7 @@ def test_source_meta_external_location():
project_path,
"sushi",
profile.target,
variable_overrides={"start": "2020-01-01"},
model_defaults=ModelDefaultsConfig(start="2020-01-01"),
)

sources = helper.sources()
Expand Down Expand Up @@ -229,7 +231,7 @@ def test_top_level_dbt_adapter_macros():
project_path,
"sushi",
profile.target,
variable_overrides={"start": "2020-01-01"},
model_defaults=ModelDefaultsConfig(start="2020-01-01"),
)

# Adapter macros must be marked as top-level
Expand Down
23 changes: 23 additions & 0 deletions tests/dbt/test_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1346,6 +1346,29 @@ def test_is_incremental(sushi_test_project: Project, assert_exp_eq, mocker):
'SELECT 1 AS "one" FROM "tbl_a" AS "tbl_a" WHERE "ds" > (SELECT MAX("ds") FROM "model" AS "model")',
)

# If the snapshot_table_exists flag was set to False, intervals should be ignored
assert_exp_eq(
model_config.to_sqlmesh(context)
.render_query_or_raise(snapshot=snapshot, snapshot_table_exists=False)
.sql(),
'SELECT 1 AS "one" FROM "tbl_a" AS "tbl_a"',
)

# If the snapshot_table_exists flag was set to True, intervals should be taken into account
assert_exp_eq(
model_config.to_sqlmesh(context)
.render_query_or_raise(snapshot=snapshot, snapshot_table_exists=True)
.sql(),
'SELECT 1 AS "one" FROM "tbl_a" AS "tbl_a" WHERE "ds" > (SELECT MAX("ds") FROM "model" AS "model")',
)
snapshot.intervals = []
assert_exp_eq(
model_config.to_sqlmesh(context)
.render_query_or_raise(snaspshot=snapshot, snapshot_table_exists=True)
.sql(),
'SELECT 1 AS "one" FROM "tbl_a" AS "tbl_a"',
)


@pytest.mark.xdist_group("dbt_manifest")
def test_is_incremental_non_incremental_model(sushi_test_project: Project, assert_exp_eq, mocker):
Expand Down
9 changes: 4 additions & 5 deletions tests/fixtures/dbt/sushi_test/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@
from sqlmesh.core.config import ModelDefaultsConfig
from sqlmesh.dbt.loader import sqlmesh_config

variables = {"start": "Jan 1 2022"}


config = sqlmesh_config(
Path(__file__).parent, variables=variables, model_defaults=ModelDefaultsConfig(dialect="duckdb")
Path(__file__).parent, model_defaults=ModelDefaultsConfig(dialect="duckdb", start="Jan 1 2022")
)


test_config = config

test_config_with_normalization_strategy = sqlmesh_config(
Path(__file__).parent,
variables=variables,
model_defaults=ModelDefaultsConfig(dialect="duckdb,normalization_strategy=LOWERCASE"),
model_defaults=ModelDefaultsConfig(
dialect="duckdb,normalization_strategy=LOWERCASE", start="Jan 1 2022"
),
)
1 change: 0 additions & 1 deletion tests/fixtures/dbt/sushi_test/dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ clean-targets: # directories to be removed by `dbt clean`
# Full documentation: https://docs.getdbt.com/docs/configuring-models

models:
+start: "{{ var('start') }}"
sushi:
+materialized: table
+pre-hook:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,7 @@ LEFT JOIN {{ source('streaming', 'items') }} AS i
ON oi.item_id = i.id AND oi.ds = i.ds
{% if is_incremental() %}
WHERE
o.ds > (select max(ds) from {{ this }})
{% endif %}
{% if sqlmesh_incremental is defined %}
WHERE
o.ds BETWEEN '{{ start_ds }}' AND '{{ end_ds }}'
o.ds > (select CAST(max(ds) AS DATE) from {{ this }})
{% endif %}
GROUP BY
o.waiter_id,
Expand Down