Skip to content

Commit 8b49154

Browse files
committed
Add rows tracking
1 parent 5e59d18 commit 8b49154

File tree

20 files changed

+451
-82
lines changed

20 files changed

+451
-82
lines changed

sqlmesh/core/console.py

Lines changed: 52 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,7 @@ def update_snapshot_evaluation_progress(
428428
num_audits_passed: int,
429429
num_audits_failed: int,
430430
audit_only: bool = False,
431+
rows_processed: t.Optional[int] = None,
431432
) -> None:
432433
"""Updates the snapshot evaluation progress."""
433434

@@ -575,6 +576,7 @@ def update_snapshot_evaluation_progress(
575576
num_audits_passed: int,
576577
num_audits_failed: int,
577578
audit_only: bool = False,
579+
rows_processed: t.Optional[int] = None,
578580
) -> None:
579581
pass
580582

@@ -1011,7 +1013,9 @@ def start_evaluation_progress(
10111013

10121014
# determine column widths
10131015
self.evaluation_column_widths["annotation"] = (
1014-
_calculate_annotation_str_len(batched_intervals, self.AUDIT_PADDING)
1016+
_calculate_annotation_str_len(
1017+
batched_intervals, self.AUDIT_PADDING, len(" (XXXXXX rows processed)")
1018+
)
10151019
+ 3 # brackets and opening escape backslash
10161020
)
10171021
self.evaluation_column_widths["name"] = max(
@@ -1056,6 +1060,7 @@ def update_snapshot_evaluation_progress(
10561060
num_audits_passed: int,
10571061
num_audits_failed: int,
10581062
audit_only: bool = False,
1063+
rows_processed: t.Optional[int] = None,
10591064
) -> None:
10601065
"""Update the snapshot evaluation progress."""
10611066
if (
@@ -1075,7 +1080,7 @@ def update_snapshot_evaluation_progress(
10751080
).ljust(self.evaluation_column_widths["name"])
10761081

10771082
annotation = _create_evaluation_model_annotation(
1078-
snapshot, _format_evaluation_model_interval(snapshot, interval)
1083+
snapshot, _format_evaluation_model_interval(snapshot, interval), rows_processed
10791084
)
10801085
audits_str = ""
10811086
if num_audits_passed:
@@ -3639,6 +3644,7 @@ def update_snapshot_evaluation_progress(
36393644
num_audits_passed: int,
36403645
num_audits_failed: int,
36413646
audit_only: bool = False,
3647+
rows_processed: t.Optional[int] = None,
36423648
) -> None:
36433649
view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id]
36443650

@@ -3808,6 +3814,7 @@ def update_snapshot_evaluation_progress(
38083814
num_audits_passed: int,
38093815
num_audits_failed: int,
38103816
audit_only: bool = False,
3817+
rows_processed: t.Optional[int] = None,
38113818
) -> None:
38123819
message = f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
38133820

@@ -3988,7 +3995,8 @@ def show_table_diff_summary(self, table_diff: TableDiff) -> None:
39883995
self._write(f"Join On: {keys}")
39893996

39903997

3991-
_CONSOLE: Console = NoopConsole()
3998+
# _CONSOLE: Console = NoopConsole()
3999+
_CONSOLE: Console = TerminalConsole()
39924000

39934001

39944002
def set_console(console: Console) -> None:
@@ -4135,33 +4143,49 @@ def _format_evaluation_model_interval(snapshot: Snapshot, interval: Interval) ->
41354143
return ""
41364144

41374145

4138-
def _create_evaluation_model_annotation(snapshot: Snapshot, interval_info: t.Optional[str]) -> str:
4146+
def _create_evaluation_model_annotation(
4147+
snapshot: Snapshot, interval_info: t.Optional[str], rows_processed: t.Optional[int]
4148+
) -> str:
4149+
annotation = None
4150+
num_rows_processed = str(rows_processed) if rows_processed else ""
4151+
rows_processed_str = f" ({num_rows_processed} rows processed)" if num_rows_processed else ""
4152+
41394153
if snapshot.is_audit:
4140-
return "run standalone audit"
4141-
if snapshot.is_model and snapshot.model.kind.is_external:
4142-
return "run external audits"
4143-
if snapshot.model.kind.is_seed:
4144-
return "insert seed file"
4145-
if snapshot.model.kind.is_full:
4146-
return "full refresh"
4147-
if snapshot.model.kind.is_view:
4148-
return "recreate view"
4149-
if snapshot.model.kind.is_incremental_by_unique_key:
4150-
return "insert/update rows"
4151-
if snapshot.model.kind.is_incremental_by_partition:
4152-
return "insert partitions"
4153-
4154-
return interval_info if interval_info else ""
4155-
4156-
4157-
def _calculate_interval_str_len(snapshot: Snapshot, intervals: t.List[Interval]) -> int:
4154+
annotation = "run standalone audit"
4155+
if snapshot.is_model:
4156+
if snapshot.model.kind.is_external:
4157+
annotation = "run external audits"
4158+
if snapshot.model.kind.is_view:
4159+
annotation = "recreate view"
4160+
if snapshot.model.kind.is_seed:
4161+
# no "processed" for seeds
4162+
seed_num_rows_inserted = (
4163+
f" ({num_rows_processed} rows inserted)" if num_rows_processed else ""
4164+
)
4165+
annotation = f"insert seed file{seed_num_rows_inserted}"
4166+
if snapshot.model.kind.is_full:
4167+
annotation = f"full refresh{rows_processed_str}"
4168+
if snapshot.model.kind.is_incremental_by_unique_key:
4169+
annotation = f"insert/update rows{rows_processed_str}"
4170+
if snapshot.model.kind.is_incremental_by_partition:
4171+
annotation = f"insert partitions{rows_processed_str}"
4172+
4173+
if annotation:
4174+
return annotation
4175+
4176+
return f"{interval_info}{rows_processed_str}" if interval_info else ""
4177+
4178+
4179+
def _calculate_interval_str_len(
4180+
snapshot: Snapshot, intervals: t.List[Interval], rows_processed: t.Optional[int] = None
4181+
) -> int:
41584182
interval_str_len = 0
41594183
for interval in intervals:
41604184
interval_str_len = max(
41614185
interval_str_len,
41624186
len(
41634187
_create_evaluation_model_annotation(
4164-
snapshot, _format_evaluation_model_interval(snapshot, interval)
4188+
snapshot, _format_evaluation_model_interval(snapshot, interval), rows_processed
41654189
)
41664190
),
41674191
)
@@ -4214,13 +4238,16 @@ def _calculate_audit_str_len(snapshot: Snapshot, audit_padding: int = 0) -> int:
42144238

42154239

42164240
def _calculate_annotation_str_len(
4217-
batched_intervals: t.Dict[Snapshot, t.List[Interval]], audit_padding: int = 0
4241+
batched_intervals: t.Dict[Snapshot, t.List[Interval]],
4242+
audit_padding: int = 0,
4243+
rows_processed_len: int = 0,
42184244
) -> int:
42194245
annotation_str_len = 0
42204246
for snapshot, intervals in batched_intervals.items():
42214247
annotation_str_len = max(
42224248
annotation_str_len,
42234249
_calculate_interval_str_len(snapshot, intervals)
4224-
+ _calculate_audit_str_len(snapshot, audit_padding),
4250+
+ _calculate_audit_str_len(snapshot, audit_padding)
4251+
+ rows_processed_len,
42254252
)
42264253
return annotation_str_len

sqlmesh/core/engine_adapter/base.py

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
)
4141
from sqlmesh.core.model.kind import TimeColumn
4242
from sqlmesh.core.schema_diff import SchemaDiffer
43+
from sqlmesh.core.execution_tracker import record_execution as track_execution_record
4344
from sqlmesh.utils import (
4445
CorrelationId,
4546
columns_to_types_all_known,
@@ -828,6 +829,7 @@ def _create_table_from_source_queries(
828829
table_description: t.Optional[str] = None,
829830
column_descriptions: t.Optional[t.Dict[str, str]] = None,
830831
table_kind: t.Optional[str] = None,
832+
track_row_count: bool = True,
831833
**kwargs: t.Any,
832834
) -> None:
833835
table = exp.to_table(table_name)
@@ -873,11 +875,15 @@ def _create_table_from_source_queries(
873875
replace=replace,
874876
table_description=table_description,
875877
table_kind=table_kind,
878+
track_row_count=track_row_count,
876879
**kwargs,
877880
)
878881
else:
879882
self._insert_append_query(
880-
table_name, query, target_columns_to_types or self.columns(table)
883+
table_name,
884+
query,
885+
target_columns_to_types or self.columns(table),
886+
track_row_count=track_row_count,
881887
)
882888

883889
# Register comments with commands if the engine supports comments and we weren't able to
@@ -901,6 +907,7 @@ def _create_table(
901907
table_description: t.Optional[str] = None,
902908
column_descriptions: t.Optional[t.Dict[str, str]] = None,
903909
table_kind: t.Optional[str] = None,
910+
track_row_count: bool = True,
904911
**kwargs: t.Any,
905912
) -> None:
906913
self.execute(
@@ -917,7 +924,8 @@ def _create_table(
917924
),
918925
table_kind=table_kind,
919926
**kwargs,
920-
)
927+
),
928+
track_row_count=track_row_count,
921929
)
922930

923931
def _build_create_table_exp(
@@ -1392,6 +1400,7 @@ def insert_append(
13921400
table_name: TableName,
13931401
query_or_df: QueryOrDF,
13941402
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1403+
track_row_count: bool = True,
13951404
source_columns: t.Optional[t.List[str]] = None,
13961405
) -> None:
13971406
source_queries, target_columns_to_types = self._get_source_queries_and_columns_to_types(
@@ -1400,30 +1409,39 @@ def insert_append(
14001409
target_table=table_name,
14011410
source_columns=source_columns,
14021411
)
1403-
self._insert_append_source_queries(table_name, source_queries, target_columns_to_types)
1412+
self._insert_append_source_queries(
1413+
table_name, source_queries, target_columns_to_types, track_row_count
1414+
)
14041415

14051416
def _insert_append_source_queries(
14061417
self,
14071418
table_name: TableName,
14081419
source_queries: t.List[SourceQuery],
14091420
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1421+
track_row_count: bool = True,
14101422
) -> None:
14111423
with self.transaction(condition=len(source_queries) > 0):
14121424
target_columns_to_types = target_columns_to_types or self.columns(table_name)
14131425
for source_query in source_queries:
14141426
with source_query as query:
1415-
self._insert_append_query(table_name, query, target_columns_to_types)
1427+
self._insert_append_query(
1428+
table_name, query, target_columns_to_types, track_row_count=track_row_count
1429+
)
14161430

14171431
def _insert_append_query(
14181432
self,
14191433
table_name: TableName,
14201434
query: Query,
14211435
target_columns_to_types: t.Dict[str, exp.DataType],
14221436
order_projections: bool = True,
1437+
track_row_count: bool = True,
14231438
) -> None:
14241439
if order_projections:
14251440
query = self._order_projections_and_filter(query, target_columns_to_types)
1426-
self.execute(exp.insert(query, table_name, columns=list(target_columns_to_types)))
1441+
self.execute(
1442+
exp.insert(query, table_name, columns=list(target_columns_to_types)),
1443+
track_row_count=track_row_count,
1444+
)
14271445

14281446
def insert_overwrite_by_partition(
14291447
self,
@@ -1565,7 +1583,7 @@ def _insert_overwrite_by_condition(
15651583
)
15661584
if insert_overwrite_strategy.is_replace_where:
15671585
insert_exp.set("where", where or exp.true())
1568-
self.execute(insert_exp)
1586+
self.execute(insert_exp, track_row_count=True)
15691587

15701588
def update_table(
15711589
self,
@@ -1586,7 +1604,7 @@ def _merge(
15861604
using = exp.alias_(
15871605
exp.Subquery(this=query), alias=MERGE_SOURCE_ALIAS, copy=False, table=True
15881606
)
1589-
self.execute(exp.Merge(this=this, using=using, on=on, whens=whens))
1607+
self.execute(exp.Merge(this=this, using=using, on=on, whens=whens), track_row_count=True)
15901608

15911609
def scd_type_2_by_time(
15921610
self,
@@ -2335,6 +2353,7 @@ def execute(
23352353
expressions: t.Union[str, exp.Expression, t.Sequence[exp.Expression]],
23362354
ignore_unsupported_errors: bool = False,
23372355
quote_identifiers: bool = True,
2356+
track_row_count: bool = False,
23382357
**kwargs: t.Any,
23392358
) -> None:
23402359
"""Execute a sql query."""
@@ -2356,7 +2375,7 @@ def execute(
23562375
expression=e if isinstance(e, exp.Expression) else None,
23572376
quote_identifiers=quote_identifiers,
23582377
)
2359-
self._execute(sql, **kwargs)
2378+
self._execute(sql, track_row_count, **kwargs)
23602379

23612380
def _attach_correlation_id(self, sql: str) -> str:
23622381
if self.ATTACH_CORRELATION_ID and self.correlation_id:
@@ -2381,9 +2400,20 @@ def _log_sql(
23812400

23822401
logger.log(self._execute_log_level, "Executing SQL: %s", sql_to_log)
23832402

2384-
def _execute(self, sql: str, **kwargs: t.Any) -> None:
2403+
def _execute(self, sql: str, track_row_count: bool = False, **kwargs: t.Any) -> None:
23852404
self.cursor.execute(sql, **kwargs)
23862405

2406+
if track_row_count:
2407+
rowcount_raw = getattr(self.cursor, "rowcount", None)
2408+
rowcount = None
2409+
if rowcount_raw is not None:
2410+
try:
2411+
rowcount = int(rowcount_raw)
2412+
except (TypeError, ValueError):
2413+
pass
2414+
2415+
track_execution_record(sql, rowcount)
2416+
23872417
@contextlib.contextmanager
23882418
def temp_table(
23892419
self,
@@ -2428,6 +2458,7 @@ def temp_table(
24282458
exists=True,
24292459
table_description=None,
24302460
column_descriptions=None,
2461+
track_row_count=False,
24312462
**kwargs,
24322463
)
24332464

@@ -2679,7 +2710,7 @@ def _replace_by_key(
26792710
insert_statement.set("where", delete_filter)
26802711
insert_statement.set("this", exp.to_table(target_table))
26812712

2682-
self.execute(insert_statement)
2713+
self.execute(insert_statement, track_row_count=True)
26832714
finally:
26842715
self.drop_table(temp_table)
26852716

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
SourceQuery,
2121
set_catalog,
2222
)
23+
from sqlmesh.core.execution_tracker import record_execution as track_execution_record
2324
from sqlmesh.core.node import IntervalUnit
2425
from sqlmesh.core.schema_diff import SchemaDiffer
2526
from sqlmesh.utils import optional_import, get_source_columns_to_types
@@ -1049,6 +1050,7 @@ def _db_call(self, func: t.Callable[..., t.Any], *args: t.Any, **kwargs: t.Any)
10491050
def _execute(
10501051
self,
10511052
sql: str,
1053+
track_row_count: bool = False,
10521054
**kwargs: t.Any,
10531055
) -> None:
10541056
"""Execute a sql query."""
@@ -1094,6 +1096,9 @@ def _execute(
10941096
self.cursor._set_rowcount(query_results)
10951097
self.cursor._set_description(query_results.schema)
10961098

1099+
if track_row_count:
1100+
track_execution_record(sql, query_results.total_rows)
1101+
10971102
def _get_data_objects(
10981103
self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
10991104
) -> t.List[DataObject]:

sqlmesh/core/engine_adapter/clickhouse.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ def _insert_overwrite_by_condition(
294294
)
295295

296296
try:
297-
self.execute(existing_records_insert_exp)
297+
self.execute(existing_records_insert_exp, track_row_count=True)
298298
finally:
299299
if table_partition_exp:
300300
self.drop_table(partitions_temp_table_name)
@@ -489,6 +489,7 @@ def _create_table(
489489
table_description: t.Optional[str] = None,
490490
column_descriptions: t.Optional[t.Dict[str, str]] = None,
491491
table_kind: t.Optional[str] = None,
492+
track_row_count: bool = True,
492493
**kwargs: t.Any,
493494
) -> None:
494495
"""Creates a table in the database.
@@ -525,6 +526,7 @@ def _create_table(
525526
column_descriptions,
526527
table_kind,
527528
empty_ctas=(self.engine_run_mode.is_cloud and expression is not None),
529+
track_row_count=track_row_count,
528530
**kwargs,
529531
)
530532

0 commit comments

Comments
 (0)