Skip to content

Commit

Permalink
Logic for expiration with refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
sdht0 committed Oct 14, 2024
1 parent 1d4a531 commit fc2feb2
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 5 deletions.
59 changes: 57 additions & 2 deletions src/adapter/src/catalog/dataflow_expiration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@

//! Helper function for dataflow expiration checks.
use mz_repr::GlobalId;

use crate::catalog::Catalog;
use differential_dataflow::lattice::Lattice;
use mz_catalog::memory::objects::CatalogItem;
use mz_expr::CollectionPlan;
use mz_repr::{GlobalId, Timestamp};

impl Catalog {
/// Whether the catalog entry `id` or any of its transitive dependencies is a materialized view
Expand All @@ -25,4 +27,57 @@ impl Catalog {
.transitive_uses(id)
.any(test_has_transitive_refresh_schedule)
}
/// Recursive function.
pub(crate) fn compute_expiration_with_refresh(
&self,
id: GlobalId,
replica_expiration: Timestamp,
) -> Timestamp {
let entry = self.get_entry(&id);
// TODO: use a queue to deduplicate work.
// TODO: don't use Timestamp::MAX as the initial value.
match &entry.item {
CatalogItem::MaterializedView(mv) => {
let mut new_expiration = mv.raw_expr.depends_on().into_iter().fold(
Timestamp::MAX,
|new_expiration, dep| {
// Pick the minimum refresh time of all dependencies.
new_expiration
.meet(&self.compute_expiration_with_refresh(dep, replica_expiration))
},
);
if let Some(refresh_schedule) = &mv.refresh_schedule {
if let Some(next_refresh) = refresh_schedule.round_up_timestamp(new_expiration)
{
new_expiration = next_refresh;
}
}
new_expiration
}
CatalogItem::View(view) => view.raw_expr.depends_on().into_iter().fold(
Timestamp::MAX,
|new_expiration, dep| {
new_expiration
.meet(&self.compute_expiration_with_refresh(dep, replica_expiration))
},
),
CatalogItem::Index(index) => {
self.compute_expiration_with_refresh(index.on, replica_expiration)
}
CatalogItem::Func(_) => {
todo!()
}
CatalogItem::ContinualTask(_) => {
todo!()
}
// These will not have a transitive dependency on REFRESH.
CatalogItem::Table(_)
| CatalogItem::Source(_)
| CatalogItem::Sink(_)
| CatalogItem::Log(_)
| CatalogItem::Type(_)
| CatalogItem::Secret(_)
| CatalogItem::Connection(_) => replica_expiration,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use mz_repr::explain::{ExprHumanizerExt, TransientItem};
use mz_repr::optimize::OptimizerFeatures;
use mz_repr::optimize::OverrideFrom;
use mz_repr::refresh_schedule::RefreshSchedule;
use mz_repr::{Datum, GlobalId, Row};
use mz_repr::{Datum, GlobalId, Row, Timestamp};
use mz_sql::ast::ExplainStage;
use mz_sql::catalog::CatalogError;
use mz_sql::names::ResolvedIds;
Expand Down Expand Up @@ -581,6 +581,28 @@ impl Coordinator {
.into_iter()
.any(|id| self.catalog.item_has_transitive_refresh_schedule(id));

// Collect properties for `DataflowExpirationDesc`.
let transitive_upper = self.least_valid_write(&id_bundle);
let replica_expiration = Timestamp::default(); // TODO: Can we get replica_expiration information here somehow?

let mut expiration_with_refresh =
raw_expr
.depends_on()
.into_iter()
.fold(Timestamp::MAX, |new_expiration, id| {
new_expiration.meet(
&self
.catalog
.compute_expiration_with_refresh(id, replica_expiration),
)
});
if let Some(refresh_schedule) = &refresh_schedule {
if let Some(next_refresh) = refresh_schedule.round_up_timestamp(expiration_with_refresh)
{
expiration_with_refresh = next_refresh;
}
}

let read_holds_owned;
let read_holds = if let Some(txn_reads) = self.txn_read_holds.get(session.conn_id()) {
// In some cases, for example when REFRESH is used, the preparatory
Expand Down
4 changes: 2 additions & 2 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ impl ComputeState {
let new_offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.worker_config);
if new_offset.is_zero() {
info!(
current_replica_expiration_millis = %self.replica_expiration.elements(),
current_replica_expiration_millis = ?self.replica_expiration.elements(),
"disabling replica_expiration",
);
self.replica_expiration.clear();
Expand All @@ -340,7 +340,7 @@ impl ComputeState {
} else {
warn!(
new_offset = %new_offset,
current_replica_expiration = %self.replica_expiration.elements(),
current_replica_expiration = ?self.replica_expiration.elements(),
"replica_expiration: ignoring new offset as it results in a larger replica_expiration",
);
}
Expand Down

0 comments on commit fc2feb2

Please sign in to comment.