Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

persist: Plumb relevant arrow::DataTypes to compaction #30627

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/persist-client/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,9 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
.iter()
.map(|(meta, run)| (&compact_desc, meta, run.as_slice()))
.collect();
// We just wrote these runs so use the current `FullSchemas` instead of
// fetching from the schema registry.
let compaction_schemas = write_schemas.to_current_full_schemas();

let output_batch = Compactor::<K, V, T, D>::compact_runs(
&cfg,
Expand All @@ -1052,7 +1055,7 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
metrics,
shard_metrics,
isolated_runtime,
write_schemas,
compaction_schemas,
)
.await
.expect("successful compaction");
Expand Down Expand Up @@ -1179,7 +1182,9 @@ impl<T: Timestamp + Codec64> BatchParts<T> {

let (name, write_future) = if updates.goodbytes() < inline_threshold {
let metrics = Arc::clone(&self.metrics);
let write_schemas = write_schemas.clone();
// We just wrote this data so it's safe to use the current mapping
// of `Codec::Schema` to `arrow::datatype::DataType`.
let write_schemas = write_schemas.to_current_full_schemas();

let span = debug_span!("batch::inline_part", shard = %self.shard_id).or_current();
(
Expand All @@ -1193,7 +1198,9 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
.measure_part_build(|| {
updates.get_or_make_structured::<K, V>(
write_schemas.key.as_ref(),
&write_schemas.key_dt,
write_schemas.val.as_ref(),
&write_schemas.val_dt,
)
})
.clone();
Expand Down
2 changes: 1 addition & 1 deletion src/persist-client/src/cli/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ where
Arc::clone(&machine.applier.shard_metrics),
Arc::new(IsolatedRuntime::default()),
req,
schemas,
schemas.to_current_full_schemas(),
)
.await?;
metrics.compaction.admin_count.inc();
Expand Down
26 changes: 24 additions & 2 deletions src/persist-client/src/internal/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,21 @@ use std::ops::ControlFlow::{self, Break, Continue};
use std::sync::Arc;
use std::time::Instant;

use bytes::Bytes;
use differential_dataflow::difference::Semigroup;
use differential_dataflow::lattice::Lattice;
use mz_ore::cast::CastFrom;
use mz_persist::location::{CaSResult, Indeterminate, SeqNo, VersionedData};
use mz_persist_types::schema::SchemaId;
use mz_persist_types::{Codec, Codec64};
use mz_proto::ProtoType;
use prost::Message;
use timely::progress::{Antichain, Timestamp};
use tracing::debug;

use crate::cache::{LockingTypedState, StateCache};
use crate::error::{CodecMismatch, InvalidUsage};
use crate::internal::encoding::FullSchemas;
use crate::internal::gc::GcReq;
use crate::internal::maintenance::RoutineMaintenance;
use crate::internal::metrics::{CmdMetrics, Metrics, ShardMetrics};
Expand Down Expand Up @@ -221,11 +225,29 @@ where
}

/// See [crate::PersistClient::get_schema].
pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
pub fn get_schema(&self, schema_id: SchemaId) -> Option<FullSchemas<K, V>> {
self.state
.read_lock(&self.metrics.locks.applier_read_cacheable, |state| {
let x = state.collections.schemas.get(&schema_id)?;
Some((K::decode_schema(&x.key), V::decode_schema(&x.val)))
let key_dt =
mz_persist_types::arrow::ProtoDataType::decode(Bytes::clone(&x.key_data_type))
.expect("key ProtoArrayData to roundtrip")
.into_rust()
.expect("key ProtoArrayData to rust");
let val_dt =
mz_persist_types::arrow::ProtoDataType::decode(Bytes::clone(&x.val_data_type))
.expect("key ProtoArrayData to roundtrip")
.into_rust()
.expect("key ProtoArrayData to rust");

let schemas = FullSchemas {
id: Some(schema_id),
key: Arc::new(K::decode_schema(&x.key)),
key_dt,
val: Arc::new(V::decode_schema(&x.val)),
val_dt,
};
Some(schemas)
})
}

Expand Down
48 changes: 25 additions & 23 deletions src/persist-client/src/internal/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::async_runtime::IsolatedRuntime;
use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, BatchParts, PartDeletes};
use crate::cfg::MiB;
use crate::fetch::FetchBatchFilter;
use crate::internal::encoding::Schemas;
use crate::internal::encoding::{FullSchemas, Schemas};
use crate::internal::gc::GarbageCollector;
use crate::internal::machine::Machine;
use crate::internal::maintenance::RoutineMaintenance;
Expand Down Expand Up @@ -307,29 +307,23 @@ where
// It's an invariant that SchemaIds are ordered.
.max();
let maybe_compaction_schema = match compaction_schema_id {
Some(id) => machine
.get_schema(id)
.map(|(key_schema, val_schema)| (id, key_schema, val_schema)),
Some(id) => machine.get_schema(id),
None => None,
};
let use_most_recent_schema = COMPACTION_USE_MOST_RECENT_SCHEMA.get(&machine.applier.cfg);

let compaction_schema = match maybe_compaction_schema {
Some((id, key_schema, val_schema)) if use_most_recent_schema => {
Some(compaction_schemas) if use_most_recent_schema => {
metrics.compaction.schema_selection.recent_schema.inc();
Schemas {
id: Some(id),
key: Arc::new(key_schema),
val: Arc::new(val_schema),
}
compaction_schemas
}
Some(_) => {
metrics.compaction.schema_selection.disabled.inc();
write_schemas
write_schemas.to_current_full_schemas()
}
None => {
metrics.compaction.schema_selection.no_schema.inc();
write_schemas
write_schemas.to_current_full_schemas()
}
};

Expand Down Expand Up @@ -464,7 +458,7 @@ where
shard_metrics: Arc<ShardMetrics>,
isolated_runtime: Arc<IsolatedRuntime>,
req: CompactReq<T>,
write_schemas: Schemas<K, V>,
schemas: FullSchemas<K, V>,
) -> Result<CompactRes<T>, anyhow::Error> {
let () = Self::validate_req(&req)?;

Expand Down Expand Up @@ -531,7 +525,7 @@ where
Arc::clone(&metrics),
Arc::clone(&shard_metrics),
Arc::clone(&isolated_runtime),
write_schemas.clone(),
schemas.clone(),
)
.await?;
let (parts, run_splits, run_meta, updates) =
Expand Down Expand Up @@ -720,7 +714,7 @@ where
metrics: Arc<Metrics>,
shard_metrics: Arc<ShardMetrics>,
isolated_runtime: Arc<IsolatedRuntime>,
write_schemas: Schemas<K, V>,
compaction_schemas: FullSchemas<K, V>,
) -> Result<HollowBatch<T>, anyhow::Error> {
// TODO: Figure out a more principled way to allocate our memory budget.
// Currently, we give any excess budget to write parallelism. If we had
Expand Down Expand Up @@ -756,7 +750,7 @@ where
cfg.batch.clone(),
parts,
Arc::clone(&metrics),
write_schemas.clone(),
compaction_schemas.to_schemas(),
desc.lower().clone(),
Arc::clone(&blob),
shard_id.clone(),
Expand All @@ -778,7 +772,7 @@ where
desc.upper().elements()
),
*shard_id,
StructuredSort::<K, V, T, D>::new(write_schemas.clone()),
StructuredSort::<K, V, T, D>::new(compaction_schemas.clone()),
blob,
Arc::clone(&metrics),
shard_metrics,
Expand Down Expand Up @@ -829,8 +823,10 @@ where
// In the hopefully-common case of a single chunk, this will not copy.
let updates = BlobTraceUpdates::concat::<K, V>(
chunks,
write_schemas.key.as_ref(),
write_schemas.val.as_ref(),
compaction_schemas.key.as_ref(),
&compaction_schemas.key_dt,
compaction_schemas.val.as_ref(),
&compaction_schemas.val_dt,
&metrics.columnar,
)?;
batch.flush_many(updates).await?;
Expand Down Expand Up @@ -878,7 +874,13 @@ where
key_vec.extend_from_slice(k);
val_vec.clear();
val_vec.extend_from_slice(v);
crate::batch::validate_schema(&write_schemas, &key_vec, &val_vec, None, None);
crate::batch::validate_schema(
&compaction_schemas.to_schemas(),
&key_vec,
&val_vec,
None,
None,
);
batch.add(&key_vec, &val_vec, &t, &d).await?;
}
tokio::task::yield_now().await;
Expand All @@ -899,7 +901,7 @@ where
&cfg.batch,
&metrics.compaction.batch,
&isolated_runtime,
&write_schemas,
&compaction_schemas.to_schemas(),
)
.await;
}
Expand Down Expand Up @@ -1028,7 +1030,7 @@ mod tests {
write.metrics.shards.shard(&write.machine.shard_id(), ""),
Arc::new(IsolatedRuntime::default()),
req.clone(),
schemas.clone(),
schemas.to_current_full_schemas(),
)
.await
.expect("compaction failed");
Expand Down Expand Up @@ -1105,7 +1107,7 @@ mod tests {
write.metrics.shards.shard(&write.machine.shard_id(), ""),
Arc::new(IsolatedRuntime::default()),
req.clone(),
schemas.clone(),
schemas.to_current_full_schemas(),
)
.await
.expect("compaction failed");
Expand Down
56 changes: 56 additions & 0 deletions src/persist-client/src/internal/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use mz_persist::indexed::columnar::ColumnarRecords;
use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
use mz_persist::location::{SeqNo, VersionedData};
use mz_persist::metrics::ColumnarMetrics;
use mz_persist_types::columnar::data_type;
use mz_persist_types::schema::SchemaId;
use mz_persist_types::stats::{PartStats, ProtoStructStats};
use mz_persist_types::{Codec, Codec64};
Expand Down Expand Up @@ -70,6 +71,20 @@ pub struct Schemas<K: Codec, V: Codec> {
pub val: Arc<V::Schema>,
}

impl<K: Codec, V: Codec> Schemas<K, V> {
/// Returns a [`FullSchemas`] using [`Codec`] to [`arrow::datatypes::DataType`]
/// mapping for the current version of Materialize.
pub fn to_current_full_schemas(&self) -> FullSchemas<K, V> {
FullSchemas {
id: self.id.clone(),
key: Arc::clone(&self.key),
key_dt: data_type::<K>(self.key.as_ref()).expect("valid key schema"),
val: Arc::clone(&self.val),
val_dt: data_type::<V>(self.val.as_ref()).expect("valid val schema"),
}
}
}

impl<K: Codec, V: Codec> Clone for Schemas<K, V> {
fn clone(&self) -> Self {
Self {
Expand All @@ -80,6 +95,47 @@ impl<K: Codec, V: Codec> Clone for Schemas<K, V> {
}
}

/// Set of schemas including the corresponding [Arrow `DataType`].
///
/// Some processes, e.g. compaction, operate mainly on the Arrow `DataType` as
/// opposed to a generic [`Codec`] type. For these processes we thread through
/// the full schema data that's encoded in the registry in case the mapping of
/// [`Codec::Schema`] to [Arrow `DataType`] changes between two releases.
///
/// [Arrow `DataType`]: arrow::datatypes::DataType
#[derive(Debug)]
pub struct FullSchemas<K: Codec, V: Codec> {
// TODO: Remove the Option once this finishes rolling out and all shards
// have a registered schema.
pub id: Option<SchemaId>,
pub key: Arc<K::Schema>,
pub key_dt: arrow::datatypes::DataType,
pub val: Arc<V::Schema>,
pub val_dt: arrow::datatypes::DataType,
}

impl<K: Codec, V: Codec> FullSchemas<K, V> {
pub fn to_schemas(&self) -> Schemas<K, V> {
Schemas {
id: self.id.clone(),
key: Arc::clone(&self.key),
val: Arc::clone(&self.val),
}
}
}

impl<K: Codec, V: Codec> Clone for FullSchemas<K, V> {
fn clone(&self) -> Self {
Self {
id: self.id,
key: Arc::clone(&self.key),
key_dt: self.key_dt.clone(),
val: Arc::clone(&self.val),
val_dt: self.val_dt.clone(),
}
}
}

/// A proto that is decoded on use.
///
/// Because of the way prost works, decoding a large protobuf may result in a
Expand Down
5 changes: 3 additions & 2 deletions src/persist-client/src/internal/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::critical::CriticalReaderId;
use crate::error::{CodecMismatch, InvalidUsage};
use crate::internal::apply::Applier;
use crate::internal::compact::CompactReq;
use crate::internal::encoding::FullSchemas;
use crate::internal::gc::GarbageCollector;
use crate::internal::maintenance::{RoutineMaintenance, WriterMaintenance};
use crate::internal::metrics::{CmdMetrics, Metrics, MetricsRetryStream, RetryMetrics};
Expand Down Expand Up @@ -759,7 +760,7 @@ where
}

/// See [crate::PersistClient::get_schema].
pub fn get_schema(&self, schema_id: SchemaId) -> Option<(K::Schema, V::Schema)> {
pub fn get_schema(&self, schema_id: SchemaId) -> Option<FullSchemas<K, V>> {
self.applier.get_schema(schema_id)
}

Expand Down Expand Up @@ -2005,7 +2006,7 @@ pub mod datadriven {
Arc::clone(&datadriven.machine.applier.shard_metrics),
Arc::clone(&datadriven.client.isolated_runtime),
req,
schemas,
schemas.to_current_full_schemas(),
)
.await?;

Expand Down
14 changes: 9 additions & 5 deletions src/persist-client/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use timely::progress::Timestamp;
use tracing::{debug_span, Instrument};

use crate::fetch::{EncodedPart, FetchBatchFilter};
use crate::internal::encoding::Schemas;
use crate::internal::encoding::FullSchemas;
use crate::internal::metrics::{ReadMetrics, ShardMetrics};
use crate::internal::paths::WriterKey;
use crate::internal::state::{HollowRun, RunMeta, RunOrder, RunPart};
Expand Down Expand Up @@ -213,13 +213,13 @@ pub struct StructuredUpdates {
/// Sort parts ordered by the codec-encoded key and value columns.
#[derive(Debug)]
pub struct StructuredSort<K: Codec, V: Codec, T, D> {
schemas: Schemas<K, V>,
schemas: FullSchemas<K, V>,
_time_diff: PhantomData<fn(T, D)>,
}

impl<K: Codec, V: Codec, T, D> StructuredSort<K, V, T, D> {
/// A sort for structured data with the given schema.
pub fn new(schemas: Schemas<K, V>) -> Self {
pub fn new(schemas: FullSchemas<K, V>) -> Self {
Self {
schemas,
_time_diff: Default::default(),
Expand All @@ -241,8 +241,12 @@ impl<K: Codec, V: Codec, T: Codec64, D: Codec64> RowSort<T, D> for StructuredSor
}

fn updates_from_blob(&self, mut updates: BlobTraceUpdates) -> Self::Updates {
let structured = updates
.get_or_make_structured::<K, V>(self.schemas.key.as_ref(), self.schemas.val.as_ref());
let structured = updates.get_or_make_structured::<K, V>(
self.schemas.key.as_ref(),
&self.schemas.key_dt,
self.schemas.val.as_ref(),
&self.schemas.val_dt,
);
let key_ord = ArrayOrd::new(structured.key.as_ref());
let val_ord = ArrayOrd::new(structured.val.as_ref());
StructuredUpdates {
Expand Down
Loading