Skip to content

Commit

Permalink
catalog: Key migration shard by binary version
Browse files Browse the repository at this point in the history
The builtin migration shard is a persist shard used during version
upgrades. It uses the environment's deploy generation as part of the
key for the values. The assumption was that two environments with the
same deploy generation would always have the same binary version. This
assumption would allow all migration steps of two environments with the
same deploy generation to be idempotent.

This assumption was not correct. Two environments with different binary
version can use the same deploy generation as long as one environment
never fully completed a deployment. This is especially bad because the
migration shard is written to and read from in read-only mode, before
a deployment is complete.

This commit updates the key of the builtin migration shard to
explicitly use the binary version of environmentd so that the
migration steps are idempotent.

Fixes #MaterializeInc/database-issues/issues/8917
  • Loading branch information
jkosh44 committed Jan 27, 2025
1 parent 5df1932 commit 7a635c9
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 26 deletions.
17 changes: 17 additions & 0 deletions src/adapter/src/catalog/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,23 @@ pub(crate) fn durable_migrate(
Some(EXPR_CACHE_MIGRATION_DONE),
)?;
}

// Migrate the builtin migration shard to a new shard. We're updating the keys to use the explicit
// binary version instead of the deploy generation.
const BUILTIN_MIGRATION_SHARD_MIGRATION_KEY: &str = "migration_shard_migration";
const BUILTIN_MIGRATION_SHARD_MIGRATION_DONE: u64 = 1;
if tx.get_config(BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string())
!= Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE)
{
if let Some(shard_id) = tx.get_builtin_migration_shard() {
tx.mark_shards_as_finalized(btreeset! {shard_id});
tx.set_builtin_migration_shard(ShardId::new())?;
}
tx.set_config(
BUILTIN_MIGRATION_SHARD_MIGRATION_KEY.to_string(),
Some(BUILTIN_MIGRATION_SHARD_MIGRATION_DONE),
)?;
}
Ok(())
}

Expand Down
58 changes: 34 additions & 24 deletions src/adapter/src/catalog/open/builtin_item_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ pub(crate) async fn migrate_builtin_items(
}
BuiltinItemMigrationConfig::ZeroDownTime {
persist_client,
deploy_generation,
read_only,
} => {
migrate_builtin_items_0dt(
Expand All @@ -77,7 +76,6 @@ pub(crate) async fn migrate_builtin_items(
local_expr_cache,
persist_client,
migrated_builtins,
deploy_generation,
read_only,
)
.await
Expand Down Expand Up @@ -134,22 +132,22 @@ async fn migrate_builtin_items_legacy(
///
/// 1. Each environment has a dedicated persist shard, called the migration shard, that allows
/// environments to durably write down metadata while in read-only mode. The shard is a
/// mapping of `(GlobalId, deploy_generation)` to `ShardId`.
/// 2. Collect the `GlobalId` of all migrated tables for the current deploy generation.
/// mapping of `(GlobalId, build_version)` to `ShardId`.
/// 2. Collect the `GlobalId` of all migrated tables for the current build version.
/// 3. Read in the current contents of the migration shard.
/// 4. Collect all the `ShardId`s from the migration shard that are not at the current
/// `deploy_generation` or are not in the set of migrated tables.
/// `build_version` or are not in the set of migrated tables.
/// a. If they ARE NOT mapped to a `GlobalId` in the storage metadata then they are shards
/// from an incomplete migration. Finalize them and remove them from the migration shard.
/// Note: care must be taken to not remove the shard from the migration shard until we are
/// sure that they will be finalized, otherwise the shard will leak.
/// b. If they ARE mapped to a `GlobalId` in the storage metadata then they are shards from a
/// complete migration. Remove them from the migration shard.
/// 5. Collect all the `GlobalId`s of tables that are migrated, but not in the migration shard
/// for the current deploy generation. Generate new `ShardId`s and add them to the migration
/// for the current build version. Generate new `ShardId`s and add them to the migration
/// shard.
/// 6. At this point the migration shard should only logically contain a mapping of migrated
/// table `GlobalId`s to new `ShardId`s for the current deploy generation. For each of these
/// table `GlobalId`s to new `ShardId`s for the current build version. For each of these
/// `GlobalId`s such that the `ShardId` isn't already in the storage metadata:
/// a. Remove the current `GlobalId` to `ShardId` mapping from the storage metadata.
/// b. Finalize the removed `ShardId`s.
Expand Down Expand Up @@ -177,7 +175,6 @@ async fn migrate_builtin_items_0dt(
local_expr_cache: &mut LocalExpressionCache,
persist_client: PersistClient,
migrated_builtins: Vec<CatalogItemId>,
deploy_generation: u64,
read_only: bool,
) -> Result<BuiltinItemMigrationResult, Error> {
assert_eq!(
Expand All @@ -186,6 +183,8 @@ async fn migrate_builtin_items_0dt(
"txn must be in savepoint mode when read_only is true, and in writable mode when read_only is false"
);

let build_version = state.config.build_info.semver_version();

// 0. Update durably stored fingerprints.
let id_fingerprint_map: BTreeMap<_, _> = BUILTINS::iter(&state.config().builtins_cfg)
.map(|builtin| {
Expand Down Expand Up @@ -237,7 +236,9 @@ async fn migrate_builtin_items_0dt(
.expect("builtin migration shard should exist for opened catalogs");
let diagnostics = Diagnostics {
shard_name: "builtin_migration".to_string(),
handle_purpose: format!("builtin table migration shard for org {organization_id:?} generation {deploy_generation:?}"),
handle_purpose: format!(
"builtin table migration shard for org {organization_id:?} version {build_version:?}"
),
};
let mut since_handle: SinceHandle<TableKey, ShardId, Timestamp, Diff, i64> = persist_client
.open_critical_since(
Expand Down Expand Up @@ -348,16 +349,16 @@ async fn migrate_builtin_items_0dt(
txn.get_collection_metadata()
};
for (table_key, shard_id) in global_id_shards.clone() {
if table_key.deploy_generation > deploy_generation {
if table_key.build_version > build_version {
halt!(
"saw deploy generation {}, which is greater than current deploy generation {}",
table_key.deploy_generation,
deploy_generation
"saw build version {}, which is greater than current build version {}",
table_key.build_version,
build_version
);
}

if !migrated_storage_collections.contains(&table_key.global_id)
|| table_key.deploy_generation < deploy_generation
|| table_key.build_version < build_version
{
global_id_shards.remove(&table_key);
if storage_collection_metadata.get(&GlobalId::System(table_key.global_id))
Expand All @@ -370,7 +371,7 @@ async fn migrate_builtin_items_0dt(
}
}

// 5. Add migrated tables to migration shard for current generation.
// 5. Add migrated tables to migration shard for current build version.
let mut global_id_shards: BTreeMap<_, _> = global_id_shards
.into_iter()
.map(|(table_key, shard_id)| (table_key.global_id, shard_id))
Expand All @@ -381,7 +382,7 @@ async fn migrate_builtin_items_0dt(
global_id_shards.insert(global_id, shard_id);
let table_key = TableKey {
global_id,
deploy_generation,
build_version: build_version.clone(),
};
migrated_shard_updates.push(((table_key, shard_id), upper, 1));
}
Expand Down Expand Up @@ -541,35 +542,35 @@ mod persist_schema {
use mz_persist_types::stats::NoneStats;
use mz_persist_types::Codec;

#[derive(Debug, Clone, Default, Eq, Ord, PartialEq, PartialOrd)]
#[derive(Debug, Clone, Eq, Ord, PartialEq, PartialOrd)]
pub(super) struct TableKey {
pub(super) global_id: u64,
pub(super) deploy_generation: u64,
pub(super) build_version: semver::Version,
}

impl std::fmt::Display for TableKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}-{}", self.global_id, self.deploy_generation)
write!(f, "{}-{}", self.global_id, self.build_version)
}
}

impl std::str::FromStr for TableKey {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let parts: Vec<_> = s.split('-').collect();
let &[global_id, deploy_generation] = parts.as_slice() else {
let parts: Vec<_> = s.splitn(2, '-').collect();
let &[global_id, build_version] = parts.as_slice() else {
return Err(format!("invalid TableKey '{s}'"));
};
let global_id = global_id
.parse()
.map_err(|e: ParseIntError| e.to_string())?;
let deploy_generation = deploy_generation
let build_version = build_version
.parse()
.map_err(|e: ParseIntError| e.to_string())?;
.map_err(|e: semver::Error| e.to_string())?;
Ok(TableKey {
global_id,
deploy_generation,
build_version,
})
}
}
Expand All @@ -588,6 +589,15 @@ mod persist_schema {
}
}

impl Default for TableKey {
fn default() -> Self {
Self {
global_id: Default::default(),
build_version: semver::Version::new(0, 0, 0),
}
}
}

impl Codec for TableKey {
type Storage = ();
type Schema = TableKeySchema;
Expand Down
1 change: 0 additions & 1 deletion src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3986,7 +3986,6 @@ pub fn serve(
let builtin_item_migration_config = if enable_0dt_deployment {
BuiltinItemMigrationConfig::ZeroDownTime {
persist_client: persist_client.clone(),
deploy_generation: controller_config.deploy_generation,
read_only: read_only_controllers,
}
} else {
Expand Down
1 change: 0 additions & 1 deletion src/catalog/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ pub enum BuiltinItemMigrationConfig {
Legacy,
ZeroDownTime {
persist_client: PersistClient,
deploy_generation: u64,
read_only: bool,
},
}
Expand Down
7 changes: 7 additions & 0 deletions src/catalog/src/durable/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1831,6 +1831,13 @@ impl<'a> Transaction<'a> {
.map(|shard_id| shard_id.parse().expect("valid ShardId"))
}

pub fn set_builtin_migration_shard(&mut self, shard_id: ShardId) -> Result<(), CatalogError> {
self.set_setting(
BUILTIN_MIGRATION_SHARD_KEY.to_string(),
Some(shard_id.to_string()),
)
}

pub fn get_expression_cache_shard(&self) -> Option<ShardId> {
self.get_setting(EXPRESSION_CACHE_SHARD_KEY.to_string())
.map(|shard_id| shard_id.parse().expect("valid ShardId"))
Expand Down

0 comments on commit 7a635c9

Please sign in to comment.