Skip to content

Commit 4e25c50

Browse files
authored
Merge pull request #34093 from bkirwi/metadata-map
[persist] Add a safely serializable metadata map to some Persist structs
2 parents a31bf15 + f9b426e commit 4e25c50

File tree

7 files changed

+161
-6
lines changed

7 files changed

+161
-6
lines changed

src/persist-client/build.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,11 @@ fn main() {
5353
.btree_map(["."])
5454
.bytes([
5555
".mz_persist_client.internal.diff.ProtoStateFieldDiffs",
56+
".mz_persist_client.internal.service.ProtoPushDiff",
57+
".mz_persist_client.internal.state.ProtoEncodedSchemas",
5658
".mz_persist_client.internal.state.ProtoHollowBatchPart",
59+
".mz_persist_client.internal.state.ProtoRunMeta",
5760
".mz_persist_client.internal.state.ProtoVersionedData",
58-
".mz_persist_client.internal.state.ProtoEncodedSchemas",
59-
".mz_persist_client.internal.service.ProtoPushDiff",
6061
]);
6162

6263
// Setting `emit_rerun_if_changed(false)` below causes tonic to entirely

src/persist-client/src/batch.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ use crate::async_runtime::IsolatedRuntime;
5151
use crate::cfg::{BATCH_BUILDER_MAX_OUTSTANDING_PARTS, MiB};
5252
use crate::error::InvalidUsage;
5353
use crate::internal::compact::{CompactConfig, Compactor};
54-
use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
54+
use crate::internal::encoding::{
55+
LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, Schemas,
56+
};
5557
use crate::internal::machine::retry_external;
5658
use crate::internal::merge::{MergeTree, Pending};
5759
use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics};
@@ -715,6 +717,7 @@ where
715717
} else {
716718
None
717719
},
720+
meta: MetadataMap::default(),
718721
});
719722
run_parts.extend(parts);
720723
}
@@ -860,6 +863,7 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
860863
} else {
861864
None
862865
},
866+
meta: MetadataMap::default(),
863867
},
864868
completed_run.parts,
865869
)
@@ -1279,8 +1283,10 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
12791283
stats
12801284
});
12811285

1286+
let meta = MetadataMap::default();
12821287
BatchPart::Hollow(HollowBatchPart {
12831288
key: partial_key,
1289+
meta,
12841290
encoded_size_bytes: payload_len,
12851291
key_lower,
12861292
structured_key_lower,

src/persist-client/src/internal/datadriven.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ impl<'a> DirectiveArgs<'a> {
104104
.map(|x| {
105105
RunPart::Single(BatchPart::Hollow(HollowBatchPart {
106106
key: PartialBatchKey((*x).to_owned()),
107+
meta: Default::default(),
107108
encoded_size_bytes: 0,
108109
key_lower: vec![],
109110
structured_key_lower: None,

src/persist-client/src/internal/encoding.rs

Lines changed: 129 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use bytes::{Buf, Bytes};
1919
use differential_dataflow::lattice::Lattice;
2020
use differential_dataflow::trace::Description;
2121
use mz_ore::cast::CastInto;
22-
use mz_ore::{assert_none, halt};
22+
use mz_ore::{assert_none, halt, soft_panic_or_log};
2323
use mz_persist::indexed::encoding::{BatchColumnarFormat, BlobTraceBatchPart, BlobTraceUpdates};
2424
use mz_persist::location::{SeqNo, VersionedData};
2525
use mz_persist::metrics::ColumnarMetrics;
@@ -32,7 +32,7 @@ use proptest::strategy::Strategy;
3232
use prost::Message;
3333
use semver::Version;
3434
use serde::ser::SerializeStruct;
35-
use serde::{Deserialize, Serialize};
35+
use serde::{Deserialize, Serialize, Serializer};
3636
use timely::progress::{Antichain, Timestamp};
3737
use uuid::Uuid;
3838

@@ -192,6 +192,105 @@ impl<T: Message + Default> RustType<Bytes> for LazyProto<T> {
192192
}
193193
}
194194

195+
/// Our Proto implementation, Prost, cannot handle unrecognized fields. This means that unexpected
196+
/// data will be dropped at deserialization time, which means that we can't reliably roundtrip data
197+
/// from future versions of the code, which causes trouble during upgrades and at other times.
198+
///
199+
/// This type works around the issue by defining an unstructured metadata map. Keys are expected to
200+
/// be well-known strings defined in the code; values are bytes, expected to be encoded protobuf.
201+
/// (The association between the two is lightly enforced with the affiliated [MetadataKey] type.)
202+
/// It's safe to add new metadata keys in new versions, since even unrecognized keys can be losslessly
203+
/// roundtripped. However, if the metadata is not safe for the old version to ignore -- perhaps it
204+
/// needs to be kept in sync with some other part of the struct -- you will need to use a more
205+
/// heavyweight migration for it.
206+
#[derive(Debug, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
207+
pub(crate) struct MetadataMap(BTreeMap<String, Bytes>);
208+
209+
/// Associating a field name and an associated Proto message type, for lookup in a metadata map.
210+
///
211+
/// It is an error to reuse key names, or to change the type associated with a particular name.
212+
/// It is polite to choose short names, since they get serialized alongside every struct.
213+
#[allow(unused)]
214+
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
215+
pub(crate) struct MetadataKey<V, P = V> {
216+
name: &'static str,
217+
type_: PhantomData<(V, P)>,
218+
}
219+
220+
impl<V, P> MetadataKey<V, P> {
221+
#[allow(unused)]
222+
pub(crate) const fn new(name: &'static str) -> Self {
223+
MetadataKey {
224+
name,
225+
type_: PhantomData,
226+
}
227+
}
228+
}
229+
230+
impl serde::Serialize for MetadataMap {
231+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
232+
where
233+
S: Serializer,
234+
{
235+
serializer.collect_map(self.0.iter())
236+
}
237+
}
238+
239+
impl MetadataMap {
240+
/// Returns true iff no metadata keys have been set.
241+
pub fn is_empty(&self) -> bool {
242+
self.0.is_empty()
243+
}
244+
245+
/// Serialize and insert a new key into the map, replacing any existing value for the key.
246+
#[allow(unused)]
247+
pub fn set<V: RustType<P>, P: prost::Message>(&mut self, key: MetadataKey<V, P>, value: V) {
248+
self.0.insert(
249+
String::from(key.name),
250+
Bytes::from(value.into_proto_owned().encode_to_vec()),
251+
);
252+
}
253+
254+
/// Deserialize a key from the map, if it is present.
255+
#[allow(unused)]
256+
pub fn get<V: RustType<P>, P: prost::Message + Default>(
257+
&self,
258+
key: MetadataKey<V, P>,
259+
) -> Option<V> {
260+
let proto = match P::decode(self.0.get(key.name)?.as_ref()) {
261+
Ok(decoded) => decoded,
262+
Err(err) => {
263+
// This should be impossible unless one of the MetadataKey invariants are broken.
264+
soft_panic_or_log!(
265+
"error when decoding {key}; was it redefined? {err}",
266+
key = key.name
267+
);
268+
return None;
269+
}
270+
};
271+
272+
match proto.into_rust() {
273+
Ok(proto) => Some(proto),
274+
Err(err) => {
275+
// This should be impossible unless one of the MetadataKey invariants are broken.
276+
soft_panic_or_log!(
277+
"error when decoding {key}; was it redefined? {err}",
278+
key = key.name
279+
);
280+
None
281+
}
282+
}
283+
}
284+
}
285+
impl RustType<BTreeMap<String, Bytes>> for MetadataMap {
286+
fn into_proto(&self) -> BTreeMap<String, Bytes> {
287+
self.0.clone()
288+
}
289+
fn from_proto(proto: BTreeMap<String, Bytes>) -> Result<Self, TryFromProtoError> {
290+
Ok(MetadataMap(proto))
291+
}
292+
}
293+
195294
pub(crate) fn parse_id(id_prefix: &str, id_type: &str, encoded: &str) -> Result<[u8; 16], String> {
196295
let uuid_encoded = match encoded.strip_prefix(id_prefix) {
197296
Some(x) => x,
@@ -1362,6 +1461,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatch> for HollowBatch<T> {
13621461
parts.extend(proto.deprecated_keys.into_iter().map(|key| {
13631462
RunPart::Single(BatchPart::Hollow(HollowBatchPart {
13641463
key: PartialBatchKey(key),
1464+
meta: Default::default(),
13651465
encoded_size_bytes: 0,
13661466
key_lower: vec![],
13671467
structured_key_lower: None,
@@ -1418,6 +1518,7 @@ impl RustType<ProtoRunMeta> for RunMeta {
14181518
deprecated_schema_id: self.deprecated_schema.into_proto(),
14191519
id: self.id.into_proto(),
14201520
len: self.len.into_proto(),
1521+
meta: self.meta.into_proto(),
14211522
}
14221523
}
14231524

@@ -1434,6 +1535,7 @@ impl RustType<ProtoRunMeta> for RunMeta {
14341535
deprecated_schema: proto.deprecated_schema_id.into_rust()?,
14351536
id: proto.id.into_rust()?,
14361537
len: proto.len.into_rust()?,
1538+
meta: proto.meta.into_rust()?,
14371539
})
14381540
}
14391541
}
@@ -1472,6 +1574,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for HollowRunRef<T>
14721574
schema_id: None,
14731575
structured_key_lower: self.structured_key_lower.into_proto(),
14741576
deprecated_schema_id: None,
1577+
metadata: BTreeMap::default(),
14751578
};
14761579
part
14771580
}
@@ -1509,6 +1612,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
15091612
format: x.format.map(|f| f.into_proto()),
15101613
schema_id: x.schema_id.into_proto(),
15111614
deprecated_schema_id: x.deprecated_schema_id.into_proto(),
1615+
metadata: BTreeMap::default(),
15121616
},
15131617
BatchPart::Inline {
15141618
updates,
@@ -1526,6 +1630,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
15261630
format: None,
15271631
schema_id: schema_id.into_proto(),
15281632
deprecated_schema_id: deprecated_schema_id.into_proto(),
1633+
metadata: BTreeMap::default(),
15291634
},
15301635
}
15311636
}
@@ -1541,6 +1646,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
15411646
Some(proto_hollow_batch_part::Kind::Key(key)) => {
15421647
Ok(BatchPart::Hollow(HollowBatchPart {
15431648
key: key.into_rust()?,
1649+
meta: proto.metadata.into_rust()?,
15441650
encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
15451651
key_lower: proto.key_lower.into(),
15461652
structured_key_lower: proto.structured_key_lower.into_rust()?,
@@ -1839,6 +1945,25 @@ mod tests {
18391945

18401946
use super::*;
18411947

1948+
#[mz_ore::test]
1949+
fn metadata_map() {
1950+
const COUNT: MetadataKey<u64> = MetadataKey::new("count");
1951+
1952+
let mut map = MetadataMap::default();
1953+
map.set(COUNT, 100);
1954+
let mut map = MetadataMap::from_proto(map.into_proto()).unwrap();
1955+
assert_eq!(map.get(COUNT), Some(100));
1956+
1957+
const ANTICHAIN: MetadataKey<Antichain<u64>, ProtoU64Antichain> =
1958+
MetadataKey::new("antichain");
1959+
assert_none!(map.get(ANTICHAIN));
1960+
1961+
map.set(ANTICHAIN, Antichain::from_elem(30));
1962+
let map = MetadataMap::from_proto(map.into_proto()).unwrap();
1963+
assert_eq!(map.get(COUNT), Some(100));
1964+
assert_eq!(map.get(ANTICHAIN), Some(Antichain::from_elem(30)));
1965+
}
1966+
18421967
#[mz_ore::test]
18431968
fn applier_version_state() {
18441969
let v1 = semver::Version::new(1, 0, 0);
@@ -1916,6 +2041,7 @@ mod tests {
19162041
),
19172042
vec![RunPart::Single(BatchPart::Hollow(HollowBatchPart {
19182043
key: PartialBatchKey("a".into()),
2044+
meta: Default::default(),
19192045
encoded_size_bytes: 5,
19202046
key_lower: vec![],
19212047
structured_key_lower: None,
@@ -1943,6 +2069,7 @@ mod tests {
19432069
.parts
19442070
.push(RunPart::Single(BatchPart::Hollow(HollowBatchPart {
19452071
key: PartialBatchKey("b".into()),
2072+
meta: Default::default(),
19462073
encoded_size_bytes: 0,
19472074
key_lower: vec![],
19482075
structured_key_lower: None,

src/persist-client/src/internal/state.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ message ProtoHollowBatchPart {
6060
}
6161
optional uint64 schema_id = 12;
6262

63+
map<string, bytes> metadata = 14;
64+
6365
optional bytes key_stats = 536870906;
6466

6567
reserved 536870907 to 536870911;
@@ -93,6 +95,8 @@ message ProtoRunMeta {
9395
optional uint64 deprecated_schema_id = 2;
9496
optional string id = 4;
9597
optional uint64 len = 5;
98+
99+
map<string, bytes> meta = 6;
96100
}
97101

98102
message ProtoHollowRun {

src/persist-client/src/internal/state.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@ use uuid::Uuid;
5656

5757
use crate::critical::CriticalReaderId;
5858
use crate::error::InvalidUsage;
59-
use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, parse_id};
59+
use crate::internal::encoding::{
60+
LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, parse_id,
61+
};
6062
use crate::internal::gc::GcReq;
6163
use crate::internal::machine::retry_external;
6264
use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
@@ -818,13 +820,20 @@ pub struct RunMeta {
818820

819821
/// The number of updates in this run, or `None` if the number is unknown.
820822
pub(crate) len: Option<usize>,
823+
824+
/// Additional unstructured metadata.
825+
#[serde(skip_serializing_if = "MetadataMap::is_empty")]
826+
pub(crate) meta: MetadataMap,
821827
}
822828

823829
/// A subset of a [HollowBatch] corresponding 1:1 to a blob.
824830
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
825831
pub struct HollowBatchPart<T> {
826832
/// Pointer usable to retrieve the updates.
827833
pub key: PartialBatchKey,
834+
/// Miscellaneous metadata.
835+
#[serde(skip_serializing_if = "MetadataMap::is_empty")]
836+
pub meta: MetadataMap,
828837
/// The encoded size of this part.
829838
pub encoded_size_bytes: usize,
830839
/// A lower bound on the keys in the part. (By default, this the minimum
@@ -1213,6 +1222,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
12131222
// are added.
12141223
let HollowBatchPart {
12151224
key: self_key,
1225+
meta: self_meta,
12161226
encoded_size_bytes: self_encoded_size_bytes,
12171227
key_lower: self_key_lower,
12181228
structured_key_lower: self_structured_key_lower,
@@ -1225,6 +1235,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
12251235
} = self;
12261236
let HollowBatchPart {
12271237
key: other_key,
1238+
meta: other_meta,
12281239
encoded_size_bytes: other_encoded_size_bytes,
12291240
key_lower: other_key_lower,
12301241
structured_key_lower: other_structured_key_lower,
@@ -1237,6 +1248,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
12371248
} = other;
12381249
(
12391250
self_key,
1251+
self_meta,
12401252
self_encoded_size_bytes,
12411253
self_key_lower,
12421254
self_structured_key_lower,
@@ -1249,6 +1261,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
12491261
)
12501262
.cmp(&(
12511263
other_key,
1264+
other_meta,
12521265
other_encoded_size_bytes,
12531266
other_key_lower,
12541267
other_structured_key_lower,
@@ -2998,6 +3011,7 @@ pub(crate) mod tests {
29983011
)| {
29993012
HollowBatchPart {
30003013
key,
3014+
meta: Default::default(),
30013015
encoded_size_bytes,
30023016
key_lower,
30033017
structured_key_lower: None,
@@ -3171,6 +3185,7 @@ pub(crate) mod tests {
31713185
.map(|x| {
31723186
RunPart::Single(BatchPart::Hollow(HollowBatchPart {
31733187
key: PartialBatchKey((*x).to_owned()),
3188+
meta: Default::default(),
31743189
encoded_size_bytes: 0,
31753190
key_lower: vec![],
31763191
structured_key_lower: None,

src/persist-client/src/iter.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1273,6 +1273,7 @@ mod tests {
12731273
key: PartialBatchKey(
12741274
"n0000000/p00000000-0000-0000-0000-000000000000".into(),
12751275
),
1276+
meta: Default::default(),
12761277
encoded_size_bytes,
12771278
key_lower: vec![],
12781279
structured_key_lower: None,

0 commit comments

Comments
 (0)