diff --git a/src/adapter/src/catalog/migrate.rs b/src/adapter/src/catalog/migrate.rs index c62d768a838e6..f3ab840ef4e23 100644 --- a/src/adapter/src/catalog/migrate.rs +++ b/src/adapter/src/catalog/migrate.rs @@ -9,17 +9,21 @@ use std::collections::BTreeMap; +use maplit::btreeset; use mz_catalog::builtin::BuiltinTable; use mz_catalog::durable::Transaction; use mz_catalog::memory::objects::StateUpdate; use mz_ore::collections::CollectionExt; use mz_ore::now::NowFn; +use mz_persist_types::ShardId; use mz_repr::{CatalogItemId, Timestamp}; use mz_sql::ast::display::AstDisplay; use mz_sql_parser::ast::{Raw, Statement}; +use mz_storage_client::controller::StorageTxn; use semver::Version; use tracing::info; use uuid::Uuid; + // DO NOT add any more imports from `crate` outside of `crate::catalog`. use crate::catalog::open::into_consolidatable_updates_startup; use crate::catalog::state::LocalExpressionCache; @@ -186,10 +190,24 @@ pub(crate) async fn migrate( /// Migrations that run only on the durable catalog before any data is loaded into memory. pub(crate) fn durable_migrate( - _tx: &mut Transaction, + tx: &mut Transaction, _organization_id: Uuid, _boot_ts: Timestamp, ) -> Result<(), anyhow::Error> { + // Migrate the expression cache to a new shard. We're updating the keys to use the explicit + // binary version instead of the deploy generation. + const EXPR_CACHE_MIGRATION_KEY: &str = "expr_cache_migration"; + const EXPR_CACHE_MIGRATION_DONE: u64 = 1; + if tx.get_config(EXPR_CACHE_MIGRATION_KEY.to_string()) != Some(EXPR_CACHE_MIGRATION_DONE) { + if let Some(shard_id) = tx.get_expression_cache_shard() { + tx.mark_shards_as_finalized(btreeset! {shard_id}); + tx.set_expression_cache_shard(ShardId::new())?; + } + tx.set_config( + EXPR_CACHE_MIGRATION_KEY.to_string(), + Some(EXPR_CACHE_MIGRATION_DONE), + )?; + } Ok(()) } diff --git a/src/adapter/src/catalog/open.rs b/src/adapter/src/catalog/open.rs index eab051fd2057b..e1174cc21290a 100644 --- a/src/adapter/src/catalog/open.rs +++ b/src/adapter/src/catalog/open.rs @@ -293,7 +293,6 @@ impl Catalog { aws_privatelink_availability_zones: config.aws_privatelink_availability_zones, http_host_name: config.http_host_name, }; - let deploy_generation = storage.get_deployment_generation().await?; let mut updates: Vec<_> = storage.sync_to_current_updates().await?; assert!(!updates.is_empty(), "initial catalog snapshot is missing"); @@ -464,7 +463,7 @@ impl Catalog { .collect(); let dyncfgs = config.persist_client.dyncfgs().clone(); let expr_cache_config = ExpressionCacheConfig { - deploy_generation, + build_version: config.build_info.semver_version(), shard_id: txn .get_expression_cache_shard() .expect("expression cache shard should exist for opened catalogs"), diff --git a/src/catalog/src/durable/transaction.rs b/src/catalog/src/durable/transaction.rs index 544b6702f5721..45fc9ec2e0b81 100644 --- a/src/catalog/src/durable/transaction.rs +++ b/src/catalog/src/durable/transaction.rs @@ -1836,6 +1836,13 @@ impl<'a> Transaction<'a> { .map(|shard_id| shard_id.parse().expect("valid ShardId")) } + pub fn set_expression_cache_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> { + self.set_setting( + EXPRESSION_CACHE_SHARD_KEY.to_string(), + Some(shard_id.to_string()), + ) + } + /// Updates the catalog `enable_0dt_deployment` "config" value to /// match the `enable_0dt_deployment` "system var" value. /// diff --git a/src/catalog/src/expr_cache.rs b/src/catalog/src/expr_cache.rs index c3ee62ce17303..20e5820876a58 100644 --- a/src/catalog/src/expr_cache.rs +++ b/src/catalog/src/expr_cache.rs @@ -32,10 +32,11 @@ use mz_repr::GlobalId; use mz_transform::dataflow::DataflowMetainfo; use mz_transform::notice::OptimizerNotice; use proptest_derive::Arbitrary; +use semver::Version; use serde::{Deserialize, Serialize}; use timely::Container; use tokio::sync::mpsc; -use tracing::debug; +use tracing::{debug, warn}; #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Arbitrary)] enum ExpressionType { @@ -70,7 +71,7 @@ impl GlobalExpressions { #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, Arbitrary)] struct CacheKey { - deploy_generation: u64, + build_version: String, id: GlobalId, expr_type: ExpressionType, } @@ -107,7 +108,7 @@ impl DurableCacheCodec for ExpressionCodec { /// Configuration needed to initialize an [`ExpressionCache`]. #[derive(Debug, Clone)] pub struct ExpressionCacheConfig { - pub deploy_generation: u64, + pub build_version: Version, pub persist: PersistClient, pub shard_id: ShardId, pub current_ids: BTreeSet, @@ -118,7 +119,7 @@ pub struct ExpressionCacheConfig { /// A durable cache of optimized expressions. pub struct ExpressionCache { - deploy_generation: u64, + build_version: Version, durable_cache: DurableCache, } @@ -136,7 +137,7 @@ impl ExpressionCache { /// Returns all cached expressions in the current deploy generation, after reconciliation. pub async fn open( ExpressionCacheConfig { - deploy_generation, + build_version, persist, shard_id, current_ids, @@ -151,7 +152,7 @@ impl ExpressionCache { ) { let durable_cache = DurableCache::new(&persist, shard_id, "expressions").await; let mut cache = Self { - deploy_generation, + build_version, durable_cache, }; @@ -189,8 +190,16 @@ impl ExpressionCache { let mut global_expressions = BTreeMap::new(); for (key, expressions) in self.durable_cache.entries_local() { - if key.deploy_generation == self.deploy_generation { - // Only deserialize the current generation. + let build_version = match key.build_version.parse::() { + Ok(build_version) => build_version, + Err(err) => { + warn!("unable to parse build version: {key:?}: {err:?}"); + keys_to_remove.push((key.clone(), None)); + continue; + } + }; + if build_version == self.build_version { + // Only deserialize the current version. match key.expr_type { ExpressionType::Local => { let expressions: LocalExpressions = match bincode::deserialize(expressions) @@ -234,7 +243,7 @@ impl ExpressionCache { } } } else if remove_prior_gens { - // Remove expressions from previous generations. + // Remove expressions from previous versions. keys_to_remove.push((key.clone(), None)); } } @@ -268,13 +277,14 @@ impl ExpressionCache { invalidate_ids: BTreeSet, ) { let mut entries = BTreeMap::new(); + let build_version = self.build_version.to_string(); // Important to do `invalidate_ids` first, so that `new_X_expressions` overwrites duplicate // keys. for id in invalidate_ids { entries.insert( CacheKey { id, - deploy_generation: self.deploy_generation, + build_version: build_version.clone(), expr_type: ExpressionType::Local, }, None, @@ -282,7 +292,7 @@ impl ExpressionCache { entries.insert( CacheKey { id, - deploy_generation: self.deploy_generation, + build_version: build_version.clone(), expr_type: ExpressionType::Global, }, None, @@ -301,7 +311,7 @@ impl ExpressionCache { entries.insert( CacheKey { id, - deploy_generation: self.deploy_generation, + build_version: build_version.clone(), expr_type: ExpressionType::Local, }, Some(expressions), @@ -320,7 +330,7 @@ impl ExpressionCache { entries.insert( CacheKey { id, - deploy_generation: self.deploy_generation, + build_version: build_version.clone(), expr_type: ExpressionType::Global, }, Some(expressions), @@ -433,6 +443,7 @@ mod tests { use proptest::proptest; use proptest::strategy::{Strategy, ValueTree}; use proptest::test_runner::{RngAlgorithm, TestRng, TestRunner}; + use semver::Version; use tracing::info; use crate::expr_cache::{ @@ -550,8 +561,8 @@ mod tests { let local_tree: ArbitraryTimeout = ArbitraryTimeout::new(); let global_tree: ArbitraryTimeout = ArbitraryTimeout::new(); - let first_deploy_generation = 0; - let second_deploy_generation = 1; + let first_version = Version::new(0, 1, 0); + let second_version = Version::new(0, 2, 0); let persist = PersistClient::new_for_tests().await; let shard_id = ShardId::new(); @@ -567,7 +578,7 @@ mod tests { // Open a new empty cache. let (cache, local_exprs, global_exprs) = ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { - deploy_generation: first_deploy_generation, + build_version: first_version.clone(), persist: persist.clone(), shard_id, current_ids: current_ids.clone(), @@ -611,7 +622,7 @@ mod tests { // Re-open the cache. let (_cache, local_entries, global_entries) = ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { - deploy_generation: first_deploy_generation, + build_version: first_version.clone(), persist: persist.clone(), shard_id, current_ids: current_ids.clone(), @@ -640,7 +651,7 @@ mod tests { // Re-open the cache. let (_cache, local_entries, global_entries) = ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { - deploy_generation: first_deploy_generation, + build_version: first_version.clone(), persist: persist.clone(), shard_id, current_ids: current_ids.clone(), @@ -683,7 +694,7 @@ mod tests { // Re-open the cache. let (_cache, local_entries, global_entries) = ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { - deploy_generation: first_deploy_generation, + build_version: first_version.clone(), persist: persist.clone(), shard_id, current_ids: current_ids.clone(), @@ -706,7 +717,7 @@ mod tests { // Open the cache at a new generation. let (cache, local_entries, global_entries) = ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { - deploy_generation: second_deploy_generation, + build_version: second_version.clone(), persist: persist.clone(), shard_id, current_ids: current_ids.clone(), @@ -758,7 +769,7 @@ mod tests { // Re-open the cache at the first generation. let (_cache, local_entries, global_entries) = ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { - deploy_generation: first_deploy_generation, + build_version: first_version.clone(), persist: persist.clone(), shard_id, current_ids: current_ids.clone(), @@ -782,7 +793,7 @@ mod tests { remove_prior_gens = true; let (_cache, local_entries, global_entries) = ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { - deploy_generation: second_deploy_generation, + build_version: second_version.clone(), persist: persist.clone(), shard_id, current_ids: current_ids.clone(), @@ -805,7 +816,7 @@ mod tests { // Re-open the cache at the first generation. let (_cache, local_entries, global_entries) = ExpressionCacheHandle::spawn_expression_cache(ExpressionCacheConfig { - deploy_generation: first_deploy_generation, + build_version: first_version.clone(), persist: persist.clone(), shard_id, current_ids: current_ids.clone(),