Skip to content

Commit 29f0c27

Browse files
committed
Add BQ support and track bytes processed
1 parent 275016a commit 29f0c27

File tree

19 files changed

+188
-105
lines changed

19 files changed

+188
-105
lines changed

sqlmesh/core/console.py

Lines changed: 74 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from sqlmesh.core.environment import EnvironmentNamingInfo, EnvironmentSummary
3232
from sqlmesh.core.linter.rule import RuleViolation
3333
from sqlmesh.core.model import Model
34+
from sqlmesh.core.execution_tracker import QueryExecutionStats
3435
from sqlmesh.core.snapshot import (
3536
Snapshot,
3637
SnapshotChangeCategory,
@@ -428,7 +429,7 @@ def update_snapshot_evaluation_progress(
428429
num_audits_passed: int,
429430
num_audits_failed: int,
430431
audit_only: bool = False,
431-
rows_processed: t.Optional[int] = None,
432+
execution_stats: t.Optional[QueryExecutionStats] = None,
432433
) -> None:
433434
"""Updates the snapshot evaluation progress."""
434435

@@ -576,7 +577,7 @@ def update_snapshot_evaluation_progress(
576577
num_audits_passed: int,
577578
num_audits_failed: int,
578579
audit_only: bool = False,
579-
rows_processed: t.Optional[int] = None,
580+
execution_stats: t.Optional[QueryExecutionStats] = None,
580581
) -> None:
581582
pass
582583

@@ -1014,7 +1015,7 @@ def start_evaluation_progress(
10141015
# determine column widths
10151016
self.evaluation_column_widths["annotation"] = (
10161017
_calculate_annotation_str_len(
1017-
batched_intervals, self.AUDIT_PADDING, len(" (XXXXXX rows processed)")
1018+
batched_intervals, self.AUDIT_PADDING, len(" (123.4m rows, 123.4 KiB)")
10181019
)
10191020
+ 3 # brackets and opening escape backslash
10201021
)
@@ -1060,7 +1061,7 @@ def update_snapshot_evaluation_progress(
10601061
num_audits_passed: int,
10611062
num_audits_failed: int,
10621063
audit_only: bool = False,
1063-
rows_processed: t.Optional[int] = None,
1064+
execution_stats: t.Optional[QueryExecutionStats] = None,
10641065
) -> None:
10651066
"""Update the snapshot evaluation progress."""
10661067
if (
@@ -1080,7 +1081,7 @@ def update_snapshot_evaluation_progress(
10801081
).ljust(self.evaluation_column_widths["name"])
10811082

10821083
annotation = _create_evaluation_model_annotation(
1083-
snapshot, _format_evaluation_model_interval(snapshot, interval), rows_processed
1084+
snapshot, _format_evaluation_model_interval(snapshot, interval), execution_stats
10841085
)
10851086
audits_str = ""
10861087
if num_audits_passed:
@@ -3644,7 +3645,7 @@ def update_snapshot_evaluation_progress(
36443645
num_audits_passed: int,
36453646
num_audits_failed: int,
36463647
audit_only: bool = False,
3647-
rows_processed: t.Optional[int] = None,
3648+
execution_stats: t.Optional[QueryExecutionStats] = None,
36483649
) -> None:
36493650
view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id]
36503651

@@ -3814,7 +3815,7 @@ def update_snapshot_evaluation_progress(
38143815
num_audits_passed: int,
38153816
num_audits_failed: int,
38163817
audit_only: bool = False,
3817-
rows_processed: t.Optional[int] = None,
3818+
execution_stats: t.Optional[QueryExecutionStats] = None,
38183819
) -> None:
38193820
message = f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
38203821

@@ -4145,11 +4146,27 @@ def _format_evaluation_model_interval(snapshot: Snapshot, interval: Interval) ->
41454146

41464147

41474148
def _create_evaluation_model_annotation(
4148-
snapshot: Snapshot, interval_info: t.Optional[str], rows_processed: t.Optional[int]
4149+
snapshot: Snapshot,
4150+
interval_info: t.Optional[str],
4151+
execution_stats: t.Optional[QueryExecutionStats],
41494152
) -> str:
41504153
annotation = None
4151-
num_rows_processed = str(rows_processed) if rows_processed else ""
4152-
rows_processed_str = f" ({num_rows_processed} rows)" if num_rows_processed else ""
4154+
execution_stats_str = ""
4155+
if execution_stats:
4156+
rows_processed = execution_stats.total_rows_processed
4157+
execution_stats_str += (
4158+
f"{_abbreviate_integer_count(rows_processed)} row{'s' if rows_processed > 1 else ''}"
4159+
if rows_processed
4160+
else ""
4161+
)
4162+
4163+
bytes_processed = execution_stats.total_bytes_processed
4164+
execution_stats_str += (
4165+
f"{', ' if execution_stats_str else ''}{_format_bytes(bytes_processed)}"
4166+
if bytes_processed
4167+
else ""
4168+
)
4169+
execution_stats_str = f" ({execution_stats_str})" if execution_stats_str else ""
41534170

41544171
if snapshot.is_audit:
41554172
annotation = "run standalone audit"
@@ -4159,30 +4176,32 @@ def _create_evaluation_model_annotation(
41594176
if snapshot.model.kind.is_view:
41604177
annotation = "recreate view"
41614178
if snapshot.model.kind.is_seed:
4162-
annotation = f"insert seed file{rows_processed_str}"
4179+
annotation = f"insert seed file{execution_stats_str}"
41634180
if snapshot.model.kind.is_full:
4164-
annotation = f"full refresh{rows_processed_str}"
4181+
annotation = f"full refresh{execution_stats_str}"
41654182
if snapshot.model.kind.is_incremental_by_unique_key:
4166-
annotation = f"insert/update rows{rows_processed_str}"
4183+
annotation = f"insert/update rows{execution_stats_str}"
41674184
if snapshot.model.kind.is_incremental_by_partition:
4168-
annotation = f"insert partitions{rows_processed_str}"
4185+
annotation = f"insert partitions{execution_stats_str}"
41694186

41704187
if annotation:
41714188
return annotation
41724189

4173-
return f"{interval_info}{rows_processed_str}" if interval_info else ""
4190+
return f"{interval_info}{execution_stats_str}" if interval_info else ""
41744191

41754192

41764193
def _calculate_interval_str_len(
4177-
snapshot: Snapshot, intervals: t.List[Interval], rows_processed: t.Optional[int] = None
4194+
snapshot: Snapshot,
4195+
intervals: t.List[Interval],
4196+
execution_stats: t.Optional[QueryExecutionStats] = None,
41784197
) -> int:
41794198
interval_str_len = 0
41804199
for interval in intervals:
41814200
interval_str_len = max(
41824201
interval_str_len,
41834202
len(
41844203
_create_evaluation_model_annotation(
4185-
snapshot, _format_evaluation_model_interval(snapshot, interval), rows_processed
4204+
snapshot, _format_evaluation_model_interval(snapshot, interval), execution_stats
41864205
)
41874206
),
41884207
)
@@ -4237,14 +4256,50 @@ def _calculate_audit_str_len(snapshot: Snapshot, audit_padding: int = 0) -> int:
42374256
def _calculate_annotation_str_len(
42384257
batched_intervals: t.Dict[Snapshot, t.List[Interval]],
42394258
audit_padding: int = 0,
4240-
rows_processed_len: int = 0,
4259+
execution_stats_len: int = 0,
42414260
) -> int:
42424261
annotation_str_len = 0
42434262
for snapshot, intervals in batched_intervals.items():
42444263
annotation_str_len = max(
42454264
annotation_str_len,
42464265
_calculate_interval_str_len(snapshot, intervals)
42474266
+ _calculate_audit_str_len(snapshot, audit_padding)
4248-
+ rows_processed_len,
4267+
+ execution_stats_len,
42494268
)
42504269
return annotation_str_len
4270+
4271+
4272+
# Convert number of bytes to a human-readable string
4273+
# https://github.com/dbt-labs/dbt-adapters/blob/34fd178539dcb6f82e18e738adc03de7784c032f/dbt-bigquery/src/dbt/adapters/bigquery/connections.py#L165
4274+
def _format_bytes(num_bytes: t.Optional[int]) -> str:
4275+
if num_bytes and num_bytes > 0:
4276+
if num_bytes < 1024:
4277+
return f"{num_bytes} Bytes"
4278+
4279+
num_bytes_float = float(num_bytes) / 1024.0
4280+
for unit in ["KiB", "MiB", "GiB", "TiB", "PiB"]:
4281+
if num_bytes_float < 1024.0:
4282+
return f"{num_bytes_float:3.1f} {unit}"
4283+
num_bytes_float /= 1024.0
4284+
4285+
num_bytes_float *= 1024.0 # undo last division in loop
4286+
return f"{num_bytes_float:3.1f} {unit}"
4287+
return ""
4288+
4289+
4290+
# Abbreviate integer count. Example: 1,000,000,000 -> 1b
4291+
# https://github.com/dbt-labs/dbt-adapters/blob/34fd178539dcb6f82e18e738adc03de7784c032f/dbt-bigquery/src/dbt/adapters/bigquery/connections.py#L178
4292+
def _abbreviate_integer_count(count: t.Optional[int]) -> str:
4293+
if count and count > 0:
4294+
if count < 1000:
4295+
return str(count)
4296+
4297+
count_float = float(count) / 1000.0
4298+
for unit in ["k", "m", "b", "t"]:
4299+
if count_float < 1000.0:
4300+
return f"{count_float:3.1f}{unit}".strip()
4301+
count_float /= 1000.0
4302+
4303+
count_float *= 1000.0 # undo last division in loop
4304+
return f"{count_float:3.1f}{unit}".strip()
4305+
return ""

sqlmesh/core/engine_adapter/base.py

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -830,7 +830,7 @@ def _create_table_from_source_queries(
830830
table_description: t.Optional[str] = None,
831831
column_descriptions: t.Optional[t.Dict[str, str]] = None,
832832
table_kind: t.Optional[str] = None,
833-
track_row_count: bool = True,
833+
track_execution_stats: bool = True,
834834
**kwargs: t.Any,
835835
) -> None:
836836
table = exp.to_table(table_name)
@@ -876,15 +876,15 @@ def _create_table_from_source_queries(
876876
replace=replace,
877877
table_description=table_description,
878878
table_kind=table_kind,
879-
track_row_count=track_row_count,
879+
track_execution_stats=track_execution_stats,
880880
**kwargs,
881881
)
882882
else:
883883
self._insert_append_query(
884884
table_name,
885885
query,
886886
target_columns_to_types or self.columns(table),
887-
track_row_count=track_row_count,
887+
track_execution_stats=track_execution_stats,
888888
)
889889

890890
# Register comments with commands if the engine supports comments and we weren't able to
@@ -908,7 +908,7 @@ def _create_table(
908908
table_description: t.Optional[str] = None,
909909
column_descriptions: t.Optional[t.Dict[str, str]] = None,
910910
table_kind: t.Optional[str] = None,
911-
track_row_count: bool = True,
911+
track_execution_stats: bool = True,
912912
**kwargs: t.Any,
913913
) -> None:
914914
self.execute(
@@ -926,7 +926,7 @@ def _create_table(
926926
table_kind=table_kind,
927927
**kwargs,
928928
),
929-
track_row_count=track_row_count,
929+
track_execution_stats=track_execution_stats,
930930
)
931931

932932
def _build_create_table_exp(
@@ -1401,7 +1401,7 @@ def insert_append(
14011401
table_name: TableName,
14021402
query_or_df: QueryOrDF,
14031403
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1404-
track_row_count: bool = True,
1404+
track_execution_stats: bool = True,
14051405
source_columns: t.Optional[t.List[str]] = None,
14061406
) -> None:
14071407
source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types(
@@ -1411,22 +1411,25 @@ def insert_append(
14111411
source_columns=source_columns,
14121412
)
14131413
self._insert_append_source_queries(
1414-
table_name, source_queries, target_columns_to_types, track_row_count
1414+
table_name, source_queries, target_columns_to_types, track_execution_stats
14151415
)
14161416

14171417
def _insert_append_source_queries(
14181418
self,
14191419
table_name: TableName,
14201420
source_queries: t.List[SourceQuery],
14211421
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1422-
track_row_count: bool = True,
1422+
track_execution_stats: bool = True,
14231423
) -> None:
14241424
with self.transaction(condition=len(source_queries) > 0):
14251425
target_columns_to_types = target_columns_to_types or self.columns(table_name)
14261426
for source_query in source_queries:
14271427
with source_query as query:
14281428
self._insert_append_query(
1429-
table_name, query, target_columns_to_types, track_row_count=track_row_count
1429+
table_name,
1430+
query,
1431+
target_columns_to_types,
1432+
track_execution_stats=track_execution_stats,
14301433
)
14311434

14321435
def _insert_append_query(
@@ -1435,13 +1438,13 @@ def _insert_append_query(
14351438
query: Query,
14361439
target_columns_to_types: t.Dict[str, exp.DataType],
14371440
order_projections: bool = True,
1438-
track_row_count: bool = True,
1441+
track_execution_stats: bool = True,
14391442
) -> None:
14401443
if order_projections:
14411444
query = self._order_projections_and_filter(query, target_columns_to_types)
14421445
self.execute(
14431446
exp.insert(query, table_name, columns=list(target_columns_to_types)),
1444-
track_row_count=track_row_count,
1447+
track_execution_stats=track_execution_stats,
14451448
)
14461449

14471450
def insert_overwrite_by_partition(
@@ -1584,7 +1587,7 @@ def _insert_overwrite_by_condition(
15841587
)
15851588
if insert_overwrite_strategy.is_replace_where:
15861589
insert_exp.set("where", where or exp.true())
1587-
self.execute(insert_exp, track_row_count=True)
1590+
self.execute(insert_exp, track_execution_stats=True)
15881591

15891592
def update_table(
15901593
self,
@@ -1605,7 +1608,9 @@ def _merge(
16051608
using = exp.alias_(
16061609
exp.Subquery(this=query), alias=MERGE_SOURCE_ALIAS, copy=False, table=True
16071610
)
1608-
self.execute(exp.Merge(this=this, using=using, on=on, whens=whens), track_row_count=True)
1611+
self.execute(
1612+
exp.Merge(this=this, using=using, on=on, whens=whens), track_execution_stats=True
1613+
)
16091614

16101615
def scd_type_2_by_time(
16111616
self,
@@ -2354,7 +2359,7 @@ def execute(
23542359
expressions: t.Union[str, exp.Expression, t.Sequence[exp.Expression]],
23552360
ignore_unsupported_errors: bool = False,
23562361
quote_identifiers: bool = True,
2357-
track_row_count: bool = False,
2362+
track_execution_stats: bool = False,
23582363
**kwargs: t.Any,
23592364
) -> None:
23602365
"""Execute a sql query."""
@@ -2376,7 +2381,7 @@ def execute(
23762381
expression=e if isinstance(e, exp.Expression) else None,
23772382
quote_identifiers=quote_identifiers,
23782383
)
2379-
self._execute(sql, track_row_count, **kwargs)
2384+
self._execute(sql, track_execution_stats, **kwargs)
23802385

23812386
def _attach_correlation_id(self, sql: str) -> str:
23822387
if self.ATTACH_CORRELATION_ID and self.correlation_id:
@@ -2401,12 +2406,12 @@ def _log_sql(
24012406

24022407
logger.log(self._execute_log_level, "Executing SQL: %s", sql_to_log)
24032408

2404-
def _execute(self, sql: str, track_row_count: bool = False, **kwargs: t.Any) -> None:
2409+
def _execute(self, sql: str, track_execution_stats: bool = False, **kwargs: t.Any) -> None:
24052410
self.cursor.execute(sql, **kwargs)
24062411

24072412
if (
24082413
self.SUPPORTS_QUERY_EXECUTION_TRACKING
2409-
and track_row_count
2414+
and track_execution_stats
24102415
and QueryExecutionTracker.is_tracking()
24112416
):
24122417
rowcount_raw = getattr(self.cursor, "rowcount", None)
@@ -2417,7 +2422,7 @@ def _execute(self, sql: str, track_row_count: bool = False, **kwargs: t.Any) ->
24172422
except (TypeError, ValueError):
24182423
pass
24192424

2420-
QueryExecutionTracker.record_execution(sql, rowcount)
2425+
QueryExecutionTracker.record_execution(sql, rowcount, None)
24212426

24222427
@contextlib.contextmanager
24232428
def temp_table(
@@ -2463,7 +2468,7 @@ def temp_table(
24632468
exists=True,
24642469
table_description=None,
24652470
column_descriptions=None,
2466-
track_row_count=False,
2471+
track_execution_stats=False,
24672472
**kwargs,
24682473
)
24692474

@@ -2715,7 +2720,7 @@ def _replace_by_key(
27152720
insert_statement.set("where", delete_filter)
27162721
insert_statement.set("this", exp.to_table(target_table))
27172722

2718-
self.execute(insert_statement, track_row_count=True)
2723+
self.execute(insert_statement, track_execution_stats=True)
27192724
finally:
27202725
self.drop_table(temp_table)
27212726

0 commit comments

Comments
 (0)