Skip to content

Commit ff60c69

Browse files
committed
Add design doc for adapter listening to the catalog
1 parent 566e98c commit ff60c69

File tree

1 file changed

+171
-0
lines changed

1 file changed

+171
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
# Adapter Catalog Listen
2+
3+
## The Problem
4+
5+
Currently, in response to certain events (mostly DDL) the coordinator/adapter takes the following
6+
actions in no particular order:
7+
8+
- Update the durable catalog state.
9+
- Update the in-memory catalog state.
10+
- Append updates to builtin tables.
11+
- Send commands to controllers.
12+
13+
We would like to structure the coordinator/adapter in such a way that it can listen to changes made
14+
by other coordinators/adapters. In order to facilitate this in-memory catalog state updates,
15+
builtin table updates, and controller commands should be created in response to a change to the
16+
durable catalog state.
17+
18+
## Success Criteria
19+
20+
All `sequence_X` methods that correspond to DDL should be structured in the following way:
21+
22+
1. Keep doing the same thing before the catalog transaction.
23+
2. Perform catalog the catalog transaction which only updates the durable catalog state.
24+
3. Retrieve all changes to the durable catalog.
25+
4. Update in-memory catalog state based on the changes.
26+
5. Generate builtin table updates and controller commands from the changes.
27+
28+
`Catalog::open` should be structured in a way that all updates to in-memory state, builtin table
29+
updates, and controller commands are generated directly from a list of updates from the durable
30+
catalog.
31+
32+
## Out of Scope
33+
34+
- A catalog subscribe API.
35+
- Multi-subscriber catalog.
36+
37+
## Solution Proposal
38+
39+
### Data Structures
40+
41+
```Rust
42+
struct StateUpdate {
43+
kind: StateUpdateKind,
44+
diff: Diff,
45+
}
46+
47+
enum StateUpdateKind {
48+
AuditLog(mz_audit_log::VersionedEvent),
49+
Cluster(mz_catalog::durable::objects::Cluster),
50+
ClusterReplica(mz_catalog::durable::objects::ClusterReplica),
51+
Comment(mz_catalog::durable::objects::Comment),
52+
Database(mz_catalog::durable::objects::Database),
53+
DefaultPrivilege(mz_catalog::durable::objects::DefaultPrivilege),
54+
IntrospectionSourceIndex(mz_catalog::durable::objects::IntrospectionSourceIndex),
55+
Item(mz_catalog::durable::objects::Item),
56+
Role(mz_catalog::durable::objects::Role),
57+
Schema(mz_catalog::durable::objects::Schema),
58+
StorageUsage(mz_audit_log::VersionedStorageUsage),
59+
SystemConfiguration(mz_catalog::durable::objects::SystemConfiguration),
60+
SystemObjectMapping(mz_catalog::durable::objects::SystemObjectMapping),
61+
SystemPrivilege(MzAclItem),
62+
}
63+
64+
enum ControllerCommand {
65+
// ...
66+
}
67+
```
68+
69+
### Durable Catalog API
70+
71+
```Rust
72+
trait ReadOnlyDurableCatalogState {
73+
// ...
74+
75+
/// Fetches and returns all updates to the durable catalog state that have not yet been
76+
/// consumed.
77+
async fn sync(&mut self) -> Vec<StateUpdate>;
78+
79+
// ...
80+
}
81+
```
82+
83+
### In-memory Catalog API
84+
85+
```Rust
86+
impl Catalog {
87+
/// Fetches and applies all updates to the durable catalog state that have not yet been
88+
/// consumed.
89+
async fn sync(&mut self) -> Vec<(BuiltinTableUpdate, ControllerCommand)> {
90+
let updates = self.storage().await.sync().await;
91+
self.apply_updates(updates);
92+
}
93+
94+
/// Update in-memory catalog state and generate builtin table updates and controller commands.
95+
fn apply_updates(&mut self, updates: Vec<StateUpdate>) -> Vec<(BuiltinTableUpdate, ControllerCommand)> {
96+
let mut res: Vec<(BuiltinTableUpdate, ControllerCommand)> = Vec::new();
97+
for StateUpdate { kind, diff } in updates {
98+
match (kind, diff) {
99+
// ...
100+
}
101+
}
102+
res
103+
}
104+
}
105+
```
106+
107+
### Coordinator API
108+
109+
```Rust
110+
impl Coordinator {
111+
async fn sync_state(&mut self) {
112+
let (builtin_table_updates, controller_commands) = self.catalog.sync().await;
113+
self.apply_builtin_table_updates(builtin_table_updates).await;
114+
self.apply_controller_commands(controller_commands).await;
115+
}
116+
}
117+
```
118+
119+
### Coordinator Bootstrap Psuedo Code
120+
121+
Note: This simplifies the current structure of the code and combines some logic from
122+
`mz_adapter::coord::serve`, `Catalog::open`, and `Coordinator::bootstrap`.
123+
124+
```Rust
125+
impl Coordinator {
126+
async fn serve(&mut self) {
127+
// ...
128+
let durable_catalog_state = self.open_durable_catalog_state().await;
129+
let catalog = Catalog::new(durable_catalog_state);
130+
let (builtin_table_updates, controller_commands) = catalog.sync().await;
131+
let builtin_table_retractions = // get retractions of current builtin table contents.
132+
let builtin_table_updates = builtin_table_retractions.chain(builtin_table_updates);
133+
self.apply_builtin_table_updates(builtin_table_updates).await;
134+
self.apply_controller_commands(controller_commands).await;
135+
// ...
136+
}
137+
}
138+
```
139+
140+
### Coordinator Sequence Psuedo Code
141+
142+
```Rust
143+
impl Coordinator {
144+
pub(crate) async fn sequence_plan(
145+
&mut self,
146+
mut ctx: ExecuteContext,
147+
plan: Plan,
148+
resolved_ids: ResolvedIds,
149+
) -> LocalBoxFuture<'_, ()> {
150+
// ...
151+
// IMPORTANT: This may trigger a catalog transaction which only updates the durable catalog.
152+
self.sequence_X(plan);
153+
// ...
154+
let (builtin_table_updates, controller_commands) = self.catalog.sync().await;
155+
self.apply_builtin_table_updates(builtin_table_updates).await;
156+
self.apply_controller_commands(controller_commands).await;
157+
}
158+
}
159+
```
160+
161+
### Fencing
162+
163+
Fencing will not change as part of this work. That means that a new writer will fence out all
164+
existing readers and writers.
165+
166+
## Open questions
167+
168+
- Is this a good incremental step towards multi-subscriber or should we go right to
169+
multi-subscriber?
170+
- Should `StateUpdate`s contain timestamps? I.e., while consuming updates from the durable catalog,
171+
should the in-memory catalog care at all about timestamps?

0 commit comments

Comments
 (0)