Skip to content

Commit bbce016

Browse files
author
Siddhartha Sahu
committed
Initial refresh support
1 parent f2f02f5 commit bbce016

File tree

6 files changed

+177
-142
lines changed

6 files changed

+177
-142
lines changed

src/adapter/src/catalog/dataflow_expiration.rs

Lines changed: 21 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -6,78 +6,32 @@
66
//! Helper function for dataflow expiration checks.
77
88
use crate::catalog::Catalog;
9-
use differential_dataflow::lattice::Lattice;
10-
use mz_catalog::memory::objects::CatalogItem;
11-
use mz_expr::CollectionPlan;
12-
use mz_repr::{GlobalId, Timestamp};
9+
use mz_compute_types::dataflows::{RefreshDep, RefreshDepIndex};
10+
use mz_repr::GlobalId;
1311

1412
impl Catalog {
15-
/// Whether the catalog entry `id` or any of its transitive dependencies is a materialized view
16-
/// with a refresh schedule. Used to disable dataflow expiration if found.
17-
pub(crate) fn item_has_transitive_refresh_schedule(&self, id: GlobalId) -> bool {
18-
let test_has_transitive_refresh_schedule = |dep: GlobalId| -> bool {
19-
if let Some(mv) = self.get_entry(&dep).materialized_view() {
20-
return mv.refresh_schedule.is_some();
21-
}
22-
false
23-
};
24-
test_has_transitive_refresh_schedule(id)
25-
|| self
26-
.state()
27-
.transitive_uses(id)
28-
.any(test_has_transitive_refresh_schedule)
29-
}
3013
/// Recursive function.
31-
pub(crate) fn compute_expiration_with_refresh(
14+
pub(crate) fn get_refresh_dependencies(
3215
&self,
33-
id: GlobalId,
34-
replica_expiration: Timestamp,
35-
) -> Timestamp {
36-
let entry = self.get_entry(&id);
37-
// TODO: use a queue to deduplicate work.
38-
// TODO: don't use Timestamp::MAX as the initial value.
39-
match &entry.item {
40-
CatalogItem::MaterializedView(mv) => {
41-
let mut new_expiration = mv.raw_expr.depends_on().into_iter().fold(
42-
Timestamp::MAX,
43-
|new_expiration, dep| {
44-
// Pick the minimum refresh time of all dependencies.
45-
new_expiration
46-
.meet(&self.compute_expiration_with_refresh(dep, replica_expiration))
47-
},
48-
);
49-
if let Some(refresh_schedule) = &mv.refresh_schedule {
50-
if let Some(next_refresh) = refresh_schedule.round_up_timestamp(new_expiration)
51-
{
52-
new_expiration = next_refresh;
53-
}
54-
}
55-
new_expiration
56-
}
57-
CatalogItem::View(view) => view.raw_expr.depends_on().into_iter().fold(
58-
Timestamp::MAX,
59-
|new_expiration, dep| {
60-
new_expiration
61-
.meet(&self.compute_expiration_with_refresh(dep, replica_expiration))
62-
},
63-
),
64-
CatalogItem::Index(index) => {
65-
self.compute_expiration_with_refresh(index.on, replica_expiration)
66-
}
67-
CatalogItem::Func(_) => {
68-
todo!()
69-
}
70-
CatalogItem::ContinualTask(_) => {
71-
todo!()
16+
deps: impl Iterator<Item = GlobalId>,
17+
deps_tree: &mut Vec<RefreshDep>,
18+
) -> Option<RefreshDepIndex> {
19+
let start = deps_tree.len();
20+
for dep in deps {
21+
let entry = self.get_entry(&dep);
22+
let refresh_dep_index =
23+
self.get_refresh_dependencies(entry.uses().into_iter(), deps_tree);
24+
let refresh_schedule = entry
25+
.materialized_view()
26+
.and_then(|mv| mv.refresh_schedule.clone());
27+
if refresh_dep_index.is_some() || refresh_schedule.is_some() {
28+
deps_tree.push(RefreshDep {
29+
refresh_dep_index,
30+
refresh_schedule,
31+
});
7232
}
73-
// These will not have a transitive dependency on REFRESH.
74-
CatalogItem::Table(_)
75-
| CatalogItem::Source(_)
76-
| CatalogItem::Sink(_)
77-
| CatalogItem::Log(_)
78-
| CatalogItem::Type(_)
79-
| CatalogItem::Secret(_)
80-
| CatalogItem::Connection(_) => replica_expiration,
8133
}
34+
let end = deps_tree.len();
35+
(end > start).then_some(RefreshDepIndex { start, end })
8236
}
8337
}

src/adapter/src/coord/sequencer/inner/create_index.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,6 @@ impl Coordinator {
442442

443443
// Collect properties for `DataflowExpirationDesc`.
444444
let transitive_upper = self.least_valid_write(&id_bundle);
445-
let has_transitive_refresh_schedule = self.catalog.item_has_transitive_refresh_schedule(on);
446445

447446
// Pre-allocate a vector of transient GlobalIds for each notice.
448447
let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
@@ -477,9 +476,6 @@ impl Coordinator {
477476
df_desc.set_as_of(since);
478477

479478
df_desc.dataflow_expiration_desc.transitive_upper = Some(transitive_upper);
480-
df_desc
481-
.dataflow_expiration_desc
482-
.has_transitive_refresh_schedule = has_transitive_refresh_schedule;
483479

484480
coord
485481
.ship_dataflow_and_notice_builtin_table_updates(

src/adapter/src/coord/sequencer/inner/create_materialized_view.rs

Lines changed: 7 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use mz_repr::explain::{ExprHumanizerExt, TransientItem};
2121
use mz_repr::optimize::OptimizerFeatures;
2222
use mz_repr::optimize::OverrideFrom;
2323
use mz_repr::refresh_schedule::RefreshSchedule;
24-
use mz_repr::{Datum, GlobalId, Row, Timestamp};
24+
use mz_repr::{Datum, GlobalId, Row};
2525
use mz_sql::ast::ExplainStage;
2626
use mz_sql::catalog::CatalogError;
2727
use mz_sql::names::ResolvedIds;
@@ -575,33 +575,10 @@ impl Coordinator {
575575

576576
// Collect properties for `DataflowExpirationDesc`.
577577
let transitive_upper = self.least_valid_write(&id_bundle);
578-
let has_transitive_refresh_schedule = refresh_schedule.is_some()
579-
|| raw_expr
580-
.depends_on()
581-
.into_iter()
582-
.any(|id| self.catalog.item_has_transitive_refresh_schedule(id));
583-
584-
// Collect properties for `DataflowExpirationDesc`.
585-
let transitive_upper = self.least_valid_write(&id_bundle);
586-
let replica_expiration = Timestamp::default(); // TODO: Can we get replica_expiration information here somehow?
587-
588-
let mut expiration_with_refresh =
589-
raw_expr
590-
.depends_on()
591-
.into_iter()
592-
.fold(Timestamp::MAX, |new_expiration, id| {
593-
new_expiration.meet(
594-
&self
595-
.catalog
596-
.compute_expiration_with_refresh(id, replica_expiration),
597-
)
598-
});
599-
if let Some(refresh_schedule) = &refresh_schedule {
600-
if let Some(next_refresh) = refresh_schedule.round_up_timestamp(expiration_with_refresh)
601-
{
602-
expiration_with_refresh = next_refresh;
603-
}
604-
}
578+
let mut refresh_deps = Vec::new();
579+
let refresh_deps_index = self
580+
.catalog
581+
.get_refresh_dependencies(raw_expr.depends_on().into_iter(), &mut refresh_deps);
605582

606583
let read_holds_owned;
607584
let read_holds = if let Some(txn_reads) = self.txn_read_holds.get(session.conn_id()) {
@@ -697,9 +674,8 @@ impl Coordinator {
697674
df_desc.until = until;
698675

699676
df_desc.dataflow_expiration_desc.transitive_upper = Some(transitive_upper);
700-
df_desc
701-
.dataflow_expiration_desc
702-
.has_transitive_refresh_schedule = has_transitive_refresh_schedule;
677+
df_desc.dataflow_expiration_desc.refresh_deps = refresh_deps;
678+
df_desc.dataflow_expiration_desc.refresh_deps_index = refresh_deps_index;
703679

704680
let storage_metadata = coord.catalog.state().storage_metadata();
705681

src/adapter/src/coord/sequencer/inner/subscribe.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,6 @@ impl Coordinator {
324324
cluster_id,
325325
plan:
326326
plan::SubscribePlan {
327-
from,
328327
copy_to,
329328
emit_progress,
330329
output,
@@ -339,10 +338,6 @@ impl Coordinator {
339338

340339
// Collect properties for `DataflowExpirationDesc`.
341340
let transitive_upper = self.least_valid_write(&id_bundle);
342-
let has_transitive_refresh_schedule = from
343-
.depends_on()
344-
.into_iter()
345-
.any(|id| self.catalog.item_has_transitive_refresh_schedule(id));
346341

347342
let sink_id = global_lir_plan.sink_id();
348343

@@ -366,9 +361,6 @@ impl Coordinator {
366361
let (mut df_desc, df_meta) = global_lir_plan.unapply();
367362

368363
df_desc.dataflow_expiration_desc.transitive_upper = Some(transitive_upper);
369-
df_desc
370-
.dataflow_expiration_desc
371-
.has_transitive_refresh_schedule = has_transitive_refresh_schedule;
372364

373365
// Emit notices.
374366
self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);

src/compute-types/src/dataflows.proto

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,18 @@ message ProtoDataflowDescription {
4949
}
5050

5151
message ProtoDataflowExpirationDesc {
52+
message ProtoRefreshDepIndex {
53+
uint64 start = 1;
54+
uint64 end = 2;
55+
}
56+
message ProtoRefreshDep {
57+
optional ProtoRefreshDepIndex refresh_dep_index = 1;
58+
optional mz_repr.refresh_schedule.ProtoRefreshSchedule refresh_schedule = 2;
59+
}
5260
optional mz_repr.antichain.ProtoU64Antichain transitive_upper = 1;
53-
bool has_transitive_refresh_schedule = 2;
5461
bool is_timeline_epoch_ms = 3;
62+
repeated ProtoRefreshDep refresh_deps = 4;
63+
optional ProtoRefreshDepIndex refresh_deps_index = 5;
5564
}
5665

5766
repeated ProtoSourceImport source_imports = 1;

0 commit comments

Comments
 (0)