diff --git a/doc/developer/design/20240919_dataflow_expiration.md b/doc/developer/design/20240919_dataflow_expiration.md new file mode 100644 index 0000000000000..4bf1fbb1c09b0 --- /dev/null +++ b/doc/developer/design/20240919_dataflow_expiration.md @@ -0,0 +1,131 @@ +# Dataflow Expiration + +- Associated issues/prs: [#26029](https://github.com/MaterializeInc/materialize/issues/26029) [#29587](https://github.com/MaterializeInc/materialize/pull/29587) + +## The Problem + +Temporal filters currently require Materialize to maintain all future retractions +of data that is currently visible. For long windows, the retractions could be at +timestamps beyond our next scheduled restart, which is typically our weekly +DB release. + +For instance, in the example below, the temporal filter in the `last_30_days` +view causes two diffs to be generated for every row inserted into `events`, the +row itself and a retraction 30 days later. However, if the replica is +restarted in the next few days, the retraction diff is never processed, making +it redundant to keep the diff waiting to be processed. + +```sql +-- Create a table of timestamped events. +CREATE TABLE events ( + content TEXT, + event_ts TIMESTAMP +); +-- Create a view of events from the last 30 seconds. +CREATE VIEW last_30_days AS +SELECT event_ts, content +FROM events +WHERE mz_now() <= event_ts + INTERVAL '30 days'; + +INSERT INTO events VALUES ('hello', now()); + +COPY (SUBSCRIBE (SELECT event_ts, content FROM last_30_days)) TO STDOUT; +1727130590201 1 2023-09-23 22:29:50.201 hello -- now() +1729722590222 -1 2023-10-23 22:29:50.222 hello -- now() + 30 days +``` + +Dataflows with large temporal windows (e.g. a year) can generate a large number +of retractions that consume memory and CPU but are never used. Instead, all +such retractions can be dropped. + +## Success Criteria + +When temporal filters are in use, retraction diffs associated with timestamps +beyond a set expiration time can be dropped without affecting correctness, +resulting in lower memory and CPU utilization from halving the number of +processed diffs. + +## Solution Proposal + +A new LaunchDarkly feature flag is introduced that specifies an _expiration +offset_ (a `Duration`). The _replica expiration_ time is computed as the offset +added to the start time of the replica. Dataflows matching certain +criteria (detailed below) are then configured with a _dataflow expiration_ +derived from the replica expiration. Diffs generated in these dataflows beyond +the dataflow expiration are dropped. To ensure correctness, panic checks are +added to these dataflows that ensure that the frontier does not exceed the +dataflow expiration before the replica is restarted. + +An overview of the logic used for these features is as follows: +``` +# Consider the `upper` for different dataflows +if mv_view_with_constant_values: + upper := [] +else if mv_with_refresh_every: + upper := [next_refresh()] +else: + upper := [write_frontier()] + +# The `upper` for a dataflow considering all its transitive inputs +inputs_upper := meet(for all inputs i: i_upper) + +# Dataflow expiration logic +if compute_replica_expiration_offset is not set: + dataflow_replication := [] +else for dataflows of type in [materialized view, index, subscribe]: + replica_expiration := replica_start + compute_replica_expiration_offset + if dataflow_timeline is not EpochMilliseconds: + dataflow_expiration := [] + else if refresh_interval set in any transitive dependency of dataflow: + dataflow_expiration := [] + else if inputs_upper == []: + dataflow_expiration := [] + else if inputs_upper > expiration: + dataflow_expiration := inputs_upper + else: + dataflow_expiration := replica_expiration + +dataflow_until := dataflow_until.meet(dataflow_expiration) +``` + +Note that we only consider dataflows representing materialized views, indexes, +and subscribes. These are long-running dataflows that maintain state during +their lifetime. Other dataflows such as peeks are transient and do not need to +explicitly drop retraction diffs. + +More concretely, we make the following changes: + +* Introduce a new dyncfg `compute_replica_expiration_offset`. +* If the offset is configured with a non-zero value, compute + `replica_expiration = now() + offset`. This value specifies the maximum + time for which the replica is expected to be running. Consequently, diffs + associated with timestamps beyond this limit do not have to be stored and can + be dropped. +* When building a dataflow, compute `dataflow_expiration` as per the logic + described above. If non-empty, the `dataflow_expiration` is added to the + dataflow `until` that ensures that any diff beyond this limit is dropped in + `mfp.evaluate()`. +* To ensure correctness, we attach checks in `Context::export_index` and + `Context::export_sink` that panic if the dataflow frontier exceeds the + configured `dataflow_expiration`. This is to prevent the dataflow from + serving potentially incorrect results due to dropped data. +* On a replica restart, `replica_expiration` and `dataflow_expiration` is + recomputed as the offset to the new start time. Any data whose timestamps + are within the new limit are now not dropped. + +## Open Questions + +- What is the appropriate default expiration time? + - Given that we currently restart replicas every week as part of the DB release + and leaving some buffer for skipped week, 3 weeks (+1 day margin) seems like + a good limit to start with. + +## Out of Scope + +Dataflow expiration is disabled for the following cases: + +- Dataflows whose timeline type is not `Timeline::EpochMillis`. We rely on the + frontier timestamp being comparable to wall clock time of the replica. +- Dataflows that transitively depend on a materialized view with a non-default + refresh schedule. Handling such materialized views would require additional + logic to track the refresh schedules and ensure that the dataflow expiration diff --git a/doc/developer/design/20240919_replica_expiration.md b/doc/developer/design/20240919_replica_expiration.md deleted file mode 100644 index fdd823b17c05f..0000000000000 --- a/doc/developer/design/20240919_replica_expiration.md +++ /dev/null @@ -1,97 +0,0 @@ -# Replica Expiration - -- Associated issues/prs: [#26029](https://github.com/MaterializeInc/materialize/issues/26029) [#29587](https://github.com/MaterializeInc/materialize/pull/29587) - -## The Problem - -Temporal filters currently require Materialize to maintain all future retractions -of data that is currently visible. For long windows, the retractions could be at -timestamps beyond our next scheduled restart, which is typically our weekly -DB release. - -For instance, in the example below, the temporal filter in the `last_30_sec` -view causes two diffs to be generated for every row inserted into `events`, the -row itself and a retraction 30 seconds later. However, if the replica is -restarted in the next 30 seconds, the retraction diff is never processed, making -it unnecessary to generate and store that extra diff. - -```sql --- Create a table of timestamped events. -CREATE TABLE events ( - content TEXT, - event_ts TIMESTAMP -); --- Create a view of events from the last 30 seconds. -CREATE VIEW last_30_days AS -SELECT event_ts, content -FROM events -WHERE mz_now() <= event_ts + INTERVAL '30 days'; - -INSERT INTO events VALUES ('hello', now()); - -COPY (SUBSCRIBE (SELECT event_ts, content FROM last_30_days)) TO STDOUT; -1727130590201 1 2023-09-23 22:29:50.201 hello -- now() -1729722590222 -1 2023-10-23 22:29:50.222 hello -- now() + 30 days -``` - -## Success Criteria - -Diffs associated with timestamps beyond a set expiration time are dropped -without affecting correctness, resulting in lower number of diffs to process and -consequently lower memory and CPU utilization. This change is mainly expected to -benefit from dataflows with temporal filters that generate retraction diffs for -future timestamps. - -## Out of Scope - -- Dataflows whose timeline type is not `Timeline::EpochMillis`. We rely on the - frontier timestamp being comparable to wall clock time of the replica. - -## Solution Proposal - -A new LaunchDarkly feature flag in introduced that allows replicas to be -configured to expire. The flag specifies an expiration offset (a `Duration`), -which is added to the start time of each replica to derive the absolute -expiration time for that replica. When a non-zero offset is given, replicas -filter out diffs that are beyond the expiration time. To ensure correctness, -the replicas panic if their frontier exceeds the expiration time before the -replica is restarted. - -More concretely, we make the following changes: - -* Introduce a new dyncfg `compute_replica_expiration_offset` to set an offset `Duration` - for each environment using LaunchDarkly. -* If the offset is configured with a non-zero value, compute the - `replica_expiration` time as `now() + offset`. This value specifies the maximum - time for which the replica is expected to be running. Consequently, diffs - associated with timestamps beyond this limit do not have to be stored and can - be dropped. -* We only consider dataflows with timeline kind `Timeline::EpochMillis`. For - these dataflows, we propagate `replica_expiration` to the existing `until` - checks in `mfp.evaluate()`, such that any data beyond the expiration time is - filtered out. -* To ensure correctness, we add checks in `Context::export_index` to stop the - replica with a panic if the frontier exceeds the expiration time. This is to - prevent the replica from serving requests without any data that has - potentially been dropped. - * Explain where the checks have been added and why. -* If the expiration time is exceeded, the replica will panic and restart with a - new expiration limit as an offset of the new start time. This time, any data - whose timestamps fall within the new limit are not filtered, thus maintaining - correctness. - -* Replica now(). - -## Alternatives - -- - -## Open Questions - -- What is the appropriate default expiration time? - - Given that we currently restart replicas every week as part of the DB release - and leaving some buffer for skipped week, 3 weeks (+1 day margin) seems like - a good limit to start with. -- What to do with refresh at dataflows? - - We can ignore refresh at dataflows for now, as they are not expected to - generate retractions