diff --git a/doc/developer/design/20240919_dataflow_expiration.md b/doc/developer/design/20240919_dataflow_expiration.md new file mode 100644 index 0000000000000..585e0c6740112 --- /dev/null +++ b/doc/developer/design/20240919_dataflow_expiration.md @@ -0,0 +1,133 @@ +# Dataflow Expiration + +- Associated issues/prs: [#26029](https://github.com/MaterializeInc/materialize/issues/26029) [#29587](https://github.com/MaterializeInc/materialize/pull/29587) + +## The Problem + +Temporal filters currently require Materialize to maintain all future retractions +of data that is currently visible. For long windows, the retractions could be at +timestamps beyond our next scheduled restart, which is typically our weekly +DB release. + +For instance, in the example below, the temporal filter in the `last_30_days` +view causes two diffs to be generated for every row inserted into `events`, the +row itself and a retraction 30 days later. However, if the replica is +restarted in the next few days, the retraction diff is never processed, making +it redundant to keep the diff waiting to be processed. + +```sql +-- Create a table of timestamped events. +CREATE TABLE events ( + content TEXT, + event_ts TIMESTAMP +); +-- Create a view of events from the last 30 seconds. +CREATE VIEW last_30_days AS +SELECT event_ts, content +FROM events +WHERE mz_now() <= event_ts + INTERVAL '30 days'; + +INSERT INTO events VALUES ('hello', now()); + +COPY (SUBSCRIBE (SELECT event_ts, content FROM last_30_days)) TO STDOUT; +1727130590201 1 2023-09-23 22:29:50.201 hello -- now() +1729722590222 -1 2023-10-23 22:29:50.222 hello -- now() + 30 days +``` + +Dataflows with large temporal windows (e.g. a year) can generate a large number +of retractions that consume memory and CPU but are never used. Instead, all +such retractions can be dropped. + +## Success Criteria + +When temporal filters are in use, retraction diffs associated with timestamps +beyond a set expiration time can be dropped without affecting correctness, +resulting in lower memory and CPU utilization from halving the number of +processed diffs. + +## Solution Proposal + +A new LaunchDarkly feature flag is introduced that specifies an _expiration +offset_ (a `Duration`). The _replica expiration_ time is computed as the offset +added to the start time of the replica. Dataflows matching certain +criteria (detailed below) are then configured with a _dataflow expiration_ +derived from the replica expiration. Diffs generated in these dataflows beyond +the dataflow expiration are dropped. To ensure correctness, panic checks are +added to these dataflows that ensure that the frontier does not exceed the +dataflow expiration before the replica is restarted. + +An overview of the logic used for these features is as follows: +``` +# Consider the `upper` for different dataflows +if mv_view_with_constant_values: + upper := [] +else if mv_with_refresh_every: + upper := [next_refresh()] +else: + upper := [write_frontier()] + +# The `upper` for a dataflow considering all its transitive inputs +inputs_upper := meet(for all inputs i: i_upper) + +# Dataflow expiration logic +if compute_replica_expiration_offset is not set: + dataflow_replication := [] +else for dataflows of type in [materialized view, index, subscribe]: + replica_expiration := replica_start + compute_replica_expiration_offset + if dataflow_timeline is not EpochMilliseconds: + # Dataflows that do not depend on any source or table are not in the + # EpochMilliseconds timeline + dataflow_expiration := [] + else if refresh_interval set in any transitive dependency of dataflow: + dataflow_expiration := [] + else if inputs_upper == []: + dataflow_expiration := [] + else if inputs_upper > expiration: + dataflow_expiration := inputs_upper + else: + dataflow_expiration := replica_expiration + +dataflow_until := dataflow_until.meet(dataflow_expiration) +``` + +Note that we only consider dataflows representing materialized views, indexes, +and subscribes. These are long-running dataflows that maintain state during +their lifetime. Other dataflows such as peeks are transient and do not need to +explicitly drop retraction diffs. + +More concretely, we make the following changes: + +* Introduce a new dyncfg `compute_replica_expiration_offset`. +* If the offset is configured with a non-zero value, compute + `replica_expiration = now() + offset`. This value specifies the maximum + time for which the replica is expected to be running. Consequently, diffs + associated with timestamps beyond this limit do not have to be stored and can + be dropped. +* When building a dataflow, compute `dataflow_expiration` as per the logic + described above. If non-empty, the `dataflow_expiration` is added to the + dataflow `until` that ensures that any diff beyond this limit is dropped in + `mfp.evaluate()`. +* To ensure correctness, we attach checks in `Context::export_index` and + `Context::export_sink` that panic if the dataflow frontier exceeds the + configured `dataflow_expiration`. This is to prevent the dataflow from + serving potentially incorrect results due to dropped data. +* On a replica restart, `replica_expiration` and `dataflow_expiration` is + recomputed as the offset to the new start time. Any data whose timestamps + are within the new limit are now not dropped. + +## Open Questions + +- What is the appropriate default expiration time? + - Given that we currently restart replicas every week as part of the DB release + and leaving some buffer for skipped week, 3 weeks (+1 day margin) seems like + a good limit to start with. + +## Out of Scope + +Dataflow expiration is disabled for the following cases: + +- Dataflows whose timeline type is not `Timeline::EpochMillis`. We rely on the + frontier timestamp being comparable to wall clock time of the replica. +- Dataflows that transitively depend on a materialized view with a non-default + refresh schedule. Handling such materialized views would require additional + logic to track the refresh schedules and ensure that the dataflow expiration diff --git a/misc/python/materialize/cli/ci_annotate_errors.py b/misc/python/materialize/cli/ci_annotate_errors.py index a8d0a0eb89f42..c3442e72d2085 100644 --- a/misc/python/materialize/cli/ci_annotate_errors.py +++ b/misc/python/materialize/cli/ci_annotate_errors.py @@ -125,6 +125,7 @@ | restart-materialized-1\ *|\ thread\ 'coordinator'\ panicked\ at\ 'external\ operation\ .*\ failed\ unrecoverably.* # Expected in cluster test | cluster-clusterd[12]-1\ .*\ halting\ process:\ new\ timely\ configuration\ does\ not\ match\ existing\ timely\ configuration + | cluster-clusterd1-1\ .*\ has\ exceeded\ expiration # Emitted by tests employing explicit mz_panic() | forced\ panic # Emitted by broken_statements.slt in order to stop panic propagation, as 'forced panic' will unwantedly panic the `environmentd` thread. diff --git a/misc/python/materialize/feature_benchmark/benchmark_versioning.py b/misc/python/materialize/feature_benchmark/benchmark_versioning.py index 2aa2967719b94..6661772ccb6ba 100644 --- a/misc/python/materialize/feature_benchmark/benchmark_versioning.py +++ b/misc/python/materialize/feature_benchmark/benchmark_versioning.py @@ -23,7 +23,7 @@ # Consider increasing the scenario's class #version() if changes are expected to impact results! SHA256_BY_SCENARIO_FILE: dict[str, str] = { - "benchmark_main.py": "9419ffb7f17de4584fc35e2f531c3e6181ad2b0284bf6aeb2f15ebb966a5e007", + "benchmark_main.py": "0e328994c56b9bae2a9da0db10974b4fa30dd8c825e3290e00f5cecbb32ec338", "concurrency.py": "2e9c149c136b83b3853abc923a1adbdaf55a998ab4557712f8424c8b16f2adb1", "customer.py": "d1e72837a342c3ebf1f4a32ec583b1b78a78644cdba495030a6df45ebbffe703", "optbench.py": "ae411afe1ba595021f2f9d2d21500ba0c1c6941561493eabcae113373f493bfa", diff --git a/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py b/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py index 75da958eaaf83..a5fa83b04bcfa 100644 --- a/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py +++ b/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py @@ -2182,3 +2182,48 @@ def benchmark(self) -> MeasurementSource: """ ) ) + + +class ReplicaExpiration(Scenario): + def init(self) -> list[Action]: + return [ + TdAction( + """ +> CREATE TABLE events ( + content TEXT, + event_ts TIMESTAMP + ); +> CREATE VIEW last_30_days AS + SELECT event_ts, content + FROM events + WHERE mz_now() <= event_ts + INTERVAL '30 days'; + +> CREATE DEFAULT INDEX ON last_30_days +""" + ), + ] + + def benchmark(self) -> MeasurementSource: + return Td( + dedent( + f""" + > DELETE FROM events; + + > SELECT COUNT(*) FROM last_30_days + 0 + + > SELECT 1; + /* A */ + 1 + + > INSERT INTO events SELECT concat('somelongstringthatdoesntmattermuchatallbutrequiresmemorytostoreXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', x::text), now() FROM generate_series(1, {self.n()}) AS x + + > SELECT COUNT(*) FROM last_30_days + {self.n()} + + > SELECT 1; + /* B */ + 1 + """ + ) + ) diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py index 96ee10ad31ea0..c8270989a846b 100644 --- a/misc/python/materialize/mzcompose/__init__.py +++ b/misc/python/materialize/mzcompose/__init__.py @@ -79,6 +79,7 @@ def get_default_system_parameters( "cluster_always_use_disk": "true", "compute_dataflow_max_inflight_bytes": "134217728", # 128 MiB "compute_hydration_concurrency": "2", + "compute_replica_expiration_offset": "3d", "disk_cluster_replicas_default": "true", "enable_0dt_deployment": "true" if zero_downtime else "false", "enable_0dt_deployment_panic_after_timeout": "true", diff --git a/misc/python/materialize/mzcompose/services/clusterd.py b/misc/python/materialize/mzcompose/services/clusterd.py index b8f8d0fe8e12c..fa90d1e184d34 100644 --- a/misc/python/materialize/mzcompose/services/clusterd.py +++ b/misc/python/materialize/mzcompose/services/clusterd.py @@ -24,6 +24,7 @@ def __init__( environment_extra: list[str] = [], memory: str | None = None, options: list[str] = [], + restart: str = "no", ) -> None: environment = [ "CLUSTERD_LOG_FILTER", @@ -58,6 +59,7 @@ def __init__( "ports": [2100, 2101, 6878], "environment": environment, "volumes": DEFAULT_MZ_VOLUMES, + "restart": restart, } ) diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index 550f6510538e1..f1d8d1b5f1863 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -95,6 +95,7 @@ pub(crate) mod consistency; mod migrate; mod apply; +mod dataflow_expiration; mod open; mod state; mod transact; diff --git a/src/adapter/src/catalog/dataflow_expiration.rs b/src/adapter/src/catalog/dataflow_expiration.rs new file mode 100644 index 0000000000000..7fa28e27a4cf8 --- /dev/null +++ b/src/adapter/src/catalog/dataflow_expiration.rs @@ -0,0 +1,28 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. + +//! Helper function for dataflow expiration checks. + +use mz_repr::GlobalId; + +use crate::catalog::Catalog; + +impl Catalog { + /// Whether the catalog entry `id` or any of its transitive dependencies is a materialized view + /// with a refresh schedule. Used to disable dataflow expiration if found. + pub(crate) fn item_has_transitive_refresh_schedule(&self, id: GlobalId) -> bool { + let test_has_transitive_refresh_schedule = |dep: GlobalId| -> bool { + if let Some(mv) = self.get_entry(&dep).materialized_view() { + return mv.refresh_schedule.is_some(); + } + false + }; + test_has_transitive_refresh_schedule(id) + || self + .state() + .transitive_uses(id) + .any(test_has_transitive_refresh_schedule) + } +} diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 2b23a4e2e2e3f..c0e7bf73d51a0 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -742,6 +742,10 @@ pub struct CreateMaterializedViewOptimize { /// An optional context set iff the state machine is initiated from /// sequencing an EXPLAIN for this statement. explain_ctx: ExplainContext, + /// Whether the timeline is [`mz_storage_types::sources::Timeline::EpochMilliseconds`]. + /// + /// Used to determine if it is safe to enable dataflow expiration. + is_timeline_epoch_ms: bool, } #[derive(Debug)] @@ -2572,8 +2576,15 @@ impl Coordinator { .catalog() .resolve_full_name(entry.name(), None) .to_string(); + let is_timeline_epoch_ms = self.get_timeline_context(*id).is_timeline_epoch_ms(); let (_optimized_plan, physical_plan, _metainfo) = self - .optimize_create_continual_task(&ct, *id, self.owned_catalog(), debug_name) + .optimize_create_continual_task( + &ct, + *id, + self.owned_catalog(), + debug_name, + is_timeline_epoch_ms, + ) .expect("builtin CT should optimize successfully"); // Determine an as of for the new continual task. @@ -2616,6 +2627,7 @@ impl Coordinator { for entry in ordered_catalog_entries { let id = entry.id(); + let is_timeline_epoch_ms = self.get_timeline_context(id).is_timeline_epoch_ms(); match entry.item() { CatalogItem::Index(idx) => { // Collect optimizer parameters. @@ -2646,6 +2658,7 @@ impl Coordinator { entry.name().clone(), idx.on, idx.keys.to_vec(), + is_timeline_epoch_ms, ); let global_mir_plan = optimizer.optimize(index_plan)?; let optimized_plan = global_mir_plan.df_desc().clone(); @@ -2700,6 +2713,7 @@ impl Coordinator { debug_name, optimizer_config.clone(), self.optimizer_metrics(), + is_timeline_epoch_ms, ); // MIR ⇒ MIR optimization (global) @@ -2743,7 +2757,13 @@ impl Coordinator { .resolve_full_name(entry.name(), None) .to_string(); let (optimized_plan, physical_plan, metainfo) = self - .optimize_create_continual_task(ct, id, self.owned_catalog(), debug_name)?; + .optimize_create_continual_task( + ct, + id, + self.owned_catalog(), + debug_name, + is_timeline_epoch_ms, + )?; let catalog = self.catalog_mut(); catalog.set_optimized_plan(id, optimized_plan); diff --git a/src/adapter/src/coord/introspection.rs b/src/adapter/src/coord/introspection.rs index e425a74df4d76..5ffca416b7d76 100644 --- a/src/adapter/src/coord/introspection.rs +++ b/src/adapter/src/coord/introspection.rs @@ -255,7 +255,11 @@ impl Coordinator { let read_holds = self.acquire_read_holds(&id_bundle); let as_of = read_holds.least_valid_read(); - let global_mir_plan = global_mir_plan.resolve(as_of); + // Introspection subscribes only read from system collections, which are always in + // the `EpochMilliseconds` timeline. + let is_timeline_epoch_ms = true; + + let global_mir_plan = global_mir_plan.resolve(as_of, is_timeline_epoch_ms); let span = Span::current(); Ok(StageResult::Handle(mz_ore::task::spawn_blocking( diff --git a/src/adapter/src/coord/sequencer/inner/create_continual_task.rs b/src/adapter/src/coord/sequencer/inner/create_continual_task.rs index ca7e332019609..7bf0ca92e58d0 100644 --- a/src/adapter/src/coord/sequencer/inner/create_continual_task.rs +++ b/src/adapter/src/coord/sequencer/inner/create_continual_task.rs @@ -82,6 +82,10 @@ impl Coordinator { }, }; + let is_timeline_epoch_ms = self + .validate_timeline_context(resolved_ids.0.clone())? + .is_timeline_epoch_ms(); + // Construct the CatalogItem for this CT and optimize it. let mut item = crate::continual_task::ct_item_from_plan(plan, sink_id, resolved_ids)?; let full_name = bootstrap_catalog.resolve_full_name(&name, Some(session.conn_id())); @@ -90,6 +94,7 @@ impl Coordinator { sink_id, Arc::new(bootstrap_catalog), full_name.to_string(), + is_timeline_epoch_ms, )?; // Timestamp selection @@ -156,6 +161,7 @@ impl Coordinator { output_id: GlobalId, catalog: Arc, debug_name: String, + is_timeline_epoch_ms: bool, ) -> Result< ( DataflowDescription, @@ -185,6 +191,7 @@ impl Coordinator { debug_name, optimizer_config, self.optimizer_metrics(), + is_timeline_epoch_ms, ); // HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local and global) diff --git a/src/adapter/src/coord/sequencer/inner/create_index.rs b/src/adapter/src/coord/sequencer/inner/create_index.rs index 9ef721be60b66..ec506d9e022a6 100644 --- a/src/adapter/src/coord/sequencer/inner/create_index.rs +++ b/src/adapter/src/coord/sequencer/inner/create_index.rs @@ -295,7 +295,7 @@ impl Coordinator { }: CreateIndexOptimize, ) -> Result>, AdapterError> { let plan::CreateIndexPlan { - index: plan::Index { cluster_id, .. }, + index: plan::Index { cluster_id, on, .. }, .. } = &plan; @@ -321,6 +321,7 @@ impl Coordinator { self.optimizer_metrics(), ); let span = Span::current(); + let is_timeline_epoch_ms = self.get_timeline_context(*on).is_timeline_epoch_ms(); Ok(StageResult::Handle(mz_ore::task::spawn_blocking( || "optimize create index", move || { @@ -332,7 +333,7 @@ impl Coordinator { let _dispatch_guard = explain_ctx.dispatch_guard(); let index_plan = - optimize::index::Index::new(plan.name.clone(), plan.index.on, plan.index.keys.clone()); + optimize::index::Index::new(plan.name.clone(), plan.index.on, plan.index.keys.clone(), is_timeline_epoch_ms); // MIR ⇒ MIR optimization (global) let global_mir_plan = optimizer.catch_unwind_optimize(index_plan)?; @@ -421,6 +422,8 @@ impl Coordinator { .. }: CreateIndexFinish, ) -> Result>, AdapterError> { + let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id); + let ops = vec![catalog::Op::CreateItem { id: exported_index_id, name: name.clone(), @@ -437,6 +440,10 @@ impl Coordinator { owner_id: *self.catalog().get_entry(&on).owner_id(), }]; + // Collect properties for `DataflowExpirationDesc`. + let transitive_upper = self.least_valid_write(&id_bundle); + let has_transitive_refresh_schedule = self.catalog.item_has_transitive_refresh_schedule(on); + // Pre-allocate a vector of transient GlobalIds for each notice. let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id()) .take(global_lir_plan.df_meta().optimizer_notices.len()) @@ -458,9 +465,6 @@ impl Coordinator { .process_dataflow_metainfo(df_meta, exported_index_id, session, notice_ids) .await; - // Timestamp selection - let id_bundle = dataflow_import_id_bundle(&df_desc, cluster_id); - // We're putting in place read holds, such that ship_dataflow, // below, which calls update_read_capabilities, can successfully // do so. Otherwise, the since of dependencies might move along @@ -472,6 +476,11 @@ impl Coordinator { let since = coord.least_valid_read(&read_holds); df_desc.set_as_of(since); + df_desc.dataflow_expiration_desc.transitive_upper = Some(transitive_upper); + df_desc + .dataflow_expiration_desc + .has_transitive_refresh_schedule = has_transitive_refresh_schedule; + coord .ship_dataflow_and_notice_builtin_table_updates( df_desc, diff --git a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs index 65bd2338e5775..f06eaae858025 100644 --- a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs +++ b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs @@ -340,7 +340,7 @@ impl Coordinator { // We want to reject queries that depend on log sources, for example, // even if we can *technically* optimize that reference away. let expr_depends_on = expr.depends_on(); - self.validate_timeline_context(expr_depends_on.iter().cloned())?; + let timeline_ctx = self.validate_timeline_context(expr_depends_on.iter().cloned())?; self.validate_system_column_references(*ambiguous_columns, &expr_depends_on)?; // Materialized views are not allowed to depend on log sources, as replicas // are not producing the same definite collection for these. @@ -398,6 +398,7 @@ impl Coordinator { plan, resolved_ids, explain_ctx, + is_timeline_epoch_ms: timeline_ctx.is_timeline_epoch_ms(), }, )) } @@ -410,6 +411,7 @@ impl Coordinator { plan, resolved_ids, explain_ctx, + is_timeline_epoch_ms, }: CreateMaterializedViewOptimize, ) -> Result>, AdapterError> { let plan::CreateMaterializedViewPlan { @@ -452,6 +454,7 @@ impl Coordinator { debug_name, optimizer_config, self.optimizer_metrics(), + is_timeline_epoch_ms, ); let span = Span::current(); @@ -570,6 +573,14 @@ impl Coordinator { // Timestamp selection let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id); + // Collect properties for `DataflowExpirationDesc`. + let transitive_upper = self.least_valid_write(&id_bundle); + let has_transitive_refresh_schedule = refresh_schedule.is_some() + || raw_expr + .depends_on() + .into_iter() + .any(|id| self.catalog.item_has_transitive_refresh_schedule(id)); + let read_holds_owned; let read_holds = if let Some(txn_reads) = self.txn_read_holds.get(session.conn_id()) { // In some cases, for example when REFRESH is used, the preparatory @@ -663,6 +674,11 @@ impl Coordinator { df_desc.set_initial_as_of(initial_as_of); df_desc.until = until; + df_desc.dataflow_expiration_desc.transitive_upper = Some(transitive_upper); + df_desc + .dataflow_expiration_desc + .has_transitive_refresh_schedule = has_transitive_refresh_schedule; + let storage_metadata = coord.catalog.state().storage_metadata(); // Announce the creation of the materialized view source. diff --git a/src/adapter/src/coord/sequencer/inner/subscribe.rs b/src/adapter/src/coord/sequencer/inner/subscribe.rs index cf8fd1289dba8..8d9c7c9f51b29 100644 --- a/src/adapter/src/coord/sequencer/inner/subscribe.rs +++ b/src/adapter/src/coord/sequencer/inner/subscribe.rs @@ -23,9 +23,10 @@ use crate::coord::{ SubscribeStage, SubscribeTimestampOptimizeLir, TargetCluster, }; use crate::error::AdapterError; +use crate::optimize::dataflows::dataflow_import_id_bundle; use crate::optimize::Optimize; use crate::session::{Session, TransactionOps}; -use crate::{optimize, AdapterNotice, ExecuteContext, TimelineContext}; +use crate::{optimize, AdapterNotice, ExecuteContext, TimelineContext, TimestampProvider}; impl Staged for SubscribeStage { type Ctx = ExecuteContext; @@ -283,7 +284,12 @@ impl Coordinator { self.store_transaction_read_holds(ctx.session(), read_holds); - let global_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of)); + let is_timeline_epoch_ms = self + .validate_timeline_context(plan.from.depends_on().iter().cloned())? + .is_timeline_epoch_ms(); + + let global_mir_plan = + global_mir_plan.resolve(Antichain::from_elem(as_of), is_timeline_epoch_ms); // Optimize LIR let span = Span::current(); @@ -318,6 +324,7 @@ impl Coordinator { cluster_id, plan: plan::SubscribePlan { + from, copy_to, emit_progress, output, @@ -328,6 +335,15 @@ impl Coordinator { replica_id, }: SubscribeFinish, ) -> Result>, AdapterError> { + let id_bundle = dataflow_import_id_bundle(global_lir_plan.df_desc(), cluster_id); + + // Collect properties for `DataflowExpirationDesc`. + let transitive_upper = self.least_valid_write(&id_bundle); + let has_transitive_refresh_schedule = from + .depends_on() + .into_iter() + .any(|id| self.catalog.item_has_transitive_refresh_schedule(id)); + let sink_id = global_lir_plan.sink_id(); let (tx, rx) = mpsc::unbounded_channel(); @@ -347,7 +363,13 @@ impl Coordinator { }; active_subscribe.initialize(); - let (df_desc, df_meta) = global_lir_plan.unapply(); + let (mut df_desc, df_meta) = global_lir_plan.unapply(); + + df_desc.dataflow_expiration_desc.transitive_upper = Some(transitive_upper); + df_desc + .dataflow_expiration_desc + .has_transitive_refresh_schedule = has_transitive_refresh_schedule; + // Emit notices. self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices); diff --git a/src/adapter/src/coord/timeline.rs b/src/adapter/src/coord/timeline.rs index 3ff2e049992c6..fba8a457ba09d 100644 --- a/src/adapter/src/coord/timeline.rs +++ b/src/adapter/src/coord/timeline.rs @@ -70,6 +70,11 @@ impl TimelineContext { Self::TimestampIndependent | Self::TimestampDependent => None, } } + + /// Whether the context contains a timeline of type [`Timeline::EpochMilliseconds`]. + pub fn is_timeline_epoch_ms(&self) -> bool { + matches!(self, &Self::TimelineDependent(Timeline::EpochMilliseconds)) + } } /// Global state for a single timeline. diff --git a/src/adapter/src/coord/timestamp_selection.rs b/src/adapter/src/coord/timestamp_selection.rs index 94d12fc2f83d0..d109ab64790a9 100644 --- a/src/adapter/src/coord/timestamp_selection.rs +++ b/src/adapter/src/coord/timestamp_selection.rs @@ -132,6 +132,17 @@ impl TimestampContext { pub fn antichain(&self) -> Antichain { Antichain::from_elem(self.timestamp_or_default()) } + + /// Whether the context contains a timestamp of type [`Timeline::EpochMilliseconds`]. + pub fn is_timeline_epoch_ms(&self) -> bool { + matches!( + self, + &Self::TimelineTimestamp { + timeline: Timeline::EpochMilliseconds, + .. + } + ) + } } #[async_trait(?Send)] diff --git a/src/adapter/src/optimize/index.rs b/src/adapter/src/optimize/index.rs index 6503187187f03..04cd674249cb6 100644 --- a/src/adapter/src/optimize/index.rs +++ b/src/adapter/src/optimize/index.rs @@ -90,11 +90,26 @@ pub struct Index { name: QualifiedItemName, on: GlobalId, keys: Vec, + /// Whether the timeline is [`mz_storage_types::sources::Timeline::EpochMilliseconds`]. + /// + /// Used to determine if it is safe to enable dataflow expiration. + is_timeline_epoch_ms: bool, } impl Index { - pub fn new(name: QualifiedItemName, on: GlobalId, keys: Vec) -> Self { - Self { name, on, keys } + /// Construct a new [`Index`]. Arguments are recorded as-is. + pub fn new( + name: QualifiedItemName, + on: GlobalId, + keys: Vec, + is_timeline_epoch_ms: bool, + ) -> Self { + Self { + name, + on, + keys, + is_timeline_epoch_ms, + } } } @@ -153,6 +168,8 @@ impl Optimize for Optimizer { }; let mut df_desc = MirDataflowDescription::new(full_name.to_string()); + df_desc.dataflow_expiration_desc.is_timeline_epoch_ms = index.is_timeline_epoch_ms; + df_builder.import_into_dataflow(&index.on, &mut df_desc, &self.config.features)?; df_builder.maybe_reoptimize_imported_views(&mut df_desc, &self.config)?; diff --git a/src/adapter/src/optimize/materialized_view.rs b/src/adapter/src/optimize/materialized_view.rs index e6cd2979c9840..b9e06adc37211 100644 --- a/src/adapter/src/optimize/materialized_view.rs +++ b/src/adapter/src/optimize/materialized_view.rs @@ -76,6 +76,10 @@ pub struct Optimizer { metrics: OptimizerMetrics, /// The time spent performing optimization so far. duration: Duration, + /// Whether the timeline is [`mz_storage_types::sources::Timeline::EpochMilliseconds`]. + /// + /// Used to determine if it is safe to enable dataflow expiration. + is_timeline_epoch_ms: bool, } impl Optimizer { @@ -90,6 +94,7 @@ impl Optimizer { debug_name: String, config: OptimizerConfig, metrics: OptimizerMetrics, + is_timeline_epoch_ms: bool, ) -> Self { Self { typecheck_ctx: empty_context(), @@ -104,6 +109,7 @@ impl Optimizer { config, metrics, duration: Default::default(), + is_timeline_epoch_ms, } } } @@ -221,6 +227,8 @@ impl Optimize for Optimizer { }; let mut df_desc = MirDataflowDescription::new(self.debug_name.clone()); + df_desc.dataflow_expiration_desc.is_timeline_epoch_ms = self.is_timeline_epoch_ms; + df_desc.refresh_schedule.clone_from(&self.refresh_schedule); df_builder.import_view_into_dataflow( diff --git a/src/adapter/src/optimize/subscribe.rs b/src/adapter/src/optimize/subscribe.rs index 67f4e17a90870..9ec4ad0266759 100644 --- a/src/adapter/src/optimize/subscribe.rs +++ b/src/adapter/src/optimize/subscribe.rs @@ -161,6 +161,11 @@ impl GlobalLirPlan { let sink_desc = sink_exports.values().next().expect("valid sink"); sink_desc } + + /// Get the plan's [`LirDataflowDescription`]. + pub fn df_desc(&self) -> &LirDataflowDescription { + &self.df_desc + } } /// Marker type for [`GlobalMirPlan`] structs representing an optimization @@ -294,7 +299,11 @@ impl GlobalMirPlan { /// We need to resolve timestamps before the `GlobalMirPlan ⇒ GlobalLirPlan` /// optimization stage in order to profit from possible single-time /// optimizations in the `Plan::finalize_dataflow` call. - pub fn resolve(mut self, as_of: Antichain) -> GlobalMirPlan { + pub fn resolve( + mut self, + as_of: Antichain, + is_timeline_epoch_ms: bool, + ) -> GlobalMirPlan { // A dataflow description for a `SUBSCRIBE` statement should not have // index exports. soft_assert_or_log!( @@ -305,6 +314,9 @@ impl GlobalMirPlan { // Set the `as_of` timestamp for the dataflow. self.df_desc.set_as_of(as_of); + // Detect the timeline type. + self.df_desc.dataflow_expiration_desc.is_timeline_epoch_ms = is_timeline_epoch_ms; + // The only outputs of the dataflow are sinks, so we might be able to // turn off the computation early, if they all have non-trivial // `up_to`s. diff --git a/src/catalog/src/memory/objects.rs b/src/catalog/src/memory/objects.rs index 668a28e891108..6a7e5c92f6edc 100644 --- a/src/catalog/src/memory/objects.rs +++ b/src/catalog/src/memory/objects.rs @@ -1732,6 +1732,14 @@ impl CatalogEntry { } } + /// Returns the inner [`MaterializedView`] if this entry is a materialized view, else `None`. + pub fn materialized_view(&self) -> Option<&MaterializedView> { + match self.item() { + CatalogItem::MaterializedView(mv) => Some(mv), + _ => None, + } + } + /// Returns the inner [`Source`] if this entry is a source, else `None`. pub fn source(&self) -> Option<&Source> { match self.item() { diff --git a/src/compute-client/src/as_of_selection.rs b/src/compute-client/src/as_of_selection.rs index 8fc277fabda29..239eafc6b1220 100644 --- a/src/compute-client/src/as_of_selection.rs +++ b/src/compute-client/src/as_of_selection.rs @@ -743,8 +743,8 @@ mod tests { use async_trait::async_trait; use futures::future::BoxFuture; - use mz_compute_types::dataflows::IndexDesc; use mz_compute_types::dataflows::IndexImport; + use mz_compute_types::dataflows::{DataflowExpirationDesc, IndexDesc}; use mz_compute_types::sinks::ComputeSinkConnection; use mz_compute_types::sinks::ComputeSinkDesc; use mz_compute_types::sinks::PersistSinkConnection; @@ -1005,6 +1005,7 @@ mod tests { initial_storage_as_of: Default::default(), refresh_schedule: Default::default(), debug_name: Default::default(), + dataflow_expiration_desc: DataflowExpirationDesc::default(), } } diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index ba89ece29d54a..8c5ebb428537b 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -1296,6 +1296,7 @@ where initial_storage_as_of: dataflow.initial_storage_as_of, refresh_schedule: dataflow.refresh_schedule, debug_name: dataflow.debug_name, + dataflow_expiration_desc: dataflow.dataflow_expiration_desc, }; if augmented_dataflow.is_transient() { diff --git a/src/compute-types/src/dataflows.proto b/src/compute-types/src/dataflows.proto index 95e0351fe34db..cdbe2ce21c59b 100644 --- a/src/compute-types/src/dataflows.proto +++ b/src/compute-types/src/dataflows.proto @@ -48,6 +48,12 @@ message ProtoDataflowDescription { sinks.ProtoComputeSinkDesc sink_desc = 2; } + message ProtoDataflowExpirationDesc { + optional mz_repr.antichain.ProtoU64Antichain transitive_upper = 1; + bool has_transitive_refresh_schedule = 2; + bool is_timeline_epoch_ms = 3; + } + repeated ProtoSourceImport source_imports = 1; repeated ProtoIndexImport index_imports = 2; repeated ProtoBuildDesc objects_to_build = 3; @@ -57,6 +63,7 @@ message ProtoDataflowDescription { mz_repr.antichain.ProtoU64Antichain until = 7; optional mz_repr.antichain.ProtoU64Antichain initial_storage_as_of = 9; optional mz_repr.refresh_schedule.ProtoRefreshSchedule refresh_schedule = 10; + ProtoDataflowExpirationDesc dataflow_expiration_desc = 11; string debug_name = 8; } diff --git a/src/compute-types/src/dataflows.rs b/src/compute-types/src/dataflows.rs index 83ecc574030ca..a4b5779ac8c4b 100644 --- a/src/compute-types/src/dataflows.rs +++ b/src/compute-types/src/dataflows.rs @@ -12,6 +12,7 @@ use std::collections::{BTreeMap, BTreeSet}; use std::fmt; +use differential_dataflow::lattice::Lattice; use mz_expr::{CollectionPlan, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr}; use mz_ore::soft_assert_or_log; use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError}; @@ -25,7 +26,8 @@ use serde::{Deserialize, Serialize}; use timely::progress::Antichain; use crate::dataflows::proto_dataflow_description::{ - ProtoIndexExport, ProtoIndexImport, ProtoSinkExport, ProtoSourceImport, + ProtoDataflowExpirationDesc, ProtoIndexExport, ProtoIndexImport, ProtoSinkExport, + ProtoSourceImport, }; use crate::plan::flat_plan::FlatPlan; use crate::plan::Plan; @@ -72,9 +74,11 @@ pub struct DataflowDescription { pub refresh_schedule: Option, /// Human readable name pub debug_name: String, + /// Information about the dataflow used to determine if dataflow expiration can be enabled. + pub dataflow_expiration_desc: DataflowExpirationDesc, } -impl DataflowDescription, (), mz_repr::Timestamp> { +impl DataflowDescription { /// Tests if the dataflow refers to a single timestamp, namely /// that `as_of` has a single coordinate and that the `until` /// value corresponds to the `as_of` value plus one, or `as_of` @@ -108,6 +112,38 @@ impl DataflowDescription, (), mz_repr::Timestamp> { as_of.try_step_forward().as_ref() == until.as_option() } + /// Returns the dataflow expiration, i.e, the timestamp beyond which diffs can be + /// dropped. + /// + /// Returns an empty timestamp if `replica_expiration` is unset or matches conditions under + /// which dataflow expiration should be disabled. + pub fn expire_dataflow_at( + &self, + replica_expiration: &Antichain, + ) -> Antichain { + let dataflow_expiration_desc = &self.dataflow_expiration_desc; + + // Disable dataflow expiration if `replica_expiration` is unset, the current dataflow has a + // refresh schedule, has a transitive dependency with a refresh schedule, or the dataflow's + // timeline is not `Timeline::EpochMilliSeconds`. + if replica_expiration.is_empty() + || self.refresh_schedule.is_some() + || dataflow_expiration_desc.has_transitive_refresh_schedule + || !dataflow_expiration_desc.is_timeline_epoch_ms + { + return Antichain::default(); + } + + if let Some(upper) = &dataflow_expiration_desc.transitive_upper { + // Returns empty if `upper` is empty, else the max of `upper` and `replica_expiration`. + upper.join(replica_expiration) + } else { + replica_expiration.clone() + } + } +} + +impl DataflowDescription, (), mz_repr::Timestamp> { /// Check invariants expected to be true about `DataflowDescription`s. pub fn check_invariants(&self) -> Result<(), String> { let mut plans: Vec<_> = self.objects_to_build.iter().map(|o| &o.plan).collect(); @@ -141,6 +177,7 @@ impl DataflowDescription { initial_storage_as_of: None, refresh_schedule: None, debug_name: name, + dataflow_expiration_desc: DataflowExpirationDesc::default(), } } @@ -543,6 +580,7 @@ where initial_storage_as_of: self.initial_storage_as_of.clone(), refresh_schedule: self.refresh_schedule.clone(), debug_name: self.debug_name.clone(), + dataflow_expiration_desc: self.dataflow_expiration_desc.clone(), } } } @@ -560,6 +598,7 @@ impl RustType for DataflowDescription for DataflowDescription(), 1..5), refresh_schedule_some in any::(), refresh_schedule in any::(), + dataflow_expiration_desc in any::>(), ) -> DataflowDescription { DataflowDescription { source_imports: BTreeMap::from_iter(source_imports.into_iter()), @@ -743,6 +788,7 @@ proptest::prop_compose! { None }, debug_name, + dataflow_expiration_desc, } } } @@ -842,6 +888,80 @@ impl RustType for BuildDesc { } } +/// A description of dataflow properties used to determine if dataflow expiration can be enabled. +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct DataflowExpirationDesc { + /// The upper of the dataflow considering all transitive dependencies. + pub transitive_upper: Option>, + /// Whether the dataflow has a transitive dependency with a refresh schedule. + pub has_transitive_refresh_schedule: bool, + /// Whether the timeline of the dataflow is [`mz_storage_types::sources::Timeline::EpochMilliseconds`]. + pub is_timeline_epoch_ms: bool, +} + +impl Default for DataflowExpirationDesc { + fn default() -> Self { + Self { + transitive_upper: None, + // Assume present unless explicitly checked. + has_transitive_refresh_schedule: true, + // Assume any timeline type possible unless explicitly checked. + is_timeline_epoch_ms: false, + } + } +} + +impl RustType for DataflowExpirationDesc { + fn into_proto(&self) -> ProtoDataflowExpirationDesc { + ProtoDataflowExpirationDesc { + transitive_upper: self.transitive_upper.into_proto(), + has_transitive_refresh_schedule: self.has_transitive_refresh_schedule.into_proto(), + is_timeline_epoch_ms: self.is_timeline_epoch_ms.into_proto(), + } + } + + fn from_proto(x: ProtoDataflowExpirationDesc) -> Result { + Ok(Self { + transitive_upper: x.transitive_upper.into_rust()?, + has_transitive_refresh_schedule: x.has_transitive_refresh_schedule.into_rust()?, + is_timeline_epoch_ms: x.is_timeline_epoch_ms.into_rust()?, + }) + } +} + +impl Arbitrary for DataflowExpirationDesc { + type Strategy = BoxedStrategy; + type Parameters = (); + + fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { + ( + any::(), + any::(), + proptest::collection::vec(any::(), 1..5), + any::(), + ) + .prop_map( + |( + has_transitive_refresh_schedule, + transitive_upper_some, + transitive_upper, + is_timeline_epoch_ms, + )| { + DataflowExpirationDesc { + transitive_upper: if transitive_upper_some { + Some(Antichain::from(transitive_upper)) + } else { + None + }, + has_transitive_refresh_schedule, + is_timeline_epoch_ms, + } + }, + ) + .boxed() + } +} + #[cfg(test)] mod tests { use mz_ore::assert_ok; diff --git a/src/compute-types/src/dyncfgs.rs b/src/compute-types/src/dyncfgs.rs index 58eff79a972be..856dc1eb7657b 100644 --- a/src/compute-types/src/dyncfgs.rs +++ b/src/compute-types/src/dyncfgs.rs @@ -122,6 +122,17 @@ pub const COPY_TO_S3_MULTIPART_PART_SIZE_BYTES: Config = Config::new( "The size of each part in a multipart upload to S3.", ); +/// The maximum lifetime of a replica configured as an offset to the replica start time. +/// Used in temporal filters to drop diffs generated at timestamps beyond the expiration time. +/// +/// Disabled by default. Once set, cannot be disabled again during the lifetime of a replica. +/// When set multiple times, existing replicas only accept strictly decreasing offsets. +pub const COMPUTE_REPLICA_EXPIRATION_OFFSET: Config = Config::new( + "compute_replica_expiration_offset", + Duration::ZERO, + "The expiration time offset for replicas. Zero disables expiration.", +); + /// Adds the full set of all compute `Config`s. pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { configs @@ -139,4 +150,5 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { .add(©_TO_S3_PARQUET_ROW_GROUP_FILE_RATIO) .add(©_TO_S3_ARROW_BUILDER_BUFFER_RATIO) .add(©_TO_S3_MULTIPART_PART_SIZE_BYTES) + .add(&COMPUTE_REPLICA_EXPIRATION_OFFSET) } diff --git a/src/compute-types/src/plan/lowering.rs b/src/compute-types/src/plan/lowering.rs index 2c5adf013ed31..d94b03d59e186 100644 --- a/src/compute-types/src/plan/lowering.rs +++ b/src/compute-types/src/plan/lowering.rs @@ -117,6 +117,7 @@ impl Context { initial_storage_as_of: desc.initial_storage_as_of, refresh_schedule: desc.refresh_schedule, debug_name: desc.debug_name, + dataflow_expiration_desc: desc.dataflow_expiration_desc, }) } diff --git a/src/compute/src/compute_state.rs b/src/compute/src/compute_state.rs index 40437f511ab75..1d4b1a8ae4f78 100644 --- a/src/compute/src/compute_state.rs +++ b/src/compute/src/compute_state.rs @@ -15,6 +15,7 @@ use std::sync::{mpsc, Arc}; use std::time::{Duration, Instant}; use bytesize::ByteSize; +use differential_dataflow::lattice::Lattice; use differential_dataflow::trace::cursor::IntoOwned; use differential_dataflow::trace::{Cursor, TraceReader}; use differential_dataflow::Hashable; @@ -34,6 +35,7 @@ use mz_dyncfg::ConfigSet; use mz_expr::SafeMfpPlan; use mz_ore::cast::CastFrom; use mz_ore::metrics::UIntGauge; +use mz_ore::now::EpochMillis; use mz_ore::task::AbortOnDropHandle; use mz_ore::tracing::{OpenTelemetryContext, TracingHandle}; use mz_persist_client::cache::PersistClientCache; @@ -162,6 +164,16 @@ pub struct ComputeState { /// Interval at which to perform server maintenance tasks. Set to a zero interval to /// perform maintenance with every `step_or_park` invocation. pub server_maintenance_interval: Duration, + + /// The [`mz_ore::now::SYSTEM_TIME`] at which the replica was started. + /// + /// Used to compute `replica_expiration`. + pub init_system_time: EpochMillis, + + /// The maximum time for which the replica is expected to live. If not empty, dataflows in the + /// replica can drop diffs associated with timestamps beyond the replica expiration. + /// The replica will panic if such dataflows are not dropped before the replica has expired. + pub replica_expiration: Antichain, } impl ComputeState { @@ -208,6 +220,8 @@ impl ComputeState { read_only_tx, read_only_rx, server_maintenance_interval: Duration::ZERO, + init_system_time: mz_ore::now::SYSTEM_TIME(), + replica_expiration: Antichain::default(), } } @@ -294,6 +308,53 @@ impl ComputeState { // Remember the maintenance interval locally to avoid reading it from the config set on // every server iteration. self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config); + + // Set replica expiration. + { + let offset = COMPUTE_REPLICA_EXPIRATION_OFFSET.get(&self.worker_config); + if offset.is_zero() { + if !self.replica_expiration.is_empty() { + warn!( + current_replica_expiration = ?self.replica_expiration, + "replica_expiration: cannot disable once expiration is enabled", + ); + } + } else { + let offset: EpochMillis = offset + .as_millis() + .try_into() + .expect("duration must fit within u64"); + let replica_expiration_millis = self.init_system_time + offset; + let replica_expiration = Timestamp::from(replica_expiration_millis); + + // We only allow updating `replica_expiration` to an earlier time. Allowing it to be + // updated to a later time could be surprising since existing dataflows would still + // panic at their original expiration. + if !self.replica_expiration.less_equal(&replica_expiration) { + info!( + offset = %offset, + replica_expiration_millis = %replica_expiration_millis, + replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis), + "updating replica_expiration", + ); + self.replica_expiration = Antichain::from_elem(replica_expiration); + } else { + warn!( + new_offset = %offset, + current_replica_expiration_millis = %replica_expiration_millis, + current_replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis), + "replica_expiration: ignoring new offset as it is greater than current value", + ); + } + } + + // Record the replica expiration in the metrics. + if let Some(expiration) = self.replica_expiration.as_option() { + self.worker_metrics + .replica_expiration_timestamp_seconds + .set(expiration.into()); + } + } } /// Returns the cc or non-cc version of "dataflow_max_inflight_bytes", as @@ -416,22 +477,31 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { let dataflow_index = self.timely_worker.next_dataflow_index(); let as_of = dataflow.as_of.clone().unwrap(); + // Determine the dataflow expiration, if any. + let dataflow_expiration = + dataflow.expire_dataflow_at(&self.compute_state.replica_expiration); + + // Add the dataflow expiration to `until`. + let until = dataflow.until.meet(&dataflow_expiration); + if dataflow.is_transient() { - tracing::debug!( + debug!( name = %dataflow.debug_name, import_ids = %dataflow.display_import_ids(), export_ids = %dataflow.display_export_ids(), as_of = ?as_of.elements(), - until = ?dataflow.until.elements(), + dataflow_expiration = ?dataflow_expiration.elements(), + until = ?until.elements(), "creating dataflow", ); } else { - tracing::info!( + info!( name = %dataflow.debug_name, import_ids = %dataflow.display_import_ids(), export_ids = %dataflow.display_export_ids(), as_of = ?as_of.elements(), - until = ?dataflow.until.elements(), + dataflow_expiration = ?dataflow_expiration.elements(), + until = ?until.elements(), "creating dataflow", ); }; @@ -481,6 +551,8 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { self.compute_state, dataflow, start_signal, + until, + dataflow_expiration, ); } @@ -759,6 +831,19 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { } } + /// Report per-worker metrics. + pub(crate) fn report_metrics(&self) { + if let Some(expiration) = self.compute_state.replica_expiration.as_option() { + let now = Duration::from_millis(mz_ore::now::SYSTEM_TIME()).as_secs_f64(); + let expiration = Duration::from_millis(::from(expiration)).as_secs_f64(); + let remaining = expiration - now; + self.compute_state + .worker_metrics + .replica_expiration_remaining_seconds + .set(remaining) + } + } + /// Either complete the peek (and send the response) or put it in the pending set. fn process_peek(&mut self, upper: &mut Antichain, mut peek: PendingPeek) { let response = match &mut peek { diff --git a/src/compute/src/metrics.rs b/src/compute/src/metrics.rs index aa2ce8a7a0a1c..6f153e402e07b 100644 --- a/src/compute/src/metrics.rs +++ b/src/compute/src/metrics.rs @@ -63,12 +63,22 @@ pub struct ComputeMetrics { /// Histogram of command handling durations. pub(crate) handle_command_duration_seconds: HistogramVec, + + /// The timestamp of replica expiration. + pub(crate) replica_expiration_timestamp_seconds: raw::UIntGaugeVec, + + /// Remaining seconds until replica expiration. + pub(crate) replica_expiration_remaining_seconds: raw::GaugeVec, } /// Per-worker metrics. pub struct WorkerMetrics { /// Histogram of command handling durations. pub(crate) handle_command_duration_seconds: CommandMetrics, + /// The timestamp of replica expiration. + pub(crate) replica_expiration_timestamp_seconds: UIntGauge, + /// Remaining seconds until replica expiration. + pub(crate) replica_expiration_remaining_seconds: raw::Gauge, } impl WorkerMetrics { @@ -81,8 +91,18 @@ impl WorkerMetrics { .with_label_values(&[&worker, typ]) }); + let replica_expiration_timestamp_seconds = metrics + .replica_expiration_timestamp_seconds + .with_label_values(&[&worker]); + + let replica_expiration_remaining_seconds = metrics + .replica_expiration_remaining_seconds + .with_label_values(&[&worker]); + Self { handle_command_duration_seconds, + replica_expiration_timestamp_seconds, + replica_expiration_remaining_seconds, } } } @@ -170,6 +190,16 @@ impl ComputeMetrics { var_labels: ["worker_id", "command_type"], buckets: mz_ore::stats::histogram_seconds_buckets(0.000_128, 8.0), )), + replica_expiration_timestamp_seconds: registry.register(metric!( + name: "mz_dataflow_replica_expiration_timestamp_seconds", + help: "The replica expiration timestamp in seconds since epoch.", + var_labels: ["worker_id"], + )), + replica_expiration_remaining_seconds: registry.register(metric!( + name: "mz_dataflow_replica_expiration_remaining_seconds", + help: "The remaining seconds until replica expiration. Can go negative, can lag behind.", + var_labels: ["worker_id"], + )), } } diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index f077382a40ba1..6bfd1352ce3b8 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -127,7 +127,7 @@ use mz_repr::{Datum, GlobalId, Row, SharedRow}; use mz_storage_operators::persist_source; use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::errors::DataflowError; -use mz_timely_util::operator::CollectionExt; +use mz_timely_util::operator::{CollectionExt, StreamExt}; use timely::communication::Allocate; use timely::container::columnation::Columnation; use timely::dataflow::channels::pact::Pipeline; @@ -176,6 +176,8 @@ pub fn build_compute_dataflow( compute_state: &mut ComputeState, dataflow: DataflowDescription, start_signal: StartSignal, + until: Antichain, + dataflow_expiration: Antichain, ) { // Mutually recursive view definitions require special handling. let recursive = dataflow @@ -241,7 +243,7 @@ pub fn build_compute_dataflow( source.storage_metadata.clone(), dataflow.as_of.clone(), snapshot_mode, - dataflow.until.clone(), + until.clone(), mfp.as_mut(), compute_state.dataflow_max_inflight_bytes(), start_signal.clone(), @@ -296,8 +298,13 @@ pub fn build_compute_dataflow( // in order to support additional timestamp coordinates for iteration. if recursive { scope.clone().iterative::, _, _>(|region| { - let mut context = - Context::for_dataflow_in(&dataflow, region.clone(), compute_state); + let mut context = Context::for_dataflow_in( + &dataflow, + region.clone(), + compute_state, + until, + dataflow_expiration, + ); for (id, (oks, errs)) in imported_sources.into_iter() { let bundle = crate::render::CollectionBundle::from_collections( @@ -366,8 +373,13 @@ pub fn build_compute_dataflow( }); } else { scope.clone().region_named(&build_name, |region| { - let mut context = - Context::for_dataflow_in(&dataflow, region.clone(), compute_state); + let mut context = Context::for_dataflow_in( + &dataflow, + region.clone(), + compute_state, + until, + dataflow_expiration, + ); for (id, (oks, errs)) in imported_sources.into_iter() { let bundle = crate::render::CollectionBundle::from_collections( @@ -527,7 +539,14 @@ where }); match bundle.arrangement(&idx.key) { - Some(ArrangementFlavor::Local(oks, errs)) => { + Some(ArrangementFlavor::Local(mut oks, mut errs)) => { + // Ensure that the frontier does not advance past the expiration time, if set. + // Otherwise, we might write down incorrect data. + if let Some(&expiration) = self.dataflow_expiration.as_option() { + oks.expire_arrangement_at(expiration); + errs.stream = errs.stream.expire_stream_at(expiration); + } + // Obtain a specialized handle matching the specialized arrangement. let oks_trace = oks.trace_handle(); @@ -594,14 +613,22 @@ where match bundle.arrangement(&idx.key) { Some(ArrangementFlavor::Local(oks, errs)) => { - let oks = self.dispatch_rearrange_iterative(oks, "Arrange export iterative"); - let oks_trace = oks.trace_handle(); + let mut oks = self.dispatch_rearrange_iterative(oks, "Arrange export iterative"); - let errs = errs + let mut errs = errs .as_collection(|k, v| (k.clone(), v.clone())) .leave() .mz_arrange("Arrange export iterative err"); + // Ensure that the frontier does not advance past the expiration time, if set. + // Otherwise, we might write down incorrect data. + if let Some(&expiration) = self.dataflow_expiration.as_option() { + oks.expire_arrangement_at(expiration); + errs.stream = errs.stream.expire_stream_at(expiration); + } + + let oks_trace = oks.trace_handle(); + // Attach logging of dataflow errors. if let Some(logger) = compute_state.compute_logger.clone() { errs.stream.log_dataflow_errors(logger, idx_id); diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index 0a0a4d03d6a7a..44a1573501cde 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -27,7 +27,7 @@ use mz_repr::fixed_length::{FromDatumIter, ToDatumIter}; use mz_repr::{DatumVec, DatumVecBorrow, Diff, GlobalId, Row, RowArena, SharedRow}; use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::errors::DataflowError; -use mz_timely_util::operator::CollectionExt; +use mz_timely_util::operator::{CollectionExt, StreamExt}; use timely::container::columnation::Columnation; use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::Pipeline; @@ -87,6 +87,9 @@ where pub(super) hydration_logger: Option, /// Specification for rendering linear joins. pub(super) linear_join_spec: LinearJoinSpec, + /// The expiration time for dataflows in this context. The output's frontier should never advance + /// past this frontier, except the empty frontier. + pub dataflow_expiration: Antichain, } impl Context @@ -98,6 +101,8 @@ where dataflow: &DataflowDescription, scope: S, compute_state: &ComputeState, + until: Antichain, + dataflow_expiration: Antichain, ) -> Self { use mz_ore::collections::CollectionExt as IteratorExt; let dataflow_id = *scope.addr().into_first(); @@ -123,11 +128,12 @@ where debug_name: dataflow.debug_name.clone(), dataflow_id, as_of_frontier, - until: dataflow.until.clone(), + until, bindings: BTreeMap::new(), shutdown_token: Default::default(), hydration_logger, linear_join_spec: compute_state.linear_join_spec, + dataflow_expiration, } } } @@ -209,6 +215,7 @@ where hydration_logger: self.hydration_logger.clone(), linear_join_spec: self.linear_join_spec.clone(), bindings, + dataflow_expiration: self.dataflow_expiration.clone(), } } } @@ -299,6 +306,15 @@ where } } + /// Panic if the frontier of the underlying arrangement's stream exceeds `expiration` time. + pub fn expire_arrangement_at(&mut self, expiration: S::Timestamp) { + match self { + MzArrangement::RowRow(inner) => { + inner.stream = inner.stream.expire_stream_at(expiration); + } + } + } + /// Brings the underlying arrangement into a region. pub fn enter_region<'a>( &self, diff --git a/src/compute/src/render/sinks.rs b/src/compute/src/render/sinks.rs index a4d94457ce8b7..950dff3239eb9 100644 --- a/src/compute/src/render/sinks.rs +++ b/src/compute/src/render/sinks.rs @@ -92,6 +92,13 @@ where let mut ok_collection = ok_collection.leave(); let mut err_collection = err_collection.leave(); + // Ensure that the frontier does not advance past the expiration time, if set. Otherwise, + // we might write down incorrect data. + if let Some(&expiration) = self.dataflow_expiration.as_option() { + ok_collection = ok_collection.expire_collection_at(expiration); + err_collection = err_collection.expire_collection_at(expiration); + } + let non_null_assertions = sink.non_null_assertions.clone(); let from_desc = sink.from_desc.clone(); if !non_null_assertions.is_empty() { diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 48e7b6397fcb1..47135eb26b720 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -324,6 +324,7 @@ impl<'w, A: Allocate + 'static> Worker<'w, A> { debug_name: dataflow.debug_name.clone(), initial_storage_as_of: dataflow.initial_storage_as_of.clone(), refresh_schedule: dataflow.refresh_schedule.clone(), + dataflow_expiration_desc: dataflow.dataflow_expiration_desc.clone(), }) .map(ComputeCommand::CreateDataflow) .collect() @@ -473,6 +474,7 @@ impl<'w, A: Allocate + 'static> Worker<'w, A> { compute_state.report_operator_hydration(); compute_state.report_frontiers(); compute_state.report_dropped_collections(); + compute_state.report_metrics(); } self.metrics diff --git a/src/ore/src/metrics.rs b/src/ore/src/metrics.rs index 7a11d6156cd2c..835bdf8c0b206 100644 --- a/src/ore/src/metrics.rs +++ b/src/ore/src/metrics.rs @@ -190,11 +190,11 @@ use crate::assert_none; pub mod raw { use prometheus::core::{AtomicU64, GenericGaugeVec}; - /// The unsigned integer version of [`GaugeVec`](prometheus::GaugeVec). + /// The unsigned integer version of [`GaugeVec`]. /// Provides better performance if metric values are all unsigned integers. pub type UIntGaugeVec = GenericGaugeVec; - pub use prometheus::{CounterVec, HistogramVec, IntCounterVec, IntGaugeVec}; + pub use prometheus::{CounterVec, Gauge, GaugeVec, HistogramVec, IntCounterVec, IntGaugeVec}; } impl MetricsRegistry { diff --git a/src/timely-util/src/operator.rs b/src/timely-util/src/operator.rs index 22e3565cd3111..183e99fa5e53e 100644 --- a/src/timely-util/src/operator.rs +++ b/src/timely-util/src/operator.rs @@ -27,7 +27,9 @@ use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as Operato use timely::dataflow::operators::generic::operator::{self, Operator}; use timely::dataflow::operators::generic::{InputHandleCore, OperatorInfo, OutputHandleCore}; use timely::dataflow::operators::Capability; +use timely::dataflow::operators::InspectCore; use timely::dataflow::{Scope, StreamCore}; +use timely::progress::frontier::AntichainRef; use timely::progress::{Antichain, Timestamp}; use timely::{Container, Data, ExchangeData, PartialOrder}; @@ -159,6 +161,9 @@ where I: IntoIterator>, L: for<'a> FnMut(C1::Item<'a>) -> I + 'static; + /// Panic if the frontier of a [`StreamCore`] exceeds `expiration` time. + fn expire_stream_at(&self, expiration: G::Timestamp) -> StreamCore; + /// Take a Timely stream and convert it to a Differential stream, where each diff is "1" /// and each time is the current Timely timestamp. fn pass_through(&self, name: &str, unit: R) -> StreamCore @@ -230,6 +235,9 @@ where I: IntoIterator>, L: FnMut(D1) -> I + 'static; + /// Panic if the frontier of a [`Collection`] exceeds `expiration` time. + fn expire_collection_at(&self, expiration: G::Timestamp) -> Collection; + /// Replaces each record with another, with a new difference type. /// /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed) @@ -454,6 +462,17 @@ where }) } + fn expire_stream_at(&self, expiration: G::Timestamp) -> StreamCore { + self.inspect_container(move |data_or_frontier| { + if let Err(frontier) = data_or_frontier { + assert!( + frontier.is_empty() || AntichainRef::new(frontier).less_than(&expiration), + "frontier {frontier:?} has exceeded expiration {expiration:?}!", + ); + } + }) + } + fn pass_through(&self, name: &str, unit: R) -> StreamCore where CB: ContainerBuilder + for<'a> PushInto<(C1::Item<'a>, G::Timestamp, R)>, @@ -542,6 +561,10 @@ where (ok_stream.as_collection(), err_stream.as_collection()) } + fn expire_collection_at(&self, expiration: G::Timestamp) -> Collection { + self.inner.expire_stream_at(expiration).as_collection() + } + fn explode_one(&self, mut logic: L) -> Collection>::Output> where D2: differential_dataflow::Data, diff --git a/test/cluster/mzcompose.py b/test/cluster/mzcompose.py index 30de35c71f997..2ac90d0a1cfd1 100644 --- a/test/cluster/mzcompose.py +++ b/test/cluster/mzcompose.py @@ -4830,3 +4830,219 @@ def gracefully_alter(): port=6877, user="mz_system", ) + + +def workflow_crash_on_replica_expiration_mv( + c: Composition, parser: WorkflowArgumentParser +) -> None: + """ + Tests that clusterd crashes when a replica is set to expire + """ + c.down(destroy_volumes=True) + with c.override( + Clusterd(name="clusterd1", restart="on-failure"), + ): + offset = 20 + + c.up("materialized") + c.up("clusterd1") + c.sql( + f""" + ALTER SYSTEM SET enable_unorchestrated_cluster_replicas = 'true'; + ALTER SYSTEM SET compute_replica_expiration_offset = '{offset}s'; + + DROP CLUSTER IF EXISTS test CASCADE; + DROP TABLE IF EXISTS t CASCADE; + + CREATE CLUSTER test REPLICAS ( + test ( + STORAGECTL ADDRESSES ['clusterd1:2100'], + STORAGE ADDRESSES ['clusterd1:2103'], + COMPUTECTL ADDRESSES ['clusterd1:2101'], + COMPUTE ADDRESSES ['clusterd1:2102'], + WORKERS 1 + ) + ); + SET CLUSTER TO test; + + CREATE TABLE t (x int); + INSERT INTO t VALUES (42); + CREATE MATERIALIZED VIEW mv AS SELECT * FROM t WHERE x < 84; + SELECT mz_unsafe.mz_sleep({offset + 10}); + """, + port=6877, + user="mz_system", + ) + + results = c.sql_query( + """ + SELECT * from mv; + """, + port=6877, + user="mz_system", + ) + assert results == [(42,)], f"Results mismatch: expected [42], found {results}" + + c1 = c.invoke("logs", "clusterd1", capture=True) + assert ( + "has exceeded expiration" in c1.stdout + ), "unexpected success in crash-on-replica-expiration" + + +def workflow_crash_on_replica_expiration_index( + c: Composition, parser: WorkflowArgumentParser +) -> None: + + def fetch_metrics() -> Metrics: + resp = c.exec( + "clusterd1", "curl", "localhost:6878/metrics", capture=True + ).stdout + return Metrics(resp) + + """ + Tests that clusterd crashes when a replica is set to expire + """ + c.down(destroy_volumes=True) + with c.override( + Clusterd(name="clusterd1", restart="on-failure"), + ): + offset = 20 + + c.up("materialized") + c.up("clusterd1") + c.sql( + f""" + ALTER SYSTEM SET enable_unorchestrated_cluster_replicas = 'true'; + ALTER SYSTEM SET compute_replica_expiration_offset = '{offset}s'; + + DROP CLUSTER IF EXISTS test CASCADE; + DROP TABLE IF EXISTS t CASCADE; + + CREATE CLUSTER test REPLICAS ( + test ( + STORAGECTL ADDRESSES ['clusterd1:2100'], + STORAGE ADDRESSES ['clusterd1:2103'], + COMPUTECTL ADDRESSES ['clusterd1:2101'], + COMPUTE ADDRESSES ['clusterd1:2102'], + WORKERS 1 + ) + ); + SET CLUSTER TO test; + + CREATE TABLE t (x int); + INSERT INTO t VALUES (42); + CREATE VIEW mv AS SELECT * FROM t WHERE x < 84; + CREATE DEFAULT INDEX ON mv; + SELECT mz_unsafe.mz_sleep({offset + 10}); + """, + port=6877, + user="mz_system", + ) + + results = c.sql_query( + """ + SELECT * from mv; + """, + port=6877, + user="mz_system", + ) + assert results == [(42,)], f"Results mismatch: expected [42], found {results}" + + c1 = c.invoke("logs", "clusterd1", capture=True) + assert ( + "has exceeded expiration" in c1.stdout + ), "unexpected success in crash-on-replica-expiration" + + # Wait a bit to let the controller refresh its metrics. + time.sleep(2) + + # Check that expected metrics exist and have sensible values. + metrics = fetch_metrics() + + expected_expiration_timestamp_sec = int(time.time()) + + expiration_timestamp_sec = ( + metrics.get_value("mz_dataflow_replica_expiration_timestamp_seconds") / 1000 + ) + # Just ensure the expiration_timestamp is within a reasonable range of now(). + assert ( + (expected_expiration_timestamp_sec - offset) + < expiration_timestamp_sec + < (expected_expiration_timestamp_sec + offset) + ), f"expiration_timestamp: expected={expected_expiration_timestamp_sec}[{datetime.fromtimestamp(expected_expiration_timestamp_sec)}], got={expiration_timestamp_sec}[{[{datetime.fromtimestamp(expiration_timestamp_sec)}]}]" + + expiration_remaining = metrics.get_value( + "mz_dataflow_replica_expiration_remaining_seconds" + ) + # Ensure the expiration_remaining is within the configured offset. + assert ( + 1.0 < expiration_remaining < float(offset) + ), f"expiration_remaining: expected < 10s, got={expiration_remaining}" + + +def workflow_replica_expiration_creates_retraction_diffs_after_panic( + c: Composition, parser: WorkflowArgumentParser +) -> None: + """ + Test that retraction diffs within the expiration time are generated after the replica expires and panics + """ + c.down(destroy_volumes=True) + with c.override( + Testdrive(no_reset=True), + Clusterd(name="clusterd1", restart="on-failure"), + ): + + c.up("testdrive", persistent=True) + c.up("materialized") + c.up("clusterd1") + c.testdrive( + dedent( + """ + $ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} + ALTER SYSTEM SET enable_unorchestrated_cluster_replicas = 'true'; + ALTER SYSTEM SET compute_replica_expiration_offset = '50s'; + + > DROP CLUSTER IF EXISTS test CASCADE; + > DROP TABLE IF EXISTS events CASCADE; + + > CREATE CLUSTER test REPLICAS ( + test ( + STORAGECTL ADDRESSES ['clusterd1:2100'], + STORAGE ADDRESSES ['clusterd1:2103'], + COMPUTECTL ADDRESSES ['clusterd1:2101'], + COMPUTE ADDRESSES ['clusterd1:2102'], + WORKERS 1 + ) + ); + > SET CLUSTER TO test; + + > CREATE TABLE events ( + content TEXT, + event_ts TIMESTAMP + ); + + > CREATE VIEW events_view AS + SELECT event_ts, content + FROM events + WHERE mz_now() <= event_ts + INTERVAL '80s'; + + > CREATE DEFAULT INDEX ON events_view; + + > INSERT INTO events SELECT x::text, now() FROM generate_series(1, 1000) AS x; + + # Retraction diffs are not generated + > SELECT records FROM mz_introspection.mz_dataflow_arrangement_sizes + WHERE name LIKE '%events_view_primary_idx'; + 1000 + # Sleep until the replica expires + > SELECT mz_unsafe.mz_sleep(60); + + # Retraction diffs are now within the expiration time and should be generated + > SELECT records FROM mz_introspection.mz_dataflow_arrangement_sizes + WHERE name LIKE '%events_view_primary_idx'; + 2000 + > DROP TABLE events CASCADE; + > DROP CLUSTER test CASCADE; + """ + ) + ) diff --git a/test/testdrive/replica-expiration.td b/test/testdrive/replica-expiration.td new file mode 100644 index 0000000000000..abe629c80af15 --- /dev/null +++ b/test/testdrive/replica-expiration.td @@ -0,0 +1,241 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# Test indexes. + +## No retractions: expiration=30d, temporal filter width=20d +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET compute_replica_expiration_offset = '30d' +> CREATE CLUSTER test (SIZE = '1') +> SET CLUSTER TO test +> CREATE TABLE events ( + content TEXT, + event_ts TIMESTAMP + ); +> CREATE VIEW events_view AS + SELECT event_ts, content + FROM events + WHERE mz_now() <= event_ts + INTERVAL '20 days'; +> CREATE DEFAULT INDEX ON events_view; +> INSERT INTO events SELECT x::text, now() FROM generate_series(1, 1000) AS x; +> SELECT records FROM mz_introspection.mz_dataflow_arrangement_sizes + WHERE name LIKE '%events_view_primary_idx'; +2000 +> DROP TABLE events CASCADE; +> DROP CLUSTER test; + +## Does retractions: expiration=20d, temporal filter width=30d +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET compute_replica_expiration_offset = '20d' +> CREATE CLUSTER test (SIZE = '1') +> SET CLUSTER TO test +> CREATE TABLE events ( + content TEXT, + event_ts TIMESTAMP + ); +> CREATE VIEW events_view AS + SELECT event_ts, content + FROM events + WHERE mz_now() <= event_ts + INTERVAL '30 days'; +> CREATE DEFAULT INDEX ON events_view; +> INSERT INTO events SELECT x::text, now() FROM generate_series(1, 1000) AS x; +> SELECT records FROM mz_introspection.mz_dataflow_arrangement_sizes + WHERE name LIKE '%events_view_primary_idx'; +1000 +> DROP TABLE events CASCADE; +> DROP CLUSTER test; + + +# Test materialize views. `mz_introspection` does not report number of records, but we just make +# sure that the queries work. + +## No retractions: expiration=30d, temporal filter width=20d +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET compute_replica_expiration_offset = '30d' +> CREATE CLUSTER test (SIZE = '1') +> SET CLUSTER TO test +> CREATE TABLE events ( + content TEXT, + event_ts TIMESTAMP + ); +> INSERT INTO events SELECT x::text, now() FROM generate_series(1, 1000) AS x; +> CREATE MATERIALIZED VIEW events_mv AS + SELECT event_ts, content + FROM events + WHERE mz_now() <= event_ts + INTERVAL '20 days'; +> SELECT count(*) FROM events_mv where content like '1%'; +112 +> DROP TABLE events CASCADE; +> DROP CLUSTER test; + +## Does retractions: expiration=20d, temporal filter width=30d +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET compute_replica_expiration_offset = '20d' +> CREATE CLUSTER test (SIZE = '1') +> SET CLUSTER TO test +> CREATE TABLE events ( + content TEXT, + event_ts TIMESTAMP + ); +> INSERT INTO events SELECT x::text, now() FROM generate_series(1, 1000) AS x; +> CREATE MATERIALIZED VIEW events_mv AS + SELECT event_ts, content + FROM events + WHERE mz_now() <= event_ts + INTERVAL '30 days'; +> SELECT count(*) FROM events_mv where content like '1%'; +112 +> DROP TABLE events CASCADE; +> DROP CLUSTER test; + + +# Check that transitive refresh disables expiration even when enabled: expiration=20d, temporal filter width=30d + +## No transitive refresh enables expiration, thus no retractions +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET compute_replica_expiration_offset = '20d' +> CREATE CLUSTER test (SIZE = '1') +> SET CLUSTER TO test +> CREATE TABLE events ( + id INT, + content TEXT, + event_ts TIMESTAMP + ); +> INSERT INTO events SELECT x, x::text, now() FROM generate_series(1, 1000) AS x; +> CREATE MATERIALIZED VIEW mv AS + SELECT id, content, event_ts from events WHERE id < 100; +> CREATE VIEW view1 AS + SELECT id, content, event_ts + FROM mv + WHERE content like '1%'; +> CREATE VIEW view2 AS + SELECT id, content, event_ts + FROM view1 + WHERE content like '__'; +> CREATE VIEW view3 AS + SELECT id, content, event_ts + FROM view2 + WHERE mz_now() <= event_ts + INTERVAL '30 days'; +> CREATE DEFAULT INDEX ON view3; +> SELECT records FROM mz_introspection.mz_dataflow_arrangement_sizes + WHERE name LIKE '%view3_primary_idx'; +10 +> DROP TABLE events CASCADE; +> DROP MATERIALIZED VIEW if exists mv CASCADE; +> DROP VIEW if exists view1 CASCADE; +> DROP VIEW if exists view2 CASCADE; +> DROP VIEW if exists view3 CASCADE; +> DROP CLUSTER test; + +## Transitive refresh disables expiration, resulting in retractions +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET compute_replica_expiration_offset = '20d' +> CREATE CLUSTER test (SIZE = '1') +> SET CLUSTER TO test +> CREATE TABLE events ( + id INT, + content TEXT, + event_ts TIMESTAMP + ); +> INSERT INTO events SELECT x, x::text, now() FROM generate_series(1, 1000) AS x; +> CREATE MATERIALIZED VIEW mv + WITH (REFRESH AT CREATION, REFRESH AT '3000-01-01 23:59') AS + SELECT id, content, event_ts from events WHERE id < 100; +> CREATE VIEW view1 AS + SELECT id, content, event_ts + FROM mv + WHERE content like '1%'; +> CREATE VIEW view2 AS + SELECT id, content, event_ts + FROM view1 + WHERE content like '__'; +> CREATE VIEW view3 AS + SELECT id, content, event_ts + FROM view2 + WHERE mz_now() <= event_ts + INTERVAL '30 days'; +> CREATE DEFAULT INDEX ON view3; +> SELECT records FROM mz_introspection.mz_dataflow_arrangement_sizes + WHERE name LIKE '%view3_primary_idx'; +20 +> DROP TABLE events CASCADE; +> DROP MATERIALIZED VIEW if exists mv CASCADE; +> DROP VIEW if exists view1 CASCADE; +> DROP VIEW if exists view2 CASCADE; +> DROP VIEW if exists view3 CASCADE; +> DROP CLUSTER test; + + +# Views with constant values disable expiration even when enabled: expiration=20d, temporal filter width=30d + +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET compute_replica_expiration_offset = '20d'; +> CREATE CLUSTER test (SIZE = '1'); +> SET CLUSTER TO test; +> CREATE VIEW events_over_time AS VALUES ('joe', 100), ('mike', 101), ('sam', 200), ('end', 18446144073709551615); +> CREATE VIEW events AS SELECT * FROM events_over_time WHERE mz_now() <= column2 + 2592000000; -- 30d in ms +> CREATE DEFAULT INDEX ON events; +> SUBSCRIBE events WITH (progress) AS OF 0; +mz_timestamp mz_progressed mz_diff column1 column2 +---- +0 false 1 end 18446144073709551615 +0 false 1 joe 100 +0 false 1 mike 101 +0 false 1 sam 200 +0 true +18446144076301551616 false -1 end 18446144073709551615 +2592000101 false -1 joe 100 +2592000102 false -1 mike 101 +2592000201 false -1 sam 200 +> DROP VIEW if exists events_over_time CASCADE; +> DROP VIEW if exists events CASCADE; +> DROP CLUSTER test; + + +# Ensure disabling expiration results in retractions + +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET compute_replica_expiration_offset = 0; +> CREATE CLUSTER test (SIZE = '1') +> SET CLUSTER TO test +> CREATE TABLE events ( + content TEXT, + event_ts TIMESTAMP + ); +> CREATE VIEW events_view AS + SELECT event_ts, content + FROM events + WHERE mz_now() <= event_ts + INTERVAL '30 days'; +> CREATE DEFAULT INDEX ON events_view; +> INSERT INTO events SELECT x::text, now() FROM generate_series(1, 1000) AS x; +> SELECT records FROM mz_introspection.mz_dataflow_arrangement_sizes + WHERE name LIKE '%events_view_primary_idx'; +2000 +> DROP TABLE events CASCADE; +> DROP CLUSTER test; + + +# Test that a constant collection is not expired on a cluster with replication factor 0 + +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET compute_replica_expiration_offset = '30d'; +> CREATE CLUSTER test (SIZE = '1', REPLICATION FACTOR = 0); +> SET CLUSTER TO test; +# `now()` cannot be materialized. +> CREATE MATERIALIZED VIEW events_mv AS + SELECT x::text AS content, '2024-10-09 07:05:10.318+00'::timestamptz AS event_ts + FROM generate_series(1, 1000) AS x; +> CREATE VIEW events_view AS + SELECT event_ts, content + FROM events_mv + WHERE mz_now() <= event_ts + INTERVAL '30 years'; +> CREATE DEFAULT INDEX ON events_view; +> ALTER CLUSTER test SET (REPLICATION FACTOR = 1); +> SELECT count(*) FROM events_view; +1000 +> DROP MATERIALIZED VIEW events_mv CASCADE; +> DROP CLUSTER test CASCADE;