Skip to content

Commit 7c46f70

Browse files
committed
Add rows tracking
1 parent 4d943a6 commit 7c46f70

File tree

20 files changed

+466
-96
lines changed

20 files changed

+466
-96
lines changed

sqlmesh/core/console.py

Lines changed: 52 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -428,6 +428,7 @@ def update_snapshot_evaluation_progress(
428428
num_audits_passed: int,
429429
num_audits_failed: int,
430430
audit_only: bool = False,
431+
rows_processed: t.Optional[int] = None,
431432
) -> None:
432433
"""Updates the snapshot evaluation progress."""
433434

@@ -575,6 +576,7 @@ def update_snapshot_evaluation_progress(
575576
num_audits_passed: int,
576577
num_audits_failed: int,
577578
audit_only: bool = False,
579+
rows_processed: t.Optional[int] = None,
578580
) -> None:
579581
pass
580582

@@ -1011,7 +1013,9 @@ def start_evaluation_progress(
10111013

10121014
# determine column widths
10131015
self.evaluation_column_widths["annotation"] = (
1014-
_calculate_annotation_str_len(batched_intervals, self.AUDIT_PADDING)
1016+
_calculate_annotation_str_len(
1017+
batched_intervals, self.AUDIT_PADDING, len(" (XXXXXX rows processed)")
1018+
)
10151019
+ 3 # brackets and opening escape backslash
10161020
)
10171021
self.evaluation_column_widths["name"] = max(
@@ -1056,6 +1060,7 @@ def update_snapshot_evaluation_progress(
10561060
num_audits_passed: int,
10571061
num_audits_failed: int,
10581062
audit_only: bool = False,
1063+
rows_processed: t.Optional[int] = None,
10591064
) -> None:
10601065
"""Update the snapshot evaluation progress."""
10611066
if (
@@ -1075,7 +1080,7 @@ def update_snapshot_evaluation_progress(
10751080
).ljust(self.evaluation_column_widths["name"])
10761081

10771082
annotation = _create_evaluation_model_annotation(
1078-
snapshot, _format_evaluation_model_interval(snapshot, interval)
1083+
snapshot, _format_evaluation_model_interval(snapshot, interval), rows_processed
10791084
)
10801085
audits_str = ""
10811086
if num_audits_passed:
@@ -3639,6 +3644,7 @@ def update_snapshot_evaluation_progress(
36393644
num_audits_passed: int,
36403645
num_audits_failed: int,
36413646
audit_only: bool = False,
3647+
rows_processed: t.Optional[int] = None,
36423648
) -> None:
36433649
view_name, loaded_batches = self.evaluation_batch_progress[snapshot.snapshot_id]
36443650

@@ -3808,6 +3814,7 @@ def update_snapshot_evaluation_progress(
38083814
num_audits_passed: int,
38093815
num_audits_failed: int,
38103816
audit_only: bool = False,
3817+
rows_processed: t.Optional[int] = None,
38113818
) -> None:
38123819
message = f"Evaluating {snapshot.name} | batch={batch_idx} | duration={duration_ms}ms | num_audits_passed={num_audits_passed} | num_audits_failed={num_audits_failed}"
38133820

@@ -3988,7 +3995,8 @@ def show_table_diff_summary(self, table_diff: TableDiff) -> None:
39883995
self._write(f"Join On: {keys}")
39893996

39903997

3991-
_CONSOLE: Console = NoopConsole()
3998+
# _CONSOLE: Console = NoopConsole()
3999+
_CONSOLE: Console = TerminalConsole()
39924000

39934001

39944002
def set_console(console: Console) -> None:
@@ -4135,33 +4143,49 @@ def _format_evaluation_model_interval(snapshot: Snapshot, interval: Interval) ->
41354143
return ""
41364144

41374145

4138-
def _create_evaluation_model_annotation(snapshot: Snapshot, interval_info: t.Optional[str]) -> str:
4146+
def _create_evaluation_model_annotation(
4147+
snapshot: Snapshot, interval_info: t.Optional[str], rows_processed: t.Optional[int]
4148+
) -> str:
4149+
annotation = None
4150+
num_rows_processed = str(rows_processed) if rows_processed else ""
4151+
rows_processed_str = f" ({num_rows_processed} rows processed)" if num_rows_processed else ""
4152+
41394153
if snapshot.is_audit:
4140-
return "run standalone audit"
4141-
if snapshot.is_model and snapshot.model.kind.is_external:
4142-
return "run external audits"
4143-
if snapshot.model.kind.is_seed:
4144-
return "insert seed file"
4145-
if snapshot.model.kind.is_full:
4146-
return "full refresh"
4147-
if snapshot.model.kind.is_view:
4148-
return "recreate view"
4149-
if snapshot.model.kind.is_incremental_by_unique_key:
4150-
return "insert/update rows"
4151-
if snapshot.model.kind.is_incremental_by_partition:
4152-
return "insert partitions"
4153-
4154-
return interval_info if interval_info else ""
4155-
4156-
4157-
def _calculate_interval_str_len(snapshot: Snapshot, intervals: t.List[Interval]) -> int:
4154+
annotation = "run standalone audit"
4155+
if snapshot.is_model:
4156+
if snapshot.model.kind.is_external:
4157+
annotation = "run external audits"
4158+
if snapshot.model.kind.is_view:
4159+
annotation = "recreate view"
4160+
if snapshot.model.kind.is_seed:
4161+
# no "processed" for seeds
4162+
seed_num_rows_inserted = (
4163+
f" ({num_rows_processed} rows inserted)" if num_rows_processed else ""
4164+
)
4165+
annotation = f"insert seed file{seed_num_rows_inserted}"
4166+
if snapshot.model.kind.is_full:
4167+
annotation = f"full refresh{rows_processed_str}"
4168+
if snapshot.model.kind.is_incremental_by_unique_key:
4169+
annotation = f"insert/update rows{rows_processed_str}"
4170+
if snapshot.model.kind.is_incremental_by_partition:
4171+
annotation = f"insert partitions{rows_processed_str}"
4172+
4173+
if annotation:
4174+
return annotation
4175+
4176+
return f"{interval_info}{rows_processed_str}" if interval_info else ""
4177+
4178+
4179+
def _calculate_interval_str_len(
4180+
snapshot: Snapshot, intervals: t.List[Interval], rows_processed: t.Optional[int] = None
4181+
) -> int:
41584182
interval_str_len = 0
41594183
for interval in intervals:
41604184
interval_str_len = max(
41614185
interval_str_len,
41624186
len(
41634187
_create_evaluation_model_annotation(
4164-
snapshot, _format_evaluation_model_interval(snapshot, interval)
4188+
snapshot, _format_evaluation_model_interval(snapshot, interval), rows_processed
41654189
)
41664190
),
41674191
)
@@ -4214,13 +4238,16 @@ def _calculate_audit_str_len(snapshot: Snapshot, audit_padding: int = 0) -> int:
42144238

42154239

42164240
def _calculate_annotation_str_len(
4217-
batched_intervals: t.Dict[Snapshot, t.List[Interval]], audit_padding: int = 0
4241+
batched_intervals: t.Dict[Snapshot, t.List[Interval]],
4242+
audit_padding: int = 0,
4243+
rows_processed_len: int = 0,
42184244
) -> int:
42194245
annotation_str_len = 0
42204246
for snapshot, intervals in batched_intervals.items():
42214247
annotation_str_len = max(
42224248
annotation_str_len,
42234249
_calculate_interval_str_len(snapshot, intervals)
4224-
+ _calculate_audit_str_len(snapshot, audit_padding),
4250+
+ _calculate_audit_str_len(snapshot, audit_padding)
4251+
+ rows_processed_len,
42254252
)
42264253
return annotation_str_len

sqlmesh/core/engine_adapter/base.py

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
)
4141
from sqlmesh.core.model.kind import TimeColumn
4242
from sqlmesh.core.schema_diff import SchemaDiffer
43+
from sqlmesh.core.execution_tracker import record_execution as track_execution_record
4344
from sqlmesh.utils import CorrelationId, columns_to_types_all_known, random_id
4445
from sqlmesh.utils.connection_pool import ConnectionPool, create_connection_pool
4546
from sqlmesh.utils.date import TimeLike, make_inclusive, to_time_column
@@ -759,6 +760,7 @@ def _create_table_from_source_queries(
759760
table_description: t.Optional[str] = None,
760761
column_descriptions: t.Optional[t.Dict[str, str]] = None,
761762
table_kind: t.Optional[str] = None,
763+
track_row_count: bool = True,
762764
**kwargs: t.Any,
763765
) -> None:
764766
table = exp.to_table(table_name)
@@ -802,11 +804,15 @@ def _create_table_from_source_queries(
802804
replace=replace,
803805
table_description=table_description,
804806
table_kind=table_kind,
807+
track_row_count=track_row_count,
805808
**kwargs,
806809
)
807810
else:
808811
self._insert_append_query(
809-
table_name, query, columns_to_types or self.columns(table)
812+
table_name,
813+
query,
814+
columns_to_types or self.columns(table),
815+
track_row_count=track_row_count,
810816
)
811817

812818
# Register comments with commands if the engine supports comments and we weren't able to
@@ -830,6 +836,7 @@ def _create_table(
830836
table_description: t.Optional[str] = None,
831837
column_descriptions: t.Optional[t.Dict[str, str]] = None,
832838
table_kind: t.Optional[str] = None,
839+
track_row_count: bool = True,
833840
**kwargs: t.Any,
834841
) -> None:
835842
self.execute(
@@ -846,7 +853,8 @@ def _create_table(
846853
),
847854
table_kind=table_kind,
848855
**kwargs,
849-
)
856+
),
857+
track_row_count=track_row_count,
850858
)
851859

852860
def _build_create_table_exp(
@@ -1308,34 +1316,44 @@ def insert_append(
13081316
table_name: TableName,
13091317
query_or_df: QueryOrDF,
13101318
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1319+
track_row_count: bool = True,
13111320
) -> None:
13121321
source_queries, columns_to_types = self._get_source_queries_and_columns_to_types(
13131322
query_or_df, columns_to_types, target_table=table_name
13141323
)
1315-
self._insert_append_source_queries(table_name, source_queries, columns_to_types)
1324+
self._insert_append_source_queries(
1325+
table_name, source_queries, columns_to_types, track_row_count
1326+
)
13161327

13171328
def _insert_append_source_queries(
13181329
self,
13191330
table_name: TableName,
13201331
source_queries: t.List[SourceQuery],
13211332
columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
1333+
track_row_count: bool = True,
13221334
) -> None:
13231335
with self.transaction(condition=len(source_queries) > 0):
13241336
columns_to_types = columns_to_types or self.columns(table_name)
13251337
for source_query in source_queries:
13261338
with source_query as query:
1327-
self._insert_append_query(table_name, query, columns_to_types)
1339+
self._insert_append_query(
1340+
table_name, query, columns_to_types, track_row_count=track_row_count
1341+
)
13281342

13291343
def _insert_append_query(
13301344
self,
13311345
table_name: TableName,
13321346
query: Query,
13331347
columns_to_types: t.Dict[str, exp.DataType],
13341348
order_projections: bool = True,
1349+
track_row_count: bool = True,
13351350
) -> None:
13361351
if order_projections:
13371352
query = self._order_projections_and_filter(query, columns_to_types)
1338-
self.execute(exp.insert(query, table_name, columns=list(columns_to_types)))
1353+
self.execute(
1354+
exp.insert(query, table_name, columns=list(columns_to_types)),
1355+
track_row_count=track_row_count,
1356+
)
13391357

13401358
def insert_overwrite_by_partition(
13411359
self,
@@ -1459,7 +1477,7 @@ def _insert_overwrite_by_condition(
14591477
)
14601478
if insert_overwrite_strategy.is_replace_where:
14611479
insert_exp.set("where", where or exp.true())
1462-
self.execute(insert_exp)
1480+
self.execute(insert_exp, track_row_count=True)
14631481

14641482
def update_table(
14651483
self,
@@ -1480,7 +1498,7 @@ def _merge(
14801498
using = exp.alias_(
14811499
exp.Subquery(this=query), alias=MERGE_SOURCE_ALIAS, copy=False, table=True
14821500
)
1483-
self.execute(exp.Merge(this=this, using=using, on=on, whens=whens))
1501+
self.execute(exp.Merge(this=this, using=using, on=on, whens=whens), track_row_count=True)
14841502

14851503
def scd_type_2_by_time(
14861504
self,
@@ -2214,6 +2232,7 @@ def execute(
22142232
expressions: t.Union[str, exp.Expression, t.Sequence[exp.Expression]],
22152233
ignore_unsupported_errors: bool = False,
22162234
quote_identifiers: bool = True,
2235+
track_row_count: bool = False,
22172236
**kwargs: t.Any,
22182237
) -> None:
22192238
"""Execute a sql query."""
@@ -2235,7 +2254,7 @@ def execute(
22352254
expression=e if isinstance(e, exp.Expression) else None,
22362255
quote_identifiers=quote_identifiers,
22372256
)
2238-
self._execute(sql, **kwargs)
2257+
self._execute(sql, track_row_count, **kwargs)
22392258

22402259
def _attach_correlation_id(self, sql: str) -> str:
22412260
if self.ATTACH_CORRELATION_ID and self.correlation_id:
@@ -2260,9 +2279,20 @@ def _log_sql(
22602279

22612280
logger.log(self._execute_log_level, "Executing SQL: %s", sql_to_log)
22622281

2263-
def _execute(self, sql: str, **kwargs: t.Any) -> None:
2282+
def _execute(self, sql: str, track_row_count: bool = False, **kwargs: t.Any) -> None:
22642283
self.cursor.execute(sql, **kwargs)
22652284

2285+
if track_row_count:
2286+
rowcount_raw = getattr(self.cursor, "rowcount", None)
2287+
rowcount = None
2288+
if rowcount_raw is not None:
2289+
try:
2290+
rowcount = int(rowcount_raw)
2291+
except (TypeError, ValueError):
2292+
pass
2293+
2294+
track_execution_record(sql, rowcount)
2295+
22662296
@contextlib.contextmanager
22672297
def temp_table(
22682298
self,
@@ -2303,6 +2333,7 @@ def temp_table(
23032333
exists=True,
23042334
table_description=None,
23052335
column_descriptions=None,
2336+
track_row_count=False,
23062337
**kwargs,
23072338
)
23082339

@@ -2547,7 +2578,7 @@ def _replace_by_key(
25472578
insert_statement.set("where", delete_filter)
25482579
insert_statement.set("this", exp.to_table(target_table))
25492580

2550-
self.execute(insert_statement)
2581+
self.execute(insert_statement, track_row_count=True)
25512582
finally:
25522583
self.drop_table(temp_table)
25532584

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
SourceQuery,
2121
set_catalog,
2222
)
23+
from sqlmesh.core.execution_tracker import record_execution as track_execution_record
2324
from sqlmesh.core.node import IntervalUnit
2425
from sqlmesh.core.schema_diff import SchemaDiffer
2526
from sqlmesh.utils import optional_import
@@ -1036,6 +1037,7 @@ def _db_call(self, func: t.Callable[..., t.Any], *args: t.Any, **kwargs: t.Any)
10361037
def _execute(
10371038
self,
10381039
sql: str,
1040+
track_row_count: bool = False,
10391041
**kwargs: t.Any,
10401042
) -> None:
10411043
"""Execute a sql query."""
@@ -1081,6 +1083,9 @@ def _execute(
10811083
self.cursor._set_rowcount(query_results)
10821084
self.cursor._set_description(query_results.schema)
10831085

1086+
if track_row_count:
1087+
track_execution_record(sql, query_results.total_rows)
1088+
10841089
def _get_data_objects(
10851090
self, schema_name: SchemaName, object_names: t.Optional[t.Set[str]] = None
10861091
) -> t.List[DataObject]:

sqlmesh/core/engine_adapter/clickhouse.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ def _insert_overwrite_by_condition(
282282
)
283283

284284
try:
285-
self.execute(existing_records_insert_exp)
285+
self.execute(existing_records_insert_exp, track_row_count=True)
286286
finally:
287287
if table_partition_exp:
288288
self.drop_table(partitions_temp_table_name)
@@ -469,6 +469,7 @@ def _create_table(
469469
table_description: t.Optional[str] = None,
470470
column_descriptions: t.Optional[t.Dict[str, str]] = None,
471471
table_kind: t.Optional[str] = None,
472+
track_row_count: bool = True,
472473
**kwargs: t.Any,
473474
) -> None:
474475
"""Creates a table in the database.
@@ -505,6 +506,7 @@ def _create_table(
505506
column_descriptions,
506507
table_kind,
507508
empty_ctas=(self.engine_run_mode.is_cloud and expression is not None),
509+
track_row_count=track_row_count,
508510
**kwargs,
509511
)
510512

sqlmesh/core/engine_adapter/duckdb.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ def _create_table(
153153
table_description: t.Optional[str] = None,
154154
column_descriptions: t.Optional[t.Dict[str, str]] = None,
155155
table_kind: t.Optional[str] = None,
156+
track_row_count: bool = True,
156157
**kwargs: t.Any,
157158
) -> None:
158159
catalog = self.get_current_catalog()
@@ -176,6 +177,7 @@ def _create_table(
176177
table_description,
177178
column_descriptions,
178179
table_kind,
180+
track_row_count=track_row_count,
179181
**kwargs,
180182
)
181183

0 commit comments

Comments
 (0)