Skip to content

Commit 6be78ee

Browse files
committed
Fix rebase
1 parent 8ae058f commit 6be78ee

File tree

2 files changed

+48
-54
lines changed

2 files changed

+48
-54
lines changed

sqlmesh/core/scheduler.py

Lines changed: 48 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -490,33 +490,33 @@ def run_node(node: SchedulingUnit) -> None:
490490
if isinstance(node, DummyNode):
491491
return
492492

493-
with QueryExecutionTracker.track_execution(
494-
f"{snapshot.name}_{batch_idx}"
495-
) as execution_context:
496-
snapshot = self.snapshots_by_name[node.snapshot_name]
497-
498-
if isinstance(node, EvaluateNode):
499-
self.console.start_snapshot_evaluation_progress(snapshot)
500-
execution_start_ts = now_timestamp()
501-
evaluation_duration_ms: t.Optional[int] = None
502-
start, end = node.interval
503-
504-
audit_results: t.List[AuditResult] = []
505-
try:
506-
assert execution_time # mypy
507-
assert deployability_index # mypy
508-
509-
if audit_only:
510-
audit_results = self._audit_snapshot(
511-
snapshot=snapshot,
512-
environment_naming_info=environment_naming_info,
513-
deployability_index=deployability_index,
514-
snapshots=self.snapshots_by_name,
515-
start=start,
516-
end=end,
517-
execution_time=execution_time,
518-
)
519-
else:
493+
snapshot = self.snapshots_by_name[node.snapshot_name]
494+
495+
if isinstance(node, EvaluateNode):
496+
self.console.start_snapshot_evaluation_progress(snapshot)
497+
execution_start_ts = now_timestamp()
498+
evaluation_duration_ms: t.Optional[int] = None
499+
start, end = node.interval
500+
501+
audit_results: t.List[AuditResult] = []
502+
try:
503+
assert execution_time # mypy
504+
assert deployability_index # mypy
505+
506+
if audit_only:
507+
audit_results = self._audit_snapshot(
508+
snapshot=snapshot,
509+
environment_naming_info=environment_naming_info,
510+
deployability_index=deployability_index,
511+
snapshots=self.snapshots_by_name,
512+
start=start,
513+
end=end,
514+
execution_time=execution_time,
515+
)
516+
else:
517+
with self.snapshot_evaluator.execution_tracker.track_execution(
518+
f"{snapshot.name}_{node.batch_index}"
519+
) as execution_context:
520520
audit_results = self.evaluate(
521521
snapshot=snapshot,
522522
environment_naming_info=environment_naming_info,
@@ -530,35 +530,30 @@ def run_node(node: SchedulingUnit) -> None:
530530
target_table_exists=snapshot.snapshot_id not in snapshots_to_create,
531531
)
532532

533-
evaluation_duration_ms = now_timestamp() - execution_start_ts
534-
finally:
535-
num_audits = len(audit_results)
536-
num_audits_failed = sum(1 for result in audit_results if result.count)
533+
evaluation_duration_ms = now_timestamp() - execution_start_ts
534+
finally:
535+
num_audits = len(audit_results)
536+
num_audits_failed = sum(1 for result in audit_results if result.count)
537537

538-
execution_stats = self.snapshot_evaluator.execution_tracker.get_execution_stats(
539-
f"{snapshot.snapshot_id}_{batch_idx}"
540-
)
538+
execution_stats = self.snapshot_evaluator.execution_tracker.get_execution_stats(
539+
f"{snapshot.snapshot_id}_{node.batch_index}"
540+
)
541541

542-
self.console.update_snapshot_evaluation_progress(
543-
snapshot,
544-
batched_intervals[snapshot][node.batch_index],
545-
node.batch_index,
546-
evaluation_duration_ms,
547-
num_audits - num_audits_failed,
548-
num_audits_failed,
549-
execution_stats=execution_stats,
550-
auto_restatement_triggers=auto_restatement_triggers.get(
551-
snapshot.snapshot_id
552-
),
542+
self.console.update_snapshot_evaluation_progress(
543+
snapshot,
544+
batched_intervals[snapshot][node.batch_index],
545+
node.batch_index,
546+
evaluation_duration_ms,
547+
num_audits - num_audits_failed,
548+
num_audits_failed,
549+
execution_stats=execution_stats,
553550
)
554-
elif isinstance(node, CreateNode):
555-
self.snapshot_evaluator.create_snapshot(
556-
snapshot=snapshot,
557-
snapshots=self.snapshots_by_name,
558-
deployability_index=deployability_index,
559-
allow_destructive_snapshots=allow_destructive_snapshots or set(),
560-
allow_additive_snapshots=allow_additive_snapshots or set(),
561-
rows_processed=rows_processed,
551+
elif isinstance(node, CreateNode):
552+
self.snapshot_evaluator.create_snapshot(
553+
snapshot=snapshot,
554+
snapshots=self.snapshots_by_name,
555+
deployability_index=deployability_index,
556+
allow_destructive_snapshots=allow_destructive_snapshots or set(),
562557
)
563558

564559
try:

sqlmesh/core/snapshot/evaluator.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@
6464
SnapshotInfoLike,
6565
SnapshotTableCleanupTask,
6666
)
67-
from sqlmesh.core.snapshot.definition import parent_snapshots_by_name
6867
from sqlmesh.core.snapshot.execution_tracker import QueryExecutionTracker
6968
from sqlmesh.utils import random_id, CorrelationId
7069
from sqlmesh.utils.concurrency import (

0 commit comments

Comments
 (0)