Skip to content

Commit

Permalink
catalog: Key expression cache by binary version (#31206)
Browse files Browse the repository at this point in the history
The expression cache caches optimized expressions in a persist shard. It
uses the environment's deploy generation as part of the key for the
cache. The assumption was that two environments with the same deploy
generation would always have the same binary version. This assumption
would allow two environments with the same deploy generation to safely
deserialize each other's expressions, without worrying about backwards
and forwards compatibility.

This assumption was not correct. Two environments with different binary
version can use the same deploy generation as long as one environment
never fully completed a deployment. This is especially bad because the
expression cache is written to and read from in read-only mode, before a
deployment is complete.

This commit updates the key of the expression cache to explicitly use
the binary version of environmentd so that two environments with the
same version can safely deserialize each other's expressions.

Fixes #MaterializeInc/database-issues/issues/8917
  • Loading branch information
jkosh44 authored Jan 27, 2025
1 parent 5b5b37b commit 5df1932
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 26 deletions.
20 changes: 19 additions & 1 deletion src/adapter/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,21 @@

use std::collections::BTreeMap;

use maplit::btreeset;
use mz_catalog::builtin::BuiltinTable;
use mz_catalog::durable::Transaction;
use mz_catalog::memory::objects::StateUpdate;
use mz_ore::collections::CollectionExt;
use mz_ore::now::NowFn;
use mz_persist_types::ShardId;
use mz_repr::{CatalogItemId, Timestamp};
use mz_sql::ast::display::AstDisplay;
use mz_sql_parser::ast::{Raw, Statement};
use mz_storage_client::controller::StorageTxn;
use semver::Version;
use tracing::info;
use uuid::Uuid;

// DO NOT add any more imports from `crate` outside of `crate::catalog`.
use crate::catalog::open::into_consolidatable_updates_startup;
use crate::catalog::state::LocalExpressionCache;
Expand Down Expand Up @@ -186,10 +190,24 @@ pub(crate) async fn migrate(

/// Migrations that run only on the durable catalog before any data is loaded into memory.
pub(crate) fn durable_migrate(
_tx: &mut Transaction,
tx: &mut Transaction,
_organization_id: Uuid,
_boot_ts: Timestamp,
) -> Result<(), anyhow::Error> {
// Migrate the expression cache to a new shard. We're updating the keys to use the explicit
// binary version instead of the deploy generation.
const EXPR_CACHE_MIGRATION_KEY: &str = "expr_cache_migration";
const EXPR_CACHE_MIGRATION_DONE: u64 = 1;
if tx.get_config(EXPR_CACHE_MIGRATION_KEY.to_string()) != Some(EXPR_CACHE_MIGRATION_DONE) {
if let Some(shard_id) = tx.get_expression_cache_shard() {
tx.mark_shards_as_finalized(btreeset! {shard_id});
tx.set_expression_cache_shard(ShardId::new())?;
}
tx.set_config(
EXPR_CACHE_MIGRATION_KEY.to_string(),
Some(EXPR_CACHE_MIGRATION_DONE),
)?;
}
Ok(())
}

Expand Down
3 changes: 1 addition & 2 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ impl Catalog {
aws_privatelink_availability_zones: config.aws_privatelink_availability_zones,
http_host_name: config.http_host_name,
};
let deploy_generation = storage.get_deployment_generation().await?;

let mut updates: Vec<_> = storage.sync_to_current_updates().await?;
assert!(!updates.is_empty(), "initial catalog snapshot is missing");
Expand Down Expand Up @@ -464,7 +463,7 @@ impl Catalog {
.collect();
let dyncfgs = config.persist_client.dyncfgs().clone();
let expr_cache_config = ExpressionCacheConfig {
deploy_generation,
build_version: config.build_info.semver_version(),
shard_id: txn
.get_expression_cache_shard()
.expect("expression cache shard should exist for opened catalogs"),
Expand Down
7 changes: 7 additions & 0 deletions src/catalog/src/durable/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1836,6 +1836,13 @@ impl<'a> Transaction<'a> {
.map(|shard_id| shard_id.parse().expect("valid ShardId"))
}

pub fn set_expression_cache_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
self.set_setting(
EXPRESSION_CACHE_SHARD_KEY.to_string(),
Some(shard_id.to_string()),
)
}

/// Updates the catalog `enable_0dt_deployment` "config" value to
/// match the `enable_0dt_deployment` "system var" value.
///
Expand Down
57 changes: 34 additions & 23 deletions src/catalog/src/expr_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ use mz_repr::GlobalId;
use mz_transform::dataflow::DataflowMetainfo;
use mz_transform::notice::OptimizerNotice;
use proptest_derive::Arbitrary;
use semver::Version;
use serde::{Deserialize, Serialize};
use timely::Container;
use tokio::sync::mpsc;
use tracing::debug;
use tracing::{debug, warn};

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Arbitrary)]
enum ExpressionType {
Expand Down Expand Up @@ -70,7 +71,7 @@ impl GlobalExpressions {

#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Arbitrary)]
struct CacheKey {
deploy_generation: u64,
build_version: String,
id: GlobalId,
expr_type: ExpressionType,
}
Expand Down Expand Up @@ -107,7 +108,7 @@ impl DurableCacheCodec for ExpressionCodec {
/// Configuration needed to initialize an [`ExpressionCache`].
#[derive(Debug, Clone)]
pub struct ExpressionCacheConfig {
pub deploy_generation: u64,
pub build_version: Version,
pub persist: PersistClient,
pub shard_id: ShardId,
pub current_ids: BTreeSet<GlobalId>,
Expand All @@ -118,7 +119,7 @@ pub struct ExpressionCacheConfig {

/// A durable cache of optimized expressions.
pub struct ExpressionCache {
deploy_generation: u64,
build_version: Version,
durable_cache: DurableCache<ExpressionCodec>,
}

Expand All @@ -136,7 +137,7 @@ impl ExpressionCache {
/// Returns all cached expressions in the current deploy generation, after reconciliation.
pub async fn open(
ExpressionCacheConfig {
deploy_generation,
build_version,
persist,
shard_id,
current_ids,
Expand All @@ -151,7 +152,7 @@ impl ExpressionCache {
) {
let durable_cache = DurableCache::new(&persist, shard_id, "expressions").await;
let mut cache = Self {
deploy_generation,
build_version,
durable_cache,
};

Expand Down Expand Up @@ -189,8 +190,16 @@ impl ExpressionCache {
let mut global_expressions = BTreeMap::new();

for (key, expressions) in self.durable_cache.entries_local() {
if key.deploy_generation == self.deploy_generation {
// Only deserialize the current generation.
let build_version = match key.build_version.parse::<Version>() {
Ok(build_version) => build_version,
Err(err) => {
warn!("unable to parse build version: {key:?}: {err:?}");
keys_to_remove.push((key.clone(), None));
continue;
}
};
if build_version == self.build_version {
// Only deserialize the current version.
match key.expr_type {
ExpressionType::Local => {
let expressions: LocalExpressions = match bincode::deserialize(expressions)
Expand Down Expand Up @@ -234,7 +243,7 @@ impl ExpressionCache {
}
}
} else if remove_prior_gens {
// Remove expressions from previous generations.
// Remove expressions from previous versions.
keys_to_remove.push((key.clone(), None));
}
}
Expand Down Expand Up @@ -268,21 +277,22 @@ impl ExpressionCache {
invalidate_ids: BTreeSet<GlobalId>,
) {
let mut entries = BTreeMap::new();
let build_version = self.build_version.to_string();
// Important to do `invalidate_ids` first, so that `new_X_expressions` overwrites duplicate
// keys.
for id in invalidate_ids {
entries.insert(
CacheKey {
id,
deploy_generation: self.deploy_generation,
build_version: build_version.clone(),
expr_type: ExpressionType::Local,
},
None,
);
entries.insert(
CacheKey {
id,
deploy_generation: self.deploy_generation,
build_version: build_version.clone(),
expr_type: ExpressionType::Global,
},
None,
Expand All @@ -301,7 +311,7 @@ impl ExpressionCache {
entries.insert(
CacheKey {
id,
deploy_generation: self.deploy_generation,
build_version: build_version.clone(),
expr_type: ExpressionType::Local,
},
Some(expressions),
Expand All @@ -320,7 +330,7 @@ impl ExpressionCache {
entries.insert(
CacheKey {
id,
deploy_generation: self.deploy_generation,
build_version: build_version.clone(),
expr_type: ExpressionType::Global,
},
Some(expressions),
Expand Down Expand Up @@ -433,6 +443,7 @@ mod tests {
use proptest::proptest;
use proptest::strategy::{Strategy, ValueTree};
use proptest::test_runner::{RngAlgorithm, TestRng, TestRunner};
use semver::Version;
use tracing::info;

use crate::expr_cache::{
Expand Down Expand Up @@ -550,8 +561,8 @@ mod tests {
let local_tree: ArbitraryTimeout<LocalExpressions> = ArbitraryTimeout::new();
let global_tree: ArbitraryTimeout<GlobalExpressions> = ArbitraryTimeout::new();

let first_deploy_generation = 0;
let second_deploy_generation = 1;
let first_version = Version::new(0, 1, 0);
let second_version = Version::new(0, 2, 0);
let persist = PersistClient::new_for_tests().await;
let shard_id = ShardId::new();

Expand All @@ -567,7 +578,7 @@ mod tests {
// Open a new empty cache.
let (cache, local_exprs, global_exprs) =
ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
deploy_generation: first_deploy_generation,
build_version: first_version.clone(),
persist: persist.clone(),
shard_id,
current_ids: current_ids.clone(),
Expand Down Expand Up @@ -611,7 +622,7 @@ mod tests {
// Re-open the cache.
let (_cache, local_entries, global_entries) =
ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
deploy_generation: first_deploy_generation,
build_version: first_version.clone(),
persist: persist.clone(),
shard_id,
current_ids: current_ids.clone(),
Expand Down Expand Up @@ -640,7 +651,7 @@ mod tests {
// Re-open the cache.
let (_cache, local_entries, global_entries) =
ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
deploy_generation: first_deploy_generation,
build_version: first_version.clone(),
persist: persist.clone(),
shard_id,
current_ids: current_ids.clone(),
Expand Down Expand Up @@ -683,7 +694,7 @@ mod tests {
// Re-open the cache.
let (_cache, local_entries, global_entries) =
ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
deploy_generation: first_deploy_generation,
build_version: first_version.clone(),
persist: persist.clone(),
shard_id,
current_ids: current_ids.clone(),
Expand All @@ -706,7 +717,7 @@ mod tests {
// Open the cache at a new generation.
let (cache, local_entries, global_entries) =
ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
deploy_generation: second_deploy_generation,
build_version: second_version.clone(),
persist: persist.clone(),
shard_id,
current_ids: current_ids.clone(),
Expand Down Expand Up @@ -758,7 +769,7 @@ mod tests {
// Re-open the cache at the first generation.
let (_cache, local_entries, global_entries) =
ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
deploy_generation: first_deploy_generation,
build_version: first_version.clone(),
persist: persist.clone(),
shard_id,
current_ids: current_ids.clone(),
Expand All @@ -782,7 +793,7 @@ mod tests {
remove_prior_gens = true;
let (_cache, local_entries, global_entries) =
ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
deploy_generation: second_deploy_generation,
build_version: second_version.clone(),
persist: persist.clone(),
shard_id,
current_ids: current_ids.clone(),
Expand All @@ -805,7 +816,7 @@ mod tests {
// Re-open the cache at the first generation.
let (_cache, local_entries, global_entries) =
ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig {
deploy_generation: first_deploy_generation,
build_version: first_version.clone(),
persist: persist.clone(),
shard_id,
current_ids: current_ids.clone(),
Expand Down

0 comments on commit 5df1932

Please sign in to comment.