diff --git a/src/adapter/src/catalog/migrate.rs b/src/adapter/src/catalog/migrate.rs index f3ab840ef4e23..93d2decf6b264 100644 --- a/src/adapter/src/catalog/migrate.rs +++ b/src/adapter/src/catalog/migrate.rs @@ -208,6 +208,23 @@ pub(crate) fn durable_migrate( Some(EXPR_CACHE_MIGRATION_DONE), )?; } + + // Migrate the builtin migration shard to a new shard. We're updating the keys to use the explicit + // binary version instead of the deploy generation. + const BUILTIN_MIGRATION_SHARD_MIGRATION_KEY: &str = "migration_shard_migration"; + const BUILTIN_MIGRATION_SHARD_MIGRATION_DONE: u64 = 1; + if tx.get_config(BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string()) + != Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE) + { + if let Some(shard_id) = tx.get_builtin_migration_shard() { + tx.mark_shards_as_finalized(btreeset! {shard_id}); + tx.set_builtin_migration_shard(ShardId::new())?; + } + tx.set_config( + BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string(), + Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE), + )?; + } Ok(()) } diff --git a/src/adapter/src/catalog/open/builtin_item_migration.rs b/src/adapter/src/catalog/open/builtin_item_migration.rs index a88882b8634a3..077ffb1da93aa 100644 --- a/src/adapter/src/catalog/open/builtin_item_migration.rs +++ b/src/adapter/src/catalog/open/builtin_item_migration.rs @@ -68,7 +68,6 @@ pub(crate) async fn migrate_builtin_items( } BuiltinItemMigrationConfig::ZeroDownTime { persist_client, - deploy_generation, read_only, } => { migrate_builtin_items_0dt( @@ -77,7 +76,6 @@ pub(crate) async fn migrate_builtin_items( local_expr_cache, persist_client, migrated_builtins, - deploy_generation, read_only, ) .await @@ -134,11 +132,11 @@ async fn migrate_builtin_items_legacy( /// /// 1. Each environment has a dedicated persist shard, called the migration shard, that allows /// environments to durably write down metadata while in read-only mode. The shard is a -/// mapping of `(GlobalId, deploy_generation)` to `ShardId`. -/// 2. Collect the `GlobalId` of all migrated tables for the current deploy generation. +/// mapping of `(GlobalId, build_version)` to `ShardId`. +/// 2. Collect the `GlobalId` of all migrated tables for the current build version. /// 3. Read in the current contents of the migration shard. /// 4. Collect all the `ShardId`s from the migration shard that are not at the current -/// `deploy_generation` or are not in the set of migrated tables. +/// `build_version` or are not in the set of migrated tables. /// a. If they ARE NOT mapped to a `GlobalId` in the storage metadata then they are shards /// from an incomplete migration. Finalize them and remove them from the migration shard. /// Note: care must be taken to not remove the shard from the migration shard until we are @@ -146,10 +144,10 @@ async fn migrate_builtin_items_legacy( /// b. If they ARE mapped to a `GlobalId` in the storage metadata then they are shards from a /// complete migration. Remove them from the migration shard. /// 5. Collect all the `GlobalId`s of tables that are migrated, but not in the migration shard -/// for the current deploy generation. Generate new `ShardId`s and add them to the migration +/// for the current build version. Generate new `ShardId`s and add them to the migration /// shard. /// 6. At this point the migration shard should only logically contain a mapping of migrated -/// table `GlobalId`s to new `ShardId`s for the current deploy generation. For each of these +/// table `GlobalId`s to new `ShardId`s for the current build version. For each of these /// `GlobalId`s such that the `ShardId` isn't already in the storage metadata: /// a. Remove the current `GlobalId` to `ShardId` mapping from the storage metadata. /// b. Finalize the removed `ShardId`s. @@ -177,7 +175,6 @@ async fn migrate_builtin_items_0dt( local_expr_cache: &mut LocalExpressionCache, persist_client: PersistClient, migrated_builtins: Vec, - deploy_generation: u64, read_only: bool, ) -> Result { assert_eq!( @@ -186,6 +183,8 @@ async fn migrate_builtin_items_0dt( "txn must be in savepoint mode when read_only is true, and in writable mode when read_only is false" ); + let build_version = state.config.build_info.semver_version(); + // 0. Update durably stored fingerprints. let id_fingerprint_map: BTreeMap<_, _> = BUILTINS::iter(&state.config().builtins_cfg) .map(|builtin| { @@ -237,7 +236,9 @@ async fn migrate_builtin_items_0dt( .expect("builtin migration shard should exist for opened catalogs"); let diagnostics = Diagnostics { shard_name: "builtin_migration".to_string(), - handle_purpose: format!("builtin table migration shard for org {organization_id:?} generation {deploy_generation:?}"), + handle_purpose: format!( + "builtin table migration shard for org {organization_id:?} version {build_version:?}" + ), }; let mut since_handle: SinceHandle = persist_client .open_critical_since( @@ -348,16 +349,16 @@ async fn migrate_builtin_items_0dt( txn.get_collection_metadata() }; for (table_key, shard_id) in global_id_shards.clone() { - if table_key.deploy_generation > deploy_generation { + if table_key.build_version > build_version { halt!( - "saw deploy generation {}, which is greater than current deploy generation {}", - table_key.deploy_generation, - deploy_generation + "saw build version {}, which is greater than current build version {}", + table_key.build_version, + build_version ); } if !migrated_storage_collections.contains(&table_key.global_id) - || table_key.deploy_generation < deploy_generation + || table_key.build_version < build_version { global_id_shards.remove(&table_key); if storage_collection_metadata.get(&GlobalId::System(table_key.global_id)) @@ -370,7 +371,7 @@ async fn migrate_builtin_items_0dt( } } - // 5. Add migrated tables to migration shard for current generation. + // 5. Add migrated tables to migration shard for current build version. let mut global_id_shards: BTreeMap<_, _> = global_id_shards .into_iter() .map(|(table_key, shard_id)| (table_key.global_id, shard_id)) @@ -381,7 +382,7 @@ async fn migrate_builtin_items_0dt( global_id_shards.insert(global_id, shard_id); let table_key = TableKey { global_id, - deploy_generation, + build_version: build_version.clone(), }; migrated_shard_updates.push(((table_key, shard_id), upper, 1)); } @@ -541,15 +542,15 @@ mod persist_schema { use mz_persist_types::stats::NoneStats; use mz_persist_types::Codec; - #[derive(Debug, Clone, Default, Eq, Ord, PartialEq, PartialOrd)] + #[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)] pub(super) struct TableKey { pub(super) global_id: u64, - pub(super) deploy_generation: u64, + pub(super) build_version: semver::Version, } impl std::fmt::Display for TableKey { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}-{}", self.global_id, self.deploy_generation) + write!(f, "{}-{}", self.global_id, self.build_version) } } @@ -557,19 +558,19 @@ mod persist_schema { type Err = String; fn from_str(s: &str) -> Result { - let parts: Vec<_> = s.split('-').collect(); - let &[global_id, deploy_generation] = parts.as_slice() else { + let parts: Vec<_> = s.splitn(2, '-').collect(); + let &[global_id, build_version] = parts.as_slice() else { return Err(format!("invalid TableKey '{s}'")); }; let global_id = global_id .parse() .map_err(|e: ParseIntError| e.to_string())?; - let deploy_generation = deploy_generation + let build_version = build_version .parse() - .map_err(|e: ParseIntError| e.to_string())?; + .map_err(|e: semver::Error| e.to_string())?; Ok(TableKey { global_id, - deploy_generation, + build_version, }) } } @@ -588,6 +589,15 @@ mod persist_schema { } } + impl Default for TableKey { + fn default() -> Self { + Self { + global_id: Default::default(), + build_version: semver::Version::new(0, 0, 0), + } + } + } + impl Codec for TableKey { type Storage = (); type Schema = TableKeySchema; diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 2f26cff656e02..0bd74636268a3 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -3986,7 +3986,6 @@ pub fn serve( let builtin_item_migration_config = if enable_0dt_deployment { BuiltinItemMigrationConfig::ZeroDownTime { persist_client: persist_client.clone(), - deploy_generation: controller_config.deploy_generation, read_only: read_only_controllers, } } else { diff --git a/src/catalog/src/config.rs b/src/catalog/src/config.rs index b088ad52060c9..f4a0acb81ae10 100644 --- a/src/catalog/src/config.rs +++ b/src/catalog/src/config.rs @@ -98,7 +98,6 @@ pub enum BuiltinItemMigrationConfig { Legacy, ZeroDownTime { persist_client: PersistClient, - deploy_generation: u64, read_only: bool, }, } diff --git a/src/catalog/src/durable/transaction.rs b/src/catalog/src/durable/transaction.rs index 45fc9ec2e0b81..47b4f465af9f1 100644 --- a/src/catalog/src/durable/transaction.rs +++ b/src/catalog/src/durable/transaction.rs @@ -1831,6 +1831,13 @@ impl<'a> Transaction<'a> { .map(|shard_id| shard_id.parse().expect("valid ShardId")) } + pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> { + self.set_setting( + BUILTIN_MIGRATION_SHARD_KEY.to_string(), + Some(shard_id.to_string()), + ) + } + pub fn get_expression_cache_shard(&self) -> Option { self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string()) .map(|shard_id| shard_id.parse().expect("valid ShardId"))