-
Notifications
You must be signed in to change notification settings - Fork 468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add design doc for adapter listening to the catalog #25944
Conversation
ff60c69
to
a586839
Compare
a586839
to
d2f4c64
Compare
} | ||
|
||
/// Update in-memory catalog state and generate builtin table updates and controller commands. | ||
fn apply_updates(&mut self, updates: Vec<StateUpdate>) -> (Vec<BuiltinTableUpdate>, Vec<ControllerCommand>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah cool, and you don't have to worry about idempotence because "Multi-subscriber catalog" is out of scope, nice!
// IMPORTANT: This may trigger a catalog transaction which only updates the durable catalog. | ||
self.sequence_X(plan); | ||
// ... | ||
let (builtin_table_updates, controller_commands) = self.catalog.sync().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this sync
could potentially get two things worth of updates depending on races, but again, that's fine because we're not doing "Multi-subscriber" yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you expand on this a bit? The race here is that multiple subscribers will get the exact same set of builtin_table_updates
and controller_commands
. If they all try and apply them, then we'll end up with duplicate rows in the builtin tables and who knows what in storage/compute. However, there shouldn't be a race in updating the state of catalog
. I.e. all subscribers should have the exactly correct state of objects in catalog
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, it's possible that the race I was imagining doesn't exist, but I was picturing two pgwire sessions concurrently running e.g. a CREATE TABLE
. They'd write to the durable catalog in some order, but who knows in what order they'd call sync
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is possible that two catalogs are arbitrarily ahead/behind each other. However, they will observe the changes to the persist shard in the same order. For example consider the following orderings:
A
creates tablet1
.B
creates tablet2
.A
callssync
and observest1
andt2
.B
callssync
and observest1
andt2
.
A
creates tablet1
.B
creates tablet2
.B
callssync
and observest1
andt2
.A
callssync
and observest1
andt2
.
A
creates tablet1
.A
callssync
and observest1
.B
creates tablet2
.B
callssync
and observest1
andt2
.
etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think maybe the piece that is not clear from my doc is that calling catalog.sync()
has no effect on other catalogs. Which events have already been consumed or not will be stored entirely in memory and not durably written down anywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to check my own understanding, as you've written it, there's no possibility of concurrency between a call to sequence and a call to sync, right? That's all happening on the coord main loop, right? So you're guaranteed that every sequence_foo will be immediately followed by a call to sync, without the possibility of the coordinator running off and sequencing another command?
Ah good question/point. No, there is no guarantee anymore that sequence_X
will run to completion without the possibility of the coordinator running off and sequencing another command. I think in practice none of the DDL perform off-thread work after calling catalog_transact
. Still, that's a bit scary to rely on. I think we should move this logic into catalog_transact
. Something like
impl Coordinator {
fn catalog_transact(ops: Vec<Op>, ...) {
// Execute catalog transaction updating the durable catalog only.
let (builtin_table_updates, controller_commands) = self.catalog.sync().await;
self.apply_builtin_table_updates(builtin_table_updates).await;
self.apply_controller_commands(controller_commands).await;
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, because they're all async
?
I think moving the logic to catalog_transact
still allows for misuse, though, because you could, like, call catalog_transact
twice, or do something after calling catalog_transact
.
What if we made the sequence
functions return Vec<Op>
? Does that make it even harder to misuse? The only thing you get to do in a sequence function is return some ops. Arguably we also want the sequence functions to take &Coordinator
and &Session
instead of &mut
so they can't mutate any coordinator state inappropriately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, because they're all
async
?
Not exactly. Many of the sequence
functions are structured in a way that they create new tasks and perform some work in that task while allowing the Coordinator to go and process new messages. For example, CREATE MATERIALIZED VIEW
does this with it's optimization stage,
materialize/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs
Lines 420 to 499 in 38eabd8
Ok(StageResult::Handle(mz_ore::task::spawn_blocking( | |
|| "optimize create materialized view", | |
move || { | |
span.in_scope(|| { | |
let mut pipeline = || -> Result<( | |
optimize::materialized_view::LocalMirPlan, | |
optimize::materialized_view::GlobalMirPlan, | |
optimize::materialized_view::GlobalLirPlan, | |
), AdapterError> { | |
let _dispatch_guard = explain_ctx.dispatch_guard(); | |
let raw_expr = plan.materialized_view.expr.clone(); | |
// HIR ⇒ MIR lowering and MIR ⇒ MIR optimization (local and global) | |
let local_mir_plan = optimizer.catch_unwind_optimize(raw_expr)?; | |
let global_mir_plan = optimizer.catch_unwind_optimize(local_mir_plan.clone())?; | |
// MIR ⇒ LIR lowering and LIR ⇒ LIR optimization (global) | |
let global_lir_plan = optimizer.catch_unwind_optimize(global_mir_plan.clone())?; | |
Ok((local_mir_plan, global_mir_plan, global_lir_plan)) | |
}; | |
let stage = match pipeline() { | |
Ok((local_mir_plan, global_mir_plan, global_lir_plan)) => { | |
if let ExplainContext::Plan(explain_ctx) = explain_ctx { | |
let (_, df_meta) = global_lir_plan.unapply(); | |
CreateMaterializedViewStage::Explain( | |
CreateMaterializedViewExplain { | |
validity, | |
sink_id, | |
plan, | |
df_meta, | |
explain_ctx, | |
}, | |
) | |
} else { | |
CreateMaterializedViewStage::Finish(CreateMaterializedViewFinish { | |
validity, | |
sink_id, | |
plan, | |
resolved_ids, | |
local_mir_plan, | |
global_mir_plan, | |
global_lir_plan, | |
}) | |
} | |
} | |
// Internal optimizer errors are handled differently | |
// depending on the caller. | |
Err(err) => { | |
let ExplainContext::Plan(explain_ctx) = explain_ctx else { | |
// In `sequence_~` contexts, immediately return the error. | |
return Err(err.into()); | |
}; | |
if explain_ctx.broken { | |
// In `EXPLAIN BROKEN` contexts, just log the error | |
// and move to the next stage with default | |
// parameters. | |
tracing::error!("error while handling EXPLAIN statement: {}", err); | |
CreateMaterializedViewStage::Explain( | |
CreateMaterializedViewExplain { | |
validity, | |
sink_id, | |
plan, | |
df_meta: Default::default(), | |
explain_ctx, | |
}, | |
) | |
} else { | |
// In regular `EXPLAIN` contexts, immediately return the error. | |
return Err(err.into()); | |
} | |
} | |
}; | |
Ok(Box::new(stage)) | |
}) | |
}, | |
))) |
What if we made the sequence functions return
Vec<Op>
?
For the same reason listed above, we can't have the sequence
functions return anything meaningful. They return early before they're complete and finish their work in a new task that eventually sends a message back to the Coordinator.
I'll have to have a think about the best way of doing this. An even more intermediate step would be to ignore the builtin table updates and controller commands and just focus on having Catalog
update it's in memory state in Catalog::sync
. Then I think that logic can live in catalog_transact
and we can kick the can on everything else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If builtin table updates and controller commands are not necessary for a multi-subscriber catalog, then maybe it's more prudent to ignore them for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If builtin table updates and controller commands are not necessary for a multi-subscriber catalog, then maybe it's more prudent to ignore them for now.
I think it'd be good to handle them! Even though we don't know how to apply builtin table updates from multiple live environments today, that's exactly the thing we'll want to solve for full UCI and 0DT. So having the catalog API expose them will be really helpful, I think.
Your idea of just doing it all in catalog_transact
sounds good to me. There's still the potential for misuse, but cest le vie. We can just enforce the code structure rules (only one call to catalog_transact
; don't do anything after) by convention and comments for now.
|
||
## Open questions | ||
|
||
- Is this a good incremental step towards multi-subscriber or should we go right to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to do multi-subscriber for UCI, but do we need it for 0dt? If the latter is no, then I think this is a very useful intermediate step!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It depends on what we mean by "multi-subscriber", right? 0dt will require that two different versions of the same environment can do this loop at the same time:
let catalog = Catalog::open();
loop {
catalog.sync();
}
But those two different versions will be applying the commands to two different controllers so that's a little easier. And we can probably just ignore the builtin tables in the new version, and just let the old version handle that until the new version is ready to take over.
|
||
## Open questions | ||
|
||
- Is this a good incremental step towards multi-subscriber or should we go right to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It depends on what we mean by "multi-subscriber", right? 0dt will require that two different versions of the same environment can do this loop at the same time:
let catalog = Catalog::open();
loop {
catalog.sync();
}
But those two different versions will be applying the commands to two different controllers so that's a little easier. And we can probably just ignore the builtin tables in the new version, and just let the old version handle that until the new version is ready to take over.
// IMPORTANT: This may trigger a catalog transaction which only updates the durable catalog. | ||
self.sequence_X(plan); | ||
// ... | ||
let (builtin_table_updates, controller_commands) = self.catalog.sync().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to check my own understanding, as you've written it, there's no possibility of concurrency between a call to sequence
and a call to sync
, right? That's all happening on the coord main loop, right? So you're guaranteed that every sequence_foo
will be immediately followed by a call to sync
, without the possibility of the coordinator running off and sequencing another command?
- Should `StateUpdate`s contain timestamps? I.e., while consuming updates from the durable catalog, | ||
should the in-memory catalog care at all about timestamps? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this question here is the key to answering "how do multiple coordinators actually subscribe to the catalog at the same time?", but seems like something we can tackle down the road.
Motivation
Tips for reviewer
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.