Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
sdht0 committed Oct 7, 2024
1 parent f15943d commit 1ee6214
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 24 deletions.
5 changes: 2 additions & 3 deletions src/adapter/src/coord/sequencer/inner/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,6 @@ impl Coordinator {
..
}: CreateIndexFinish,
) -> Result<StageResult<Box<CreateIndexStage>>, AdapterError> {
// Timestamp selection
let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);

let ops = vec![catalog::Op::CreateItem {
Expand All @@ -442,7 +441,7 @@ impl Coordinator {
}];

// Collect properties for `DataflowExpirationDesc`.
let upper = self.least_valid_write(&id_bundle);
let transitive_upper = self.least_valid_write(&id_bundle);
let has_transitive_refresh_schedule = self.catalog.item_has_transitive_refresh_schedule(on);

// Pre-allocate a vector of transient GlobalIds for each notice.
Expand Down Expand Up @@ -477,7 +476,7 @@ impl Coordinator {
let since = coord.least_valid_read(&read_holds);
df_desc.set_as_of(since);

df_desc.dataflow_expiration_desc.transitive_upper = Some(upper);
df_desc.dataflow_expiration_desc.transitive_upper = Some(transitive_upper);
df_desc
.dataflow_expiration_desc
.has_transitive_refresh_schedule = has_transitive_refresh_schedule;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ impl Coordinator {
// We want to reject queries that depend on log sources, for example,
// even if we can *technically* optimize that reference away.
let expr_depends_on = expr.depends_on();
let timeline_ctx = self.validate_timeline_context(expr_depends_on.iter().cloned())?;
self.validate_system_column_references(*ambiguous_columns, &expr_depends_on)?;
// Materialized views are not allowed to depend on log sources, as replicas
// are not producing the same definite collection for these.
Expand Down Expand Up @@ -391,17 +392,13 @@ impl Coordinator {
}
}

let is_timeline_epochms = self
.validate_timeline_context(expr_depends_on.iter().cloned())?
.is_timeline_epochms();

Ok(CreateMaterializedViewStage::Optimize(
CreateMaterializedViewOptimize {
validity,
plan,
resolved_ids,
explain_ctx,
is_timeline_epochms,
is_timeline_epochms: timeline_ctx.is_timeline_epochms(),
},
))
}
Expand Down Expand Up @@ -577,7 +574,7 @@ impl Coordinator {
let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);

// Collect properties for `DataflowExpirationDesc`.
let upper = self.least_valid_write(&id_bundle);
let transitive_upper = self.least_valid_write(&id_bundle);
let has_transitive_refresh_schedule = refresh_schedule.is_some()
|| raw_expr
.depends_on()
Expand Down Expand Up @@ -677,7 +674,7 @@ impl Coordinator {
df_desc.set_initial_as_of(initial_as_of);
df_desc.until = until;

df_desc.dataflow_expiration_desc.transitive_upper = Some(upper);
df_desc.dataflow_expiration_desc.transitive_upper = Some(transitive_upper);
df_desc
.dataflow_expiration_desc
.has_transitive_refresh_schedule = has_transitive_refresh_schedule;
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/src/coord/sequencer/inner/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ impl Coordinator {
let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id);

// Collect properties for `DataflowExpirationDesc`.
let upper = self.least_valid_write(&id_bundle);
let transitive_upper = self.least_valid_write(&id_bundle);
let has_transitive_refresh_schedule = from
.depends_on()
.into_iter()
Expand All @@ -363,7 +363,7 @@ impl Coordinator {

let (mut df_desc, df_meta) = global_lir_plan.unapply();

df_desc.dataflow_expiration_desc.transitive_upper = Some(upper);
df_desc.dataflow_expiration_desc.transitive_upper = Some(transitive_upper);
df_desc
.dataflow_expiration_desc
.has_transitive_refresh_schedule = has_transitive_refresh_schedule;
Expand Down
24 changes: 12 additions & 12 deletions src/compute-types/src/dataflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,18 @@ pub struct DataflowExpirationDesc<T> {
pub is_timeline_epochms: bool,
}

impl<T> Default for DataflowExpirationDesc<T> {
fn default() -> Self {
Self {
transitive_upper: None,
// Assume present unless explicitly checked.
has_transitive_refresh_schedule: true,
// Assume any timeline type possible unless explicitly checked.
is_timeline_epochms: false,
}
}
}

impl RustType<ProtoDataflowExpirationDesc> for DataflowExpirationDesc<mz_repr::Timestamp> {
fn into_proto(&self) -> ProtoDataflowExpirationDesc {
ProtoDataflowExpirationDesc {
Expand Down Expand Up @@ -961,18 +973,6 @@ impl Arbitrary for DataflowExpirationDesc<mz_repr::Timestamp> {
}
}

impl<T> Default for DataflowExpirationDesc<T> {
fn default() -> Self {
Self {
transitive_upper: None,
// Assume present unless explicitly checked.
has_transitive_refresh_schedule: true,
// Assume any timeline type possible unless explicitly checked.
is_timeline_epochms: false,
}
}
}

#[cfg(test)]
mod tests {
use mz_ore::assert_ok;
Expand Down

0 comments on commit 1ee6214

Please sign in to comment.