Skip to content
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
c9991a5
Add rows tracking
treysp Aug 14, 2025
dba8206
Add flag for supporting row tracking to engine adapters
treysp Aug 14, 2025
296c25c
Run cloud tests in CI
treysp Aug 14, 2025
ed6a564
Add engine support flags
treysp Aug 14, 2025
700f006
Use threading.local() instead of locks
treysp Aug 15, 2025
8436adf
Move all tracking into snapshot evaluator, remove seed tracker class
treysp Aug 15, 2025
a87df2a
Remove 'processed' and 'inserted' from console output
treysp Aug 18, 2025
f7c6866
Add BQ support and track bytes processed
treysp Aug 18, 2025
4df2e32
Remove seed tracking, have snapshot evaluator own tracker instance
treysp Aug 18, 2025
8c2e184
Move tracker class into snapshot module
treysp Aug 18, 2025
ff7f096
Fix circular import
treysp Aug 18, 2025
fe8adba
Handle snowflake lack of CTAS tracking
treysp Aug 19, 2025
bb4b016
Fix tests and snowflake regex
treysp Aug 19, 2025
e4e30f0
Change tracking arg name to track_rows_processed
treysp Aug 19, 2025
6b0932a
Report 0 rows correctly
treysp Aug 19, 2025
55c5ffc
Add databricks support
treysp Aug 19, 2025
9e3f2aa
Remove time travel test for cloud engines, handle pyspark DFs in dbx
treysp Aug 20, 2025
d45b197
Fix rebase
treysp Aug 20, 2025
3e17479
Seeds are now handled in evaluator
treysp Aug 20, 2025
46717a0
Fix rebase
treysp Aug 20, 2025
f227e5a
Handle snowflake table already exists
treysp Aug 21, 2025
775eadf
Query info schema for snowflake CTAS num rows
treysp Aug 21, 2025
81e21cd
Remove databricks, snowflake metadata calls
treysp Aug 22, 2025
7f5f301
Add snowflake test
treysp Aug 22, 2025
175012d
Tidy up
treysp Aug 22, 2025
d947fec
PR feedback
treysp Aug 25, 2025
2f6f11a
Remove humanize functions
treysp Aug 25, 2025
0920f39
Make tracking fully instance-based by passing to engine adapter
treysp Aug 26, 2025
8e8ddab
Fix snapshot evaluator tests
treysp Aug 26, 2025
61161ca
Retain previous with_settings settings
treysp Aug 26, 2025
61c7c41
Use humanize for integer/bytes abbreviation
treysp Aug 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .circleci/continue_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ jobs:
- checkout
- run:
name: Install OS-level dependencies
command: ./.circleci/install-prerequisites.sh "<< parameters.engine >>"
command: ./.circleci/install-prerequisites.sh "<< parameters.engine >>"
- run:
name: Generate database name
command: |
Expand Down Expand Up @@ -307,7 +307,7 @@ workflows:
- redshift
- bigquery
- clickhouse-cloud
- athena
- athena
- fabric
- gcp-postgres
filters:
Expand Down
8 changes: 8 additions & 0 deletions docs/integrations/engines/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,14 @@ And confirm that our schemas and objects exist in the Snowflake catalog:

Congratulations - your SQLMesh project is up and running on Snowflake!

### Where are the row counts?

SQLMesh reports the number of rows processed by each model in its `plan` and `run` terminal output.

However, due to limitations in the Snowflake Python connector, row counts cannot be determined for `CREATE TABLE AS` statements. Therefore, SQLMesh does not report row counts for certain model kinds, such as `FULL` models.

Learn more about the connector limitation [on Github](https://github.com/snowflakedb/snowflake-connector-python/issues/645).

## Local/Built-in Scheduler
**Engine Adapter Type**: `snowflake`

Expand Down
125 changes: 101 additions & 24 deletions sqlmesh/core/console.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
SnapshotInfoLike,
)
from sqlmesh.core.snapshot.definition import Interval, Intervals, SnapshotTableInfo
from sqlmesh.core.snapshot.execution_tracker import QueryExecutionStats
from sqlmesh.core.test import ModelTest
from sqlmesh.utils import rich as srich
from sqlmesh.utils import Verbosity
Expand Down Expand Up @@ -439,6 +440,7 @@ def update_snapshot_evaluation_progress(
num_audits_passed: int,
num_audits_failed: int,
audit_only: bool = False,
execution_stats: t.Optional[QueryExecutionStats] = None,
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
) -> None:
"""Updates the snapshot evaluation progress."""
Expand Down Expand Up @@ -587,6 +589,7 @@ def update_snapshot_evaluation_progress(
num_audits_passed: int,
num_audits_failed: int,
audit_only: bool = False,
execution_stats: t.Optional[QueryExecutionStats] = None,
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
) -> None:
pass
Expand Down Expand Up @@ -1032,7 +1035,9 @@ def start_evaluation_progress(

# determine column widths
self.evaluation_column_widths["annotation"] = (
_calculate_annotation_str_len(batched_intervals, self.AUDIT_PADDING)
_calculate_annotation_str_len(
batched_intervals, self.AUDIT_PADDING, len(" (123.4m rows, 123.4 KiB)")
)
+ 3 # brackets and opening escape backslash
)
self.evaluation_column_widths["name"] = max(
Expand Down Expand Up @@ -1077,6 +1082,7 @@ def update_snapshot_evaluation_progress(
num_audits_passed: int,
num_audits_failed: int,
audit_only: bool = False,
execution_stats: t.Optional[QueryExecutionStats] = None,
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
) -> None:
"""Update the snapshot evaluation progress."""
Expand All @@ -1097,7 +1103,7 @@ def update_snapshot_evaluation_progress(
).ljust(self.evaluation_column_widths["name"])

annotation = _create_evaluation_model_annotation(
snapshot, _format_evaluation_model_interval(snapshot, interval)
snapshot, _format_evaluation_model_interval(snapshot, interval), execution_stats
)
audits_str = ""
if num_audits_passed:
Expand Down Expand Up @@ -3668,6 +3674,7 @@ def update_snapshot_evaluation_progress(
num_audits_passed: int,
num_audits_failed: int,
audit_only: bool = False,
execution_stats: t.Optional[QueryExecutionStats] = None,
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
) -> None:
view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id]
Expand Down Expand Up @@ -3838,6 +3845,7 @@ def update_snapshot_evaluation_progress(
num_audits_passed: int,
num_audits_failed: int,
audit_only: bool = False,
execution_stats: t.Optional[QueryExecutionStats] = None,
auto_restatement_triggers: t.Optional[t.List[SnapshotId]] = None,
) -> None:
message = f"Evaluated {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
Expand Down Expand Up @@ -4169,33 +4177,63 @@ def _format_evaluation_model_interval(snapshot: Snapshot, interval: Interval) ->
return ""


def _create_evaluation_model_annotation(snapshot: Snapshot, interval_info: t.Optional[str]) -> str:
def _create_evaluation_model_annotation(
snapshot: Snapshot,
interval_info: t.Optional[str],
execution_stats: t.Optional[QueryExecutionStats],
) -> str:
annotation = None
execution_stats_str = ""
if execution_stats:
rows_processed = execution_stats.total_rows_processed
execution_stats_str += (
f"{_abbreviate_integer_count(rows_processed)} row{'s' if rows_processed > 1 else ''}"
if rows_processed
else ""
)

bytes_processed = execution_stats.total_bytes_processed
execution_stats_str += (
f"{', ' if execution_stats_str else ''}{_format_bytes(bytes_processed)}"
if bytes_processed
else ""
)
execution_stats_str = f" ({execution_stats_str})" if execution_stats_str else ""

if snapshot.is_audit:
return "run standalone audit"
if snapshot.is_model and snapshot.model.kind.is_external:
return "run external audits"
if snapshot.model.kind.is_seed:
return "insert seed file"
if snapshot.model.kind.is_full:
return "full refresh"
if snapshot.model.kind.is_view:
return "recreate view"
if snapshot.model.kind.is_incremental_by_unique_key:
return "insert/update rows"
if snapshot.model.kind.is_incremental_by_partition:
return "insert partitions"

return interval_info if interval_info else ""


def _calculate_interval_str_len(snapshot: Snapshot, intervals: t.List[Interval]) -> int:
annotation = "run standalone audit"
if snapshot.is_model:
if snapshot.model.kind.is_external:
annotation = "run external audits"
if snapshot.model.kind.is_view:
annotation = "recreate view"
if snapshot.model.kind.is_seed:
annotation = f"insert seed file{execution_stats_str}"
if snapshot.model.kind.is_full:
annotation = f"full refresh{execution_stats_str}"
if snapshot.model.kind.is_incremental_by_unique_key:
annotation = f"insert/update rows{execution_stats_str}"
if snapshot.model.kind.is_incremental_by_partition:
annotation = f"insert partitions{execution_stats_str}"

if annotation:
return annotation

return f"{interval_info}{execution_stats_str}" if interval_info else ""


def _calculate_interval_str_len(
snapshot: Snapshot,
intervals: t.List[Interval],
execution_stats: t.Optional[QueryExecutionStats] = None,
) -> int:
interval_str_len = 0
for interval in intervals:
interval_str_len = max(
interval_str_len,
len(
_create_evaluation_model_annotation(
snapshot, _format_evaluation_model_interval(snapshot, interval)
snapshot, _format_evaluation_model_interval(snapshot, interval), execution_stats
)
),
)
Expand Down Expand Up @@ -4248,13 +4286,52 @@ def _calculate_audit_str_len(snapshot: Snapshot, audit_padding: int = 0) -> int:


def _calculate_annotation_str_len(
batched_intervals: t.Dict[Snapshot, t.List[Interval]], audit_padding: int = 0
batched_intervals: t.Dict[Snapshot, t.List[Interval]],
audit_padding: int = 0,
execution_stats_len: int = 0,
) -> int:
annotation_str_len = 0
for snapshot, intervals in batched_intervals.items():
annotation_str_len = max(
annotation_str_len,
_calculate_interval_str_len(snapshot, intervals)
+ _calculate_audit_str_len(snapshot, audit_padding),
+ _calculate_audit_str_len(snapshot, audit_padding)
+ execution_stats_len,
)
return annotation_str_len


# Convert number of bytes to a human-readable string
# https://github.com/dbt-labs/dbt-adapters/blob/34fd178539dcb6f82e18e738adc03de7784c032f/dbt-bigquery/src/dbt/adapters/bigquery/connections.py#L165
def _format_bytes(num_bytes: t.Optional[int]) -> str:
if num_bytes and num_bytes >= 0:
if num_bytes < 1024:
return f"{num_bytes} bytes"

num_bytes_float = float(num_bytes) / 1024.0
for unit in ["KiB", "MiB", "GiB", "TiB", "PiB"]:
if num_bytes_float < 1024.0:
return f"{num_bytes_float:3.1f} {unit}"
num_bytes_float /= 1024.0

num_bytes_float *= 1024.0 # undo last division in loop
return f"{num_bytes_float:3.1f} {unit}"
return ""


# Abbreviate integer count. Example: 1,000,000,000 -> 1b
# https://github.com/dbt-labs/dbt-adapters/blob/34fd178539dcb6f82e18e738adc03de7784c032f/dbt-bigquery/src/dbt/adapters/bigquery/connections.py#L178
def _abbreviate_integer_count(count: t.Optional[int]) -> str:
if count and count >= 0:
if count < 1000:
return str(count)

count_float = float(count) / 1000.0
for unit in ["k", "m", "b", "t"]:
if count_float < 1000.0:
return f"{count_float:3.1f}{unit}".strip()
count_float /= 1000.0

count_float *= 1000.0 # undo last division in loop
return f"{count_float:3.1f}{unit}".strip()
return ""
1 change: 1 addition & 0 deletions sqlmesh/core/engine_adapter/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class AthenaEngineAdapter(PandasNativeFetchDFSupportMixin, RowDiffMixin):
# >>> self._execute('/* test */ DESCRIBE foo')
# pyathena.error.OperationalError: FAILED: ParseException line 1:0 cannot recognize input near '/' '*' 'test'
ATTACH_CORRELATION_ID = False
SUPPORTS_QUERY_EXECUTION_TRACKING = True
SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["DATABASE", "SCHEMA"]

def __init__(
Expand Down
Loading