diff --git a/src/persist-client/src/batch.rs b/src/persist-client/src/batch.rs index 4a0f63675aaf3..1a46ee4f8f192 100644 --- a/src/persist-client/src/batch.rs +++ b/src/persist-client/src/batch.rs @@ -1042,6 +1042,9 @@ impl BatchParts { .iter() .map(|(meta, run)| (&compact_desc, meta, run.as_slice())) .collect(); + // We just wrote these runs so use the current `FullSchemas` instead of + // fetching from the schema registry. + let compaction_schemas = write_schemas.to_current_full_schemas(); let output_batch = Compactor::::compact_runs( &cfg, @@ -1052,7 +1055,7 @@ impl BatchParts { metrics, shard_metrics, isolated_runtime, - write_schemas, + compaction_schemas, ) .await .expect("successful compaction"); @@ -1179,7 +1182,9 @@ impl BatchParts { let (name, write_future) = if updates.goodbytes() < inline_threshold { let metrics = Arc::clone(&self.metrics); - let write_schemas = write_schemas.clone(); + // We just wrote this data so it's safe to use the current mapping + // of `Codec::Schema` to `arrow::datatype::DataType`. + let write_schemas = write_schemas.to_current_full_schemas(); let span = debug_span!("batch::inline_part", shard = %self.shard_id).or_current(); ( @@ -1193,7 +1198,9 @@ impl BatchParts { .measure_part_build(|| { updates.get_or_make_structured::( write_schemas.key.as_ref(), + &write_schemas.key_dt, write_schemas.val.as_ref(), + &write_schemas.val_dt, ) }) .clone(); diff --git a/src/persist-client/src/cli/admin.rs b/src/persist-client/src/cli/admin.rs index e27c5ae9dd9f5..d2e859bfb6164 100644 --- a/src/persist-client/src/cli/admin.rs +++ b/src/persist-client/src/cli/admin.rs @@ -495,7 +495,7 @@ where Arc::clone(&machine.applier.shard_metrics), Arc::new(IsolatedRuntime::default()), req, - schemas, + schemas.to_current_full_schemas(), ) .await?; metrics.compaction.admin_count.inc(); diff --git a/src/persist-client/src/internal/apply.rs b/src/persist-client/src/internal/apply.rs index 3a9c306c2965d..3444c6f58b06e 100644 --- a/src/persist-client/src/internal/apply.rs +++ b/src/persist-client/src/internal/apply.rs @@ -15,17 +15,21 @@ use std::ops::ControlFlow::{self, Break, Continue}; use std::sync::Arc; use std::time::Instant; +use bytes::Bytes; use differential_dataflow::difference::Semigroup; use differential_dataflow::lattice::Lattice; use mz_ore::cast::CastFrom; use mz_persist::location::{CaSResult, Indeterminate, SeqNo, VersionedData}; use mz_persist_types::schema::SchemaId; use mz_persist_types::{Codec, Codec64}; +use mz_proto::ProtoType; +use prost::Message; use timely::progress::{Antichain, Timestamp}; use tracing::debug; use crate::cache::{LockingTypedState, StateCache}; use crate::error::{CodecMismatch, InvalidUsage}; +use crate::internal::encoding::FullSchemas; use crate::internal::gc::GcReq; use crate::internal::maintenance::RoutineMaintenance; use crate::internal::metrics::{CmdMetrics, Metrics, ShardMetrics}; @@ -221,11 +225,29 @@ where } /// See [crate::PersistClient::get_schema]. - pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> { + pub fn get_schema(&self, schema_id: SchemaId) -> Option> { self.state .read_lock(&self.metrics.locks.applier_read_cacheable, |state| { let x = state.collections.schemas.get(&schema_id)?; - Some((K::decode_schema(&x.key), V::decode_schema(&x.val))) + let key_dt = + mz_persist_types::arrow::ProtoDataType::decode(Bytes::clone(&x.key_data_type)) + .expect("key ProtoArrayData to roundtrip") + .into_rust() + .expect("key ProtoArrayData to rust"); + let val_dt = + mz_persist_types::arrow::ProtoDataType::decode(Bytes::clone(&x.val_data_type)) + .expect("key ProtoArrayData to roundtrip") + .into_rust() + .expect("key ProtoArrayData to rust"); + + let schemas = FullSchemas { + id: Some(schema_id), + key: Arc::new(K::decode_schema(&x.key)), + key_dt, + val: Arc::new(V::decode_schema(&x.val)), + val_dt, + }; + Some(schemas) }) } diff --git a/src/persist-client/src/internal/compact.rs b/src/persist-client/src/internal/compact.rs index 0ef4327311541..e27dc957e7961 100644 --- a/src/persist-client/src/internal/compact.rs +++ b/src/persist-client/src/internal/compact.rs @@ -34,7 +34,7 @@ use crate::async_runtime::IsolatedRuntime; use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, BatchParts, PartDeletes}; use crate::cfg::MiB; use crate::fetch::FetchBatchFilter; -use crate::internal::encoding::Schemas; +use crate::internal::encoding::{FullSchemas, Schemas}; use crate::internal::gc::GarbageCollector; use crate::internal::machine::Machine; use crate::internal::maintenance::RoutineMaintenance; @@ -307,29 +307,23 @@ where // It's an invariant that SchemaIds are ordered. .max(); let maybe_compaction_schema = match compaction_schema_id { - Some(id) => machine - .get_schema(id) - .map(|(key_schema, val_schema)| (id, key_schema, val_schema)), + Some(id) => machine.get_schema(id), None => None, }; let use_most_recent_schema = COMPACTION_USE_MOST_RECENT_SCHEMA.get(&machine.applier.cfg); let compaction_schema = match maybe_compaction_schema { - Some((id, key_schema, val_schema)) if use_most_recent_schema => { + Some(compaction_schemas) if use_most_recent_schema => { metrics.compaction.schema_selection.recent_schema.inc(); - Schemas { - id: Some(id), - key: Arc::new(key_schema), - val: Arc::new(val_schema), - } + compaction_schemas } Some(_) => { metrics.compaction.schema_selection.disabled.inc(); - write_schemas + write_schemas.to_current_full_schemas() } None => { metrics.compaction.schema_selection.no_schema.inc(); - write_schemas + write_schemas.to_current_full_schemas() } }; @@ -464,7 +458,7 @@ where shard_metrics: Arc, isolated_runtime: Arc, req: CompactReq, - write_schemas: Schemas, + schemas: FullSchemas, ) -> Result, anyhow::Error> { let () = Self::validate_req(&req)?; @@ -531,7 +525,7 @@ where Arc::clone(&metrics), Arc::clone(&shard_metrics), Arc::clone(&isolated_runtime), - write_schemas.clone(), + schemas.clone(), ) .await?; let (parts, run_splits, run_meta, updates) = @@ -720,7 +714,7 @@ where metrics: Arc, shard_metrics: Arc, isolated_runtime: Arc, - write_schemas: Schemas, + compaction_schemas: FullSchemas, ) -> Result, anyhow::Error> { // TODO: Figure out a more principled way to allocate our memory budget. // Currently, we give any excess budget to write parallelism. If we had @@ -756,7 +750,7 @@ where cfg.batch.clone(), parts, Arc::clone(&metrics), - write_schemas.clone(), + compaction_schemas.to_schemas(), desc.lower().clone(), Arc::clone(&blob), shard_id.clone(), @@ -778,7 +772,7 @@ where desc.upper().elements() ), *shard_id, - StructuredSort::::new(write_schemas.clone()), + StructuredSort::::new(compaction_schemas.clone()), blob, Arc::clone(&metrics), shard_metrics, @@ -829,8 +823,10 @@ where // In the hopefully-common case of a single chunk, this will not copy. let updates = BlobTraceUpdates::concat::( chunks, - write_schemas.key.as_ref(), - write_schemas.val.as_ref(), + compaction_schemas.key.as_ref(), + &compaction_schemas.key_dt, + compaction_schemas.val.as_ref(), + &compaction_schemas.val_dt, &metrics.columnar, )?; batch.flush_many(updates).await?; @@ -878,7 +874,13 @@ where key_vec.extend_from_slice(k); val_vec.clear(); val_vec.extend_from_slice(v); - crate::batch::validate_schema(&write_schemas, &key_vec, &val_vec, None, None); + crate::batch::validate_schema( + &compaction_schemas.to_schemas(), + &key_vec, + &val_vec, + None, + None, + ); batch.add(&key_vec, &val_vec, &t, &d).await?; } tokio::task::yield_now().await; @@ -899,7 +901,7 @@ where &cfg.batch, &metrics.compaction.batch, &isolated_runtime, - &write_schemas, + &compaction_schemas.to_schemas(), ) .await; } @@ -1028,7 +1030,7 @@ mod tests { write.metrics.shards.shard(&write.machine.shard_id(), ""), Arc::new(IsolatedRuntime::default()), req.clone(), - schemas.clone(), + schemas.to_current_full_schemas(), ) .await .expect("compaction failed"); @@ -1105,7 +1107,7 @@ mod tests { write.metrics.shards.shard(&write.machine.shard_id(), ""), Arc::new(IsolatedRuntime::default()), req.clone(), - schemas.clone(), + schemas.to_current_full_schemas(), ) .await .expect("compaction failed"); diff --git a/src/persist-client/src/internal/encoding.rs b/src/persist-client/src/internal/encoding.rs index bf8fca490951b..6f1f2d7197f9f 100644 --- a/src/persist-client/src/internal/encoding.rs +++ b/src/persist-client/src/internal/encoding.rs @@ -23,6 +23,7 @@ use mz_persist::indexed::columnar::ColumnarRecords; use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates}; use mz_persist::location::{SeqNo, VersionedData}; use mz_persist::metrics::ColumnarMetrics; +use mz_persist_types::columnar::data_type; use mz_persist_types::schema::SchemaId; use mz_persist_types::stats::{PartStats, ProtoStructStats}; use mz_persist_types::{Codec, Codec64}; @@ -70,6 +71,20 @@ pub struct Schemas { pub val: Arc, } +impl Schemas { + /// Returns a [`FullSchemas`] using [`Codec`] to [`arrow::datatypes::DataType`] + /// mapping for the current version of Materialize. + pub fn to_current_full_schemas(&self) -> FullSchemas { + FullSchemas { + id: self.id.clone(), + key: Arc::clone(&self.key), + key_dt: data_type::(self.key.as_ref()).expect("valid key schema"), + val: Arc::clone(&self.val), + val_dt: data_type::(self.val.as_ref()).expect("valid val schema"), + } + } +} + impl Clone for Schemas { fn clone(&self) -> Self { Self { @@ -80,6 +95,47 @@ impl Clone for Schemas { } } +/// Set of schemas including the corresponding [Arrow `DataType`]. +/// +/// Some processes, e.g. compaction, operate mainly on the Arrow `DataType` as +/// opposed to a generic [`Codec`] type. For these processes we thread through +/// the full schema data that's encoded in the registry in case the mapping of +/// [`Codec::Schema`] to [Arrow `DataType`] changes between two releases. +/// +/// [Arrow `DataType`]: arrow::datatypes::DataType +#[derive(Debug)] +pub struct FullSchemas { + // TODO: Remove the Option once this finishes rolling out and all shards + // have a registered schema. + pub id: Option, + pub key: Arc, + pub key_dt: arrow::datatypes::DataType, + pub val: Arc, + pub val_dt: arrow::datatypes::DataType, +} + +impl FullSchemas { + pub fn to_schemas(&self) -> Schemas { + Schemas { + id: self.id.clone(), + key: Arc::clone(&self.key), + val: Arc::clone(&self.val), + } + } +} + +impl Clone for FullSchemas { + fn clone(&self) -> Self { + Self { + id: self.id, + key: Arc::clone(&self.key), + key_dt: self.key_dt.clone(), + val: Arc::clone(&self.val), + val_dt: self.val_dt.clone(), + } + } +} + /// A proto that is decoded on use. /// /// Because of the way prost works, decoding a large protobuf may result in a diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index c390043bbd6f0..4d2d161fc8e8d 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -44,6 +44,7 @@ use crate::critical::CriticalReaderId; use crate::error::{CodecMismatch, InvalidUsage}; use crate::internal::apply::Applier; use crate::internal::compact::CompactReq; +use crate::internal::encoding::FullSchemas; use crate::internal::gc::GarbageCollector; use crate::internal::maintenance::{RoutineMaintenance, WriterMaintenance}; use crate::internal::metrics::{CmdMetrics, Metrics, MetricsRetryStream, RetryMetrics}; @@ -759,7 +760,7 @@ where } /// See [crate::PersistClient::get_schema]. - pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> { + pub fn get_schema(&self, schema_id: SchemaId) -> Option> { self.applier.get_schema(schema_id) } @@ -2005,7 +2006,7 @@ pub mod datadriven { Arc::clone(&datadriven.machine.applier.shard_metrics), Arc::clone(&datadriven.client.isolated_runtime), req, - schemas, + schemas.to_current_full_schemas(), ) .await?; diff --git a/src/persist-client/src/iter.rs b/src/persist-client/src/iter.rs index 4443bdd002ac9..99958bc7352c0 100644 --- a/src/persist-client/src/iter.rs +++ b/src/persist-client/src/iter.rs @@ -38,7 +38,7 @@ use timely::progress::Timestamp; use tracing::{debug_span, Instrument}; use crate::fetch::{EncodedPart, FetchBatchFilter}; -use crate::internal::encoding::Schemas; +use crate::internal::encoding::FullSchemas; use crate::internal::metrics::{ReadMetrics, ShardMetrics}; use crate::internal::paths::WriterKey; use crate::internal::state::{HollowRun, RunMeta, RunOrder, RunPart}; @@ -213,13 +213,13 @@ pub struct StructuredUpdates { /// Sort parts ordered by the codec-encoded key and value columns. #[derive(Debug)] pub struct StructuredSort { - schemas: Schemas, + schemas: FullSchemas, _time_diff: PhantomData, } impl StructuredSort { /// A sort for structured data with the given schema. - pub fn new(schemas: Schemas) -> Self { + pub fn new(schemas: FullSchemas) -> Self { Self { schemas, _time_diff: Default::default(), @@ -241,8 +241,12 @@ impl RowSort for StructuredSor } fn updates_from_blob(&self, mut updates: BlobTraceUpdates) -> Self::Updates { - let structured = updates - .get_or_make_structured::(self.schemas.key.as_ref(), self.schemas.val.as_ref()); + let structured = updates.get_or_make_structured::( + self.schemas.key.as_ref(), + &self.schemas.key_dt, + self.schemas.val.as_ref(), + &self.schemas.val_dt, + ); let key_ord = ArrayOrd::new(structured.key.as_ref()); let val_ord = ArrayOrd::new(structured.val.as_ref()); StructuredUpdates { diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index 80bb28e64d3bf..9c9f35273cf29 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -556,7 +556,7 @@ impl PersistClient { shard_id: ShardId, schema_id: SchemaId, diagnostics: Diagnostics, - ) -> Result, InvalidUsage> + ) -> Result, Arc)>, InvalidUsage> where K: Debug + Codec, V: Debug + Codec, @@ -566,7 +566,9 @@ impl PersistClient { let machine = self .make_machine::(shard_id, diagnostics) .await?; - Ok(machine.get_schema(schema_id)) + Ok(machine + .get_schema(schema_id) + .map(|schemas| (schemas.key, schemas.val))) } /// Returns the latest schema registered at the current state. diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index 022156d1cea00..b6debe47d5226 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -42,7 +42,7 @@ use uuid::Uuid; use crate::batch::{BLOB_TARGET_SIZE, STRUCTURED_ORDER, STRUCTURED_ORDER_UNTIL_SHARD}; use crate::cfg::RetryParameters; use crate::fetch::{fetch_leased_part, FetchBatchFilter, FetchedPart, Lease, LeasedBatchPart}; -use crate::internal::encoding::Schemas; +use crate::internal::encoding::{FullSchemas, Schemas}; use crate::internal::machine::{ExpireFn, Machine}; use crate::internal::metrics::Metrics; use crate::internal::state::{BatchPart, HollowBatch}; @@ -919,7 +919,7 @@ pub(crate) struct UnexpiredReadHandleState { pub struct Cursor { consolidator: CursorConsolidator, _lease: Lease, - read_schemas: Schemas, + read_schemas: FullSchemas, } #[derive(Debug)] @@ -957,7 +957,9 @@ where .expect("fetching a leased part")?; let structured = iter.get_or_make_structured::( self.read_schemas.key.as_ref(), + &self.read_schemas.key_dt, self.read_schemas.val.as_ref(), + &self.read_schemas.val_dt, ); let key_decoder = self .read_schemas @@ -1062,6 +1064,17 @@ where as_of: as_of.clone(), }; let lease = self.lease_seqno(); + // Get the Arrow DataTypes for this schema from the registry. + let maybe_full_schema = self + .read_schemas + .id + .as_ref() + .and_then(|schema_id| self.machine.get_schema(*schema_id)); + let read_schemas = match maybe_full_schema { + Some(schema) => schema, + // TODO(parkmycar): We should remove this branch once schema_ids are required. + None => self.read_schemas.to_current_full_schemas(), + }; let structured_order = STRUCTURED_ORDER.get(&self.cfg) && { self.shard_id().to_string() < STRUCTURED_ORDER_UNTIL_SHARD.get(&self.cfg) @@ -1070,7 +1083,7 @@ where let mut consolidator = Consolidator::new( context, self.shard_id(), - StructuredSort::new(self.read_schemas.clone()), + StructuredSort::new(read_schemas.clone()), Arc::clone(&self.blob), Arc::clone(&self.metrics), Arc::clone(&self.machine.applier.shard_metrics), @@ -1126,7 +1139,7 @@ where Ok(Cursor { consolidator, _lease: lease, - read_schemas: self.read_schemas.clone(), + read_schemas, }) } diff --git a/src/persist-types/src/arrow.rs b/src/persist-types/src/arrow.rs index be85a958a2145..e6cbd9f34078e 100644 --- a/src/persist-types/src/arrow.rs +++ b/src/persist-types/src/arrow.rs @@ -39,7 +39,7 @@ mod proto { include!(concat!(env!("OUT_DIR"), "/mz_persist_types.arrow.rs")); } use crate::arrow::proto::data_type; -pub use proto::ProtoArrayData; +pub use proto::{DataType as ProtoDataType, ProtoArrayData}; /// Extract the list of fields for our recursive datatypes. pub fn fields_for_type(data_type: &DataType) -> &[FieldRef] { diff --git a/src/persist/src/indexed/encoding.rs b/src/persist/src/indexed/encoding.rs index d6fc6a1f22e5e..20551bd2104e6 100644 --- a/src/persist/src/indexed/encoding.rs +++ b/src/persist/src/indexed/encoding.rs @@ -25,7 +25,7 @@ use mz_ore::bytes::SegmentedBytes; use mz_ore::cast::CastFrom; use mz_ore::collections::CollectionExt; use mz_ore::soft_panic_or_log; -use mz_persist_types::columnar::{codec_to_schema2, data_type}; +use mz_persist_types::columnar::codec_to_schema2; use mz_persist_types::parquet::EncodingConfig; use mz_persist_types::schema::backward_compatible; use mz_persist_types::{Codec, Codec64}; @@ -231,10 +231,15 @@ impl BlobTraceUpdates { } /// Return the [`ColumnarRecordsStructuredExt`] of the blob. + /// + /// It's up to the caller to guarantee that `key_datatype` and `val_datatype` are accurate + /// mappings of [`Codec::Schema`] to [`arrow::datatypes::DataType`]. pub fn get_or_make_structured( &mut self, key_schema: &K::Schema, + key_datatype: &arrow::datatypes::DataType, val_schema: &V::Schema, + val_datatype: &arrow::datatypes::DataType, ) -> &ColumnarRecordsStructuredExt { match self { BlobTraceUpdates::Row(records) => { @@ -245,18 +250,23 @@ impl BlobTraceUpdates { ColumnarRecordsStructuredExt { key, val }, ); // Recurse at most once, since this data is now structured. - self.get_or_make_structured::(key_schema, val_schema) + self.get_or_make_structured::( + key_schema, + key_datatype, + val_schema, + val_datatype, + ) } BlobTraceUpdates::Both(_, structured) => { // If the types don't match, attempt to migrate the array to the new type. // We expect this to succeed, since this should only be called with backwards- // compatible schemas... but if it fails we only log, and let some higher-level // code signal the error if it cares. - let migrate = |array: &mut ArrayRef, to_type: DataType| { + let migrate = |array: &mut ArrayRef, to_type: &DataType| { // TODO: Plumb down the SchemaCache and use it here for the array migrations. let from_type = array.data_type().clone(); - if from_type != to_type { - if let Some(migration) = backward_compatible(&from_type, &to_type) { + if from_type != *to_type { + if let Some(migration) = backward_compatible(&from_type, to_type) { *array = migration.migrate(Arc::clone(array)); } else { error!( @@ -267,14 +277,8 @@ impl BlobTraceUpdates { } } }; - migrate( - &mut structured.key, - data_type::(key_schema).expect("valid key schema"), - ); - migrate( - &mut structured.val, - data_type::(val_schema).expect("valid value schema"), - ); + migrate(&mut structured.key, key_datatype); + migrate(&mut structured.val, val_datatype); structured } @@ -285,7 +289,9 @@ impl BlobTraceUpdates { pub fn concat( mut updates: Vec, key_schema: &K::Schema, + key_datatype: &arrow::datatypes::DataType, val_schema: &V::Schema, + val_datatype: &arrow::datatypes::DataType, metrics: &ColumnarMetrics, ) -> anyhow::Result { match updates.len() { @@ -300,7 +306,12 @@ impl BlobTraceUpdates { let mut keys = Vec::with_capacity(records.len()); let mut vals = Vec::with_capacity(records.len()); for updates in &mut updates { - let structured = updates.get_or_make_structured::(key_schema, val_schema); + let structured = updates.get_or_make_structured::( + key_schema, + key_datatype, + val_schema, + val_datatype, + ); keys.push(structured.key.as_ref()); vals.push(structured.val.as_ref()); } diff --git a/src/txn-wal/src/txns.rs b/src/txn-wal/src/txns.rs index db2ab0c52f58b..43f6a12bbefc9 100644 --- a/src/txn-wal/src/txns.rs +++ b/src/txn-wal/src/txns.rs @@ -880,12 +880,7 @@ where .expect("id must have been registered to create this batch"); let new_data_write = self .client - .open_writer( - self.shard_id(), - Arc::new(key_schema), - Arc::new(val_schema), - diagnostics, - ) + .open_writer(self.shard_id(), key_schema, val_schema, diagnostics) .await .expect("codecs shouldn't change"); tracing::info!(