Skip to content

Commit 5f9194d

Browse files
committed
Move time dependence into controller
This doesn't yet handle timelines correctly. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent a12af6e commit 5f9194d

24 files changed

+245
-201
lines changed

misc/python/materialize/cli/ci_annotate_errors.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,7 @@
125125
| restart-materialized-1\ *|\ thread\ 'coordinator'\ panicked\ at\ 'external\ operation\ .*\ failed\ unrecoverably.*
126126
# Expected in cluster test
127127
| cluster-clusterd[12]-1\ .*\ halting\ process:\ new\ timely\ configuration\ does\ not\ match\ existing\ timely\ configuration
128-
| cluster-clusterd1-1\ .*\ not\ less\ than\ expiration
129-
| cluster-clusterd1-1\ .*\ Replica\ expired
128+
| cluster-clusterd1-1\ .*\ replica\ expired
130129
# Emitted by tests employing explicit mz_panic()
131130
| forced\ panic
132131
# Emitted by broken_statements.slt in order to stop panic propagation, as 'forced panic' will unwantedly panic the `environmentd` thread.

src/adapter/src/coord.rs

+4-16
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,6 @@ use crate::error::AdapterError;
186186
use crate::explain::insights::PlanInsightsContext;
187187
use crate::explain::optimizer_trace::{DispatchGuard, OptimizerTrace};
188188
use crate::metrics::Metrics;
189-
use crate::optimize::dataflow_expiration::time_dependence;
190189
use crate::optimize::dataflows::{
191190
dataflow_import_id_bundle, ComputeInstanceSnapshot, DataflowBuilder,
192191
};
@@ -2667,9 +2666,7 @@ impl Coordinator {
26672666
(optimized_plan, global_lir_plan)
26682667
};
26692668

2670-
let (mut physical_plan, metainfo) = global_lir_plan.unapply();
2671-
physical_plan.time_dependence =
2672-
time_dependence(self.catalog(), physical_plan.import_ids(), None);
2669+
let (physical_plan, metainfo) = global_lir_plan.unapply();
26732670
let metainfo = {
26742671
// Pre-allocate a vector of transient GlobalIds for each notice.
26752672
let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
@@ -2728,12 +2725,7 @@ impl Coordinator {
27282725
(optimized_plan, global_lir_plan)
27292726
};
27302727

2731-
let (mut physical_plan, metainfo) = global_lir_plan.unapply();
2732-
physical_plan.time_dependence = time_dependence(
2733-
self.catalog(),
2734-
physical_plan.import_ids(),
2735-
mv.refresh_schedule.clone(),
2736-
);
2728+
let (physical_plan, metainfo) = global_lir_plan.unapply();
27372729
let metainfo = {
27382730
// Pre-allocate a vector of transient GlobalIds for each notice.
27392731
let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
@@ -2762,12 +2754,8 @@ impl Coordinator {
27622754
.catalog()
27632755
.resolve_full_name(entry.name(), None)
27642756
.to_string();
2765-
let (optimized_plan, mut physical_plan, metainfo) = self
2757+
let (optimized_plan, physical_plan, metainfo) = self
27662758
.optimize_create_continual_task(ct, id, self.owned_catalog(), debug_name)?;
2767-
// Filter our own id as it is not known yet.
2768-
let ids = physical_plan.import_ids().filter(|x| *x != id);
2769-
physical_plan.time_dependence = time_dependence(self.catalog(), ids, None);
2770-
27712759
let catalog = self.catalog_mut();
27722760
catalog.set_optimized_plan(id, optimized_plan);
27732761
catalog.set_physical_plan(id, physical_plan);
@@ -3104,7 +3092,7 @@ impl Coordinator {
31043092
catalog.expire().await;
31053093
}
31063094
}
3107-
.boxed_local()
3095+
.boxed_local()
31083096
}
31093097

31103098
/// Obtain a read-only Catalog reference.

src/adapter/src/coord/sequencer/inner/create_continual_task.rs

-5
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ use crate::catalog;
3939
use crate::command::ExecuteResponse;
4040
use crate::coord::Coordinator;
4141
use crate::error::AdapterError;
42-
use crate::optimize::dataflow_expiration::time_dependence;
4342
use crate::optimize::dataflows::dataflow_import_id_bundle;
4443
use crate::optimize::{self, Optimize, OptimizerCatalog};
4544
use crate::session::Session;
@@ -126,10 +125,6 @@ impl Coordinator {
126125

127126
let () = self
128127
.catalog_transact_with_side_effects(Some(session), ops, |coord| async {
129-
// We're referencing ourselves, so filter out our ID.
130-
let ids = physical_plan.import_ids().filter(|x| *x != sink_id);
131-
physical_plan.time_dependence = time_dependence(coord.catalog(), ids, None);
132-
133128
let catalog = coord.catalog_mut();
134129
catalog.set_optimized_plan(sink_id, optimized_plan);
135130
catalog.set_physical_plan(sink_id, physical_plan.clone());

src/adapter/src/coord/sequencer/inner/create_index.rs

-4
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ use crate::coord::{
3030
use crate::error::AdapterError;
3131
use crate::explain::explain_dataflow;
3232
use crate::explain::optimizer_trace::OptimizerTrace;
33-
use crate::optimize::dataflow_expiration::time_dependence;
3433
use crate::optimize::dataflows::dataflow_import_id_bundle;
3534
use crate::optimize::{self, Optimize};
3635
use crate::session::Session;
@@ -448,9 +447,6 @@ impl Coordinator {
448447
let transact_result = self
449448
.catalog_transact_with_side_effects(Some(session), ops, |coord| async {
450449
let (mut df_desc, df_meta) = global_lir_plan.unapply();
451-
df_desc.time_dependence =
452-
time_dependence(coord.catalog(), df_desc.import_ids(), None);
453-
454450
// Save plan structures.
455451
coord
456452
.catalog_mut()

src/adapter/src/coord/sequencer/inner/create_materialized_view.rs

-5
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ use crate::error::AdapterError;
4545
use crate::explain::explain_dataflow;
4646
use crate::explain::explain_plan;
4747
use crate::explain::optimizer_trace::OptimizerTrace;
48-
use crate::optimize::dataflow_expiration::time_dependence;
4948
use crate::optimize::dataflows::dataflow_import_id_bundle;
5049
use crate::optimize::{self, Optimize};
5150
use crate::session::Session;
@@ -649,10 +648,6 @@ impl Coordinator {
649648
.catalog_transact_with_side_effects(Some(session), ops, |coord| async {
650649
let output_desc = global_lir_plan.desc().clone();
651650
let (mut df_desc, df_meta) = global_lir_plan.unapply();
652-
653-
df_desc.time_dependence =
654-
time_dependence(coord.catalog(), df_desc.import_ids(), refresh_schedule);
655-
656651
// Save plan structures.
657652
coord
658653
.catalog_mut()

src/adapter/src/coord/sequencer/inner/subscribe.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use crate::coord::{
2323
SubscribeStage, SubscribeTimestampOptimizeLir, TargetCluster,
2424
};
2525
use crate::error::AdapterError;
26-
use crate::optimize::dataflow_expiration::time_dependence;
2726
use crate::optimize::Optimize;
2827
use crate::session::{Session, TransactionOps};
2928
use crate::{optimize, AdapterNotice, ExecuteContext, TimelineContext};
@@ -348,8 +347,7 @@ impl Coordinator {
348347
};
349348
active_subscribe.initialize();
350349

351-
let (mut df_desc, df_meta) = global_lir_plan.unapply();
352-
df_desc.time_dependence = time_dependence(self.catalog(), df_desc.import_ids(), None);
350+
let (df_desc, df_meta) = global_lir_plan.unapply();
353351

354352
// Emit notices.
355353
self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);

src/adapter/src/optimize.rs

-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
//! repository.
5454
5555
pub mod copy_to;
56-
pub mod dataflow_expiration;
5756
pub mod dataflows;
5857
pub mod index;
5958
pub mod materialized_view;

src/adapter/src/optimize/dataflow_expiration.rs

-146
This file was deleted.

src/compute-client/src/as_of_selection.rs

+8
Original file line numberDiff line numberDiff line change
@@ -785,6 +785,7 @@ mod tests {
785785
use mz_storage_types::read_holds::ReadHoldError;
786786
use mz_storage_types::sources::SourceExportDataConfig;
787787
use mz_storage_types::sources::{GenericSourceConnection, SourceDesc};
788+
use mz_storage_types::time_dependence::TimeDependence;
788789

789790
use super::*;
790791

@@ -947,6 +948,13 @@ mod tests {
947948
}
948949
Ok(holds)
949950
}
951+
952+
fn get_time_dependence(
953+
&self,
954+
_id: GlobalId,
955+
) -> Result<Option<TimeDependence>, StorageError<Self::Timestamp>> {
956+
unimplemented!()
957+
}
950958
}
951959

952960
fn dataflow(

0 commit comments

Comments
 (0)