-
Notifications
You must be signed in to change notification settings - Fork 470
catalog: Hook up expression cache into startup #30227
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
e8d3a28
to
350f435
Compare
87d3275
to
9fbaec2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The nightly panics look related: https://buildkite.com/materialize/nightly/builds/10328
Since the cache is supposed to improve performance, can we create a simple feature-benchmark scenario to show that it performs faster now?
One thing I'm worried about here is that the expression cache might hide optimization being slow in other benchmarks. If so, we could disable it in ADDITIONAL_BENCHMARKING_SYSTEM_PARAMETERS
in misc/python/materialize/mzcompose/init.py
How does the expression cache work with upgrades? What if the previous optimization led to a wrong plan, do we still keep using the wrong plan after the upgrade?
3fc0ed5
to
5402c2f
Compare
MitigationsCompleting required mitigations increases Resilience Coverage.
Risk Summary:The pull request has a high-risk score of 83, driven by predictors such as the average line count in files and executable lines within files. Historically, pull requests with these predictors are 155% more likely to introduce bugs compared to the repository baseline. Additionally, three modified files are identified as hotspots with elevated levels of recent bug fixes. Although the observed bug trend in the repository is increasing, this does not directly impact the risk score. Note: The risk score is not based on semantic analysis but on historical predictors of bug occurrence in the repository. The attributes above were deemed the strongest predictors based on that history. Predictors and the score may change as the PR evolves in code, time, and review activity. Bug Hotspots:
|
This commit hooks up the expression cache into the startup process. During startup if an expression is found in the cache, then we use that expression instead of re-optimizing an item. If an expression is not found in the cache, then we optimize the item and insert the optimized expression into the cache. The insertion into the cache happens in bulk as a single operation at the end of startup instead of once for each object. The reason is that inserting new expressions may need to invalidate old expressions. Inserting all expressions in bulk allows us to easily figure out what should be invalidated vs what shouldn't be invalidated. If we update the expressions one at a time, then we may accidentally invalidate new expressions. Much of the code in this commit involves plumbing things around and keeping track of what is and isn't cached. After startup we do not cache newly created items, this is left for future work. As a result, if environmentd crashes in-between releases then recovery may be slow if there are many new objects. However, read-only instances should fully populate the cache during a release. Works towards resolving #MaterializeInc/database-issues/8384
5402c2f
to
c4015d1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think all's good but I had one question about a method, see inline, and some questions to check my understanding. Very happy to approve quickly once we sort these out!
src/adapter/src/catalog/apply.rs
Outdated
@@ -1289,10 +1313,18 @@ impl CatalogState { | |||
let span = info_span!(parent: None, "parse builtin view", name = view.name); | |||
OpenTelemetryContext::obtain().attach_as_parent_to(&span); | |||
let task_state = Arc::clone(&spawn_state); | |||
let cached_expr = local_expression_cache.remove_cached_expression(&id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are the expressions removed from the cache? Because we expect each one to be used at most once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes and I didn't want to clone every expression.
src/adapter/src/catalog.rs
Outdated
let entry = self.get_entry(&on); | ||
let uses = entry.uses(); | ||
queue.extend(uses.clone()); | ||
seen.extend(uses); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My brain might be too smooth already, but I don't get how this one works. When we start out, queue == seen
, so won't the check on seen.insert(cur)
always be false, so we don't do anything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I believe that it is my brain that is the smooth one. This line shouldn't be here. I've pushed a fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✊
src/adapter/src/catalog/apply.rs
Outdated
Ok(item) => { | ||
Ok((item, uncached_expr)) => { | ||
if let Some((uncached_expr, optimizer_features)) = uncached_expr { | ||
local_expression_cache.insert_uncached_expression(id, uncached_expr, optimizer_features); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this business of cached and uncached expressions? Might be easier to quickly huddle once you're online.
Aha! Now that I read further, it's: uncached expressions = expressions that were not cached but should be added to the cache once we update it. When I first read this, I thought these might be expressions that we don't want to cache or something... 🙈
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that's correct. This was one of the most painful parts of this PR, plumbing everything around to various parts of startup. I've added a doc-comment to this method to try and make it clearer.
| BootstrapStateUpdateKind::IntrospectionSourceIndex(_) | ||
| BootstrapStateUpdateKind::ClusterReplica(_) | ||
| BootstrapStateUpdateKind::SystemObjectMapping(_) => { | ||
| BootstrapStateUpdateKind::ClusterReplica(_) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this re-shuffling needed? Is it because of how we measure/report timings below, for pre_item_updates
and sytem_item_updates
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want to open the cache until we've loaded SystemConfiguration
, because that contains the dyncfg that tells us whether or not the cache is enabled. However we want to use the cache to load IntrospectionSourceIndex
and SystemObjectMapping
because those contain system items that sometimes need to be optimized.
So splitting these up into two separate collections allows us to use the cache on system items.
if !current_ids.contains(&key.id) | ||
|| expressions.optimizer_features != *optimizer_features | ||
{ | ||
if !current_ids.contains(&key.id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why'd you move the check for optimizer features out to call-sites?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It turns out that during startup, depending on the context, we fiddle around with the optimizer features. So there's no single OptimizerFeatures
that can be used for all expressions.
ListenEvent::Updates(x) => { | ||
ListenEvent::Updates(mut x) => { | ||
consolidate_updates(&mut x); | ||
x.sort_by(|(_, ts1, d1), (_, ts2, d2)| ts1.cmp(ts2).then(d1.cmp(d2))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why's this now needed? You noticed that something doesn't work right without it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the panic below on line 158 was triggered in a Nightly test.
assert_eq!(val, prev, "removed val does not match expected val");
We have to process retractions before inserts or else we may retract the wrong value.
Those should be resolved, but there's still another CI panic that I'm looking into.
I'm hoping that the existing
The results seem all over the place so it's hard to tell what's going on. I'll look into it and make sure that the expression cache is even being used here.
The cache is ONLY used during startup, so it shouldn't hide the latency when creating new objects or when running
Each optimized expression is tagged with the deploy generation that it was created in. An
No, due to the deploy generation described above. |
Here's a more recent benchmark run, still sort of all over the place.
|
materialize/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py Lines 1927 to 1952 in 77b43ee
If I wanted to restart Materialize here BEFORE we start timing for the benchmark, how would I do that? |
src/catalog/src/expr_cache.rs
Outdated
// Remove dropped IDs and expressions that rely on dropped indexes. | ||
let index_dependencies: BTreeSet<_> = expressions | ||
.global_mir | ||
.index_imports | ||
.keys() | ||
.chain(expressions.physical_plan.index_imports.keys()) | ||
.cloned() | ||
.collect(); | ||
if !current_ids.contains(&key.id) | ||
|| expressions.optimizer_features != *optimizer_features | ||
|| !index_dependencies.is_subset(current_ids) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ggevay does this make sense in terms of the calls to index_imports
?
This is called during startup. I have a set of all the items in the catalog, current_ids
. I'm trying to remove any expression from the cache that may rely on an index that no longer exists, i.e. doesn't exist in current_ids
. Should I also be looking in other places inside of expressions
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at index_imports
should be enough: A dataflow can use an index only if the index is imported.
(Note that, currently, the index_imports
of MIR and LIR plan are the same, so strictly speaking it would be enough to look at only one of them. But I think it's good practice if you check both here (as it is currently done in your PR), so that your code is future-proof against possible changes where the MIR and LIR index_imports
maybe drift apart.)
Simply add a diff --git a/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py b/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py
index 3c2f4a5a40..b631a2eaf8 100644
--- a/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py
+++ b/misc/python/materialize/feature_benchmark/scenarios/benchmark_main.py
@@ -1966,6 +1966,9 @@ ALTER SYSTEM SET max_clusters = {self.n() * 6};
Lambda(lambda e: e.RestartMzClusterd()),
Td(
f"""
+> SELECT 1;
+ /* A */
+1
{check_views}
{check_sources}
{check_tables} |
I want to include the restart time in the time measurements of each iteration. I just also want to restart Materialize once at the very beginning of the test before we start measuring the other restarts. |
The first measurement is disregarded anyway, so this should already do what you want by default? |
Ah ok, yeah so it should already be doing exactly what I want. |
…k-up-expr-cache # Conflicts: # src/adapter/src/catalog.rs # src/adapter/src/catalog/apply.rs # src/adapter/src/catalog/open/builtin_item_migration.rs # src/adapter/src/catalog/state.rs # src/adapter/src/coord.rs
This commit hooks up the expression cache into the startup process.
During startup if an expression is found in the cache, then we use that
expression instead of re-optimizing an item. If an expression is not
found in the cache, then we optimize the item and insert the
optimized expression into the cache.
The insertion into the cache happens in bulk as a single operation at
the end of startup instead of once for each object. The reason is that
inserting new expressions may need to invalidate old expressions.
Inserting all expressions in bulk allows us to easily figure out what
should be invalidated vs what shouldn't be invalidated. If we update
the expressions one at a time, then we may accidentally invalidate new
expressions.
Much of the code in this commit involves plumbing things around and
keeping track of what is and isn't cached.
After startup we do not cache newly created items, this is left for
future work. As a result, if environmentd crashes in-between releases
then recovery may be slow if there are many new objects. However,
read-only instances should fully populate the cache during a release.
Works towards resolving #MaterializeInc/database-issues/8384
Motivation
This PR adds a known-desirable feature.
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.