From 5df1932649885bc5417142daef16afc5cdf0e76b Mon Sep 17 00:00:00 2001 From: Joseph Koshakow Date: Mon, 27 Jan 2025 17:21:27 -0500 Subject: [PATCH] catalog: Key expression cache by binary version (#31206) The expression cache caches optimized expressions in a persist shard. It uses the environment's deploy generation as part of the key for the cache. The assumption was that two environments with the same deploy generation would always have the same binary version. This assumption would allow two environments with the same deploy generation to safely deserialize each other's expressions, without worrying about backwards and forwards compatibility. This assumption was not correct. Two environments with different binary version can use the same deploy generation as long as one environment never fully completed a deployment. This is especially bad because the expression cache is written to and read from in read-only mode, before a deployment is complete. This commit updates the key of the expression cache to explicitly use the binary version of environmentd so that two environments with the same version can safely deserialize each other's expressions. Fixes #MaterializeInc/database-issues/issues/8917 --- src/adapter/src/catalog/migrate.rs | 20 ++++++++- src/adapter/src/catalog/open.rs | 3 +- src/catalog/src/durable/transaction.rs | 7 ++++ src/catalog/src/expr_cache.rs | 57 +++++++++++++++----------- 4 files changed, 61 insertions(+), 26 deletions(-) diff --git a/src/adapter/src/catalog/migrate.rs b/src/adapter/src/catalog/migrate.rs index c62d768a838e6..f3ab840ef4e23 100644 --- a/src/adapter/src/catalog/migrate.rs +++ b/src/adapter/src/catalog/migrate.rs @@ -9,17 +9,21 @@ use std::collections::BTreeMap; +use maplit::btreeset; use mz_catalog::builtin::BuiltinTable; use mz_catalog::durable::Transaction; use mz_catalog::memory::objects::StateUpdate; use mz_ore::collections::CollectionExt; use mz_ore::now::NowFn; +use mz_persist_types::ShardId; use mz_repr::{CatalogItemId, Timestamp}; use mz_sql::ast::display::AstDisplay; use mz_sql_parser::ast::{Raw, Statement}; +use mz_storage_client::controller::StorageTxn; use semver::Version; use tracing::info; use uuid::Uuid; + // DO NOT add any more imports from `crate` outside of `crate::catalog`. use crate::catalog::open::into_consolidatable_updates_startup; use crate::catalog::state::LocalExpressionCache; @@ -186,10 +190,24 @@ pub(crate) async fn migrate( /// Migrations that run only on the durable catalog before any data is loaded into memory. pub(crate) fn durable_migrate( - _tx: &mut Transaction, + tx: &mut Transaction, _organization_id: Uuid, _boot_ts: Timestamp, ) -> Result<(), anyhow::Error> { + // Migrate the expression cache to a new shard. We're updating the keys to use the explicit + // binary version instead of the deploy generation. + const EXPR_CACHE_MIGRATION_KEY: &str = "expr_cache_migration"; + const EXPR_CACHE_MIGRATION_DONE: u64 = 1; + if tx.get_config(EXPR_CACHE_MIGRATION_KEY.to_string()) != Some(EXPR_CACHE_MIGRATION_DONE) { + if let Some(shard_id) = tx.get_expression_cache_shard() { + tx.mark_shards_as_finalized(btreeset! {shard_id}); + tx.set_expression_cache_shard(ShardId::new())?; + } + tx.set_config( + EXPR_CACHE_MIGRATION_KEY.to_string(), + Some(EXPR_CACHE_MIGRATION_DONE), + )?; + } Ok(()) } diff --git a/src/adapter/src/catalog/open.rs b/src/adapter/src/catalog/open.rs index eab051fd2057b..e1174cc21290a 100644 --- a/src/adapter/src/catalog/open.rs +++ b/src/adapter/src/catalog/open.rs @@ -293,7 +293,6 @@ impl Catalog { aws_privatelink_availability_zones: config.aws_privatelink_availability_zones, http_host_name: config.http_host_name, }; - let deploy_generation = storage.get_deployment_generation().await?; let mut updates: Vec<_> = storage.sync_to_current_updates().await?; assert!(!updates.is_empty(), "initial catalog snapshot is missing"); @@ -464,7 +463,7 @@ impl Catalog { .collect(); let dyncfgs = config.persist_client.dyncfgs().clone(); let expr_cache_config = ExpressionCacheConfig { - deploy_generation, + build_version: config.build_info.semver_version(), shard_id: txn .get_expression_cache_shard() .expect("expression cache shard should exist for opened catalogs"), diff --git a/src/catalog/src/durable/transaction.rs b/src/catalog/src/durable/transaction.rs index 544b6702f5721..45fc9ec2e0b81 100644 --- a/src/catalog/src/durable/transaction.rs +++ b/src/catalog/src/durable/transaction.rs @@ -1836,6 +1836,13 @@ impl<'a> Transaction<'a> { .map(|shard_id| shard_id.parse().expect("valid ShardId")) } + pub fn set_expression_cache_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> { + self.set_setting( + EXPRESSION_CACHE_SHARD_KEY.to_string(), + Some(shard_id.to_string()), + ) + } + /// Updates the catalog `enable_0dt_deployment` "config" value to /// match the `enable_0dt_deployment` "system var" value. /// diff --git a/src/catalog/src/expr_cache.rs b/src/catalog/src/expr_cache.rs index c3ee62ce17303..20e5820876a58 100644 --- a/src/catalog/src/expr_cache.rs +++ b/src/catalog/src/expr_cache.rs @@ -32,10 +32,11 @@ use mz_repr::GlobalId; use mz_transform::dataflow::DataflowMetainfo; use mz_transform::notice::OptimizerNotice; use proptest_derive::Arbitrary; +use semver::Version; use serde::{Deserialize, Serialize}; use timely::Container; use tokio::sync::mpsc; -use tracing::debug; +use tracing::{debug, warn}; #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Arbitrary)] enum ExpressionType { @@ -70,7 +71,7 @@ impl GlobalExpressions { #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Arbitrary)] struct CacheKey { - deploy_generation: u64, + build_version: String, id: GlobalId, expr_type: ExpressionType, } @@ -107,7 +108,7 @@ impl DurableCacheCodec for ExpressionCodec { /// Configuration needed to initialize an [`ExpressionCache`]. #[derive(Debug, Clone)] pub struct ExpressionCacheConfig { - pub deploy_generation: u64, + pub build_version: Version, pub persist: PersistClient, pub shard_id: ShardId, pub current_ids: BTreeSet, @@ -118,7 +119,7 @@ pub struct ExpressionCacheConfig { /// A durable cache of optimized expressions. pub struct ExpressionCache { - deploy_generation: u64, + build_version: Version, durable_cache: DurableCache, } @@ -136,7 +137,7 @@ impl ExpressionCache { /// Returns all cached expressions in the current deploy generation, after reconciliation. pub async fn open( ExpressionCacheConfig { - deploy_generation, + build_version, persist, shard_id, current_ids, @@ -151,7 +152,7 @@ impl ExpressionCache { ) { let durable_cache = DurableCache::new(&persist, shard_id, "expressions").await; let mut cache = Self { - deploy_generation, + build_version, durable_cache, }; @@ -189,8 +190,16 @@ impl ExpressionCache { let mut global_expressions = BTreeMap::new(); for (key, expressions) in self.durable_cache.entries_local() { - if key.deploy_generation == self.deploy_generation { - // Only deserialize the current generation. + let build_version = match key.build_version.parse::() { + Ok(build_version) => build_version, + Err(err) => { + warn!("unable to parse build version: {key:?}: {err:?}"); + keys_to_remove.push((key.clone(), None)); + continue; + } + }; + if build_version == self.build_version { + // Only deserialize the current version. match key.expr_type { ExpressionType::Local => { let expressions: LocalExpressions = match bincode::deserialize(expressions) @@ -234,7 +243,7 @@ impl ExpressionCache { } } } else if remove_prior_gens { - // Remove expressions from previous generations. + // Remove expressions from previous versions. keys_to_remove.push((key.clone(), None)); } } @@ -268,13 +277,14 @@ impl ExpressionCache { invalidate_ids: BTreeSet, ) { let mut entries = BTreeMap::new(); + let build_version = self.build_version.to_string(); // Important to do `invalidate_ids` first, so that `new_X_expressions` overwrites duplicate // keys. for id in invalidate_ids { entries.insert( CacheKey { id, - deploy_generation: self.deploy_generation, + build_version: build_version.clone(), expr_type: ExpressionType::Local, }, None, @@ -282,7 +292,7 @@ impl ExpressionCache { entries.insert( CacheKey { id, - deploy_generation: self.deploy_generation, + build_version: build_version.clone(), expr_type: ExpressionType::Global, }, None, @@ -301,7 +311,7 @@ impl ExpressionCache { entries.insert( CacheKey { id, - deploy_generation: self.deploy_generation, + build_version: build_version.clone(), expr_type: ExpressionType::Local, }, Some(expressions), @@ -320,7 +330,7 @@ impl ExpressionCache { entries.insert( CacheKey { id, - deploy_generation: self.deploy_generation, + build_version: build_version.clone(), expr_type: ExpressionType::Global, }, Some(expressions), @@ -433,6 +443,7 @@ mod tests { use proptest::proptest; use proptest::strategy::{Strategy, ValueTree}; use proptest::test_runner::{RngAlgorithm, TestRng, TestRunner}; + use semver::Version; use tracing::info; use crate::expr_cache::{ @@ -550,8 +561,8 @@ mod tests { let local_tree: ArbitraryTimeout = ArbitraryTimeout::new(); let global_tree: ArbitraryTimeout = ArbitraryTimeout::new(); - let first_deploy_generation = 0; - let second_deploy_generation = 1; + let first_version = Version::new(0, 1, 0); + let second_version = Version::new(0, 2, 0); let persist = PersistClient::new_for_tests().await; let shard_id = ShardId::new(); @@ -567,7 +578,7 @@ mod tests { // Open a new empty cache. let (cache, local_exprs, global_exprs) = ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { - deploy_generation: first_deploy_generation, + build_version: first_version.clone(), persist: persist.clone(), shard_id, current_ids: current_ids.clone(), @@ -611,7 +622,7 @@ mod tests { // Re-open the cache. let (_cache, local_entries, global_entries) = ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { - deploy_generation: first_deploy_generation, + build_version: first_version.clone(), persist: persist.clone(), shard_id, current_ids: current_ids.clone(), @@ -640,7 +651,7 @@ mod tests { // Re-open the cache. let (_cache, local_entries, global_entries) = ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { - deploy_generation: first_deploy_generation, + build_version: first_version.clone(), persist: persist.clone(), shard_id, current_ids: current_ids.clone(), @@ -683,7 +694,7 @@ mod tests { // Re-open the cache. let (_cache, local_entries, global_entries) = ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { - deploy_generation: first_deploy_generation, + build_version: first_version.clone(), persist: persist.clone(), shard_id, current_ids: current_ids.clone(), @@ -706,7 +717,7 @@ mod tests { // Open the cache at a new generation. let (cache, local_entries, global_entries) = ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { - deploy_generation: second_deploy_generation, + build_version: second_version.clone(), persist: persist.clone(), shard_id, current_ids: current_ids.clone(), @@ -758,7 +769,7 @@ mod tests { // Re-open the cache at the first generation. let (_cache, local_entries, global_entries) = ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { - deploy_generation: first_deploy_generation, + build_version: first_version.clone(), persist: persist.clone(), shard_id, current_ids: current_ids.clone(), @@ -782,7 +793,7 @@ mod tests { remove_prior_gens = true; let (_cache, local_entries, global_entries) = ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { - deploy_generation: second_deploy_generation, + build_version: second_version.clone(), persist: persist.clone(), shard_id, current_ids: current_ids.clone(), @@ -805,7 +816,7 @@ mod tests { // Re-open the cache at the first generation. let (_cache, local_entries, global_entries) = ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { - deploy_generation: first_deploy_generation, + build_version: first_version.clone(), persist: persist.clone(), shard_id, current_ids: current_ids.clone(),