Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/persist-client/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ fn main() {
.btree_map(["."])
.bytes([
".mz_persist_client.internal.diff.ProtoStateFieldDiffs",
".mz_persist_client.internal.service.ProtoPushDiff",
".mz_persist_client.internal.state.ProtoEncodedSchemas",
".mz_persist_client.internal.state.ProtoHollowBatchPart",
".mz_persist_client.internal.state.ProtoRunMeta",
".mz_persist_client.internal.state.ProtoVersionedData",
".mz_persist_client.internal.state.ProtoEncodedSchemas",
".mz_persist_client.internal.service.ProtoPushDiff",
]);

// Setting `emit_rerun_if_changed(false)` below causes tonic to entirely
Expand Down
8 changes: 7 additions & 1 deletion src/persist-client/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ use crate::async_runtime::IsolatedRuntime;
use crate::cfg::{BATCH_BUILDER_MAX_OUTSTANDING_PARTS, MiB};
use crate::error::InvalidUsage;
use crate::internal::compact::{CompactConfig, Compactor};
use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
use crate::internal::encoding::{
LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, Schemas,
};
use crate::internal::machine::retry_external;
use crate::internal::merge::{MergeTree, Pending};
use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics};
Expand Down Expand Up @@ -715,6 +717,7 @@ where
} else {
None
},
meta: MetadataMap::default(),
});
run_parts.extend(parts);
}
Expand Down Expand Up @@ -860,6 +863,7 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
} else {
None
},
meta: MetadataMap::default(),
},
completed_run.parts,
)
Expand Down Expand Up @@ -1279,8 +1283,10 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
stats
});

let meta = MetadataMap::default();
BatchPart::Hollow(HollowBatchPart {
key: partial_key,
meta,
encoded_size_bytes: payload_len,
key_lower,
structured_key_lower,
Expand Down
1 change: 1 addition & 0 deletions src/persist-client/src/internal/datadriven.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ impl<'a> DirectiveArgs<'a> {
.map(|x| {
RunPart::Single(BatchPart::Hollow(HollowBatchPart {
key: PartialBatchKey((*x).to_owned()),
meta: Default::default(),
encoded_size_bytes: 0,
key_lower: vec![],
structured_key_lower: None,
Expand Down
131 changes: 129 additions & 2 deletions src/persist-client/src/internal/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use bytes::{Buf, Bytes};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::trace::Description;
use mz_ore::cast::CastInto;
use mz_ore::{assert_none, halt};
use mz_ore::{assert_none, halt, soft_panic_or_log};
use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
use mz_persist::location::{SeqNo, VersionedData};
use mz_persist::metrics::ColumnarMetrics;
Expand All @@ -32,7 +32,7 @@ use proptest::strategy::Strategy;
use prost::Message;
use semver::Version;
use serde::ser::SerializeStruct;
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Serialize, Serializer};
use timely::progress::{Antichain, Timestamp};
use uuid::Uuid;

Expand Down Expand Up @@ -192,6 +192,105 @@ impl<T: Message + Default> RustType<Bytes> for LazyProto<T> {
}
}

/// Our Proto implementation, Prost, cannot handle unrecognized fields. This means that unexpected
/// data will be dropped at deserialization time, which means that we can't reliably roundtrip data
/// from future versions of the code, which causes trouble during upgrades and at other times.
///
/// This type works around the issue by defining an unstructured metadata map. Keys are expected to
/// be well-known strings defined in the code; values are bytes, expected to be encoded protobuf.
/// (The association between the two is lightly enforced with the affiliated [MetadataKey] type.)
/// It's safe to add new metadata keys in new versions, since even unrecognized keys can be losslessly
/// roundtripped. However, if the metadata is not safe for the old version to ignore -- perhaps it
/// needs to be kept in sync with some other part of the struct -- you will need to use a more
/// heavyweight migration for it.
#[derive(Debug, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub(crate) struct MetadataMap(BTreeMap<String, Bytes>);

/// Associating a field name and an associated Proto message type, for lookup in a metadata map.
///
/// It is an error to reuse key names, or to change the type associated with a particular name.
/// It is polite to choose short names, since they get serialized alongside every struct.
#[allow(unused)]
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub(crate) struct MetadataKey<V, P = V> {
name: &'static str,
type_: PhantomData<(V, P)>,
}

impl<V, P> MetadataKey<V, P> {
#[allow(unused)]
pub(crate) const fn new(name: &'static str) -> Self {
MetadataKey {
name,
type_: PhantomData,
}
}
}

impl serde::Serialize for MetadataMap {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.collect_map(self.0.iter())
}
}

impl MetadataMap {
/// Returns true iff no metadata keys have been set.
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}

/// Serialize and insert a new key into the map, replacing any existing value for the key.
#[allow(unused)]
pub fn set<V: RustType<P>, P: prost::Message>(&mut self, key: MetadataKey<V, P>, value: V) {
self.0.insert(
String::from(key.name),
Bytes::from(value.into_proto_owned().encode_to_vec()),
);
}

/// Deserialize a key from the map, if it is present.
#[allow(unused)]
pub fn get<V: RustType<P>, P: prost::Message + Default>(
&self,
key: MetadataKey<V, P>,
) -> Option<V> {
let proto = match P::decode(self.0.get(key.name)?.as_ref()) {
Ok(decoded) => decoded,
Err(err) => {
// This should be impossible unless one of the MetadataKey invariants are broken.
soft_panic_or_log!(
"error when decoding {key}; was it redefined? {err}",
key = key.name
);
return None;
}
};

match proto.into_rust() {
Ok(proto) => Some(proto),
Err(err) => {
// This should be impossible unless one of the MetadataKey invariants are broken.
soft_panic_or_log!(
"error when decoding {key}; was it redefined? {err}",
key = key.name
);
None
}
}
}
}
impl RustType<BTreeMap<String, Bytes>> for MetadataMap {
fn into_proto(&self) -> BTreeMap<String, Bytes> {
self.0.clone()
}
fn from_proto(proto: BTreeMap<String, Bytes>) -> Result<Self, TryFromProtoError> {
Ok(MetadataMap(proto))
}
}

pub(crate) fn parse_id(id_prefix: &str, id_type: &str, encoded: &str) -> Result<[u8; 16], String> {
let uuid_encoded = match encoded.strip_prefix(id_prefix) {
Some(x) => x,
Expand Down Expand Up @@ -1362,6 +1461,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatch> for HollowBatch<T> {
parts.extend(proto.deprecated_keys.into_iter().map(|key| {
RunPart::Single(BatchPart::Hollow(HollowBatchPart {
key: PartialBatchKey(key),
meta: Default::default(),
encoded_size_bytes: 0,
key_lower: vec![],
structured_key_lower: None,
Expand Down Expand Up @@ -1418,6 +1518,7 @@ impl RustType<ProtoRunMeta> for RunMeta {
deprecated_schema_id: self.deprecated_schema.into_proto(),
id: self.id.into_proto(),
len: self.len.into_proto(),
meta: self.meta.into_proto(),
}
}

Expand All @@ -1434,6 +1535,7 @@ impl RustType<ProtoRunMeta> for RunMeta {
deprecated_schema: proto.deprecated_schema_id.into_rust()?,
id: proto.id.into_rust()?,
len: proto.len.into_rust()?,
meta: proto.meta.into_rust()?,
})
}
}
Expand Down Expand Up @@ -1472,6 +1574,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for HollowRunRef<T>
schema_id: None,
structured_key_lower: self.structured_key_lower.into_proto(),
deprecated_schema_id: None,
metadata: BTreeMap::default(),
};
part
}
Expand Down Expand Up @@ -1509,6 +1612,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
format: x.format.map(|f| f.into_proto()),
schema_id: x.schema_id.into_proto(),
deprecated_schema_id: x.deprecated_schema_id.into_proto(),
metadata: BTreeMap::default(),
},
BatchPart::Inline {
updates,
Expand All @@ -1526,6 +1630,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
format: None,
schema_id: schema_id.into_proto(),
deprecated_schema_id: deprecated_schema_id.into_proto(),
metadata: BTreeMap::default(),
},
}
}
Expand All @@ -1541,6 +1646,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
Some(proto_hollow_batch_part::Kind::Key(key)) => {
Ok(BatchPart::Hollow(HollowBatchPart {
key: key.into_rust()?,
meta: proto.metadata.into_rust()?,
encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
key_lower: proto.key_lower.into(),
structured_key_lower: proto.structured_key_lower.into_rust()?,
Expand Down Expand Up @@ -1839,6 +1945,25 @@ mod tests {

use super::*;

#[mz_ore::test]
fn metadata_map() {
const COUNT: MetadataKey<u64> = MetadataKey::new("count");

let mut map = MetadataMap::default();
map.set(COUNT, 100);
let mut map = MetadataMap::from_proto(map.into_proto()).unwrap();
assert_eq!(map.get(COUNT), Some(100));

const ANTICHAIN: MetadataKey<Antichain<u64>, ProtoU64Antichain> =
MetadataKey::new("antichain");
assert_none!(map.get(ANTICHAIN));

map.set(ANTICHAIN, Antichain::from_elem(30));
let map = MetadataMap::from_proto(map.into_proto()).unwrap();
assert_eq!(map.get(COUNT), Some(100));
assert_eq!(map.get(ANTICHAIN), Some(Antichain::from_elem(30)));
}

#[mz_ore::test]
fn applier_version_state() {
let v1 = semver::Version::new(1, 0, 0);
Expand Down Expand Up @@ -1916,6 +2041,7 @@ mod tests {
),
vec![RunPart::Single(BatchPart::Hollow(HollowBatchPart {
key: PartialBatchKey("a".into()),
meta: Default::default(),
encoded_size_bytes: 5,
key_lower: vec![],
structured_key_lower: None,
Expand Down Expand Up @@ -1943,6 +2069,7 @@ mod tests {
.parts
.push(RunPart::Single(BatchPart::Hollow(HollowBatchPart {
key: PartialBatchKey("b".into()),
meta: Default::default(),
encoded_size_bytes: 0,
key_lower: vec![],
structured_key_lower: None,
Expand Down
4 changes: 4 additions & 0 deletions src/persist-client/src/internal/state.proto
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ message ProtoHollowBatchPart {
}
optional uint64 schema_id = 12;

map<string, bytes> metadata = 14;

optional bytes key_stats = 536870906;

reserved 536870907 to 536870911;
Expand Down Expand Up @@ -93,6 +95,8 @@ message ProtoRunMeta {
optional uint64 deprecated_schema_id = 2;
optional string id = 4;
optional uint64 len = 5;

map<string, bytes> meta = 6;
}

message ProtoHollowRun {
Expand Down
17 changes: 16 additions & 1 deletion src/persist-client/src/internal/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ use uuid::Uuid;

use crate::critical::CriticalReaderId;
use crate::error::InvalidUsage;
use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, parse_id};
use crate::internal::encoding::{
LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, parse_id,
};
use crate::internal::gc::GcReq;
use crate::internal::machine::retry_external;
use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
Expand Down Expand Up @@ -818,13 +820,20 @@ pub struct RunMeta {

/// The number of updates in this run, or `None` if the number is unknown.
pub(crate) len: Option<usize>,

/// Additional unstructured metadata.
#[serde(skip_serializing_if = "MetadataMap::is_empty")]
pub(crate) meta: MetadataMap,
}

/// A subset of a [HollowBatch] corresponding 1:1 to a blob.
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
pub struct HollowBatchPart<T> {
/// Pointer usable to retrieve the updates.
pub key: PartialBatchKey,
/// Miscellaneous metadata.
#[serde(skip_serializing_if = "MetadataMap::is_empty")]
pub meta: MetadataMap,
/// The encoded size of this part.
pub encoded_size_bytes: usize,
/// A lower bound on the keys in the part. (By default, this the minimum
Expand Down Expand Up @@ -1213,6 +1222,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
// are added.
let HollowBatchPart {
key: self_key,
meta: self_meta,
encoded_size_bytes: self_encoded_size_bytes,
key_lower: self_key_lower,
structured_key_lower: self_structured_key_lower,
Expand All @@ -1225,6 +1235,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
} = self;
let HollowBatchPart {
key: other_key,
meta: other_meta,
encoded_size_bytes: other_encoded_size_bytes,
key_lower: other_key_lower,
structured_key_lower: other_structured_key_lower,
Expand All @@ -1237,6 +1248,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
} = other;
(
self_key,
self_meta,
self_encoded_size_bytes,
self_key_lower,
self_structured_key_lower,
Expand All @@ -1249,6 +1261,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
)
.cmp(&(
other_key,
other_meta,
other_encoded_size_bytes,
other_key_lower,
other_structured_key_lower,
Expand Down Expand Up @@ -2998,6 +3011,7 @@ pub(crate) mod tests {
)| {
HollowBatchPart {
key,
meta: Default::default(),
encoded_size_bytes,
key_lower,
structured_key_lower: None,
Expand Down Expand Up @@ -3171,6 +3185,7 @@ pub(crate) mod tests {
.map(|x| {
RunPart::Single(BatchPart::Hollow(HollowBatchPart {
key: PartialBatchKey((*x).to_owned()),
meta: Default::default(),
encoded_size_bytes: 0,
key_lower: vec![],
structured_key_lower: None,
Expand Down
1 change: 1 addition & 0 deletions src/persist-client/src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1273,6 +1273,7 @@ mod tests {
key: PartialBatchKey(
"n0000000/p00000000-0000-0000-0000-000000000000".into(),
),
meta: Default::default(),
encoded_size_bytes,
key_lower: vec![],
structured_key_lower: None,
Expand Down