Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved replica and dataflow expiration #30162

Merged
merged 27 commits into from
Oct 31, 2024

Conversation

antiguru
Copy link
Member

@antiguru antiguru commented Oct 23, 2024

Support expiration of dataflows depending on wall-clock time and with refresh schedules.

This is a partial re-implementation of #29587 to enable more dataflows to participate in expiration. Specifically, it introduces the abstraction of time dependence to describe how a dataflow follows wall-clock time. Using this information, we can then determine how a replica's expiration time relates to a specific dataflow. This allows us to support dataflows that have custom refresh policies.

I'm not sold on the names introduced by this PR, but it's the best I came up with. Open to suggestions!

The implementation deviates from the existing implementation is some important ways:

  • We do not panic in the dataflow operator that checks for frontier advancements, but rather retain a capability until the dataflow is shut down. This avoids race-condition where dataflow shutdown happens in parallel with dropping the shutdown token, and it avoids needing to reason about what dataflows produce error streams---some have an error output that immediately advances to the empty frontier.
  • We do not handle the empty frontier in a special way. Previously, we considered advancing to the empty frontier acceptable. However, this makes it difficult to distinguish a shutdown from a source reading the expiration time. In the first case, the operator should drop its capability, in the second it must not for correctness reasons.
  • We check in the worker thread whether the replica has expired and panic if needed.

There are some problems this PR does not address:

  • Caching the time dependence information in the physical plans seems like a hack. I think a better place would be the controller. Happy to try this in a follow-up PR.
  • We need a separate kill-switch to disable the feature because as it is implemented, we capture the expiration time in the controller once per replica. A second kill-switch would enable us to override the expiration to stabilize the system.

Fixes MaterializeInc/database-issues#8688.
Fixes MaterializeInc/database-issues#8683.

Tips for the reviewer

Don't look at individual commits, it's a work log and does not have any semantic meaning.

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered. (trigger-ci for additional test/nightly runs)
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
  • If this PR includes major user-facing behavior changes, I have pinged the relevant PM to schedule a changelog post.

@antiguru antiguru requested review from a team as code owners October 23, 2024 13:32
@antiguru antiguru requested a review from ParkMyCar October 23, 2024 13:33
@antiguru antiguru marked this pull request as draft October 23, 2024 13:33
Copy link

shepherdlybot bot commented Oct 23, 2024

Risk Score:82 / 100 Bug Hotspots:8 Resilience Coverage:50%

Mitigations

Completing required mitigations increases Resilience Coverage.

  • (Required) Code Review 🔍 Detected
  • (Required) Feature Flag
  • (Required) Integration Test 🔍 Detected
  • (Required) Observability
  • (Required) QA Review 🔍 Detected
  • (Required) Run Nightly Tests
  • Unit Test 🔍 Detected
Risk Summary:

The pull request carries a high risk score of 82, driven by predictors such as the average age of files, cognitive complexity within files, and the delta of executable lines. Historically, PRs with these predictors are 119% more likely to cause a bug than the repository baseline. Additionally, the repository has an increasing trend in observed bugs.

Note: The risk score is not based on semantic analysis but on historical predictors of bug occurrence in the repository. The attributes above were deemed the strongest predictors based on that history. Predictors and the score may change as the PR evolves in code, time, and review activity.

Bug Hotspots:
What's This?

File Percentile
../src/render.rs 97
../controller/instance.rs 100
../src/as_of_selection.rs 98
../inner/create_continual_task.rs 98
../src/coord.rs 99
../src/catalog.rs 93
../src/dataflows.rs 90
../inner/create_materialized_view.rs 91

@antiguru antiguru force-pushed the replica_expiration branch 6 times, most recently from 701e64c to 1d206d2 Compare October 28, 2024 12:01
@antiguru
Copy link
Member Author

@antiguru antiguru changed the title DNM WIP replica expiration Improved replica and dataflow expiration Oct 28, 2024
@antiguru antiguru marked this pull request as ready for review October 28, 2024 15:36
@antiguru antiguru requested a review from a team as a code owner October 28, 2024 15:36
@antiguru antiguru requested review from teskje and sdht0 October 28, 2024 15:36
@antiguru
Copy link
Member Author

antiguru commented Oct 28, 2024

Asking specific people for feedback on the following parts:

@antiguru antiguru requested review from petrosagg and ggevay October 28, 2024 15:38
Copy link
Member

@ParkMyCar ParkMyCar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adapter bits LGTM

Copy link
Contributor

@sdht0 sdht0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice! Commented on a bunch of observations.

I'm still grokking the refresh schedule logic.

match self {
TimeDependence::Indeterminate => None,
TimeDependence::RefreshSchedule(schedule, inner) => {
let result = inner.iter().map(|inner| inner.apply(wall_clock)).min()??;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment explicitly laying out the logic would be nice here.

pub fn determine_dataflow_expiration<P, S>(
&self,
plan: &DataflowDescription<P, S, mz_repr::Timestamp>,
) -> Antichain<mz_repr::Timestamp> {
Copy link
Contributor

@sdht0 sdht0 Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional, but I think it'd be better to return a more explicit Option<mz_repr::Timestamp> here, as later we almost always have to do self.dataflow_expiration.as_option(). Same for compute_state.replica_expiration`.

Antichain helps with less_than() and meet(), but otherwise it is slightly less transparent in what it is representing, which is exactly an optional Timestamp.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this take! Seems like some surrounding code would become less awkward if treated expiry times as optional timestamps instead of frontiers.

Copy link
Contributor

@sdht0 sdht0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went over the refresh schedule logic and it looks pretty good!

After adding tests for refresh schedules, this would be good to go!

@@ -7,7 +7,7 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

# Test indexes.
# Test replica expiration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

How about refresh schedule related tests?

/// Potentially valid for all times.
Indeterminate,
/// Valid up to a some nested time, rounded according to the refresh schedule.
RefreshSchedule(Option<RefreshSchedule>, Vec<Self>),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RefreshSchedule does not really capture what this variant does, which is to capture a tree of nested times, with an optional refresh schedule. Rename to NestedTimes or equivalent?

use TimeDependence::*;

if let Some(dependence) = self.seen.get(&id).cloned() {
return dependence;
Copy link
Contributor

@sdht0 sdht0 Oct 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Just an observation, on the apply() side, these common subtrees will be traversed multiple times. If this ever becomes a problem, we could preserve the ids and apply this seen logic even there. Although the reduction/normalization logic will interfere.)

@antiguru antiguru force-pushed the replica_expiration branch 2 times, most recently from 4e5e1ed to 09dc8b2 Compare October 29, 2024 12:46
Copy link
Contributor

@teskje teskje left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not done yet, but posting the first part of my review already.

///
/// The default value indicates the dataflow follows wall-clock without modifications.
#[derive(Debug, Clone, Default, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
pub struct TimeDependence {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I liked the previous implementation more, as it was more explicit. In the new code:

  • There is no distinction between an "unset" time_dependence (defaults to None) and a time_dependence that was computed to be "indeterminate" (now None). Perhaps there is another way to distinguish between the two cases when debugging?
  • The wall-clock time represented using the default value is an additional thing to keep in mind while paging in the code.

I don't know how subscribes shut down, deferring to later.

Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Remove `Indeterminate` variant and replace it by `None`. Restructure some
code.

Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
for continual task dataflows (not storage collections). Update docs.

Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
We don't know the exact time semantics continual tasks will have, so
disable expiration for now.

Signed-off-by: Moritz Hoffmann <[email protected]>
Signed-off-by: Moritz Hoffmann <[email protected]>
@antiguru
Copy link
Member Author

Another nightly run: https://buildkite.com/materialize/nightly/builds/10226

Copy link
Contributor

@ggevay ggevay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still thinking about some things, but sending the comments that I have so far. It's looking good to me.

TimeDependence::new(None, vec![TimeDependence::default()]).apply(100.into())
);

// Default refresh schedules refresh never, no wall-clock dependence.
Copy link
Contributor

@ggevay ggevay Oct 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Btw. it never actually happens that an MV's refresh schedule is Default, because if the user doesn't specify any refresh options, then the MV ends up being a conventional, non-refresh MV.)

//! * A meet of anything but wall-clock time and a refresh schedule results in a refresh schedule
//! that depends on the deduplicated collection of dependencies.
//! * Otherwise, a dataflow is indeterminate, which expresses that we either don't know how it
//! follows wall-clock time, or is a constant collection.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could add a note here that if there are both determinate and indeterminate TimeDependences, then it's ok whatever the actual, real time dependence of the indeterminate ones are:

  • It's ok if the indeterminate ones have a later frontier than the determinate ones, because then the frontier will be determined by the determinate ones.
  • It's also ok if the indeterminate ones have an earlier frontier than the determinate ones. This will mean that the dataflow will expire later than ideal, but it won't cause a panic or wrong results.

//! The time dependence needs to be computed on the actual dependencies, and not on catalog
//! uses. An optimized dataflow depends on concrete indexes, and has unnecessary dependencies
//! pruned. Additionally, transitive dependencies can depend on indexes that do not exist anymore,
//! which makes combining run-time information with catalog-based information inconclusive.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Note that in theory, it could happen that an MIR plan involves a GlobalId that actually happens to be "dead code" in the MIR plan. In this case, DataflowDescription::import_ids would still list this dependency, but the dataflow's actual frontier wouldn't reflect this dependency. In practice, this shouldn't happen currently, because one of the last things that we do with MIR plans is call NormalizeLets, which removes CTEs that are not referenced.)

use super::*;

#[mz_ore::test]
fn test_time_dependence_normalize() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In both of the tests in test_time_dependence_normalize, normalize doesn't have any actual work to do, i.e., the struct is unchanged. (But normalize is pretty simple, so it's not a big problem.)

let (mut df_desc, df_meta) = global_lir_plan.unapply();

df_desc.time_dependence =
time_dependence(coord.catalog(), df_desc.import_ids(), refresh_schedule);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could these calls to time_dependence be moved into the final optimize call, which returns this df_desc? Then this call wouldn't need to be duplicated between bootstrapping and sequencing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I interpret this correctly, you're suggestion to move the time_dependence call into the optimizer? I looked into this, but the optimizer doesn't have access to the catalog (it only has an OptimizerCatalog, which doesn't expose access to the plans). This, and we're using it for continual tasks, where we don't want the feature to be enabled.

That said, I'll file a follow-up issue to move the time dependence determination into the controller, which would avoid the whole issue wholesale.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok!

Copy link
Contributor

@teskje teskje left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't review again in detail, but my one blocking concern about CTs has been addressed, so I'm good with merging this now and addressing the style comments later.

Copy link
Contributor

@ggevay ggevay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also ok with merging as it is, and addressing the minor comment later.

Copy link
Contributor

@def- def- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All nightly failures are also in main (some are fixed already, but no need to rebase)

@antiguru antiguru merged commit 24d2fc2 into MaterializeInc:main Oct 31, 2024
213 of 222 checks passed
@antiguru antiguru deleted the replica_expiration branch October 31, 2024 16:04
@antiguru
Copy link
Member Author

Thanks for the reviews!

@antiguru antiguru restored the replica_expiration branch November 19, 2024 09:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants