Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

doc: Update expr cache design doc #31215

Merged
merged 1 commit into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 26 additions & 26 deletions doc/developer/design/20241008_expression_cache.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,25 @@ enough, then the time spent on this part of startup should be small.

The cache will present similarly as a key-value value store where the key is a composite of

- deployment generation
- build version
- object global ID

The value will be a serialized version of the optimized expression. An `environmentd` process with
deploy generation `n`, will never be expected to look at a serialized expression with a deploy
generation `m` s.t. `n != m`. Therefore, there are no forwards or backwards compatibility needed on
build version `n`, will never be expected to look at a serialized expression with a build
version `m` s.t. `n != m`. Therefore, there are no forwards or backwards compatibility needed on
the serialized representation of expressions. The cache will also be made durable so that it's
available after a restart, at least within the same deployment generation.
available after a restart, at least within the same build version.

Upgrading an environment will look something like this:

1. Start deploy generation `n` in read-only mode.
2. Populate the expression cache for generation `n`.
3. Start deploy generation `n` in read-write mode.
1. Start build version `n` in read-only mode.
2. Populate the expression cache for version `n`.
3. Start build version `n` in read-write mode.
4. Read optimized expressions from cache.

Restarting an environment will look something like this:

1. Start deploy generation `n` in read-write mode.
1. Start build version `n` in read-write mode.
2. Read optimized expressions from cache.

### Prior Art
Expand Down Expand Up @@ -74,31 +74,31 @@ struct Expressions {
}

struct ExpressionCache {
deploy_generation: u64,
build_version: Version,
information_needed_to_connect_to_durable_store: _,
}

impl ExpressionCache {
/// Creates a new [`ExpressionCache`] for `deploy_generation`.
fn new(&mut self, deploy_generation: u64, information_needed_to_connect_to_durable_store: _) -> Self;
/// Creates a new [`ExpressionCache`] for `build_version`.
fn new(&mut self, build_version: Version, information_needed_to_connect_to_durable_store: _) -> Self;

/// Reconciles all entries in current deploy generation with the current objects, `current_ids`,
/// Reconciles all entries in current build version with the current objects, `current_ids`,
/// and current optimizer features, `optimizer_features`.
///
/// If `remove_prior_gens` is `true`, all previous generations are durably removed from the
/// If `remove_prior_versions` is `true`, all previous versions are durably removed from the
/// cache.
///
/// Returns all cached expressions in the current deploy generation, after reconciliation.
fn open(&mut self, current_ids: &BTreeSet<GlobalId>, optimizer_features: &OptimizerFeatures, remove_prior_gens: bool) -> Vec<(GlobalId, Expressions)>;
/// Returns all cached expressions in the current build version, after reconciliation.
fn open(&mut self, current_ids: &BTreeSet<GlobalId>, optimizer_features: &OptimizerFeatures, remove_prior_versions: bool) -> Vec<(GlobalId, Expressions)>;

/// Durably removes all entries given by `invalidate_ids` and inserts `new_entries` into
/// current deploy generation.
/// current build version.
///
/// If there is a duplicate ID in both `invalidate_ids` and `new_entries`, then the final value
/// will be taken from `new_entries`.
fn insert_expressions(&mut self, new_entries: Vec<(GlobalId, Expressions)>, invalidate_ids: BTreeSet<GlobalId>);

/// Durably remove and return all entries in current deploy generation that depend on an ID in
/// Durably remove and return all entries in current build version that depend on an ID in
/// `dropped_ids` .
///
/// Optional for v1.
Expand All @@ -112,7 +112,7 @@ Below is a detailed set of steps that will happen in startup.

1. Call `ExpressionCache::open` to read the cache into memory and perform reconciliation (See
[Startup Reconciliation](#startup reconciliation)). When passing in the arguments,
`remove_prior_gens == !read_only_mode`.
`remove_prior_versions == !read_only_mode`.
2. While opening the catalog, for each object:
a. If the object is present in the cache, use cached optimized expression.
b. Else generate the optimized expressions and insert the expressions via
Expand All @@ -123,7 +123,7 @@ Below is a detailed set of steps that will happen in startup.
When opening the cache for the first time, we need to perform the following reconciliation tasks:

- Remove any entries that exist in the cache but not in the catalog.
- If `remove_prior_gens` is true, then remove all prior gens.
- If `remove_prior_versions` is true, then remove all prior versions.


### DDL - Create
Expand Down Expand Up @@ -154,7 +154,7 @@ correct state.
### Implementation

The implementation will use persist for durability. The cache will be a single dedicated shard.
Each cache entry will be keyed by `(deploy_generation, global_id)` and the value will be a
Each cache entry will be keyed by `(build_version, global_id)` and the value will be a
serialized version of the expression.

#### Conflict Resolution
Expand All @@ -164,15 +164,15 @@ time. This would manifest in an upper mismatch error during an insert or invalid
this error, the cache should read in all new updates, apply each update as described below, and
retry the operation from the beginning.

If the update is in a different deploy generation as the current cache, then ignore it. It is in a
If the update is in a different build version as the current cache, then ignore it. It is in a
different logical namespace and won't conflict with the operation.

If the update is in the same deploy generation, then we must be in a split-brain scenario where
If the update is in the same build version, then we must be in a split-brain scenario where
both the current process and another process think they are the leader. We should still update any
in-memory state as if the current cache had made that change. This relies on the following
invariants:

- Two processes with the same deploy generation MUST be running the same version of code.
- Two processes with the same build version MUST be running the same version of code.
- A global ID only ever maps to a single object.
- Optimization is deterministic.

Expand Down Expand Up @@ -200,7 +200,7 @@ following:

## Alternatives

- For the persist implementation, we could mint a new shard for each deploy generation. This would
- For the persist implementation, we could mint a new shard for each build version. This would
require us to finalize old shards during startup which would accumulate shard tombstones in CRDB.
- We could use persist's `FileBlob` for durability. It's extremely well tested (most of CI uses it
for persist) and solves at least some of the file system cons.
Expand All @@ -211,7 +211,7 @@ require us to finalize old shards during startup which would accumulate shard to

One potential implementation is via the filesystem of an attached durable storage to `environmentd`.
Each cache entry would be saved as a file of the format
`/path/to/cache/<deploy_generation>/<global_id>`.
`/path/to/cache/<build_version>/<global_id>`.

#### Pros
- No need to worry about coordination across K8s pods.
Expand All @@ -230,5 +230,5 @@ Each cache entry would be saved as a file of the format
- If we use the persist implementation, how do we coordinate writes across pods?
- I haven't thought much about this, but here's one idea. The cache will maintain a subscribe on
the persist shard. Everytime it experiences an upper mismatch, it will listen for all new
changes. If any of the changes contain the current deploy generation, then panic, else ignore
changes. If any of the changes contain the current build version, then panic, else ignore
them.
2 changes: 1 addition & 1 deletion src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ impl Catalog {
.expect("expression cache shard should exist for opened catalogs"),
persist: config.persist_client,
current_ids,
remove_prior_gens: !config.read_only,
remove_prior_versions: !config.read_only,
compact_shard: config.read_only,
dyncfgs,
};
Expand Down
32 changes: 16 additions & 16 deletions src/catalog/src/expr_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub struct ExpressionCacheConfig {
pub persist: PersistClient,
pub shard_id: ShardId,
pub current_ids: BTreeSet<GlobalId>,
pub remove_prior_gens: bool,
pub remove_prior_versions: bool,
pub compact_shard: bool,
pub dyncfgs: ConfigSet,
}
Expand All @@ -128,7 +128,7 @@ impl ExpressionCache {
/// Reconciliation will remove all entries that are not in `current_ids` and remove all
/// entries that have optimizer features that are not equal to `optimizer_features`.
///
/// If `remove_prior_gens` is `true`, then all previous generations are durably removed from the
/// If `remove_prior_versions` is `true`, then all previous generations are durably removed from the
/// cache.
///
/// If `compact_shard` is `true`, then this function will block on fully compacting the backing
Expand All @@ -141,7 +141,7 @@ impl ExpressionCache {
persist,
shard_id,
current_ids,
remove_prior_gens,
remove_prior_versions,
compact_shard,
dyncfgs,
}: ExpressionCacheConfig,
Expand All @@ -159,7 +159,7 @@ impl ExpressionCache {
const RETRIES: usize = 100;
for _ in 0..RETRIES {
match cache
.try_open(&current_ids, remove_prior_gens, compact_shard, &dyncfgs)
.try_open(&current_ids, remove_prior_versions, compact_shard, &dyncfgs)
.await
{
Ok((local_expressions, global_expressions)) => {
Expand All @@ -175,7 +175,7 @@ impl ExpressionCache {
async fn try_open(
&mut self,
current_ids: &BTreeSet<GlobalId>,
remove_prior_gens: bool,
remove_prior_versions: bool,
compact_shard: bool,
dyncfgs: &ConfigSet,
) -> Result<
Expand Down Expand Up @@ -242,7 +242,7 @@ impl ExpressionCache {
}
}
}
} else if remove_prior_gens {
} else if remove_prior_versions {
// Remove expressions from previous versions.
keys_to_remove.push((key.clone(), None));
}
Expand Down Expand Up @@ -567,7 +567,7 @@ mod tests {
let shard_id = ShardId::new();

let mut current_ids = BTreeSet::new();
let mut remove_prior_gens = false;
let mut remove_prior_versions = false;
// Compacting the shard takes too long, so we leave it to integration tests.
let compact_shard = false;
let dyncfgs = &mz_persist_client::cfg::all_dyncfgs(ConfigSet::default());
Expand All @@ -582,7 +582,7 @@ mod tests {
persist: persist.clone(),
shard_id,
current_ids: current_ids.clone(),
remove_prior_gens,
remove_prior_versions,
compact_shard,
dyncfgs: dyncfgs.clone(),
})
Expand Down Expand Up @@ -626,7 +626,7 @@ mod tests {
persist: persist.clone(),
shard_id,
current_ids: current_ids.clone(),
remove_prior_gens,
remove_prior_versions,
compact_shard,
dyncfgs: dyncfgs.clone(),
})
Expand Down Expand Up @@ -655,7 +655,7 @@ mod tests {
persist: persist.clone(),
shard_id,
current_ids: current_ids.clone(),
remove_prior_gens,
remove_prior_versions,
compact_shard,
dyncfgs: dyncfgs.clone(),
})
Expand Down Expand Up @@ -698,7 +698,7 @@ mod tests {
persist: persist.clone(),
shard_id,
current_ids: current_ids.clone(),
remove_prior_gens,
remove_prior_versions,
compact_shard,
dyncfgs: dyncfgs.clone(),
})
Expand All @@ -721,7 +721,7 @@ mod tests {
persist: persist.clone(),
shard_id,
current_ids: current_ids.clone(),
remove_prior_gens,
remove_prior_versions,
compact_shard,
dyncfgs: dyncfgs.clone(),
})
Expand Down Expand Up @@ -773,7 +773,7 @@ mod tests {
persist: persist.clone(),
shard_id,
current_ids: current_ids.clone(),
remove_prior_gens,
remove_prior_versions,
compact_shard,
dyncfgs: dyncfgs.clone(),
})
Expand All @@ -790,14 +790,14 @@ mod tests {

{
// Open the cache at a new generation and clear previous generations.
remove_prior_gens = true;
remove_prior_versions = true;
let (_cache, local_entries, global_entries) =
ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
build_version: second_version.clone(),
persist: persist.clone(),
shard_id,
current_ids: current_ids.clone(),
remove_prior_gens,
remove_prior_versions,
compact_shard,
dyncfgs: dyncfgs.clone(),
})
Expand All @@ -820,7 +820,7 @@ mod tests {
persist: persist.clone(),
shard_id,
current_ids: current_ids.clone(),
remove_prior_gens,
remove_prior_versions,
compact_shard,
dyncfgs: dyncfgs.clone(),
})
Expand Down
2 changes: 1 addition & 1 deletion src/durable-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ impl<C: DurableCacheCodec> DurableCache<C> {
// TODO(jkosh44) With the proper lifetime incantations, we might be able to accept
// references to `C::KeyCodec` and `C::ValCodec`, since that's what
// `WriteHandle::compare_and_append` wants. That would avoid some clones from callers of
// this method.i
// this method.
I: IntoIterator<Item = ((C::KeyCodec, C::ValCodec), i64)>,
{
let expected_upper = write_ts;
Expand Down