Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

catalog: Key migration shard by binary version #31210

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/adapter/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,23 @@ pub(crate) fn durable_migrate(
Some(EXPR_CACHE_MIGRATION_DONE),
)?;
}

// Migrate the builtin migration shard to a new shard. We're updating the keys to use the explicit
// binary version instead of the deploy generation.
const BUILTIN_MIGRATION_SHARD_MIGRATION_KEY: &str = "migration_shard_migration";
const BUILTIN_MIGRATION_SHARD_MIGRATION_DONE: u64 = 1;
if tx.get_config(BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string())
!= Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE)
{
if let Some(shard_id) = tx.get_builtin_migration_shard() {
tx.mark_shards_as_finalized(btreeset! {shard_id});
tx.set_builtin_migration_shard(ShardId::new())?;
}
tx.set_config(
BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string(),
Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE),
)?;
}
Ok(())
}

Expand Down
58 changes: 34 additions & 24 deletions src/adapter/src/catalog/open/builtin_item_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ pub(crate) async fn migrate_builtin_items(
}
BuiltinItemMigrationConfig::ZeroDownTime {
persist_client,
deploy_generation,
read_only,
} => {
migrate_builtin_items_0dt(
Expand All @@ -77,7 +76,6 @@ pub(crate) async fn migrate_builtin_items(
local_expr_cache,
persist_client,
migrated_builtins,
deploy_generation,
read_only,
)
.await
Expand Down Expand Up @@ -134,22 +132,22 @@ async fn migrate_builtin_items_legacy(
///
/// 1. Each environment has a dedicated persist shard, called the migration shard, that allows
/// environments to durably write down metadata while in read-only mode. The shard is a
/// mapping of `(GlobalId, deploy_generation)` to `ShardId`.
/// 2. Collect the `GlobalId` of all migrated tables for the current deploy generation.
/// mapping of `(GlobalId, build_version)` to `ShardId`.
/// 2. Collect the `GlobalId` of all migrated tables for the current build version.
/// 3. Read in the current contents of the migration shard.
/// 4. Collect all the `ShardId`s from the migration shard that are not at the current
/// `deploy_generation` or are not in the set of migrated tables.
/// `build_version` or are not in the set of migrated tables.
/// a. If they ARE NOT mapped to a `GlobalId` in the storage metadata then they are shards
/// from an incomplete migration. Finalize them and remove them from the migration shard.
/// Note: care must be taken to not remove the shard from the migration shard until we are
/// sure that they will be finalized, otherwise the shard will leak.
/// b. If they ARE mapped to a `GlobalId` in the storage metadata then they are shards from a
/// complete migration. Remove them from the migration shard.
/// 5. Collect all the `GlobalId`s of tables that are migrated, but not in the migration shard
/// for the current deploy generation. Generate new `ShardId`s and add them to the migration
/// for the current build version. Generate new `ShardId`s and add them to the migration
/// shard.
/// 6. At this point the migration shard should only logically contain a mapping of migrated
/// table `GlobalId`s to new `ShardId`s for the current deploy generation. For each of these
/// table `GlobalId`s to new `ShardId`s for the current build version. For each of these
/// `GlobalId`s such that the `ShardId` isn't already in the storage metadata:
/// a. Remove the current `GlobalId` to `ShardId` mapping from the storage metadata.
/// b. Finalize the removed `ShardId`s.
Expand Down Expand Up @@ -177,7 +175,6 @@ async fn migrate_builtin_items_0dt(
local_expr_cache: &mut LocalExpressionCache,
persist_client: PersistClient,
migrated_builtins: Vec<CatalogItemId>,
deploy_generation: u64,
read_only: bool,
) -> Result<BuiltinItemMigrationResult, Error> {
assert_eq!(
Expand All @@ -186,6 +183,8 @@ async fn migrate_builtin_items_0dt(
"txn must be in savepoint mode when read_only is true, and in writable mode when read_only is false"
);

let build_version = state.config.build_info.semver_version();

// 0. Update durably stored fingerprints.
let id_fingerprint_map: BTreeMap<_, _> = BUILTINS::iter(&state.config().builtins_cfg)
.map(|builtin| {
Expand Down Expand Up @@ -237,7 +236,9 @@ async fn migrate_builtin_items_0dt(
.expect("builtin migration shard should exist for opened catalogs");
let diagnostics = Diagnostics {
shard_name: "builtin_migration".to_string(),
handle_purpose: format!("builtin table migration shard for org {organization_id:?} generation {deploy_generation:?}"),
handle_purpose: format!(
"builtin table migration shard for org {organization_id:?} version {build_version:?}"
),
};
let mut since_handle: SinceHandle<TableKey, ShardId, Timestamp, Diff, i64> = persist_client
.open_critical_since(
Expand Down Expand Up @@ -348,16 +349,16 @@ async fn migrate_builtin_items_0dt(
txn.get_collection_metadata()
};
for (table_key, shard_id) in global_id_shards.clone() {
if table_key.deploy_generation > deploy_generation {
if table_key.build_version > build_version {
halt!(
"saw deploy generation {}, which is greater than current deploy generation {}",
table_key.deploy_generation,
deploy_generation
"saw build version {}, which is greater than current build version {}",
table_key.build_version,
build_version
);
}

if !migrated_storage_collections.contains(&table_key.global_id)
|| table_key.deploy_generation < deploy_generation
|| table_key.build_version < build_version
{
global_id_shards.remove(&table_key);
if storage_collection_metadata.get(&GlobalId::System(table_key.global_id))
Expand All @@ -370,7 +371,7 @@ async fn migrate_builtin_items_0dt(
}
}

// 5. Add migrated tables to migration shard for current generation.
// 5. Add migrated tables to migration shard for current build version.
let mut global_id_shards: BTreeMap<_, _> = global_id_shards
.into_iter()
.map(|(table_key, shard_id)| (table_key.global_id, shard_id))
Expand All @@ -381,7 +382,7 @@ async fn migrate_builtin_items_0dt(
global_id_shards.insert(global_id, shard_id);
let table_key = TableKey {
global_id,
deploy_generation,
build_version: build_version.clone(),
};
migrated_shard_updates.push(((table_key, shard_id), upper, 1));
}
Expand Down Expand Up @@ -541,35 +542,35 @@ mod persist_schema {
use mz_persist_types::stats::NoneStats;
use mz_persist_types::Codec;

#[derive(Debug, Clone, Default, Eq, Ord, PartialEq, PartialOrd)]
#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)]
pub(super) struct TableKey {
pub(super) global_id: u64,
pub(super) deploy_generation: u64,
pub(super) build_version: semver::Version,
}

impl std::fmt::Display for TableKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}-{}", self.global_id, self.deploy_generation)
write!(f, "{}-{}", self.global_id, self.build_version)
}
}

impl std::str::FromStr for TableKey {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let parts: Vec<_> = s.split('-').collect();
let &[global_id, deploy_generation] = parts.as_slice() else {
let parts: Vec<_> = s.splitn(2, '-').collect();
let &[global_id, build_version] = parts.as_slice() else {
return Err(format!("invalid TableKey '{s}'"));
};
let global_id = global_id
.parse()
.map_err(|e: ParseIntError| e.to_string())?;
let deploy_generation = deploy_generation
let build_version = build_version
.parse()
.map_err(|e: ParseIntError| e.to_string())?;
.map_err(|e: semver::Error| e.to_string())?;
Ok(TableKey {
global_id,
deploy_generation,
build_version,
})
}
}
Expand All @@ -588,6 +589,15 @@ mod persist_schema {
}
}

impl Default for TableKey {
fn default() -> Self {
Self {
global_id: Default::default(),
build_version: semver::Version::new(0, 0, 0),
}
}
}

impl Codec for TableKey {
type Storage = ();
type Schema = TableKeySchema;
Expand Down
1 change: 0 additions & 1 deletion src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3986,7 +3986,6 @@ pub fn serve(
let builtin_item_migration_config = if enable_0dt_deployment {
BuiltinItemMigrationConfig::ZeroDownTime {
persist_client: persist_client.clone(),
deploy_generation: controller_config.deploy_generation,
read_only: read_only_controllers,
}
} else {
Expand Down
1 change: 0 additions & 1 deletion src/catalog/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ pub enum BuiltinItemMigrationConfig {
Legacy,
ZeroDownTime {
persist_client: PersistClient,
deploy_generation: u64,
read_only: bool,
},
}
Expand Down
7 changes: 7 additions & 0 deletions src/catalog/src/durable/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1831,6 +1831,13 @@ impl<'a> Transaction<'a> {
.map(|shard_id| shard_id.parse().expect("valid ShardId"))
}

pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
self.set_setting(
BUILTIN_MIGRATION_SHARD_KEY.to_string(),
Some(shard_id.to_string()),
)
}

pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
.map(|shard_id| shard_id.parse().expect("valid ShardId"))
Expand Down
Loading