From 8eda798570553fccf06ff64a2a69619d24e098ef Mon Sep 17 00:00:00 2001
From: Parker Timmerman <parker.timmerman@materialize.com>
Date: Fri, 25 Oct 2024 11:39:21 -0400
Subject: [PATCH 1/9] start, all columns nullable

* recusively mark all columns as nullable at the Arrow/Parquet level
---
 src/repr/src/row/encode.rs | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git a/src/repr/src/row/encode.rs b/src/repr/src/row/encode.rs
index 5ca8cb4e9fe7e..084a9c2beebf8 100644
--- a/src/repr/src/row/encode.rs
+++ b/src/repr/src/row/encode.rs
@@ -827,7 +827,12 @@ impl DatumColumnEncoder {
                     .into_iter()
                     .enumerate()
                     .map(|(tag, encoder)| {
-                        let nullable = encoder.nullable;
+                        // Note: We mark all columns as nullable at the Arrow/Parquet level because
+                        // it has a negligible performance difference, but it protects us from
+                        // unintended nullability changes in the columns of SQL objects.
+                        //
+                        // See: <https://github.com/MaterializeInc/database-issues/issues/2488>
+                        let nullable = true;
                         let array = encoder.finish();
                         let field =
                             Field::new(tag.to_string(), array.data_type().clone(), nullable);
@@ -1440,7 +1445,12 @@ impl ColumnEncoder<Row> for RowColumnarEncoder {
             .iter()
             .zip_eq(encoders)
             .map(|((col_idx, _col_name), encoder)| {
-                let nullable = encoder.nullable;
+                // Note: We mark all columns as nullable at the Arrow/Parquet level because it has
+                // a negligible performance difference, but it protects us from unintended
+                // nullability changes in the columns of SQL objects.
+                //
+                // See: <https://github.com/MaterializeInc/database-issues/issues/2488>
+                let nullable = true;
                 let array = encoder.finish();
                 let field = Field::new(col_idx.to_string(), array.data_type().clone(), nullable);
 

From c6b46b5311148ac0ab70a7aef83c47d1ed1ed8d2 Mon Sep 17 00:00:00 2001
From: Parker Timmerman <parker.timmerman@materialize.com>
Date: Fri, 25 Oct 2024 13:34:39 -0400
Subject: [PATCH 2/9] implement one time migration for Arrow DataTypes in
 Persist Schema registry

---
 src/persist-client/src/internal/state.rs | 115 +++++++++---
 src/persist-client/src/schema.rs         | 215 ++++++++++++++++++++++-
 src/persist-types/src/arrow.rs           |   2 +-
 3 files changed, 306 insertions(+), 26 deletions(-)

diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs
index 91fde2dd4495f..dd1988c602532 100644
--- a/src/persist-client/src/internal/state.rs
+++ b/src/persist-client/src/internal/state.rs
@@ -7,7 +7,7 @@
 // the Business Source License, use of this software will be governed
 // by the Apache License, Version 2.0.
 
-use anyhow::ensure;
+use anyhow::{ensure, Context};
 use async_stream::{stream, try_stream};
 use std::borrow::Cow;
 use std::cmp::Ordering;
@@ -1126,7 +1126,7 @@ pub struct EncodedSchemas {
     /// The arrow `DataType` produced by this `K::Schema` at the time it was
     /// registered, encoded as a `ProtoDataType`.
     pub key_data_type: Bytes,
-    /// A full in-mem `K::Schema` impl encoded via [Codec::encode_schema].
+    /// A full in-mem `V::Schema` impl encoded via [Codec::encode_schema].
     pub val: Bytes,
     /// The arrow `DataType` produced by this `V::Schema` at the time it was
     /// registered, encoded as a `ProtoDataType`.
@@ -1294,15 +1294,17 @@ where
         key_schema: &K::Schema,
         val_schema: &V::Schema,
     ) -> ControlFlow<NoOpStateTransition<Option<SchemaId>>, Option<SchemaId>> {
-        fn data_type<T>(schema: &impl Schema2<T>) -> Bytes {
-            // To be defensive, create an empty batch and inspect the resulting
-            // data type (as opposed to something like allowing the `Schema2` to
-            // declare the DataType).
-            let array = Schema2::encoder(schema).expect("valid schema").finish();
-            let proto = Array::data_type(&array).into_proto();
+        fn encoded_data_type(data_type: &DataType) -> Bytes {
+            let proto = data_type.into_proto();
             prost::Message::encode_to_vec(&proto).into()
         }
 
+        fn decode_data_type(buf: Bytes) -> Result<DataType, anyhow::Error> {
+            let proto: mz_persist_types::arrow::ProtoDataType =
+                prost::Message::decode(buf).context("decoding schema DataType")?;
+            DataType::from_proto(proto).context("converting ProtoDataType into DataType")
+        }
+
         // Look for an existing registered SchemaId for these schemas.
         //
         // The common case is that this should be a recent one, so as a minor
@@ -1318,12 +1320,64 @@ where
             K::decode_schema(&x.key) == *key_schema && V::decode_schema(&x.val) == *val_schema
         });
         match existing_id {
-            Some((schema_id, _)) => {
-                // TODO: Validate that the decoded schemas still produce records
-                // of the recorded DataType, to detect shenanigans. Probably
-                // best to wait until we've turned on Schema2 in prod and thus
-                // committed to the current mappings.
-                Break(NoOpStateTransition(Some(*schema_id)))
+            Some((schema_id, encoded_schemas)) => {
+                let schema_id = *schema_id;
+                let new_k_datatype = mz_persist_types::columnar::data_type::<K>(key_schema)
+                    .expect("valid key schema");
+                let new_v_datatype = mz_persist_types::columnar::data_type::<V>(val_schema)
+                    .expect("valid val schema");
+
+                let new_k_encoded_datatype = encoded_data_type(&new_k_datatype);
+                let new_v_encoded_datatype = encoded_data_type(&new_v_datatype);
+
+                // Check if the generated Arrow DataTypes have changed.
+                if encoded_schemas.key_data_type != new_k_encoded_datatype
+                    || encoded_schemas.val_data_type != new_v_encoded_datatype
+                {
+                    let old_k_datatype =
+                        decode_data_type(Bytes::clone(&encoded_schemas.key_data_type))
+                            .expect("failed to roundtrip Arrow DataType");
+                    let old_v_datatype =
+                        decode_data_type(Bytes::clone(&encoded_schemas.val_data_type))
+                            .expect("failed to roundtrip Arrow DataType");
+
+                    let k_atleast_as_nullable =
+                        crate::schema::is_atleast_as_nullable(&old_k_datatype, &new_k_datatype);
+                    let v_atleast_as_nullable =
+                        crate::schema::is_atleast_as_nullable(&old_v_datatype, &new_v_datatype);
+
+                    // If the Arrow DataType for `k` or `v` has changed, but it's only become more
+                    // nullable, then we allow in-place re-writing of the schema.
+                    match (k_atleast_as_nullable, v_atleast_as_nullable) {
+                        // TODO(parkmycar): Remove this one-time migration after v0.123 ships.
+                        (Ok(()), Ok(())) => {
+                            let key = Bytes::clone(&encoded_schemas.key);
+                            let val = Bytes::clone(&encoded_schemas.val);
+                            self.schemas.insert(
+                                schema_id,
+                                EncodedSchemas {
+                                    key,
+                                    key_data_type: new_k_encoded_datatype,
+                                    val,
+                                    val_data_type: new_v_encoded_datatype,
+                                },
+                            );
+                            Continue(Some(schema_id))
+                        }
+                        (k_err, _) => {
+                            tracing::info!(
+                                "register schemas, Arrow DataType changed\nkey: {:?}\nold: {:?}\nnew: {:?}",
+                                k_err,
+                                old_k_datatype,
+                                new_k_datatype,
+                            );
+                            Break(NoOpStateTransition(None))
+                        }
+                    }
+                } else {
+                    // Everything matches.
+                    Break(NoOpStateTransition(Some(schema_id)))
+                }
             }
             None if self.is_tombstone() => {
                 // TODO: Is this right?
@@ -1334,13 +1388,17 @@ where
                 // generate the next id if/when we start supporting the removal
                 // of schemas.
                 let id = SchemaId(self.schemas.len());
+                let key_data_type = mz_persist_types::columnar::data_type::<K>(key_schema)
+                    .expect("valid key schema");
+                let val_data_type = mz_persist_types::columnar::data_type::<V>(val_schema)
+                    .expect("valid val schema");
                 let prev = self.schemas.insert(
                     id,
                     EncodedSchemas {
                         key: K::encode_schema(key_schema),
-                        key_data_type: data_type(key_schema),
+                        key_data_type: encoded_data_type(&key_data_type),
                         val: V::encode_schema(val_schema),
-                        val_data_type: data_type(val_schema),
+                        val_data_type: encoded_data_type(&val_data_type),
                     },
                 );
                 assert_eq!(prev, None);
@@ -1388,16 +1446,25 @@ where
             }));
         }
 
+        let current_key = K::decode_schema(&current.key);
+        let current_key_dt = EncodedSchemas::decode_data_type(&current.key_data_type);
+        let current_val = V::decode_schema(&current.val);
+        let current_val_dt = EncodedSchemas::decode_data_type(&current.val_data_type);
+
         let key_dt = data_type(key_schema);
         let val_dt = data_type(val_schema);
-        let key_fn = backward_compatible(
-            &EncodedSchemas::decode_data_type(&current.key_data_type),
-            &key_dt,
-        );
-        let val_fn = backward_compatible(
-            &EncodedSchemas::decode_data_type(&current.val_data_type),
-            &val_dt,
-        );
+
+        // If the schema is exactly the same as the current one, no-op.
+        if current_key == *key_schema
+            && current_key_dt == key_dt
+            && current_val == *val_schema
+            && current_val_dt == val_dt
+        {
+            return Break(NoOpStateTransition(CaESchema::Ok(*current_id)));
+        }
+
+        let key_fn = backward_compatible(&current_key_dt, &key_dt);
+        let val_fn = backward_compatible(&current_val_dt, &val_dt);
         let (Some(key_fn), Some(val_fn)) = (key_fn, val_fn) else {
             return Break(NoOpStateTransition(CaESchema::Incompatible));
         };
diff --git a/src/persist-client/src/schema.rs b/src/persist-client/src/schema.rs
index 98347a8e24c4e..a8929c573b330 100644
--- a/src/persist-client/src/schema.rs
+++ b/src/persist-client/src/schema.rs
@@ -14,6 +14,8 @@ use std::fmt::Debug;
 use std::sync::{Arc, RwLock};
 use std::time::Instant;
 
+use anyhow::Context;
+use arrow::datatypes::DataType;
 use differential_dataflow::difference::Semigroup;
 use differential_dataflow::lattice::Lattice;
 use mz_ore::cast::CastFrom;
@@ -402,15 +404,98 @@ impl<K: Codec, V: Codec> PartMigration<K, V> {
     }
 }
 
+/// Returns if `new` is at least as nullable as `old`.
+///
+/// Errors if `new` is less nullable than `old`, or `old` and `new` are different types or have
+/// different nested fields.
+pub(crate) fn is_atleast_as_nullable(old: &DataType, new: &DataType) -> Result<(), anyhow::Error> {
+    fn check(old: &arrow::datatypes::Field, new: &arrow::datatypes::Field) -> bool {
+        old.is_nullable() == new.is_nullable() || !old.is_nullable() && new.is_nullable()
+    }
+
+    match (old, new) {
+        (DataType::Null, DataType::Null)
+        | (DataType::Boolean, DataType::Boolean)
+        | (DataType::Int8, DataType::Int8)
+        | (DataType::Int16, DataType::Int16)
+        | (DataType::Int32, DataType::Int32)
+        | (DataType::Int64, DataType::Int64)
+        | (DataType::UInt8, DataType::UInt8)
+        | (DataType::UInt16, DataType::UInt16)
+        | (DataType::UInt32, DataType::UInt32)
+        | (DataType::UInt64, DataType::UInt64)
+        | (DataType::Float16, DataType::Float16)
+        | (DataType::Float32, DataType::Float32)
+        | (DataType::Float64, DataType::Float64)
+        | (DataType::Binary, DataType::Binary)
+        | (DataType::Utf8, DataType::Utf8) => Ok(()),
+        (DataType::FixedSizeBinary(old_size), DataType::FixedSizeBinary(new_size))
+            if old_size == new_size =>
+        {
+            Ok(())
+        }
+        (DataType::List(old_field), DataType::List(new_field))
+        | (DataType::Map(old_field, _), DataType::Map(new_field, _)) => {
+            if !check(old_field, new_field) {
+                anyhow::bail!("'{}' is now less nullable", old_field.name());
+            }
+            // Recurse into our children and bail early if one fails.
+            let child_result = is_atleast_as_nullable(old_field.data_type(), new_field.data_type())
+                .with_context(|| format!("'{}'", old_field.name()));
+            if let Err(e) = child_result {
+                return Err(e);
+            }
+            Ok(())
+        }
+        (DataType::Struct(old_fields), DataType::Struct(new_fields)) => {
+            if old_fields.len() != new_fields.len() {
+                anyhow::bail!(
+                    "wrong number of fields, old: {}, new: {}",
+                    old_fields.len(),
+                    new_fields.len()
+                )
+            }
+
+            // Note: This nested loop approach is O(n^2), but we expect the number of fields to be
+            // relatively small, and it avoid allocations, so we consciously use this approach.
+            for new_field in new_fields {
+                let old_field = old_fields
+                    .iter()
+                    .find(|old| old.name() == new_field.name())
+                    .ok_or_else(|| anyhow::anyhow!("missing field '{}'", new_field.name()))?;
+
+                if !check(old_field, new_field) {
+                    anyhow::bail!("'{}' is now less nullable", old_field.name());
+                }
+
+                // Recurse into our children and bail early if one fails.
+                let child_result =
+                    is_atleast_as_nullable(old_field.data_type(), new_field.data_type())
+                        .with_context(|| format!("'{}'", old_field.name()));
+                if let Err(e) = child_result {
+                    return Err(e);
+                }
+            }
+
+            Ok(())
+        }
+        (old, new) => {
+            anyhow::bail!("found unsupported or mismatched datatypes! old: {old:?}, new: {new:?}")
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use arrow::array::{
         as_string_array, Array, ArrayBuilder, StringArray, StringBuilder, StructArray,
     };
-    use arrow::datatypes::{DataType, Field};
+    use arrow::datatypes::{DataType, Field, Fields};
     use bytes::BufMut;
     use futures::StreamExt;
     use mz_dyncfg::ConfigUpdates;
+    use mz_ore::error::ErrorExt;
+    use mz_ore::{assert_contains, assert_err, assert_ok};
     use mz_persist_types::codec_impls::UnitSchema;
     use mz_persist_types::columnar::{ColumnDecoder, ColumnEncoder, Schema2};
     use mz_persist_types::stats::{NoneStats, StructStats};
@@ -765,4 +850,132 @@ mod tests {
             info_log_non_zero_metrics(&client.metrics.registry.gather());
         }
     }
+
+    #[mz_ore::test]
+    fn test_as_nullable() {
+        assert_ok!(is_atleast_as_nullable(&DataType::UInt8, &DataType::UInt8));
+        assert_err!(is_atleast_as_nullable(&DataType::UInt8, &DataType::Utf8));
+
+        let old_type = DataType::Struct(arrow::datatypes::Fields::from(vec![
+            Field::new("a", DataType::Utf8, true),
+            Field::new("b", DataType::Boolean, false),
+        ]));
+        assert_ok!(is_atleast_as_nullable(&old_type, &old_type));
+
+        let more_nullable_type = DataType::Struct(arrow::datatypes::Fields::from(vec![
+            Field::new("a", DataType::Utf8, true),
+            Field::new("b", DataType::Boolean, true),
+        ]));
+        assert_ok!(is_atleast_as_nullable(&old_type, &more_nullable_type));
+
+        let less_nullable_type = DataType::Struct(arrow::datatypes::Fields::from(vec![
+            Field::new("a", DataType::Utf8, false),
+            Field::new("b", DataType::Boolean, false),
+        ]));
+        assert_err!(is_atleast_as_nullable(&old_type, &less_nullable_type));
+
+        let different_fields = DataType::Struct(arrow::datatypes::Fields::from(vec![
+            Field::new("foobar", DataType::Utf8, true),
+            Field::new("b", DataType::Boolean, true),
+        ]));
+        assert_err!(is_atleast_as_nullable(&old_type, &different_fields));
+
+        let different_number_of_fields = DataType::Struct(arrow::datatypes::Fields::from(vec![
+            Field::new("a", DataType::Utf8, true),
+            Field::new("b", DataType::Boolean, true),
+            Field::new("c", DataType::UInt64, true),
+        ]));
+        assert_err!(is_atleast_as_nullable(
+            &old_type,
+            &different_number_of_fields
+        ));
+
+        let out_of_order_fields = DataType::Struct(arrow::datatypes::Fields::from(vec![
+            Field::new("b", DataType::Boolean, false),
+            Field::new("a", DataType::Utf8, true),
+        ]));
+        assert_ok!(is_atleast_as_nullable(&old_type, &out_of_order_fields));
+    }
+
+    #[mz_ore::test]
+    fn test_as_nullable_deeply_nested() {
+        let old_type = DataType::Struct(Fields::from(vec![
+            Field::new(
+                "k",
+                DataType::Struct(Fields::from(vec![
+                    Field::new("event_type", DataType::Utf8, false),
+                    Field::new(
+                        "details",
+                        DataType::Struct(Fields::from(vec![
+                            Field::new("name", DataType::Utf8, false),
+                            Field::new("new_user", DataType::Boolean, false),
+                            Field::new("plan", DataType::Utf8, true),
+                        ])),
+                        false,
+                    ),
+                    Field::new("event_ts", DataType::UInt64, true),
+                ])),
+                true,
+            ),
+            Field::new("v", DataType::Boolean, false),
+            Field::new("t", DataType::UInt64, false),
+            Field::new("d", DataType::Int64, false),
+        ]));
+        assert_ok!(is_atleast_as_nullable(&old_type, &old_type));
+
+        let more_nullable_type = DataType::Struct(Fields::from(vec![
+            Field::new(
+                "k",
+                DataType::Struct(Fields::from(vec![
+                    Field::new("event_type", DataType::Utf8, false),
+                    Field::new(
+                        "details",
+                        DataType::Struct(Fields::from(vec![
+                            Field::new("name", DataType::Utf8, false),
+                            // More nullable than old_type.
+                            Field::new("new_user", DataType::Boolean, true),
+                            Field::new("plan", DataType::Utf8, true),
+                        ])),
+                        false,
+                    ),
+                    Field::new("event_ts", DataType::UInt64, true),
+                ])),
+                true,
+            ),
+            Field::new("v", DataType::Boolean, false),
+            Field::new("t", DataType::UInt64, false),
+            Field::new("d", DataType::Int64, false),
+        ]));
+        assert_ok!(is_atleast_as_nullable(&old_type, &more_nullable_type));
+
+        let less_nullable_type = DataType::Struct(Fields::from(vec![
+            Field::new(
+                "k",
+                DataType::Struct(Fields::from(vec![
+                    Field::new("event_type", DataType::Utf8, false),
+                    Field::new(
+                        "details",
+                        DataType::Struct(Fields::from(vec![
+                            Field::new("name", DataType::Utf8, false),
+                            Field::new("new_user", DataType::Boolean, false),
+                            // Less nullable than old type.
+                            Field::new("plan", DataType::Utf8, false),
+                        ])),
+                        false,
+                    ),
+                    Field::new("event_ts", DataType::UInt64, true),
+                ])),
+                true,
+            ),
+            Field::new("v", DataType::Boolean, false),
+            Field::new("t", DataType::UInt64, false),
+            Field::new("d", DataType::Int64, false),
+        ]));
+        let result = is_atleast_as_nullable(&old_type, &less_nullable_type);
+        assert_err!(result);
+        assert_contains!(
+            result.unwrap_err().to_string_with_causes(),
+            "'k': 'details': 'plan' is now less nullable"
+        );
+    }
 }
diff --git a/src/persist-types/src/arrow.rs b/src/persist-types/src/arrow.rs
index be85a958a2145..e6cbd9f34078e 100644
--- a/src/persist-types/src/arrow.rs
+++ b/src/persist-types/src/arrow.rs
@@ -39,7 +39,7 @@ mod proto {
     include!(concat!(env!("OUT_DIR"), "/mz_persist_types.arrow.rs"));
 }
 use crate::arrow::proto::data_type;
-pub use proto::ProtoArrayData;
+pub use proto::{DataType as ProtoDataType, ProtoArrayData};
 
 /// Extract the list of fields for our recursive datatypes.
 pub fn fields_for_type(data_type: &DataType) -> &[FieldRef] {

From fd694e5adf2cdb17b1f66fae8b9e688008761028 Mon Sep 17 00:00:00 2001
From: Parker Timmerman <parker.timmerman@materialize.com>
Date: Fri, 25 Oct 2024 14:24:57 -0400
Subject: [PATCH 3/9] deprecate all existing schema IDs in Part and Run
 metadata

---
 misc/python/materialize/mzcompose/__init__.py |  3 ++
 .../materialize/parallel_workload/action.py   |  1 +
 src/buf.yaml                                  |  2 +
 src/persist-client/src/batch.rs               | 49 +++++++++++++++++--
 src/persist-client/src/cfg.rs                 |  1 +
 src/persist-client/src/internal/datadriven.rs |  1 +
 src/persist-client/src/internal/encoding.rs   | 12 +++++
 src/persist-client/src/internal/state.proto   | 12 +++--
 src/persist-client/src/internal/state.rs      | 25 +++++++++-
 src/persist-client/src/iter.rs                |  1 +
 src/persist-client/src/project.rs             |  1 +
 src/persist-proc/src/lib.rs                   |  5 ++
 12 files changed, 105 insertions(+), 8 deletions(-)

diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py
index 53d7b3048ac08..b0ec3b625f5ce 100644
--- a/misc/python/materialize/mzcompose/__init__.py
+++ b/misc/python/materialize/mzcompose/__init__.py
@@ -115,6 +115,9 @@ def get_default_system_parameters(
         "enable_table_keys": "true",
         "enable_variadic_left_join_lowering": "true",
         "enable_worker_core_affinity": "true",
+        "persist_record_schema_id": (
+            "true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"
+        ),
         "persist_batch_columnar_format": "both_v2",
         "persist_batch_columnar_format_percent": "100",
         "persist_batch_delete_enabled": "true",
diff --git a/misc/python/materialize/parallel_workload/action.py b/misc/python/materialize/parallel_workload/action.py
index d892875e46dbb..8824a081d4d05 100644
--- a/misc/python/materialize/parallel_workload/action.py
+++ b/misc/python/materialize/parallel_workload/action.py
@@ -1024,6 +1024,7 @@ def __init__(
             "75",
             "100",
         ]
+        self.flags_with_values["persist_record_schema_id"] = BOOLEAN_FLAG_VALUES
         self.flags_with_values["persist_batch_structured_order"] = BOOLEAN_FLAG_VALUES
         self.flags_with_values["persist_batch_structured_key_lower_len"] = [
             "0",
diff --git a/src/buf.yaml b/src/buf.yaml
index d50cb186c1802..15cfabc43123e 100644
--- a/src/buf.yaml
+++ b/src/buf.yaml
@@ -53,6 +53,8 @@ breaking:
     - compute-types/src/sinks.proto
     # reason: Ignore because plans are currently not persisted.
     - expr/src/relation.proto
+    # reason: we very carefully evolve these protobuf definitions
+    - persist-client/src/internal/state.proto
     # reason: does currently not require backward-compatibility
     - storage-client/src/client.proto
     # reason: does currently not require backward-compatibility
diff --git a/src/persist-client/src/batch.rs b/src/persist-client/src/batch.rs
index 36c388cbde285..d528b8e241402 100644
--- a/src/persist-client/src/batch.rs
+++ b/src/persist-client/src/batch.rs
@@ -233,6 +233,7 @@ where
                         updates,
                         ts_rewrite,
                         schema_id,
+                        deprecated_schema_id: _,
                     }) => (updates, ts_rewrite, schema_id),
                     other @ RunPart::Many(_) | other @ RunPart::Single(BatchPart::Hollow(_)) => {
                         parts.push(other.clone());
@@ -356,6 +357,7 @@ pub struct BatchBuilderConfig {
     pub(crate) write_diffs_sum: bool,
     pub(crate) encoding_config: EncodingConfig,
     pub(crate) preferred_order: RunOrder,
+    pub(crate) record_schema_id: bool,
     pub(crate) structured_key_lower_len: usize,
     pub(crate) run_length_limit: usize,
     /// The number of runs to cap the built batch at, or None if we should
@@ -395,6 +397,12 @@ pub(crate) const ENCODING_COMPRESSION_FORMAT: Config<&'static str> = Config::new
     "A feature flag to enable compression of Parquet data (Materialize).",
 );
 
+pub(crate) const RECORD_SCHEMA_ID: Config<bool> = Config::new(
+    "persist_record_schema_id",
+    false,
+    "If set, record the ID for the shard's schema in Part and Run metadata (Materialize).",
+);
+
 pub(crate) const STRUCTURED_ORDER: Config<bool> = Config::new(
     "persist_batch_structured_order",
     false,
@@ -467,6 +475,7 @@ impl BatchBuilderConfig {
             BatchColumnarFormat::from_str(&BATCH_COLUMNAR_FORMAT.get(value));
         let batch_columnar_format_percent = BATCH_COLUMNAR_FORMAT_PERCENT.get(value);
 
+        let record_schema_id = RECORD_SCHEMA_ID.get(value);
         let structured_order = STRUCTURED_ORDER.get(value) && {
             shard_id.to_string() < STRUCTURED_ORDER_UNTIL_SHARD.get(value)
         };
@@ -493,6 +502,7 @@ impl BatchBuilderConfig {
                 compression: CompressionFormat::from_str(&ENCODING_COMPRESSION_FORMAT.get(value)),
             },
             preferred_order,
+            record_schema_id,
             structured_key_lower_len: STRUCTURED_KEY_LOWER_LEN.get(value),
             run_length_limit: MAX_RUN_LEN.get(value).clamp(2, usize::MAX),
             max_runs: match MAX_RUNS.get(value) {
@@ -737,6 +747,13 @@ where
 
         let batch_delete_enabled = self.parts.cfg.batch_delete_enabled;
         let shard_metrics = Arc::clone(&self.parts.shard_metrics);
+        // If we haven't switched over to the new schema_id field yet, keep writing the old one.
+        let (new_schema_id, deprecated_schema_id) = if self.parts.cfg.record_schema_id {
+            (self.write_schemas.id, None)
+        } else {
+            (None, self.write_schemas.id)
+        };
+
         let runs = self.parts.finish().await;
 
         let mut run_parts = vec![];
@@ -751,7 +768,8 @@ where
             }
             run_meta.push(RunMeta {
                 order: Some(order),
-                schema: self.write_schemas.id,
+                schema: new_schema_id,
+                deprecated_schema: deprecated_schema_id,
             });
             run_parts.extend(parts);
         }
@@ -1023,12 +1041,19 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
                 let handle = mz_ore::task::spawn(
                     || "batch::compact_runs",
                     async move {
+                        // If we haven't switched over to the new schema_id field yet, keep writing the old one.
+                        let (new_schema_id, deprecated_schema_id) = if cfg.batch.record_schema_id {
+                            (schemas.id, None)
+                        } else {
+                            (None, schemas.id)
+                        };
                         let runs: Vec<_> = stream::iter(parts)
                             .then(|(order, parts)| async move {
                                 (
                                     RunMeta {
                                         order: Some(order),
-                                        schema: schemas.id,
+                                        schema: new_schema_id,
+                                        deprecated_schema: deprecated_schema_id,
                                     },
                                     parts.into_result().await,
                                 )
@@ -1171,6 +1196,7 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
         // Decide this once per part and plumb it around as necessary so that we
         // use a consistent answer for things like inline threshold.
         let part_write_columnar_data = self.cfg.part_write_columnar_data();
+        let record_schema_id = self.cfg.record_schema_id;
 
         // If we're going to encode structured data then halve our limit since we're storing
         // it twice, once as binary encoded and once as structured.
@@ -1215,10 +1241,18 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
                     batch_metrics
                         .step_inline
                         .inc_by(start.elapsed().as_secs_f64());
+                    // If we haven't switched over to the new schema_id field yet, keep writing the old one.
+                    let (new_schema_id, deprecated_schema_id) = if record_schema_id {
+                        (schema_id, None)
+                    } else {
+                        (None, schema_id)
+                    };
+
                     RunPart::Single(BatchPart::Inline {
                         updates,
                         ts_rewrite,
-                        schema_id,
+                        schema_id: new_schema_id,
+                        deprecated_schema_id,
                     })
                 }
                 .instrument(span)
@@ -1445,6 +1479,12 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
             }
             stats
         });
+        // If we haven't switched over to the new schema_id field yet, keep writing the old one.
+        let (new_schema_id, deprecated_schema_id) = if cfg.record_schema_id {
+            (schema_id, None)
+        } else {
+            (None, schema_id)
+        };
 
         BatchPart::Hollow(HollowBatchPart {
             key: partial_key,
@@ -1455,7 +1495,8 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
             ts_rewrite,
             diffs_sum: cfg.write_diffs_sum.then_some(diffs_sum),
             format: Some(cfg.batch_columnar_format),
-            schema_id,
+            schema_id: new_schema_id,
+            deprecated_schema_id,
         })
     }
 
diff --git a/src/persist-client/src/cfg.rs b/src/persist-client/src/cfg.rs
index f57eb126311e8..d1cfac06573c2 100644
--- a/src/persist-client/src/cfg.rs
+++ b/src/persist-client/src/cfg.rs
@@ -315,6 +315,7 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
         .add(&crate::batch::INLINE_WRITES_SINGLE_MAX_BYTES)
         .add(&crate::batch::ENCODING_ENABLE_DICTIONARY)
         .add(&crate::batch::ENCODING_COMPRESSION_FORMAT)
+        .add(&crate::batch::RECORD_SCHEMA_ID)
         .add(&crate::batch::STRUCTURED_ORDER)
         .add(&crate::batch::STRUCTURED_ORDER_UNTIL_SHARD)
         .add(&crate::batch::STRUCTURED_KEY_LOWER_LEN)
diff --git a/src/persist-client/src/internal/datadriven.rs b/src/persist-client/src/internal/datadriven.rs
index 1fe52d34b6276..454b46717cf94 100644
--- a/src/persist-client/src/internal/datadriven.rs
+++ b/src/persist-client/src/internal/datadriven.rs
@@ -112,6 +112,7 @@ impl<'a> DirectiveArgs<'a> {
                         diffs_sum: None,
                         format: None,
                         schema_id: None,
+                        deprecated_schema_id: None,
                     }))
                 })
                 .collect(),
diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs
index bf8fca490951b..eb7f8d23fdafa 100644
--- a/src/persist-client/src/internal/encoding.rs
+++ b/src/persist-client/src/internal/encoding.rs
@@ -1333,6 +1333,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatch> for HollowBatch<T> {
                 diffs_sum: None,
                 format: None,
                 schema_id: None,
+                deprecated_schema_id: None,
             }))
         }));
         // We discard default metadatas from the proto above; re-add them here.
@@ -1365,6 +1366,7 @@ impl RustType<ProtoRunMeta> for RunMeta {
         ProtoRunMeta {
             order: order.into(),
             schema_id: self.schema.into_proto(),
+            deprecated_schema_id: self.deprecated_schema.into_proto(),
         }
     }
 
@@ -1378,6 +1380,7 @@ impl RustType<ProtoRunMeta> for RunMeta {
         Ok(Self {
             order,
             schema: proto.schema_id.into_rust()?,
+            deprecated_schema: proto.deprecated_schema_id.into_rust()?,
         })
     }
 }
@@ -1415,6 +1418,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for HollowRunRef<T>
             format: None,
             schema_id: None,
             structured_key_lower: None,
+            deprecated_schema_id: None,
         };
         part
     }
@@ -1450,11 +1454,13 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
                 diffs_sum: x.diffs_sum.as_ref().map(|x| i64::from_le_bytes(*x)),
                 format: x.format.map(|f| f.into_proto()),
                 schema_id: x.schema_id.into_proto(),
+                deprecated_schema_id: x.deprecated_schema_id.into_proto(),
             },
             BatchPart::Inline {
                 updates,
                 ts_rewrite,
                 schema_id,
+                deprecated_schema_id,
             } => ProtoHollowBatchPart {
                 kind: Some(proto_hollow_batch_part::Kind::Inline(updates.into_proto())),
                 encoded_size_bytes: 0,
@@ -1465,6 +1471,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
                 diffs_sum: None,
                 format: None,
                 schema_id: schema_id.into_proto(),
+                deprecated_schema_id: deprecated_schema_id.into_proto(),
             },
         }
     }
@@ -1475,6 +1482,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
             None => None,
         };
         let schema_id = proto.schema_id.into_rust()?;
+        let deprecated_schema_id = proto.deprecated_schema_id.into_rust()?;
         match proto.kind {
             Some(proto_hollow_batch_part::Kind::Key(key)) => {
                 Ok(BatchPart::Hollow(HollowBatchPart {
@@ -1487,6 +1495,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
                     diffs_sum: proto.diffs_sum.map(i64::to_le_bytes),
                     format: proto.format.map(|f| f.into_rust()).transpose()?,
                     schema_id,
+                    deprecated_schema_id,
                 }))
             }
             Some(proto_hollow_batch_part::Kind::Inline(x)) => {
@@ -1499,6 +1508,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
                     updates,
                     ts_rewrite,
                     schema_id,
+                    deprecated_schema_id,
                 })
             }
             _ => Err(TryFromProtoError::unknown_enum_variant(
@@ -1821,6 +1831,7 @@ mod tests {
                 diffs_sum: None,
                 format: None,
                 schema_id: None,
+                deprecated_schema_id: None,
             }))],
             4,
         );
@@ -1847,6 +1858,7 @@ mod tests {
                 diffs_sum: None,
                 format: None,
                 schema_id: None,
+                deprecated_schema_id: None,
             })));
         assert_eq!(<HollowBatch<u64>>::from_proto(old).unwrap(), expected);
     }
diff --git a/src/persist-client/src/internal/state.proto b/src/persist-client/src/internal/state.proto
index 73f175a1bd6e6..0bbc8e52b0753 100644
--- a/src/persist-client/src/internal/state.proto
+++ b/src/persist-client/src/internal/state.proto
@@ -12,6 +12,8 @@
 // making your Proto changes in a release before you populate non-default values,
 // or guard the code changes behind a feature flag.
 
+// buf breaking: ignore (we very carefully evolve these protobuf definitions)
+
 syntax = "proto3";
 
 package mz_persist_client.internal.state;
@@ -55,11 +57,13 @@ message ProtoHollowBatchPart {
     google.protobuf.Empty row = 7;
     uint64 row_and_columnar = 8;
   }
-  optional uint64 schema_id = 9;
+  optional uint64 schema_id = 12;
 
   optional bytes key_stats = 536870906;
-
+  
   reserved 536870907 to 536870911;
+
+  optional uint64 deprecated_schema_id = 9;
 }
 
 message ProtoInlineBatchPart {
@@ -83,7 +87,9 @@ enum ProtoRunOrder {
 // Data that should be common across all parts in a run.
 message ProtoRunMeta {
   ProtoRunOrder order = 1;
-  optional uint64 schema_id = 2;
+  optional uint64 schema_id = 3;
+
+  optional uint64 deprecated_schema_id = 2;
 }
 
 message ProtoHollowRun {
diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs
index dd1988c602532..60da56a5ce332 100644
--- a/src/persist-client/src/internal/state.rs
+++ b/src/persist-client/src/internal/state.rs
@@ -208,6 +208,9 @@ pub enum BatchPart<T> {
         updates: LazyInlineBatchPart,
         ts_rewrite: Option<Antichain<T>>,
         schema_id: Option<SchemaId>,
+
+        /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
+        deprecated_schema_id: Option<SchemaId>,
     },
 }
 
@@ -603,21 +606,25 @@ impl<T: Ord> Ord for BatchPart<T> {
                     updates: s_updates,
                     ts_rewrite: s_ts_rewrite,
                     schema_id: s_schema_id,
+                    deprecated_schema_id: s_deprecated_schema_id,
                 },
                 BatchPart::Inline {
                     updates: o_updates,
                     ts_rewrite: o_ts_rewrite,
                     schema_id: o_schema_id,
+                    deprecated_schema_id: o_deprecated_schema_id,
                 },
             ) => (
                 s_updates,
                 s_ts_rewrite.as_ref().map(|x| x.elements()),
                 s_schema_id,
+                s_deprecated_schema_id,
             )
                 .cmp(&(
                     o_updates,
                     o_ts_rewrite.as_ref().map(|x| x.elements()),
                     o_schema_id,
+                    o_deprecated_schema_id,
                 )),
             (BatchPart::Hollow(_), BatchPart::Inline { .. }) => Ordering::Less,
             (BatchPart::Inline { .. }, BatchPart::Hollow(_)) => Ordering::Greater,
@@ -643,6 +650,9 @@ pub struct RunMeta {
     pub(crate) order: Option<RunOrder>,
     /// All parts in a run should have the same schema.
     pub(crate) schema: Option<SchemaId>,
+
+    /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
+    pub(crate) deprecated_schema: Option<SchemaId>,
 }
 
 /// A subset of a [HollowBatch] corresponding 1:1 to a blob.
@@ -689,6 +699,9 @@ pub struct HollowBatchPart<T> {
     /// Or None for historical data written before the schema registry was
     /// added.
     pub schema_id: Option<SchemaId>,
+
+    /// ID of a schema that has since been deprecated and exists only to cleanly roundtrip.
+    pub deprecated_schema_id: Option<SchemaId>,
 }
 
 /// A [Batch] but with the updates themselves stored externally.
@@ -1020,6 +1033,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
             diffs_sum: self_diffs_sum,
             format: self_format,
             schema_id: self_schema_id,
+            deprecated_schema_id: self_deprecated_schema_id,
         } = self;
         let HollowBatchPart {
             key: other_key,
@@ -1031,6 +1045,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
             diffs_sum: other_diffs_sum,
             format: other_format,
             schema_id: other_schema_id,
+            deprecated_schema_id: other_deprecated_schema_id,
         } = other;
         (
             self_key,
@@ -1042,6 +1057,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
             self_diffs_sum,
             self_format,
             self_schema_id,
+            self_deprecated_schema_id,
         )
             .cmp(&(
                 other_key,
@@ -1053,6 +1069,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
                 other_diffs_sum,
                 other_format,
                 other_schema_id,
+                other_deprecated_schema_id,
             ))
     }
 }
@@ -2639,8 +2656,9 @@ pub(crate) mod tests {
                 any_hollow_batch_part(),
                 any::<Option<T>>(),
                 any::<Option<SchemaId>>(),
+                any::<Option<SchemaId>>(),
             ),
-            |(is_hollow, hollow, ts_rewrite, schema_id)| {
+            |(is_hollow, hollow, ts_rewrite, schema_id, deprecated_schema_id)| {
                 if is_hollow {
                     BatchPart::Hollow(hollow)
                 } else {
@@ -2650,6 +2668,7 @@ pub(crate) mod tests {
                         updates,
                         ts_rewrite,
                         schema_id,
+                        deprecated_schema_id,
                     }
                 }
             },
@@ -2672,6 +2691,7 @@ pub(crate) mod tests {
                 any::<[u8; 8]>(),
                 any::<Option<BatchColumnarFormat>>(),
                 any::<Option<SchemaId>>(),
+                any::<Option<SchemaId>>(),
             ),
             |(
                 key,
@@ -2682,6 +2702,7 @@ pub(crate) mod tests {
                 diffs_sum,
                 format,
                 schema_id,
+                deprecated_schema_id,
             )| {
                 HollowBatchPart {
                     key,
@@ -2693,6 +2714,7 @@ pub(crate) mod tests {
                     diffs_sum: Some(diffs_sum),
                     format,
                     schema_id,
+                    deprecated_schema_id,
                 }
             },
         )
@@ -2866,6 +2888,7 @@ pub(crate) mod tests {
                         diffs_sum: None,
                         format: None,
                         schema_id: None,
+                        deprecated_schema_id: None,
                     }))
                 })
                 .collect(),
diff --git a/src/persist-client/src/iter.rs b/src/persist-client/src/iter.rs
index 4443bdd002ac9..4c3924abdfe64 100644
--- a/src/persist-client/src/iter.rs
+++ b/src/persist-client/src/iter.rs
@@ -1265,6 +1265,7 @@ mod tests {
                             diffs_sum: None,
                             format: None,
                             schema_id: None,
+                            deprecated_schema_id: None,
                         }))
                     })
                     .collect();
diff --git a/src/persist-client/src/project.rs b/src/persist-client/src/project.rs
index 58315a399f7b1..d588b63a4ef8d 100644
--- a/src/persist-client/src/project.rs
+++ b/src/persist-client/src/project.rs
@@ -160,6 +160,7 @@ impl ProjectionPushdown {
             updates: faked_data,
             ts_rewrite: None,
             schema_id: None,
+            deprecated_schema_id: None,
         })
     }
 }
diff --git a/src/persist-proc/src/lib.rs b/src/persist-proc/src/lib.rs
index a18dcf8445461..e063a05b3e488 100644
--- a/src/persist-proc/src/lib.rs
+++ b/src/persist-proc/src/lib.rs
@@ -95,6 +95,11 @@ fn test_impl(attr: TokenStream, item: TokenStream) -> TokenStream {
                     x.add_dynamic("persist_claim_unclaimed_compactions", ::mz_dyncfg::ConfigVal::Bool(true));
                     x
                 },
+                {
+                    let mut x = ::mz_dyncfg::ConfigUpdates::default();
+                    x.add_dynamic("persist_record_schema_id", ::mz_dyncfg::ConfigVal::Bool(true));
+                    x
+                },
                 {
                     let mut x = ::mz_dyncfg::ConfigUpdates::default();
                     x.add_dynamic("persist_batch_columnar_format", ::mz_dyncfg::ConfigVal::String("both_v2".into()));

From ae3dd111e19cbeb6caa4f4da9defad693e683c58 Mon Sep 17 00:00:00 2001
From: Parker Timmerman <parker.timmerman@materialize.com>
Date: Fri, 25 Oct 2024 16:22:02 -0400
Subject: [PATCH 4/9] during bootstrapping of the coordinator, evolve the
 schema for Materialized Views and Continual Tasks, if need be

---
 src/adapter/src/coord.rs             | 13 ++++++
 src/adapter/src/util.rs              |  2 +
 src/persist-client/src/cfg.rs        |  1 -
 src/persist-client/src/lib.rs        | 16 +------
 src/persist-client/src/schema.rs     | 10 ++---
 src/repr/src/lib.rs                  |  2 +-
 src/storage-client/src/controller.rs | 12 +++++
 src/storage-controller/src/lib.rs    | 67 ++++++++++++++++++++++++++++
 src/storage-types/src/controller.rs  | 31 +++++++++++++
 9 files changed, 130 insertions(+), 24 deletions(-)

diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs
index 92935ff0a2705..7241e5926c592 100644
--- a/src/adapter/src/coord.rs
+++ b/src/adapter/src/coord.rs
@@ -2601,6 +2601,7 @@ impl Coordinator {
                 }
             };
 
+        let mut compute_collections = vec![];
         let mut collections = vec![];
         let mut new_builtin_continual_tasks = vec![];
         for entry in catalog.entries() {
@@ -2640,6 +2641,7 @@ impl Coordinator {
                         status_collection_id: None,
                         timeline: None,
                     };
+                    compute_collections.push((mv.global_id(), mv.desc.clone()));
                     collections.push((mv.global_id(), collection_desc));
                 }
                 CatalogItem::ContinualTask(ct) => {
@@ -2656,6 +2658,7 @@ impl Coordinator {
                         // `create_collections_for_bootstrap`.
                         new_builtin_continual_tasks.push((ct.global_id(), collection_desc));
                     } else {
+                        compute_collections.push((ct.global_id(), ct.desc.clone()));
                         collections.push((ct.global_id(), collection_desc));
                     }
                 }
@@ -2677,6 +2680,16 @@ impl Coordinator {
             .flat_map(|item_id| self.catalog.get_entry(item_id).global_ids())
             .collect();
 
+        // Before possibly creating collections, make sure their schemas are correct.
+        //
+        // Across different versions of Materialize the nullability of columns can change based on
+        // updates to our optimizer.
+        self.controller
+            .storage
+            .evolve_nullability_for_bootstrap(storage_metadata, compute_collections)
+            .await
+            .unwrap_or_terminate("cannot fail to evolve collections");
+
         self.controller
             .storage
             .create_collections_for_bootstrap(
diff --git a/src/adapter/src/util.rs b/src/adapter/src/util.rs
index b71dba454d2d4..b48665327494b 100644
--- a/src/adapter/src/util.rs
+++ b/src/adapter/src/util.rs
@@ -366,6 +366,8 @@ impl<T> ShouldTerminateGracefully for StorageError<T> {
             StorageError::ResourceExhausted(_)
             | StorageError::CollectionMetadataAlreadyExists(_)
             | StorageError::PersistShardAlreadyInUse(_)
+            | StorageError::PersistSchemaEvolveRace { .. }
+            | StorageError::PersistInvalidSchemaEvolve { .. }
             | StorageError::TxnWalShardAlreadyExists
             | StorageError::UpdateBeyondUpper(_)
             | StorageError::ReadBeforeSince(_)
diff --git a/src/persist-client/src/cfg.rs b/src/persist-client/src/cfg.rs
index d1cfac06573c2..5643c65307f77 100644
--- a/src/persist-client/src/cfg.rs
+++ b/src/persist-client/src/cfg.rs
@@ -373,7 +373,6 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
         .add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_PREFIX)
         .add(&crate::stats::STATS_UNTRIMMABLE_COLUMNS_SUFFIX)
         .add(&crate::fetch::PART_DECODE_FORMAT)
-        .add(&crate::DANGEROUS_ENABLE_SCHEMA_EVOLUTION)
 }
 
 impl PersistConfig {
diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs
index 61383cc5f263c..a0b0108edb29a 100644
--- a/src/persist-client/src/lib.rs
+++ b/src/persist-client/src/lib.rs
@@ -22,7 +22,7 @@ use std::sync::Arc;
 use differential_dataflow::difference::Semigroup;
 use differential_dataflow::lattice::Lattice;
 use mz_build_info::{build_info, BuildInfo};
-use mz_dyncfg::{Config, ConfigSet};
+use mz_dyncfg::ConfigSet;
 use mz_ore::{instrument, soft_assert_or_log};
 use mz_persist::location::{Blob, Consensus, ExternalError};
 use mz_persist_types::schema::SchemaId;
@@ -119,16 +119,6 @@ mod internal {
 /// Persist build information.
 pub const BUILD_INFO: BuildInfo = build_info!();
 
-pub(crate) const DANGEROUS_ENABLE_SCHEMA_EVOLUTION: Config<bool> = Config::new(
-    "persist_dangerous_enable_schema_evolution",
-    false,
-    "\
-DANGEROUS DO NOT ENABLE IN PRODUCTION OR STAGING ENVIRONMENTS!
-
-Enable evolving the schema of a Persist shard. Currently dangerous because \
-compaction does not yet handle batches of data with different schemas.",
-);
-
 // Re-export for convenience.
 pub use mz_persist_types::{PersistLocation, ShardId};
 
@@ -611,10 +601,6 @@ impl PersistClient {
         T: Timestamp + Lattice + Codec64 + Sync,
         D: Semigroup + Codec64 + Send + Sync,
     {
-        if !DANGEROUS_ENABLE_SCHEMA_EVOLUTION.get(&self.cfg.configs) {
-            panic!("tried to evolve the schema of a Persist shard without the feature enabled");
-        }
-
         let machine = self
             .make_machine::<K, V, T, D>(shard_id, diagnostics)
             .await?;
diff --git a/src/persist-client/src/schema.rs b/src/persist-client/src/schema.rs
index a8929c573b330..e9334af69deca 100644
--- a/src/persist-client/src/schema.rs
+++ b/src/persist-client/src/schema.rs
@@ -505,7 +505,7 @@ mod tests {
     use crate::cli::admin::info_log_non_zero_metrics;
     use crate::read::ReadHandle;
     use crate::tests::new_test_client;
-    use crate::{Diagnostics, DANGEROUS_ENABLE_SCHEMA_EVOLUTION};
+    use crate::Diagnostics;
 
     use super::*;
 
@@ -653,9 +653,7 @@ mod tests {
 
     #[mz_persist_proc::test(tokio::test)]
     #[cfg_attr(miri, ignore)]
-    async fn compare_and_evolve_schema(mut dyncfgs: ConfigUpdates) {
-        dyncfgs.add(&DANGEROUS_ENABLE_SCHEMA_EVOLUTION, true);
-
+    async fn compare_and_evolve_schema(dyncfgs: ConfigUpdates) {
         let client = new_test_client(&dyncfgs).await;
         let d = Diagnostics::for_tests();
         let shard_id = ShardId::new();
@@ -742,9 +740,7 @@ mod tests {
 
     #[mz_persist_proc::test(tokio::test)]
     #[cfg_attr(miri, ignore)]
-    async fn schema_evolution(mut dyncfgs: ConfigUpdates) {
-        dyncfgs.add(&DANGEROUS_ENABLE_SCHEMA_EVOLUTION, true);
-
+    async fn schema_evolution(dyncfgs: ConfigUpdates) {
         async fn snap_streaming(
             as_of: u64,
             read: &mut ReadHandle<Strings, (), u64, i64>,
diff --git a/src/repr/src/lib.rs b/src/repr/src/lib.rs
index 3311395cff298..c6c9d5b9834f0 100644
--- a/src/repr/src/lib.rs
+++ b/src/repr/src/lib.rs
@@ -56,7 +56,7 @@ pub use crate::relation::{
     arb_relation_desc_diff, arb_row_for_relation, ColumnName, ColumnType, NotNullViolation,
     PropRelationDescDiff, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
     RelationDesc, RelationDescBuilder, RelationType, RelationVersion, RelationVersionSelector,
-    VersionedRelationDesc,
+    TypeDiff, VersionedRelationDesc,
 };
 pub use crate::row::encode::{preserves_order, RowColumnarDecoder, RowColumnarEncoder};
 pub use crate::row::iter::{IntoRowIterator, RowIterator};
diff --git a/src/storage-client/src/controller.rs b/src/storage-client/src/controller.rs
index 6458e318d6530..c8d9b61973217 100644
--- a/src/storage-client/src/controller.rs
+++ b/src/storage-client/src/controller.rs
@@ -393,6 +393,18 @@ pub trait StorageController: Debug {
     /// Disconnects the storage instance from the specified replica.
     fn drop_replica(&mut self, instance_id: StorageInstanceId, replica_id: ReplicaId);
 
+    /// Across versions of Materialize the nullability of columns for some objects can change based
+    /// on updates to our optimizer.
+    ///
+    /// During bootstrap we will register these new schemas with Persist.
+    ///
+    /// See: <https://github.com/MaterializeInc/database-issues/issues/2488>
+    async fn evolve_nullability_for_bootstrap(
+        &mut self,
+        storage_metadata: &StorageMetadata,
+        collections: Vec<(GlobalId, RelationDesc)>,
+    ) -> Result<(), StorageError<Self::Timestamp>>;
+
     /// Create the sources described in the individual RunIngestionCommand commands.
     ///
     /// Each command carries the source id, the source description, and any associated metadata
diff --git a/src/storage-controller/src/lib.rs b/src/storage-controller/src/lib.rs
index b29cf4dd9d8a4..0556a702e7868 100644
--- a/src/storage-controller/src/lib.rs
+++ b/src/storage-controller/src/lib.rs
@@ -487,6 +487,73 @@ where
             .drop_replica(replica_id);
     }
 
+    async fn evolve_nullability_for_bootstrap(
+        &mut self,
+        storage_metadata: &StorageMetadata,
+        collections: Vec<(GlobalId, RelationDesc)>,
+    ) -> Result<(), StorageError<Self::Timestamp>> {
+        let persist_client = self
+            .persist
+            .open(self.persist_location.clone())
+            .await
+            .unwrap();
+
+        for (global_id, relation_desc) in collections {
+            let shard_id = storage_metadata.get_collection_shard(global_id)?;
+            let diagnostics = Diagnostics {
+                shard_name: global_id.to_string(),
+                handle_purpose: "evolve nullability for bootstrap".to_string(),
+            };
+            let latest_schema = persist_client
+                .latest_schema::<SourceData, (), T, Diff>(shard_id, diagnostics)
+                .await
+                .expect("invalid persist usage");
+            let Some((schema_id, current_schema, _)) = latest_schema else {
+                tracing::debug!(?global_id, "no schema registered");
+                continue;
+            };
+            tracing::debug!(?global_id, ?current_schema, new_schema = ?relation_desc, "migrating schema");
+
+            let diagnostics = Diagnostics {
+                shard_name: global_id.to_string(),
+                handle_purpose: "evolve nullability for bootstrap".to_string(),
+            };
+            let evolve_result = persist_client
+                .compare_and_evolve_schema::<SourceData, (), T, Diff>(
+                    shard_id,
+                    schema_id,
+                    &relation_desc,
+                    &UnitSchema,
+                    diagnostics,
+                )
+                .await
+                .expect("invalid persist usage");
+            match evolve_result {
+                CaESchema::Ok(_) => (),
+                CaESchema::ExpectedMismatch {
+                    schema_id,
+                    key,
+                    val: _,
+                } => {
+                    return Err(StorageError::PersistSchemaEvolveRace {
+                        global_id,
+                        shard_id,
+                        schema_id,
+                        relation_desc: key,
+                    });
+                }
+                CaESchema::Incompatible => {
+                    return Err(StorageError::PersistInvalidSchemaEvolve {
+                        global_id,
+                        shard_id,
+                    });
+                }
+            };
+        }
+
+        Ok(())
+    }
+
     /// Create and "execute" the described collection.
     ///
     /// "Execute" is in scare quotes because what executing a collection means
diff --git a/src/storage-types/src/controller.rs b/src/storage-types/src/controller.rs
index 47d840dd67e4b..4bde9bf9b2695 100644
--- a/src/storage-types/src/controller.rs
+++ b/src/storage-types/src/controller.rs
@@ -15,6 +15,7 @@ use itertools::Itertools;
 use mz_ore::assert_none;
 use mz_ore::url::SensitiveUrl;
 use mz_persist_types::codec_impls::UnitSchema;
+use mz_persist_types::schema::SchemaId;
 use mz_persist_types::stats::PartStats;
 use mz_persist_types::txn::{TxnsCodec, TxnsEntry};
 use mz_persist_types::{PersistLocation, ShardId};
@@ -192,6 +193,18 @@ pub enum StorageError<T> {
     CollectionMetadataAlreadyExists(GlobalId),
     /// Some other collection is already writing to this persist shard.
     PersistShardAlreadyInUse(ShardId),
+    /// Raced with some other process while trying to evolve the schema of a Persist shard.
+    PersistSchemaEvolveRace {
+        global_id: GlobalId,
+        shard_id: ShardId,
+        schema_id: SchemaId,
+        relation_desc: RelationDesc,
+    },
+    /// We tried to evolve the schema of a Persist shard in an invalid way.
+    PersistInvalidSchemaEvolve {
+        global_id: GlobalId,
+        shard_id: ShardId,
+    },
     /// Txn WAL shard already exists.
     TxnWalShardAlreadyExists,
     /// The item that a subsource refers to is unexpectedly missing from the
@@ -231,6 +244,8 @@ impl<T: Debug + Display + 'static> Error for StorageError<T> {
             Self::ShuttingDown(_) => None,
             Self::CollectionMetadataAlreadyExists(_) => None,
             Self::PersistShardAlreadyInUse(_) => None,
+            Self::PersistSchemaEvolveRace { .. } => None,
+            Self::PersistInvalidSchemaEvolve { .. } => None,
             Self::TxnWalShardAlreadyExists => None,
             Self::MissingSubsourceReference { .. } => None,
             Self::RtrTimeout(_) => None,
@@ -304,6 +319,22 @@ impl<T: fmt::Display + 'static> fmt::Display for StorageError<T> {
             Self::PersistShardAlreadyInUse(shard) => {
                 write!(f, "persist shard already in use: {shard}")
             }
+            Self::PersistSchemaEvolveRace {
+                global_id,
+                shard_id,
+                ..
+            } => {
+                write!(f, "persist raced when trying to evolve the schema of a shard: {global_id}, {shard_id}")
+            }
+            Self::PersistInvalidSchemaEvolve {
+                global_id,
+                shard_id,
+            } => {
+                write!(
+                    f,
+                    "persist shard evolved in an invalid way: {global_id}, {shard_id}"
+                )
+            }
             Self::TxnWalShardAlreadyExists => {
                 write!(f, "txn WAL already exists")
             }

From 53295d80c516f2eab13da0a35ea18e8f3e35c023 Mon Sep 17 00:00:00 2001
From: Parker Timmerman <parker.timmerman@materialize.com>
Date: Fri, 25 Oct 2024 16:52:30 -0400
Subject: [PATCH 5/9] add schema check to the catalog_upgrade check

---
 Cargo.lock                                  |  2 +
 src/adapter/src/catalog/state.rs            |  4 ++
 src/catalog-debug/BUILD.bazel               |  1 +
 src/catalog-debug/Cargo.toml                |  2 +
 src/catalog-debug/src/main.rs               | 67 +++++++++++++++++++--
 src/persist-client/src/internal/state.proto |  2 +-
 6 files changed, 72 insertions(+), 6 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index e54d927a0e0d1..51d1fb3c26fc5 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4546,6 +4546,7 @@ name = "mz-catalog-debug"
 version = "0.127.0-dev.0"
 dependencies = [
  "anyhow",
+ "arrow",
  "clap",
  "futures",
  "mz-adapter",
@@ -4555,6 +4556,7 @@ dependencies = [
  "mz-orchestrator-tracing",
  "mz-ore",
  "mz-persist-client",
+ "mz-persist-types",
  "mz-repr",
  "mz-service",
  "mz-sql",
diff --git a/src/adapter/src/catalog/state.rs b/src/adapter/src/catalog/state.rs
index 6ca5c83708126..e4de795430e3a 100644
--- a/src/adapter/src/catalog/state.rs
+++ b/src/adapter/src/catalog/state.rs
@@ -723,6 +723,10 @@ impl CatalogState {
         self.get_entry(item_id)
     }
 
+    pub fn get_entries(&self) -> impl Iterator<Item = (&GlobalId, &CatalogEntry)> + '_ {
+        self.entry_by_id.iter()
+    }
+
     pub fn get_temp_items(&self, conn: &ConnectionId) -> impl Iterator<Item = ObjectId> + '_ {
         let schema = self
             .temporary_schemas
diff --git a/src/catalog-debug/BUILD.bazel b/src/catalog-debug/BUILD.bazel
index ae0c33eefcbcd..b5dddc2684bec 100644
--- a/src/catalog-debug/BUILD.bazel
+++ b/src/catalog-debug/BUILD.bazel
@@ -38,6 +38,7 @@ rust_binary(
         "//src/orchestrator-tracing:mz_orchestrator_tracing",
         "//src/ore:mz_ore",
         "//src/persist-client:mz_persist_client",
+        "//src/persist-types:mz_persist_types",
         "//src/repr:mz_repr",
         "//src/service:mz_service",
         "//src/sql:mz_sql",
diff --git a/src/catalog-debug/Cargo.toml b/src/catalog-debug/Cargo.toml
index 50407e1ac420d..b294c8dbffdf3 100644
--- a/src/catalog-debug/Cargo.toml
+++ b/src/catalog-debug/Cargo.toml
@@ -11,6 +11,7 @@ workspace = true
 
 [dependencies]
 anyhow = "1.0.66"
+arrow = { version = "51.0.0", default-features = false }
 clap = { version = "3.2.24", features = ["derive", "env"] }
 futures = "0.3.25"
 mz-adapter = { path = "../adapter" }
@@ -21,6 +22,7 @@ mz-orchestrator-tracing = { path = "../orchestrator-tracing" }
 mz-ore = { path = "../ore" }
 mz-storage-types = { path = "../storage-types" }
 mz-persist-client = { path = "../persist-client" }
+mz-persist-types = { path = "../persist-types" }
 mz-tls-util = { path = "../tls-util" }
 mz-repr = { path = "../repr" }
 mz-service = { path = "../service" }
diff --git a/src/catalog-debug/src/main.rs b/src/catalog-debug/src/main.rs
index f4588d5888687..ac1100bcbda28 100644
--- a/src/catalog-debug/src/main.rs
+++ b/src/catalog-debug/src/main.rs
@@ -38,6 +38,7 @@ use mz_catalog::durable::debug::{
 use mz_catalog::durable::{
     persist_backed_catalog_state, BootstrapArgs, OpenableDurableCatalogState,
 };
+use mz_catalog::memory::objects::CatalogItem;
 use mz_cloud_resources::AwsExternalIdPrefix;
 use mz_orchestrator_tracing::{StaticTracingConfig, TracingCliArgs};
 use mz_ore::cli::{self, CliConfig};
@@ -49,11 +50,12 @@ use mz_ore::url::SensitiveUrl;
 use mz_persist_client::cache::PersistClientCache;
 use mz_persist_client::cfg::PersistConfig;
 use mz_persist_client::rpc::PubSubClientConnection;
-use mz_persist_client::{PersistClient, PersistLocation};
+use mz_persist_client::{Diagnostics, PersistClient, PersistLocation};
 use mz_repr::{Diff, Timestamp};
 use mz_service::secrets::SecretsReaderCliArgs;
 use mz_sql::catalog::EnvironmentId;
 use mz_storage_types::connections::ConnectionContext;
+use mz_storage_types::sources::SourceData;
 use serde::{Deserialize, Serialize};
 use tracing::{error, Instrument};
 
@@ -254,9 +256,9 @@ async fn run(args: Args) -> Result<(), anyhow::Error> {
             upgrade_check(
                 args,
                 openable_state,
+                persist_client,
                 secrets,
                 cluster_replica_sizes,
-                persist_client,
                 start,
             )
             .await
@@ -533,9 +535,9 @@ async fn epoch(
 async fn upgrade_check(
     args: Args,
     openable_state: Box<dyn OpenableDurableCatalogState>,
+    persist_client: PersistClient,
     secrets: SecretsReaderCliArgs,
     cluster_replica_sizes: ClusterReplicaSizeMap,
-    persist_client: PersistClient,
     start: Instant,
 ) -> Result<(), anyhow::Error> {
     let secrets_reader = secrets.load().await.context("loading secrets reader")?;
@@ -568,7 +570,7 @@ async fn upgrade_check(
     // get stored on the stack which is bad for runtime performance, and blow up our stack usage.
     // Because of that we purposefully move this Future onto the heap (i.e. Box it).
     let InitializeStateResult {
-        state: _state,
+        state,
         storage_collections_to_drop: _,
         migrated_storage_collections_0dt: _,
         new_builtin_collections: _,
@@ -609,7 +611,7 @@ async fn upgrade_check(
                 None,
             ),
             builtin_item_migration_config: BuiltinItemMigrationConfig::Legacy,
-            persist_client,
+            persist_client: persist_client.clone(),
             enable_expression_cache_override: None,
             enable_0dt_deployment: true,
             helm_chart_version: None,
@@ -628,6 +630,61 @@ async fn upgrade_check(
         dur.as_millis(),
     );
     println!("{msg}");
+
+    // Check that we can evolve the schema for all Persist shards.
+    let storage_entries = state
+        .get_entries()
+        .filter_map(|(_item_id, entry)| match entry.item() {
+            // TODO(alter_table): Handle multiple versions of tables.
+            CatalogItem::Table(table) => Some((table.global_id_writes(), &table.desc)),
+            CatalogItem::Source(source) => Some((source.global_id(), &source.desc)),
+            CatalogItem::ContinualTask(ct) => Some((ct.global_id(), &ct.desc)),
+            CatalogItem::MaterializedView(mv) => Some((mv.global_id(), &mv.desc)),
+            CatalogItem::Log(_)
+            | CatalogItem::View(_)
+            | CatalogItem::Sink(_)
+            | CatalogItem::Index(_)
+            | CatalogItem::Type(_)
+            | CatalogItem::Func(_)
+            | CatalogItem::Secret(_)
+            | CatalogItem::Connection(_) => None,
+        });
+    for (gid, item_desc) in storage_entries {
+        let shard_id = state
+            .storage_metadata()
+            .get_collection_shard::<Timestamp>(gid)
+            .context("getting shard_id")?;
+        let diagnostics = Diagnostics {
+            shard_name: gid.to_string(),
+            handle_purpose: "catalog upgrade check".to_string(),
+        };
+        let persisted_schema = persist_client
+            .latest_schema::<SourceData, (), Timestamp, Diff>(shard_id, diagnostics)
+            .await
+            .expect("invalid persist usage");
+        // We should always have schemas registered for Shards, unless their environment happened
+        // to crash after running DDL and hasn't come back up yet.
+        let Some((_schema_id, persisted_relation_desc, _)) = persisted_schema else {
+            anyhow::bail!("no schema found for {gid}, did their environment crash?");
+        };
+
+        let persisted_data_type =
+            mz_persist_types::columnar::data_type::<SourceData>(&persisted_relation_desc)?;
+        let new_data_type = mz_persist_types::columnar::data_type::<SourceData>(item_desc)?;
+
+        let migration =
+            mz_persist_types::schema::backward_compatible(&persisted_data_type, &new_data_type);
+        if migration.is_none() {
+            anyhow::bail!(
+                "invalid Persist schema migration!\npersisted: {:?}\n{:?}\nnew: {:?}\n{:?}",
+                persisted_relation_desc,
+                persisted_data_type,
+                item_desc,
+                new_data_type,
+            );
+        }
+    }
+
     Ok(())
 }
 
diff --git a/src/persist-client/src/internal/state.proto b/src/persist-client/src/internal/state.proto
index 0bbc8e52b0753..6064e2e3d6846 100644
--- a/src/persist-client/src/internal/state.proto
+++ b/src/persist-client/src/internal/state.proto
@@ -60,7 +60,7 @@ message ProtoHollowBatchPart {
   optional uint64 schema_id = 12;
 
   optional bytes key_stats = 536870906;
-  
+
   reserved 536870907 to 536870911;
 
   optional uint64 deprecated_schema_id = 9;

From 1f2400110459d87bec655529185b0a8040ca4f8b Mon Sep 17 00:00:00 2001
From: Parker Timmerman <parker.timmerman@materialize.com>
Date: Fri, 25 Oct 2024 19:45:15 -0400
Subject: [PATCH 6/9] regenerate JSON snapshot

---
 src/persist-client/src/internal/state.rs      |   2 +-
 .../src/internal/state_serde.json             | 309 +++++-------------
 2 files changed, 89 insertions(+), 222 deletions(-)

diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs
index 60da56a5ce332..b5a0e4ac1bf4d 100644
--- a/src/persist-client/src/internal/state.rs
+++ b/src/persist-client/src/internal/state.rs
@@ -3708,7 +3708,7 @@ pub(crate) mod tests {
     fn state_inspect_serde_json() {
         const STATE_SERDE_JSON: &str = include_str!("state_serde.json");
         let mut runner = proptest::test_runner::TestRunner::deterministic();
-        let tree = any_state::<u64>(5..6).new_tree(&mut runner).unwrap();
+        let tree = any_state::<u64>(6..8).new_tree(&mut runner).unwrap();
         let json = serde_json::to_string_pretty(&tree.current()).unwrap();
         assert_eq!(
             json.trim(),
diff --git a/src/persist-client/src/internal/state_serde.json b/src/persist-client/src/internal/state_serde.json
index b33d95ece52db..45d25e3ff77be 100644
--- a/src/persist-client/src/internal/state_serde.json
+++ b/src/persist-client/src/internal/state_serde.json
@@ -468,10 +468,10 @@
     }
   },
   "since": [
-    14091139506241686208
+    15711141615835313697
   ],
   "upper": [
-    15793198891801274032
+    17363026058961705429
   ],
   "batches": [
     {
@@ -479,19 +479,31 @@
         0
       ],
       "upper": [
-        8237757120154893742
+        9886499534079125963
       ],
       "since": [
-        7834118575855137052
+        2978141131140526309
       ],
-      "len": 7,
+      "len": 1,
       "part_runs": [
         [
           {
             "order": null,
-            "schema": null
+            "schema": null,
+            "deprecated_schema": null
           },
           [
+            {
+              "type": "Inline",
+              "updates": {
+                "desc": null,
+                "index": 0,
+                "updates[len]": 0
+              },
+              "ts_rewrite": null,
+              "schema_id": "h13283873853425631561",
+              "deprecated_schema_id": "h5803370218574812925"
+            },
             {
               "type": "Inline",
               "updates": {
@@ -501,10 +513,11 @@
               },
               "ts_rewrite": {
                 "elements": [
-                  245054358183071630
+                  15039795360329996643
                 ]
               },
-              "schema_id": "h9875232680843580888"
+              "schema_id": null,
+              "deprecated_schema_id": "h6339905006779337159"
             }
           ]
         ]
@@ -512,195 +525,60 @@
     },
     {
       "lower": [
-        8237757120154893742
+        9886499534079125963
       ],
       "upper": [
-        9886499534079125963
+        10448793807825919347
       ],
       "since": [
-        7019220731930974776
+        4395599247033876229
       ],
-      "len": 8,
+      "len": 0,
       "part_runs": []
     },
     {
       "lower": [
-        9886499534079125963
+        10448793807825919347
       ],
       "upper": [
-        10092654255918715246
+        13690082948772716747
       ],
       "since": [
-        770458890179917555
+        2529543461855689013
       ],
-      "len": 2,
-      "part_runs": [
-        [
-          {
-            "order": null,
-            "schema": null
-          },
-          [
-            {
-              "type": "Inline",
-              "updates": {
-                "desc": null,
-                "index": 0,
-                "updates[len]": 0
-              },
-              "ts_rewrite": null,
-              "schema_id": "h9518259874590116324"
-            },
-            {
-              "type": "Inline",
-              "updates": {
-                "desc": null,
-                "index": 0,
-                "updates[len]": 0
-              },
-              "ts_rewrite": {
-                "elements": [
-                  17071391453325819086
-                ]
-              },
-              "schema_id": null
-            }
-          ]
-        ]
-      ]
+      "len": 8,
+      "part_runs": []
     },
     {
       "lower": [
-        10092654255918715246
+        13690082948772716747
       ],
       "upper": [
-        11170970954576416472
+        16742627657852255773
       ],
       "since": [
-        8543663013803204529
+        15711141615835313697
       ],
-      "len": 4,
-      "part_runs": [
-        [
-          {
-            "order": null,
-            "schema": null
-          },
-          [
-            {
-              "type": "Inline",
-              "updates": {
-                "desc": null,
-                "index": 0,
-                "updates[len]": 0
-              },
-              "ts_rewrite": {
-                "elements": [
-                  15961028640594888816
-                ]
-              },
-              "schema_id": null
-            },
-            {
-              "type": "Inline",
-              "updates": {
-                "desc": null,
-                "index": 0,
-                "updates[len]": 0
-              },
-              "ts_rewrite": null,
-              "schema_id": null
-            }
-          ]
-        ]
-      ]
+      "len": 0,
+      "part_runs": []
     },
     {
       "lower": [
-        11170970954576416472
+        16742627657852255773
       ],
       "upper": [
-        15793198891801274032
+        17363026058961705429
       ],
       "since": [
-        4522819118668558313
+        4441726535572007216
       ],
-      "len": 5,
-      "part_runs": [
-        [
-          {
-            "order": null,
-            "schema": null
-          },
-          [
-            {
-              "type": "Inline",
-              "updates": {
-                "desc": null,
-                "index": 0,
-                "updates[len]": 0
-              },
-              "ts_rewrite": {
-                "elements": [
-                  1990182459289640688
-                ]
-              },
-              "schema_id": null
-            },
-            {
-              "type": "Hollow",
-              "key": "?%ꞰOↁὖௗ𐨕-Ο{Ѩ%🕴{ßȺ:f🂱𑜾2f",
-              "encoded_size_bytes": 12336862283325747769,
-              "key_lower": "1b015efdf7021bcdb7d7b3094da7cca0278bb399cfa9ee82a0eea6c90cfa20a7bcbfd9b60b4c87d65cf28bbb51d48dec",
-              "structured_key_lower": null,
-              "stats": {
-                "len": 9216362869167982641,
-                "cols": {
-                  "*=<.~\\d+": {
-                    "len": 5879932233594401360,
-                    "ଁ)𖾛🕴$𝔻n`nય:⼖୴ဂ|ꧠA%🩩ോ勺[𝼧𑍧`": {
-                      "lower": false,
-                      "upper": true
-                    },
-                    "🕴ዅ/໖": {
-                      "len": 17168192427257351185,
-                      "<`�/୕𐾃=🡐Ѩ¥𑊊qᦆ\"k𲀭¥ନ{ȺW<u": {
-                        "G𑊅\"૾𐽺ࢼ𐓹h:": {
-                          "len": 1,
-                          "stats": {
-                            "lower": false,
-                            "upper": true
-                          }
-                        },
-                        "h/Ò?'Ã🝘5¯\\𐆠/𝒢==%𑙤Ѩ\\ᬑ": {
-                          "len": 1,
-                          "stats": "json_mixed"
-                        }
-                      }
-                    }
-                  },
-                  "Ⱥ૪b0": {
-                    "len": 5197454066737604612,
-                    "$G್𞺧:$᳒\\o\"🕴3\"Lꠊ๚𐬪¼:?ச?ᝥ&𐾽)Î𖿰>?𑊤𔖘": {
-                      "lower": "'xlㄑ🕴 D𐦇Ul",
-                      "upper": "ቄȺû"
-                    }
-                  }
-                }
-              },
-              "ts_rewrite": null,
-              "diffs_sum": -946081707879001155,
-              "format": null,
-              "schema_id": "h2260404778546976278"
-            }
-          ]
-        ]
-      ]
+      "len": 2,
+      "part_runs": []
     }
   ],
   "hollow_batches": {},
   "spine_batches": {
-    "0-2": {
+    "0-3": {
       "level": 4,
       "desc": {
         "lower": {
@@ -710,7 +588,7 @@
         },
         "upper": {
           "elements": [
-            9886499534079125963
+            13690082948772716747
           ]
         },
         "since": {
@@ -721,7 +599,8 @@
       },
       "parts": [
         "0-1",
-        "1-2"
+        "1-2",
+        "2-3"
       ],
       "descs": [
         {
@@ -732,131 +611,130 @@
           },
           "upper": {
             "elements": [
-              8237757120154893742
+              9886499534079125963
             ]
           },
           "since": {
             "elements": [
-              7834118575855137052
+              2978141131140526309
             ]
           }
         },
         {
           "lower": {
             "elements": [
-              8237757120154893742
+              9886499534079125963
             ]
           },
           "upper": {
             "elements": [
-              9886499534079125963
+              10448793807825919347
+            ]
+          },
+          "since": {
+            "elements": [
+              4395599247033876229
+            ]
+          }
+        },
+        {
+          "lower": {
+            "elements": [
+              10448793807825919347
+            ]
+          },
+          "upper": {
+            "elements": [
+              13690082948772716747
             ]
           },
           "since": {
             "elements": [
-              7019220731930974776
+              2529543461855689013
             ]
           }
         }
       ]
     },
-    "2-4": {
-      "level": 3,
+    "3-6": {
+      "level": 1,
       "desc": {
         "lower": {
           "elements": [
-            9886499534079125963
+            13690082948772716747
           ]
         },
         "upper": {
           "elements": [
-            11170970954576416472
+            16742627657852255773
           ]
         },
         "since": {
           "elements": [
-            14091139506241686208
+            15711141615835313697
           ]
         }
       },
       "parts": [
-        "2-3",
-        "3-4"
+        "3-6"
       ],
       "descs": [
         {
           "lower": {
             "elements": [
-              9886499534079125963
+              13690082948772716747
             ]
           },
           "upper": {
             "elements": [
-              10092654255918715246
+              16742627657852255773
             ]
           },
           "since": {
             "elements": [
-              770458890179917555
-            ]
-          }
-        },
-        {
-          "lower": {
-            "elements": [
-              10092654255918715246
-            ]
-          },
-          "upper": {
-            "elements": [
-              11170970954576416472
-            ]
-          },
-          "since": {
-            "elements": [
-              8543663013803204529
+              15711141615835313697
             ]
           }
         }
       ]
     },
-    "4-5": {
-      "level": 3,
+    "6-7": {
+      "level": 1,
       "desc": {
         "lower": {
           "elements": [
-            11170970954576416472
+            16742627657852255773
           ]
         },
         "upper": {
           "elements": [
-            15793198891801274032
+            17363026058961705429
           ]
         },
         "since": {
           "elements": [
-            4522819118668558313
+            4441726535572007216
           ]
         }
       },
       "parts": [
-        "4-5"
+        "6-7"
       ],
       "descs": [
         {
           "lower": {
             "elements": [
-              11170970954576416472
+              16742627657852255773
             ]
           },
           "upper": {
             "elements": [
-              15793198891801274032
+              17363026058961705429
             ]
           },
           "since": {
             "elements": [
-              4522819118668558313
+              4441726535572007216
             ]
           }
         }
@@ -864,18 +742,7 @@
     }
   },
   "merges": {
-    "0-2": {
-      "since": {
-        "elements": [
-          14091139506241686208
-        ]
-      },
-      "remaining_work": 0,
-      "active_compaction": {
-        "start_ms": 0
-      }
-    },
-    "2-4": {
+    "0-3": {
       "since": {
         "elements": [
           14091139506241686208
@@ -886,13 +753,13 @@
         "start_ms": 0
       }
     },
-    "2-5": {
+    "3-7": {
       "since": {
         "elements": [
-          14091139506241686208
+          15711141615835313697
         ]
       },
-      "remaining_work": 11,
+      "remaining_work": 2,
       "active_compaction": null
     }
   }

From 94675d81fdc2cd0d2fd3ec3fd4f6043a66b287df Mon Sep 17 00:00:00 2001
From: Parker Timmerman <parker.timmerman@materialize.com>
Date: Wed, 27 Nov 2024 00:12:37 -0500
Subject: [PATCH 7/9] update 'backward_compatible_struct' logic to handle
 recursive nullability changes

---
 src/persist-types/src/schema.rs | 257 ++++++++++++++++++++++++++++----
 1 file changed, 227 insertions(+), 30 deletions(-)

diff --git a/src/persist-types/src/schema.rs b/src/persist-types/src/schema.rs
index 9e57ef42d80dd..0ce80d84a3055 100644
--- a/src/persist-types/src/schema.rs
+++ b/src/persist-types/src/schema.rs
@@ -345,14 +345,47 @@ fn backward_compatible_struct(old: &Fields, new: &Fields) -> Option<ArrayMigrati
                 });
             }
             Some(NoOp) => continue,
-            // For now, don't support both making a field nullable and also
-            // modifying it in some other way. It doesn't seem that we need this for
-            // mz usage.
-            Some(_) if make_nullable => return None,
-            Some(migration) => field_migrations.push(Recurse {
-                name: n.name().clone(),
-                migration,
-            }),
+            Some(migration) => {
+                /// Checks if an [`ArrayMigration`] is only recursively making fields nullable.
+                fn recursively_all_nullable(migration: &ArrayMigration) -> bool {
+                    match migration {
+                        NoOp => true,
+                        List(_field, child) => recursively_all_nullable(child),
+                        Struct(children) => children.iter().all(|child| match child {
+                            AddFieldNullableAtEnd { .. } | DropField { .. } => false,
+                            AlterFieldNullable { .. } => true,
+                            Recurse { migration, .. } => recursively_all_nullable(migration),
+                        }),
+                    }
+                }
+
+                // We only support making a field nullable concurrently with other changes to said
+                // field, if those other changes are making children nullable as well. Otherwise we
+                // don't allow the migration.
+                //
+                // Note: There's nothing that should really prevent us from supporting this, but at
+                // the moment it's not needed in Materialize.
+                if make_nullable {
+                    if recursively_all_nullable(&migration) {
+                        field_migrations.extend([
+                            AlterFieldNullable {
+                                name: n.name().clone(),
+                            },
+                            Recurse {
+                                name: n.name().clone(),
+                                migration,
+                            },
+                        ]);
+                    } else {
+                        return None;
+                    }
+                } else {
+                    field_migrations.push(Recurse {
+                        name: n.name().clone(),
+                        migration,
+                    })
+                }
+            }
         }
     }
 
@@ -390,34 +423,34 @@ mod tests {
 
     use super::*;
 
+    #[track_caller]
+    fn testcase(old: DataType, new: DataType, expected: Option<bool>) {
+        let migration = super::backward_compatible_typ(&old, &new);
+        let actual = migration.as_ref().map(|x| x.contains_drop());
+        assert_eq!(actual, expected);
+        // If it's backward compatible, make sure that the migration
+        // logic works.
+        if let Some(migration) = migration {
+            let (old, new) = (new_empty_array(&old), new_empty_array(&new));
+            let migrated = migration.migrate(old);
+            assert_eq!(new.data_type(), migrated.data_type());
+        }
+    }
+
+    fn struct_(fields: impl IntoIterator<Item = (&'static str, DataType, bool)>) -> DataType {
+        let fields = fields
+            .into_iter()
+            .map(|(name, typ, nullable)| Field::new(name, typ, nullable))
+            .collect();
+        DataType::Struct(fields)
+    }
+
     // NB: We also have proptest coverage of all this, but it works on
     // RelationDesc+SourceData and so lives in src/storage-types.
     #[mz_ore::test]
     fn backward_compatible() {
         use DataType::*;
 
-        #[track_caller]
-        fn testcase(old: DataType, new: DataType, expected: Option<bool>) {
-            let migration = super::backward_compatible_typ(&old, &new);
-            let actual = migration.as_ref().map(|x| x.contains_drop());
-            assert_eq!(actual, expected);
-            // If it's backward compatible, make sure that the migration
-            // logic works.
-            if let Some(migration) = migration {
-                let (old, new) = (new_empty_array(&old), new_empty_array(&new));
-                let migrated = migration.migrate(old);
-                assert_eq!(new.data_type(), migrated.data_type());
-            }
-        }
-
-        fn struct_(fields: impl IntoIterator<Item = (&'static str, DataType, bool)>) -> DataType {
-            let fields = fields
-                .into_iter()
-                .map(|(name, typ, nullable)| Field::new(name, typ, nullable))
-                .collect();
-            DataType::Struct(fields)
-        }
-
         // Matching primitive types
         testcase(Boolean, Boolean, Some(false));
         testcase(Utf8, Utf8, Some(false));
@@ -584,5 +617,169 @@ mod tests {
             ),
             Some(false),
         );
+
+        // Nested nullability changes
+        testcase(
+            struct_([("0", struct_([("foo", Utf8, false)]), false)]),
+            struct_([("0", struct_([("foo", Utf8, true)]), true)]),
+            Some(false),
+        )
+    }
+
+    /// This is a regression test for a case we found when trying to merge [#30205]
+    ///
+    /// [#30205]: https://github.com/MaterializeInc/materialize/pull/30205
+    #[mz_ore::test]
+    fn backwards_compatible_nested_types() {
+        use DataType::*;
+
+        testcase(
+            struct_([
+                (
+                    "ok",
+                    struct_([
+                        (
+                            "0",
+                            List(
+                                Field::new_struct(
+                                    "map_entries",
+                                    vec![
+                                        Field::new("key", Utf8, false),
+                                        Field::new("val", Int32, true),
+                                    ],
+                                    false,
+                                )
+                                .into(),
+                            ),
+                            true,
+                        ),
+                        (
+                            "1",
+                            List(
+                                Field::new_struct(
+                                    "map_entries",
+                                    vec![
+                                        Field::new("key", Utf8, false),
+                                        Field::new("val", Int32, true),
+                                    ],
+                                    false,
+                                )
+                                .into(),
+                            ),
+                            false,
+                        ),
+                        (
+                            "2",
+                            List(
+                                Field::new_list("item", Field::new_list_field(Int32, true), true)
+                                    .into(),
+                            ),
+                            true,
+                        ),
+                        (
+                            "3",
+                            List(
+                                Field::new_list("item", Field::new_list_field(Int32, true), true)
+                                    .into(),
+                            ),
+                            false,
+                        ),
+                        ("4", struct_([("0", Int32, true), ("1", Utf8, true)]), true),
+                        (
+                            "5",
+                            struct_([("0", Int32, false), ("1", Utf8, false)]),
+                            false,
+                        ),
+                        ("6", Utf8, true),
+                        (
+                            "7",
+                            struct_([
+                                (
+                                    "dims",
+                                    List(Field::new_list_field(FixedSizeBinary(16), true).into()),
+                                    true,
+                                ),
+                                ("vals", List(Field::new_list_field(Utf8, true).into()), true),
+                            ]),
+                            false,
+                        ),
+                    ]),
+                    true,
+                ),
+                ("err", Binary, true),
+            ]),
+            struct_([
+                (
+                    "ok",
+                    struct_([
+                        (
+                            "0",
+                            List(
+                                Field::new_struct(
+                                    "map_entries",
+                                    vec![
+                                        Field::new("key", Utf8, false),
+                                        Field::new("val", Int32, true),
+                                    ],
+                                    false,
+                                )
+                                .into(),
+                            ),
+                            true,
+                        ),
+                        (
+                            "1",
+                            List(
+                                Field::new_struct(
+                                    "map_entries",
+                                    vec![
+                                        Field::new("key", Utf8, false),
+                                        Field::new("val", Int32, true),
+                                    ],
+                                    false,
+                                )
+                                .into(),
+                            ),
+                            true,
+                        ),
+                        (
+                            "2",
+                            List(
+                                Field::new_list("item", Field::new_list_field(Int32, true), true)
+                                    .into(),
+                            ),
+                            true,
+                        ),
+                        (
+                            "3",
+                            List(
+                                Field::new_list("item", Field::new_list_field(Int32, true), true)
+                                    .into(),
+                            ),
+                            true,
+                        ),
+                        ("4", struct_([("0", Int32, true), ("1", Utf8, true)]), true),
+                        ("5", struct_([("0", Int32, true), ("1", Utf8, true)]), true),
+                        ("6", Utf8, true),
+                        (
+                            "7",
+                            struct_([
+                                (
+                                    "dims",
+                                    List(Field::new_list_field(FixedSizeBinary(16), true).into()),
+                                    true,
+                                ),
+                                ("vals", List(Field::new_list_field(Utf8, true).into()), true),
+                            ]),
+                            true,
+                        ),
+                    ]),
+                    true,
+                ),
+                ("err", Binary, true),
+            ]),
+            // Should be able to migrate, should not contain any drops.
+            Some(false),
+        )
     }
 }

From 15a4b811c87478edd0cb1f7aab210e5008277cc0 Mon Sep 17 00:00:00 2001
From: Parker Timmerman <parker.timmerman@materialize.com>
Date: Wed, 4 Dec 2024 12:53:43 -0500
Subject: [PATCH 8/9] small changes to test flags

---
 .../materialize/checks/all_checks/continual_task.py   | 11 +++++++++++
 misc/python/materialize/mzcompose/__init__.py         |  4 +++-
 src/adapter/src/catalog/state.rs                      |  2 +-
 src/repr/src/lib.rs                                   |  2 +-
 4 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/misc/python/materialize/checks/all_checks/continual_task.py b/misc/python/materialize/checks/all_checks/continual_task.py
index 6e52fc97af899..7b523bb837d97 100644
--- a/misc/python/materialize/checks/all_checks/continual_task.py
+++ b/misc/python/materialize/checks/all_checks/continual_task.py
@@ -11,6 +11,8 @@
 from materialize.checks.actions import Testdrive
 from materialize.checks.checks import Check
 from materialize.checks.common import KAFKA_SCHEMA_WITH_SINGLE_STRING_FIELD
+from materialize.checks.executors import Executor
+from materialize.mz_version import MzVersion
 
 
 def schemas() -> str:
@@ -20,6 +22,9 @@ def schemas() -> str:
 class AuditLogCT(Check):
     """Continual Task for audit logging"""
 
+    def _can_run(self, e: Executor) -> bool:
+        return self.base_version > MzVersion.parse_mz("v0.127.0-dev")
+
     def initialize(self) -> Testdrive:
         return Testdrive(
             schemas()
@@ -64,6 +69,9 @@ def validate(self) -> Testdrive:
 class StreamTableJoinCT(Check):
     """Continual Task for stream table join"""
 
+    def _can_run(self, e: Executor) -> bool:
+        return self.base_version > MzVersion.parse_mz("v0.127.0-dev")
+
     def initialize(self) -> Testdrive:
         return Testdrive(
             schemas()
@@ -127,6 +135,9 @@ def validate(self) -> Testdrive:
 class UpsertCT(Check):
     """Continual Task for upserts"""
 
+    def _can_run(self, e: Executor) -> bool:
+        return self.base_version > MzVersion.parse_mz("v0.127.0-dev")
+
     def initialize(self) -> Testdrive:
         return Testdrive(
             schemas()
diff --git a/misc/python/materialize/mzcompose/__init__.py b/misc/python/materialize/mzcompose/__init__.py
index b0ec3b625f5ce..b74ad162f2e2f 100644
--- a/misc/python/materialize/mzcompose/__init__.py
+++ b/misc/python/materialize/mzcompose/__init__.py
@@ -93,7 +93,9 @@ def get_default_system_parameters(
         "enable_columnation_lgalloc": "true",
         "enable_compute_chunked_stack": "true",
         "enable_connection_validation_syntax": "true",
-        "enable_continual_task_builtins": "true",
+        "enable_continual_task_builtins": (
+            "true" if version > MzVersion.parse_mz("v0.127.0-dev") else "false"
+        ),
         "enable_continual_task_create": "true",
         "enable_continual_task_retain": "true",
         "enable_continual_task_transform": "true",
diff --git a/src/adapter/src/catalog/state.rs b/src/adapter/src/catalog/state.rs
index e4de795430e3a..6a78385f294a9 100644
--- a/src/adapter/src/catalog/state.rs
+++ b/src/adapter/src/catalog/state.rs
@@ -723,7 +723,7 @@ impl CatalogState {
         self.get_entry(item_id)
     }
 
-    pub fn get_entries(&self) -> impl Iterator<Item = (&GlobalId, &CatalogEntry)> + '_ {
+    pub fn get_entries(&self) -> impl Iterator<Item = (&CatalogItemId, &CatalogEntry)> + '_ {
         self.entry_by_id.iter()
     }
 
diff --git a/src/repr/src/lib.rs b/src/repr/src/lib.rs
index c6c9d5b9834f0..3311395cff298 100644
--- a/src/repr/src/lib.rs
+++ b/src/repr/src/lib.rs
@@ -56,7 +56,7 @@ pub use crate::relation::{
     arb_relation_desc_diff, arb_row_for_relation, ColumnName, ColumnType, NotNullViolation,
     PropRelationDescDiff, ProtoColumnName, ProtoColumnType, ProtoRelationDesc, ProtoRelationType,
     RelationDesc, RelationDescBuilder, RelationType, RelationVersion, RelationVersionSelector,
-    TypeDiff, VersionedRelationDesc,
+    VersionedRelationDesc,
 };
 pub use crate::row::encode::{preserves_order, RowColumnarDecoder, RowColumnarEncoder};
 pub use crate::row::iter::{IntoRowIterator, RowIterator};

From 93a5822db45ecfb0894a7b1982f1a1f8cdeb9486 Mon Sep 17 00:00:00 2001
From: Parker Timmerman <parker.timmerman@materialize.com>
Date: Mon, 9 Dec 2024 13:14:07 -0500
Subject: [PATCH 9/9] respond to feedback, tweak a name, add some metrics

---
 src/persist-client/src/internal/machine.rs |  2 +-
 src/persist-client/src/internal/metrics.rs |  5 +++++
 src/persist-client/src/internal/state.rs   | 15 +++++++++------
 3 files changed, 15 insertions(+), 7 deletions(-)

diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs
index db72e686e8594..228a5e5142de8 100644
--- a/src/persist-client/src/internal/machine.rs
+++ b/src/persist-client/src/internal/machine.rs
@@ -258,7 +258,7 @@ where
         let metrics = Arc::clone(&self.applier.metrics);
         let (_seqno, state, maintenance) = self
             .apply_unbatched_idempotent_cmd(&metrics.cmds.register, |_seqno, _cfg, state| {
-                state.register_schema::<K, V>(key_schema, val_schema)
+                state.register_schema::<K, V>(key_schema, val_schema, &metrics.schema)
             })
             .await;
         (state, maintenance)
diff --git a/src/persist-client/src/internal/metrics.rs b/src/persist-client/src/internal/metrics.rs
index f667b030c0966..79bae1903760f 100644
--- a/src/persist-client/src/internal/metrics.rs
+++ b/src/persist-client/src/internal/metrics.rs
@@ -3128,6 +3128,7 @@ pub struct SchemaMetrics {
     pub(crate) migration_new_count: IntCounter,
     pub(crate) migration_new_seconds: Counter,
     pub(crate) migration_migrate_seconds: Counter,
+    pub(crate) one_time_migration_more_nullable: IntCounter,
 }
 
 impl SchemaMetrics {
@@ -3199,6 +3200,10 @@ impl SchemaMetrics {
                 name: "mz_persist_schema_migration_migrate_seconds",
                 help: "seconds spent applying migration logic",
             )),
+            one_time_migration_more_nullable: registry.register(metric!(
+                name: "mz_persist_one_time_migration_more_nullable",
+                help: "count of running the onetime more nullable migration",
+            )),
         }
     }
 }
diff --git a/src/persist-client/src/internal/state.rs b/src/persist-client/src/internal/state.rs
index b5a0e4ac1bf4d..cc8c4fdc3923c 100644
--- a/src/persist-client/src/internal/state.rs
+++ b/src/persist-client/src/internal/state.rs
@@ -55,6 +55,7 @@ use crate::error::InvalidUsage;
 use crate::internal::encoding::{parse_id, LazyInlineBatchPart, LazyPartStats, LazyProto};
 use crate::internal::gc::GcReq;
 use crate::internal::machine::retry_external;
+use crate::internal::metrics::SchemaMetrics;
 use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
 use crate::internal::trace::{
     ActiveCompaction, ApplyMergeResult, FueledMergeReq, FueledMergeRes, Trace,
@@ -1310,8 +1311,9 @@ where
         &mut self,
         key_schema: &K::Schema,
         val_schema: &V::Schema,
+        metrics: &SchemaMetrics,
     ) -> ControlFlow<NoOpStateTransition<Option<SchemaId>>, Option<SchemaId>> {
-        fn encoded_data_type(data_type: &DataType) -> Bytes {
+        fn encode_data_type(data_type: &DataType) -> Bytes {
             let proto = data_type.into_proto();
             prost::Message::encode_to_vec(&proto).into()
         }
@@ -1344,8 +1346,8 @@ where
                 let new_v_datatype = mz_persist_types::columnar::data_type::<V>(val_schema)
                     .expect("valid val schema");
 
-                let new_k_encoded_datatype = encoded_data_type(&new_k_datatype);
-                let new_v_encoded_datatype = encoded_data_type(&new_v_datatype);
+                let new_k_encoded_datatype = encode_data_type(&new_k_datatype);
+                let new_v_encoded_datatype = encode_data_type(&new_v_datatype);
 
                 // Check if the generated Arrow DataTypes have changed.
                 if encoded_schemas.key_data_type != new_k_encoded_datatype
@@ -1366,7 +1368,7 @@ where
                     // If the Arrow DataType for `k` or `v` has changed, but it's only become more
                     // nullable, then we allow in-place re-writing of the schema.
                     match (k_atleast_as_nullable, v_atleast_as_nullable) {
-                        // TODO(parkmycar): Remove this one-time migration after v0.123 ships.
+                        // TODO(parkmycar): Remove this one-time migration after v0.127 ships.
                         (Ok(()), Ok(())) => {
                             let key = Bytes::clone(&encoded_schemas.key);
                             let val = Bytes::clone(&encoded_schemas.val);
@@ -1379,6 +1381,7 @@ where
                                     val_data_type: new_v_encoded_datatype,
                                 },
                             );
+                            metrics.one_time_migration_more_nullable.inc();
                             Continue(Some(schema_id))
                         }
                         (k_err, _) => {
@@ -1413,9 +1416,9 @@ where
                     id,
                     EncodedSchemas {
                         key: K::encode_schema(key_schema),
-                        key_data_type: encoded_data_type(&key_data_type),
+                        key_data_type: encode_data_type(&key_data_type),
                         val: V::encode_schema(val_schema),
-                        val_data_type: encoded_data_type(&val_data_type),
+                        val_data_type: encode_data_type(&val_data_type),
                     },
                 );
                 assert_eq!(prev, None);