-
Notifications
You must be signed in to change notification settings - Fork 465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
coord: smorgasbord of improvements for the crdb-backed timestamp oracle #23421
coord: smorgasbord of improvements for the crdb-backed timestamp oracle #23421
Conversation
This PR has higher risk. Make sure to carefully review the file hotspots. In addition to having a knowledgeable reviewer, it may be useful to add observability and/or a feature flag. What's This?
Buggy File Hotspots:
|
@benesch I think github insists on adding you as reviewer because of the ore changes (I added a constant for histogram bucketing of counts). @def- I added a commit that enables the crdb/postgres oracle for benchmarks again and will trigger a nightly run. But we might want to do something more sophisticated where we compare "with pg oracle" to "with catalog oracle" on the same commit. |
255ac7d
to
6330d26
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for following up on the TODOs that quickly. I triggered a benchmark of timestamp oracles:
https://buildkite.com/materialize/nightlies/builds/5359
https://buildkite.com/materialize/release-qualification/builds/375
We should probably wait for the results before submitting.
I again broke something with the special case explain/tracing-subscriber thing, seeing
investigating! |
6330d26
to
af80bb4
Compare
assert_eq!( | ||
oracle.peek_write_ts().await, | ||
Timestamp::MAX.step_back().expect("known to work") | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add that after calling apply_write()
, read_ts()
returns Timestamp::MAX.step_back()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you mean an apply_write(MAX-1)
or an apply_write(0)
here? If the latter, the read_ts
will not actually advance to MAX-1
because apply_write
doesn't involve the NowFn.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant apply_write(Timestamp::MAX.step_back())
. It's not that important of a case to cover though.
|
||
/// Get a shared, shallow clone of the oracle. Returns `None` if this oracle | ||
/// is not shareable. | ||
fn get_shared(&self) -> Option<Arc<dyn ShareableTimestampOracle<T> + Send + Sync>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably beyond my Rust expertise, but why do we need a new trait instead of returning Option<Arc<dyn TimestampOracle<T> + Send + Sync>>;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's two reasons:
TimestampOracle
has some methods that take a&mut
self, because that's what the oldCatalogOracle
needs. Because it does actually mutate itself. TheShareableTimestampOracle
methods all only take&self
, which is important because you can only call methods that take&self
behind a plainArc<T>
. For the crdb oracle this works out because all the calls go to crdb and don't need to mutate oracle state.TimestampOracle
is#[async_trait(?Send)]
, again becauseCatalogOracle
cannot beSend
(orSync
). So it cannot be send over to those async tasks that callread_ts
asynchronously (off the coord main loop).SharedTimestampOracle
is#[async_trait]
, without the?Send
.
And yes, subtle and annoying! 🙈
fn get_timeline(timeline_context: &TimelineContext) -> Option<Timeline> { | ||
let timeline = match timeline_context { | ||
TimelineContext::TimelineDependent(timeline) => Some(timeline.clone()), | ||
// We default to the `Timeline::EpochMilliseconds` timeline if one doesn't exist. | ||
TimelineContext::TimestampDependent => Some(Timeline::EpochMilliseconds), | ||
TimelineContext::TimestampIndependent => None, | ||
}; | ||
|
||
timeline | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CC @ggevay, I think you were looking for a method like this recently (you can ignore the rest of this PR).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost, but I think in my case I'll need Timeline::EpochMilliseconds
even for TimelineContext::TimestampIndependent
, because I'd like to handle mz_now()
in the REFRESH AT
even if the query is a constant (which is the case where validate_timeline_context
returns TimelineContext::TimestampIndependent
).
/// Returns a `Timeline` whose timestamp oracle we have to use to get a | ||
/// linearized read timestamp, _iff_ linearization is needed. | ||
fn get_linearized_timeline( | ||
isolation_level: &IsolationLevel, | ||
when: &QueryWhen, | ||
timeline_context: &TimelineContext, | ||
) -> Option<Timeline> { | ||
let timeline = Self::get_timeline(timeline_context); | ||
|
||
// In order to use a timestamp oracle, we must be in the context of some timeline. In that | ||
// context we would use the timestamp oracle in the following scenarios: | ||
// - The isolation level is Strict Serializable and the `when` allows us to use the | ||
// the timestamp oracle (ex: queries with no AS OF). | ||
// - The `when` requires us to use the timestamp oracle (ex: read-then-write queries). | ||
let linearized_timeline = match &timeline { | ||
Some(timeline) | ||
if when.must_advance_to_timeline_ts() | ||
|| (when.can_advance_to_timeline_ts() | ||
&& isolation_level == &IsolationLevel::StrictSerializable) => | ||
{ | ||
Some(timeline.clone()) | ||
} | ||
_ => None, | ||
}; | ||
|
||
linearized_timeline | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a bit of a shame that we are pulling this out of determine_timestamp_for
because it's spreading out all of the logic that determines the final timestamp. We've had some recent conversations about wanting it to be easier to reason about what led us to a specific timestamp, but I'm somewhat worried this makes it a bit trickier to pin down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's probably nothing to do about that, but I thought I'd call it out in case you had an idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's related to #23421 (comment): TimestampProvider
, which Coordinator
implements is #[async_trait(?Send)]
, because Coordinator
cannot be Send
and/or Sync
.
Also very annoying, but what I did is essentially pull out the thing that can be done asynchronously (the read_ts
call on a shareable oracle). We might get back to something better again further down the road.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might get back to something better again further down the road.
Maybe worth leaving this as a TODO?
#[allow(clippy::as_conversions)] | ||
read_ts_metrics.observe((pending_cmds.len()) as f64); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would mz_ore::cast::CastLossy
work here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point!
if timestamp <= read_ts { | ||
|
||
// See what the oracle timestamp is now and delay when needed. | ||
let current_oracle_ts = timestamp_oracle.read_ts().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to only call this once, instead of once per pending transaction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thought about that, but then didn't do it because I was lazy... 🙈 will do!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The run isn't done yet, but there seem to be some clear regressions compared to catalog-based timestamp oracle:
NAME | TYPE | THIS | OTHER | Regression? | 'THIS' is:
----------------------------------------------------------------------------------------------------
ConnectionLatency | wallclock | 3.666 | 2.247 | !!YES!! | 63.2 pct more/slower
SubscribeParallelKafka | wallclock | 2.242 | 1.528 | !!YES!! | 46.8 pct more/slower
SubscribeParallelTable | wallclock | 3.922 | 3.401 | !!YES!! | 15.3 pct more/slower
StartupLoaded | wallclock | 9.671 | 8.134 | !!YES!! | 18.9 pct more/slower
af80bb4
to
605810d
Compare
src/postgres-client/src/lib.rs
Outdated
@@ -238,7 +238,7 @@ impl PostgresClient { | |||
fn status_metrics(&self, status: Status) { | |||
self.metrics | |||
.connpool_available | |||
.set(f64::cast_lossy(status.available)); | |||
.set(i64::cast_from(status.available)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danhhz I changed this from a Gauge
to an IntGauge
because that seems to make more sense, but you might have opinions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also seems better to me, but I don't recall the context. Probably worth quickly doing the archaeology of why it was this way in the first place, to be sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bkirwi as the person who added this: it calls it being a float out in the commit comment. Was there a reason for choosing that over an i64
/ isize
?
src/postgres-client/src/metrics.rs
Outdated
|
||
/// Creates a new [`PostgresClientMetrics`] with metrics scoped to the given | ||
/// `purpose`. | ||
pub fn for_purpose(&self, purpose: &str) -> PostgresClientMetrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@danhhz I added the purpose
label so that I can have metrics scoped to timelines, but it does mean that the postgres/pool metrics for persist now also have this label. I think the dashboards should continue working, because we would ignore the label, but still wanted to flag
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems fine, but tbh I'm a bit skeptical. Metrics definitely have costs, do we really think the granularity of breaking this out by timeline is worth it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unclear to me! I did it because it was low enough effort and I had time during thanksgiving when US folks were out. But I'm very happy to just drop this commit!
Context: we currently have only one timeline (epoch millis). We will probably add at least a second timeline for catalog state. But then again, we will likely evolve the TimestampOracle
API to allow read_ts
calls that get timestamps for multiple timelines as one CRDB query, to be more efficient. In which latter case we wouldn't be able to separate metrics out by timeline.
Which means I should probably drop this commit, eh? 😅
@def- what should we do about the scalability benchmark? As it is, it compares the PR (or whatever commit the nightly is running for) with We'd have to have a matrix of tests but that sounds expensive. I could manually run tests to compare Also, I'll look at the feature benchmark results next week. But it might be that we have to accept slowdowns in the interim period, before we remove the old timestamp oracle and put in place more optimizations that use the new possibilities of the new oracle. I'll run the benchmarks against a version that has more of these optimizations to see what's what. |
Adding another dimension for scalability testing sounds like it would double the test runtimes. I'll also trigger a one-off run to compare the oracles. (Edit: https://buildkite.com/materialize/nightlies/builds/5367) Then in the future I'd be ok with just testing crdb-backed ts oracle. This should at least allow us to catch regressions. @nrainer-materialize Since you are more familiar with scalability testing, does this make sense to you? |
Result of the scalability run:
(@nrainer-materialize Would be cool if scalability framework would also print the cases where we are improving performance, not just regressing.) |
Sounds like a good idea, I will add that. |
Yes, I agree that adding a dimension will significantly increase the test durations. Though, it is on the feature to list to allow influencing more dimensions. One option I see is to add a second (parallel) build that considers a further dimension but uses fewer values in the existing dimension (that is, running the scalability test with two different feature flags but instead of testing all concurrencies 1, 2, 4, 8, 16, ..., 256 to thin out that list).
Makes absolutely sense. |
@def- that one-off test run might not be testing the I manually ran the benchmark on my machine with 3 configurations:
Results: They show that 2. is faster than 1. in some cases and slower in others. And 3. is faster in most cases (except some scenarios with inserts and |
a70d05b
to
38bbe67
Compare
/// Returns a `Timeline` whose timestamp oracle we have to use to get a | ||
/// linearized read timestamp, _iff_ linearization is needed. | ||
fn get_linearized_timeline( | ||
isolation_level: &IsolationLevel, | ||
when: &QueryWhen, | ||
timeline_context: &TimelineContext, | ||
) -> Option<Timeline> { | ||
let timeline = Self::get_timeline(timeline_context); | ||
|
||
// In order to use a timestamp oracle, we must be in the context of some timeline. In that | ||
// context we would use the timestamp oracle in the following scenarios: | ||
// - The isolation level is Strict Serializable and the `when` allows us to use the | ||
// the timestamp oracle (ex: queries with no AS OF). | ||
// - The `when` requires us to use the timestamp oracle (ex: read-then-write queries). | ||
let linearized_timeline = match &timeline { | ||
Some(timeline) | ||
if when.must_advance_to_timeline_ts() | ||
|| (when.can_advance_to_timeline_ts() | ||
&& isolation_level == &IsolationLevel::StrictSerializable) => | ||
{ | ||
Some(timeline.clone()) | ||
} | ||
_ => None, | ||
}; | ||
|
||
linearized_timeline | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might get back to something better again further down the road.
Maybe worth leaving this as a TODO?
Frontiers::get_linearized_timeline(&isolation_level, &when, &timeline_ctx); | ||
|
||
let oracle_read_ts = if let Some(timeline) = linearized_timeline { | ||
// WIP: we're reaching into the TimestampProvider here. yikes! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WIP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I might have to leave this comment but remove the WIP
because I didn't find an easy way around it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's fine. Was just pointing it out so we didn't accidentally commit a WIP :)
src/adapter/src/coord.rs
Outdated
.await?; | ||
// We create a throwaway MetricsRegistry just for | ||
// get_initial_oracle_timestamps, because we cannot create a | ||
// timeline-scoped Metrics object, and because we don't need the metrics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because we don't need the metrics from this one call.
Famous last words! I'm a pretty strong lean for figuring out some way to get this into metrics. Maybe just actually using the "all_timelines" timeline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense! And this problem conveniently vanishes when I drop the commit to split out metrics by timeline!
} | ||
} | ||
|
||
fn external_op_metrics(&self, op: &str) -> ExternalOpMetrics { | ||
fn external_op_metrics(&self, op: &str, timeline: &str) -> ExternalOpMetrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd mirror the order of the labels in the arg order here, especially since they're both just strs (i.e. swap them)
src/postgres-client/src/lib.rs
Outdated
@@ -238,7 +238,7 @@ impl PostgresClient { | |||
fn status_metrics(&self, status: Status) { | |||
self.metrics | |||
.connpool_available | |||
.set(f64::cast_lossy(status.available)); | |||
.set(i64::cast_from(status.available)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also seems better to me, but I don't recall the context. Probably worth quickly doing the archaeology of why it was this way in the first place, to be sure
src/postgres-client/src/metrics.rs
Outdated
|
||
/// Creates a new [`PostgresClientMetrics`] with metrics scoped to the given | ||
/// `purpose`. | ||
pub fn for_purpose(&self, purpose: &str) -> PostgresClientMetrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems fine, but tbh I'm a bit skeptical. Metrics definitely have costs, do we really think the granularity of breaking this out by timeline is worth it?
name: "mz_ts_oracle_batched_op_count", | ||
help: "number of operations that were batched into one external operation", | ||
var_labels: ["timeline", "op"], | ||
buckets: HISTOGRAM_COUNT_BUCKETS.to_vec(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am quite skeptical that this deserves a histogram! Would definitely lean toward asking what sorts of questions we think this needs to answer and trying to set up a way to answer them with a smaller set of counters first
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I picked a histogram because we'll only have one instance: there's one timeline and one op that does batching.
And I didn't come up with other ways of recording how well batching works. The one thing I considered is two metrics, roughly: ops_count
and batches_count
. And then you could get ops_count / batches_count
as your "how good is my batching on average". But on a mostly idle env that would be strongly dragged towards 1
because at idle not much batching is happening. So you don't see spikes in batching well, for example when running a benchmark or just a large enough parallel workload.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think my take here is that you don't necessarily care about the batch sizes specifically (at least not enough to warrant a histogram). The things you do care about are more like how long a read_ts call is waiting in the channel, the throughput of read_ts calls, etc
TimelineTimestamp(Timeline, T), | ||
TimelineTimestamp { | ||
timeline: Timeline, | ||
/// The timestamp that was chosen for a read. This can differ from the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice comments! 👏
@def- follow up to my comment above: For the feature-benchmark test. I think I can't get those numbers "good" easily with the timestamp oracle. Those tests where we regress perform a bunch of operations in series, so slight increases in time add up. At least I believe they do things in series, from looking at the scenarios, if I understand correctly the We do have batching in place in many parts, that make it so we don't do as many of the expensive timestamp oracle operations. So if these operations where actually done concurrently we wouldn't see as big an impact. Similar to how the batching makes it so the scalability benchmarks don't regress to much or actually improve results in cases. What do you think? I can also merge this without turning the postgres oracle on for benchmarks and iterate some more on optimizations. But as I mentioned somewhere above, we can only do some of the optimizations once we remove the legacy oracle, which we can only do once we roll out the postgres oracle to production. It's a chicken-and-egg problem. 😅 |
Ok for me to switch over to crdb ts oracle in CI and then improve on perf. We can still manually check performance by comparing against older Mz versions with old ts oracle. |
38bbe67
to
a18a6ca
Compare
Because that's what it does, and it frees up the name for a real stage that determines read timestamps.
…es oracle This makes the crdb/postgres oracle Send/Sync, such that we can, for example, move a handle to an oracle to an async task and do calls there asynchronously. We add a separate trait for shareable oracles but this is a temporary solution while we still have the legacy/Catalog oracle. Once all oracles are shareable (like the CRDB/postgres oracle) we can make the regular TimestampOracle shareble and remove the distinction.
This uses the previously introduced shareble TimestampOracle. And we only determine the timestamp off the main loop for the shareable crdb/postgres oracle.
This, together with the previous change to determine timestamps off the coordinator main thread makes it so `read_ts` calls for multiple peeks can be batched together. Resolves #23406
We do this because the `read_ts()` calls on the oracle we do as part of `message_linearize_reads()` are expensive when using the postgres/distributed timestamp oracle.
16c9fbc
to
bbff2c2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New metrics lgtm, thanks! Will leave the approve to Joe
Frontiers::get_linearized_timeline(&isolation_level, &when, &timeline_ctx); | ||
|
||
let oracle_read_ts = if let Some(timeline) = linearized_timeline { | ||
// WIP: we're reaching into the TimestampProvider here. yikes! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's fine. Was just pointing it out so we didn't accidentally commit a WIP :)
Motivation
Fixes part of the follow-up issues tracked in https://github.com/MaterializeInc/database-issues/issues/6635.
Tips for reviewer
Commits have meaningful messages and there's description/comments in the code.
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.