Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement dataflow expiration to limit temporal data retention #29587

Merged
merged 47 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
b1d23f7
compute: add new dyncfg to set replica expiration time
sdht0 Sep 19, 2024
d0d0570
compute: configure dataflows to respect the expiration time
sdht0 Sep 19, 2024
ccf1b76
*: configure expiration based on dataflow timeline
sdht0 Sep 19, 2024
9e9485a
doc: add design doc for improved temporal filters
sdht0 Sep 20, 2024
f7375c7
Address comments
sdht0 Sep 23, 2024
bb33ea4
hread through timeline
sdht0 Sep 25, 2024
40b9597
Exclude peek queries
sdht0 Sep 25, 2024
66403fd
Remove stray changes
sdht0 Sep 25, 2024
2abeca7
Remove more stray changes
sdht0 Sep 25, 2024
0a9c9dc
Remove more stray changes
sdht0 Sep 25, 2024
fa049fb
Implement transitive dependency checks
sdht0 Oct 3, 2024
16b7f6e
Fixes
sdht0 Oct 3, 2024
3035f4a
Tests for replica expiration
def- Oct 4, 2024
869e16c
Always apply worker config
sdht0 Oct 4, 2024
029f230
Address comments
sdht0 Oct 7, 2024
29b3506
Resolve conflicts, update SHA256_BY_SCENARIO_FILE
sdht0 Oct 7, 2024
70ded84
Improve tests
def- Oct 7, 2024
2270e02
Simplify
sdht0 Oct 7, 2024
4e3f67e
Move attached data to expiration specific struct
sdht0 Oct 7, 2024
cab072c
Rename
sdht0 Oct 7, 2024
b212370
Revert
sdht0 Oct 7, 2024
8b1bfbe
Refactor
sdht0 Oct 7, 2024
2bb7101
Design doc
sdht0 Oct 7, 2024
4fa7c38
Move catalog impl
sdht0 Oct 7, 2024
055de92
Paths
sdht0 Oct 7, 2024
c3fd0c6
Fix bug
sdht0 Oct 7, 2024
9c99688
Fix replica-expiration.td
def- Oct 8, 2024
03b51d8
Address comments
sdht0 Oct 8, 2024
42c87a8
Fix rebase on main
sdht0 Oct 9, 2024
5dbdcd1
Reimplement applying config changes
sdht0 Oct 9, 2024
530a3a5
Address comments
sdht0 Oct 9, 2024
b3ae7a6
Log expiration time
sdht0 Oct 9, 2024
0656388
Make explicit
sdht0 Oct 9, 2024
7f8529d
Tests
sdht0 Oct 9, 2024
445796f
Fmt
sdht0 Oct 10, 2024
6e2bcf4
ci: Ignore expected panic
def- Oct 10, 2024
9727741
Provide metrics
antiguru Oct 10, 2024
0cfb43c
Tests
sdht0 Oct 10, 2024
422a4fd
Metrics tests
sdht0 Oct 10, 2024
74e7819
Address comments
sdht0 Oct 10, 2024
f89ccf7
lint fixes
sdht0 Oct 10, 2024
29b36ad
Saner time offsets
sdht0 Oct 10, 2024
59ce8a3
In memory test makes more sense for VIEW+INDEX
sdht0 Oct 10, 2024
6dfa86e
Lint fixes
sdht0 Oct 10, 2024
74635c2
Fix flaky tests
sdht0 Oct 10, 2024
65feae5
Fix merge skew
sdht0 Oct 10, 2024
5b847d4
cluster test: Add workflow_replica_expiration_creates_retraction_diff…
def- Oct 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions doc/developer/design/20240919_dataflow_expiration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Dataflow Expiration

- Associated issues/prs: [#26029](https://github.com/MaterializeInc/materialize/issues/26029) [#29587](https://github.com/MaterializeInc/materialize/pull/29587)

## The Problem

Temporal filters currently require Materialize to maintain all future retractions
of data that is currently visible. For long windows, the retractions could be at
timestamps beyond our next scheduled restart, which is typically our weekly
DB release.

For instance, in the example below, the temporal filter in the `last_30_days`
view causes two diffs to be generated for every row inserted into `events`, the
row itself and a retraction 30 days later. However, if the replica is
restarted in the next few days, the retraction diff is never processed, making
it redundant to keep the diff waiting to be processed.

```sql
-- Create a table of timestamped events.
CREATE TABLE events (
content TEXT,
event_ts TIMESTAMP
);
-- Create a view of events from the last 30 seconds.
CREATE VIEW last_30_days AS
SELECT event_ts, content
FROM events
WHERE mz_now() <= event_ts + INTERVAL '30 days';

INSERT INTO events VALUES ('hello', now());

COPY (SUBSCRIBE (SELECT event_ts, content FROM last_30_days)) TO STDOUT;
1727130590201 1 2023-09-23 22:29:50.201 hello -- now()
1729722590222 -1 2023-10-23 22:29:50.222 hello -- now() + 30 days
```

Dataflows with large temporal windows (e.g. a year) can generate a large number
of retractions that consume memory and CPU but are never used. Instead, all
such retractions can be dropped.

## Success Criteria

When temporal filters are in use, retraction diffs associated with timestamps
beyond a set expiration time can be dropped without affecting correctness,
resulting in lower memory and CPU utilization from halving the number of
processed diffs.

## Solution Proposal

A new LaunchDarkly feature flag is introduced that specifies an _expiration
offset_ (a `Duration`). The _replica expiration_ time is computed as the offset
added to the start time of the replica. Dataflows matching certain
criteria (detailed below) are then configured with a _dataflow expiration_
derived from the replica expiration. Diffs generated in these dataflows beyond
the dataflow expiration are dropped. To ensure correctness, panic checks are
added to these dataflows that ensure that the frontier does not exceed the
dataflow expiration before the replica is restarted.

An overview of the logic used for these features is as follows:
```
# Consider the `upper` for different dataflows
if mv_view_with_constant_values:
upper := []
else if mv_with_refresh_every:
upper := [next_refresh()]
else:
upper := [write_frontier()]

# The `upper` for a dataflow considering all its transitive inputs
inputs_upper := meet(for all inputs i: i_upper)

# Dataflow expiration logic
if compute_replica_expiration_offset is not set:
dataflow_replication := []
else for dataflows of type in [materialized view, index, subscribe]:
replica_expiration := replica_start + compute_replica_expiration_offset
if dataflow_timeline is not EpochMilliseconds:
# Dataflows that do not depend on any source or table are not in the
# EpochMilliseconds timeline
dataflow_expiration := []
else if refresh_interval set in any transitive dependency of dataflow:
dataflow_expiration := []
else if inputs_upper == []:
dataflow_expiration := []
else if inputs_upper > expiration:
dataflow_expiration := inputs_upper
else:
dataflow_expiration := replica_expiration

dataflow_until := dataflow_until.meet(dataflow_expiration)
```

Note that we only consider dataflows representing materialized views, indexes,
and subscribes. These are long-running dataflows that maintain state during
their lifetime. Other dataflows such as peeks are transient and do not need to
explicitly drop retraction diffs.

More concretely, we make the following changes:

* Introduce a new dyncfg `compute_replica_expiration_offset`.
* If the offset is configured with a non-zero value, compute
`replica_expiration = now() + offset`. This value specifies the maximum
time for which the replica is expected to be running. Consequently, diffs
associated with timestamps beyond this limit do not have to be stored and can
be dropped.
* When building a dataflow, compute `dataflow_expiration` as per the logic
described above. If non-empty, the `dataflow_expiration` is added to the
dataflow `until` that ensures that any diff beyond this limit is dropped in
`mfp.evaluate()`.
* To ensure correctness, we attach checks in `Context::export_index` and
`Context::export_sink` that panic if the dataflow frontier exceeds the
configured `dataflow_expiration`. This is to prevent the dataflow from
serving potentially incorrect results due to dropped data.
* On a replica restart, `replica_expiration` and `dataflow_expiration` is
recomputed as the offset to the new start time. Any data whose timestamps
are within the new limit are now not dropped.

## Open Questions

- What is the appropriate default expiration time?
- Given that we currently restart replicas every week as part of the DB release
and leaving some buffer for skipped week, 3 weeks (+1 day margin) seems like
a good limit to start with.

## Out of Scope

Dataflow expiration is disabled for the following cases:

- Dataflows whose timeline type is not `Timeline::EpochMillis`. We rely on the
frontier timestamp being comparable to wall clock time of the replica.
- Dataflows that transitively depend on a materialized view with a non-default
refresh schedule. Handling such materialized views would require additional
logic to track the refresh schedules and ensure that the dataflow expiration
1 change: 1 addition & 0 deletions misc/python/materialize/cli/ci_annotate_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@
| restart-materialized-1\ *|\ thread\ 'coordinator'\ panicked\ at\ 'external\ operation\ .*\ failed\ unrecoverably.*
# Expected in cluster test
| cluster-clusterd[12]-1\ .*\ halting\ process:\ new\ timely\ configuration\ does\ not\ match\ existing\ timely\ configuration
| cluster-clusterd1-1\ .*\ has\ exceeded\ expiration
# Emitted by tests employing explicit mz_panic()
| forced\ panic
# Emitted by broken_statements.slt in order to stop panic propagation, as 'forced panic' will unwantedly panic the `environmentd` thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

# Consider increasing the scenario's class #version() if changes are expected to impact results!
SHA256_BY_SCENARIO_FILE: dict[str, str] = {
"benchmark_main.py": "9419ffb7f17de4584fc35e2f531c3e6181ad2b0284bf6aeb2f15ebb966a5e007",
"benchmark_main.py": "0e328994c56b9bae2a9da0db10974b4fa30dd8c825e3290e00f5cecbb32ec338",
"concurrency.py": "2e9c149c136b83b3853abc923a1adbdaf55a998ab4557712f8424c8b16f2adb1",
"customer.py": "d1e72837a342c3ebf1f4a32ec583b1b78a78644cdba495030a6df45ebbffe703",
"optbench.py": "ae411afe1ba595021f2f9d2d21500ba0c1c6941561493eabcae113373f493bfa",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2182,3 +2182,48 @@ def benchmark(self) -> MeasurementSource:
"""
)
)


class ReplicaExpiration(Scenario):
def init(self) -> list[Action]:
return [
TdAction(
"""
> CREATE TABLE events (
content TEXT,
event_ts TIMESTAMP
);
> CREATE VIEW last_30_days AS
SELECT event_ts, content
FROM events
WHERE mz_now() <= event_ts + INTERVAL '30 days';

> CREATE DEFAULT INDEX ON last_30_days
"""
),
]

def benchmark(self) -> MeasurementSource:
return Td(
dedent(
f"""
> DELETE FROM events;

> SELECT COUNT(*) FROM last_30_days
0

> SELECT 1;
/* A */
1

> INSERT INTO events SELECT concat('somelongstringthatdoesntmattermuchatallbutrequiresmemorytostoreXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', x::text), now() FROM generate_series(1, {self.n()}) AS x

> SELECT COUNT(*) FROM last_30_days
{self.n()}

> SELECT 1;
/* B */
1
"""
)
)
1 change: 1 addition & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def get_default_system_parameters(
"cluster_always_use_disk": "true",
"compute_dataflow_max_inflight_bytes": "134217728", # 128 MiB
"compute_hydration_concurrency": "2",
"compute_replica_expiration_offset": "3d",
"disk_cluster_replicas_default": "true",
"enable_0dt_deployment": "true" if zero_downtime else "false",
"enable_0dt_deployment_panic_after_timeout": "true",
Expand Down
2 changes: 2 additions & 0 deletions misc/python/materialize/mzcompose/services/clusterd.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(
environment_extra: list[str] = [],
memory: str | None = None,
options: list[str] = [],
restart: str = "no",
) -> None:
environment = [
"CLUSTERD_LOG_FILTER",
Expand Down Expand Up @@ -58,6 +59,7 @@ def __init__(
"ports": [2100, 2101, 6878],
"environment": environment,
"volumes": DEFAULT_MZ_VOLUMES,
"restart": restart,
}
)

Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ pub(crate) mod consistency;
mod migrate;

mod apply;
mod dataflow_expiration;
mod open;
mod state;
mod transact;
Expand Down
28 changes: 28 additions & 0 deletions src/adapter/src/catalog/dataflow_expiration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.

//! Helper function for dataflow expiration checks.

use mz_repr::GlobalId;

use crate::catalog::Catalog;

impl Catalog {
/// Whether the catalog entry `id` or any of its transitive dependencies is a materialized view
/// with a refresh schedule. Used to disable dataflow expiration if found.
pub(crate) fn item_has_transitive_refresh_schedule(&self, id: GlobalId) -> bool {
let test_has_transitive_refresh_schedule = |dep: GlobalId| -> bool {
if let Some(mv) = self.get_entry(&dep).materialized_view() {
return mv.refresh_schedule.is_some();
}
false
};
test_has_transitive_refresh_schedule(id)
|| self
.state()
.transitive_uses(id)
.any(test_has_transitive_refresh_schedule)
}
}
24 changes: 22 additions & 2 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,10 @@ pub struct CreateMaterializedViewOptimize {
/// An optional context set iff the state machine is initiated from
/// sequencing an EXPLAIN for this statement.
explain_ctx: ExplainContext,
/// Whether the timeline is [`mz_storage_types::sources::Timeline::EpochMilliseconds`].
///
/// Used to determine if it is safe to enable dataflow expiration.
is_timeline_epoch_ms: bool,
}

#[derive(Debug)]
Expand Down Expand Up @@ -2572,8 +2576,15 @@ impl Coordinator {
.catalog()
.resolve_full_name(entry.name(), None)
.to_string();
let is_timeline_epoch_ms = self.get_timeline_context(*id).is_timeline_epoch_ms();
let (_optimized_plan, physical_plan, _metainfo) = self
.optimize_create_continual_task(&ct, *id, self.owned_catalog(), debug_name)
.optimize_create_continual_task(
&ct,
*id,
self.owned_catalog(),
debug_name,
is_timeline_epoch_ms,
)
.expect("builtin CT should optimize successfully");

// Determine an as of for the new continual task.
Expand Down Expand Up @@ -2616,6 +2627,7 @@ impl Coordinator {

for entry in ordered_catalog_entries {
let id = entry.id();
let is_timeline_epoch_ms = self.get_timeline_context(id).is_timeline_epoch_ms();
match entry.item() {
CatalogItem::Index(idx) => {
// Collect optimizer parameters.
Expand Down Expand Up @@ -2646,6 +2658,7 @@ impl Coordinator {
entry.name().clone(),
idx.on,
idx.keys.to_vec(),
is_timeline_epoch_ms,
);
let global_mir_plan = optimizer.optimize(index_plan)?;
let optimized_plan = global_mir_plan.df_desc().clone();
Expand Down Expand Up @@ -2700,6 +2713,7 @@ impl Coordinator {
debug_name,
optimizer_config.clone(),
self.optimizer_metrics(),
is_timeline_epoch_ms,
);

// MIR ⇒ MIR optimization (global)
Expand Down Expand Up @@ -2743,7 +2757,13 @@ impl Coordinator {
.resolve_full_name(entry.name(), None)
.to_string();
let (optimized_plan, physical_plan, metainfo) = self
.optimize_create_continual_task(ct, id, self.owned_catalog(), debug_name)?;
.optimize_create_continual_task(
ct,
id,
self.owned_catalog(),
debug_name,
is_timeline_epoch_ms,
)?;

let catalog = self.catalog_mut();
catalog.set_optimized_plan(id, optimized_plan);
Expand Down
6 changes: 5 additions & 1 deletion src/adapter/src/coord/introspection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,11 @@ impl Coordinator {
let read_holds = self.acquire_read_holds(&id_bundle);
let as_of = read_holds.least_valid_read();

let global_mir_plan = global_mir_plan.resolve(as_of);
// Introspection subscribes only read from system collections, which are always in
// the `EpochMilliseconds` timeline.
let is_timeline_epoch_ms = true;

let global_mir_plan = global_mir_plan.resolve(as_of, is_timeline_epoch_ms);

let span = Span::current();
Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ impl Coordinator {
},
};

let is_timeline_epoch_ms = self
.validate_timeline_context(resolved_ids.0.clone())?
.is_timeline_epoch_ms();

// Construct the CatalogItem for this CT and optimize it.
let mut item = crate::continual_task::ct_item_from_plan(plan, sink_id, resolved_ids)?;
let full_name = bootstrap_catalog.resolve_full_name(&name, Some(session.conn_id()));
Expand All @@ -90,6 +94,7 @@ impl Coordinator {
sink_id,
Arc::new(bootstrap_catalog),
full_name.to_string(),
is_timeline_epoch_ms,
)?;

// Timestamp selection
Expand Down Expand Up @@ -156,6 +161,7 @@ impl Coordinator {
output_id: GlobalId,
catalog: Arc<dyn OptimizerCatalog>,
debug_name: String,
is_timeline_epoch_ms: bool,
) -> Result<
(
DataflowDescription<OptimizedMirRelationExpr>,
Expand Down Expand Up @@ -185,6 +191,7 @@ impl Coordinator {
debug_name,
optimizer_config,
self.optimizer_metrics(),
is_timeline_epoch_ms,
);

// HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local and global)
Expand Down
Loading
Loading