Skip to content

Commit

Permalink
support recursive nullability changes in backwards compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
ParkMyCar committed Nov 6, 2024
1 parent f7c7a37 commit 23d5c11
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 31 deletions.
257 changes: 227 additions & 30 deletions src/persist-types/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,14 +345,47 @@ fn backward_compatible_struct(old: &Fields, new: &Fields) -> Option<ArrayMigrati
});
}
Some(NoOp) => continue,
// For now, don't support both making a field nullable and also
// modifying it in some other way. It doesn't seem that we need this for
// mz usage.
Some(_) if make_nullable => return None,
Some(migration) => field_migrations.push(Recurse {
name: n.name().clone(),
migration,
}),
Some(migration) => {
/// Checks if an [`ArrayMigration`] is only recursively making fields nullable.
fn recursively_all_nullable(migration: &ArrayMigration) -> bool {
match migration {
NoOp => true,
List(_field, child) => recursively_all_nullable(child),
Struct(children) => children.iter().all(|child| match child {
AddFieldNullableAtEnd { .. } | DropField { .. } => false,
AlterFieldNullable { .. } => true,
Recurse { migration, .. } => recursively_all_nullable(migration),
}),
}
}

// We only support making a field nullable concurrently with other changes to said
// field, if those other changes are making children nullable as well. Otherwise we
// don't allow the migration.
//
// Note: There's nothing that should really prevent us from supporting this, but at
// the moment it's not needed in Materialize.
if make_nullable {
if recursively_all_nullable(&migration) {
field_migrations.extend([
AlterFieldNullable {
name: n.name().clone(),
},
Recurse {
name: n.name().clone(),
migration,
},
]);
} else {
return None;
}
} else {
field_migrations.push(Recurse {
name: n.name().clone(),
migration,
})
}
}
}
}

Expand Down Expand Up @@ -390,34 +423,34 @@ mod tests {

use super::*;

#[track_caller]
fn testcase(old: DataType, new: DataType, expected: Option<bool>) {
let migration = super::backward_compatible_typ(&old, &new);
let actual = migration.as_ref().map(|x| x.contains_drop());
assert_eq!(actual, expected);
// If it's backward compatible, make sure that the migration
// logic works.
if let Some(migration) = migration {
let (old, new) = (new_empty_array(&old), new_empty_array(&new));
let migrated = migration.migrate(old);
assert_eq!(new.data_type(), migrated.data_type());
}
}

fn struct_(fields: impl IntoIterator<Item = (&'static str, DataType, bool)>) -> DataType {
let fields = fields
.into_iter()
.map(|(name, typ, nullable)| Field::new(name, typ, nullable))
.collect();
DataType::Struct(fields)
}

// NB: We also have proptest coverage of all this, but it works on
// RelationDesc+SourceData and so lives in src/storage-types.
#[mz_ore::test]
fn backward_compatible() {
use DataType::*;

#[track_caller]
fn testcase(old: DataType, new: DataType, expected: Option<bool>) {
let migration = super::backward_compatible_typ(&old, &new);
let actual = migration.as_ref().map(|x| x.contains_drop());
assert_eq!(actual, expected);
// If it's backward compatible, make sure that the migration
// logic works.
if let Some(migration) = migration {
let (old, new) = (new_empty_array(&old), new_empty_array(&new));
let migrated = migration.migrate(old);
assert_eq!(new.data_type(), migrated.data_type());
}
}

fn struct_(fields: impl IntoIterator<Item = (&'static str, DataType, bool)>) -> DataType {
let fields = fields
.into_iter()
.map(|(name, typ, nullable)| Field::new(name, typ, nullable))
.collect();
DataType::Struct(fields)
}

// Matching primitive types
testcase(Boolean, Boolean, Some(false));
testcase(Utf8, Utf8, Some(false));
Expand Down Expand Up @@ -584,5 +617,169 @@ mod tests {
),
Some(false),
);

// Nested nullability changes
testcase(
struct_([("0", struct_([("foo", Utf8, false)]), false)]),
struct_([("0", struct_([("foo", Utf8, true)]), true)]),
Some(false),
)
}

/// This is a regression test for a case we found when trying to merge [#30205]
///
/// [#30205]: https://github.com/MaterializeInc/materialize/pull/30205
#[mz_ore::test]
fn backwards_compatible_nested_types() {
use DataType::*;

testcase(
struct_([
(
"ok",
struct_([
(
"0",
List(
Field::new_struct(
"map_entries",
vec![
Field::new("key", Utf8, false),
Field::new("val", Int32, true),
],
false,
)
.into(),
),
true,
),
(
"1",
List(
Field::new_struct(
"map_entries",
vec![
Field::new("key", Utf8, false),
Field::new("val", Int32, true),
],
false,
)
.into(),
),
false,
),
(
"2",
List(
Field::new_list("item", Field::new_list_field(Int32, true), true)
.into(),
),
true,
),
(
"3",
List(
Field::new_list("item", Field::new_list_field(Int32, true), true)
.into(),
),
false,
),
("4", struct_([("0", Int32, true), ("1", Utf8, true)]), true),
(
"5",
struct_([("0", Int32, false), ("1", Utf8, false)]),
false,
),
("6", Utf8, true),
(
"7",
struct_([
(
"dims",
List(Field::new_list_field(FixedSizeBinary(16), true).into()),
true,
),
("vals", List(Field::new_list_field(Utf8, true).into()), true),
]),
false,
),
]),
true,
),
("err", Binary, true),
]),
struct_([
(
"ok",
struct_([
(
"0",
List(
Field::new_struct(
"map_entries",
vec![
Field::new("key", Utf8, false),
Field::new("val", Int32, true),
],
false,
)
.into(),
),
true,
),
(
"1",
List(
Field::new_struct(
"map_entries",
vec![
Field::new("key", Utf8, false),
Field::new("val", Int32, true),
],
false,
)
.into(),
),
true,
),
(
"2",
List(
Field::new_list("item", Field::new_list_field(Int32, true), true)
.into(),
),
true,
),
(
"3",
List(
Field::new_list("item", Field::new_list_field(Int32, true), true)
.into(),
),
true,
),
("4", struct_([("0", Int32, true), ("1", Utf8, true)]), true),
("5", struct_([("0", Int32, true), ("1", Utf8, true)]), true),
("6", Utf8, true),
(
"7",
struct_([
(
"dims",
List(Field::new_list_field(FixedSizeBinary(16), true).into()),
true,
),
("vals", List(Field::new_list_field(Utf8, true).into()), true),
]),
true,
),
]),
true,
),
("err", Binary, true),
]),
// Should be able to migrate, should not contain any drops.
Some(false),
)
}
}
4 changes: 3 additions & 1 deletion src/storage-controller/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,9 +463,11 @@ where
.latest_schema::<SourceData, (), T, Diff>(shard_id, diagnostics)
.await
.expect("invalid persist usage");
let Some((schema_id, _, _)) = latest_schema else {
let Some((schema_id, current_schema, _)) = latest_schema else {
tracing::debug!(?global_id, "no schema registered");
continue;
};
tracing::debug!(?global_id, ?current_schema, new_schema = ?relation_desc, "migrating schema");

let diagnostics = Diagnostics {
shard_name: global_id.to_string(),
Expand Down

0 comments on commit 23d5c11

Please sign in to comment.