Skip to content

Commit 172f42f

Browse files
committed
Add a metadata map to the hollow batch part
1 parent 370d355 commit 172f42f

File tree

6 files changed

+28
-2
lines changed

6 files changed

+28
-2
lines changed

src/persist-client/src/batch.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ use crate::async_runtime::IsolatedRuntime;
5151
use crate::cfg::{BATCH_BUILDER_MAX_OUTSTANDING_PARTS, MiB};
5252
use crate::error::InvalidUsage;
5353
use crate::internal::compact::{CompactConfig, Compactor};
54-
use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
54+
use crate::internal::encoding::{
55+
LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, Schemas,
56+
};
5557
use crate::internal::machine::retry_external;
5658
use crate::internal::merge::{MergeTree, Pending};
5759
use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics};
@@ -1279,8 +1281,10 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
12791281
stats
12801282
});
12811283

1284+
let meta = MetadataMap::default();
12821285
BatchPart::Hollow(HollowBatchPart {
12831286
key: partial_key,
1287+
meta,
12841288
encoded_size_bytes: payload_len,
12851289
key_lower,
12861290
structured_key_lower,

src/persist-client/src/internal/datadriven.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ impl<'a> DirectiveArgs<'a> {
104104
.map(|x| {
105105
RunPart::Single(BatchPart::Hollow(HollowBatchPart {
106106
key: PartialBatchKey((*x).to_owned()),
107+
meta: Default::default(),
107108
encoded_size_bytes: 0,
108109
key_lower: vec![],
109110
structured_key_lower: None,

src/persist-client/src/internal/encoding.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1444,6 +1444,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatch> for HollowBatch<T> {
14441444
parts.extend(proto.deprecated_keys.into_iter().map(|key| {
14451445
RunPart::Single(BatchPart::Hollow(HollowBatchPart {
14461446
key: PartialBatchKey(key),
1447+
meta: Default::default(),
14471448
encoded_size_bytes: 0,
14481449
key_lower: vec![],
14491450
structured_key_lower: None,
@@ -1554,6 +1555,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for HollowRunRef<T>
15541555
schema_id: None,
15551556
structured_key_lower: self.structured_key_lower.into_proto(),
15561557
deprecated_schema_id: None,
1558+
metadata: BTreeMap::default(),
15571559
};
15581560
part
15591561
}
@@ -1591,6 +1593,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
15911593
format: x.format.map(|f| f.into_proto()),
15921594
schema_id: x.schema_id.into_proto(),
15931595
deprecated_schema_id: x.deprecated_schema_id.into_proto(),
1596+
metadata: BTreeMap::default(),
15941597
},
15951598
BatchPart::Inline {
15961599
updates,
@@ -1608,6 +1611,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
16081611
format: None,
16091612
schema_id: schema_id.into_proto(),
16101613
deprecated_schema_id: deprecated_schema_id.into_proto(),
1614+
metadata: BTreeMap::default(),
16111615
},
16121616
}
16131617
}
@@ -1623,6 +1627,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
16231627
Some(proto_hollow_batch_part::Kind::Key(key)) => {
16241628
Ok(BatchPart::Hollow(HollowBatchPart {
16251629
key: key.into_rust()?,
1630+
meta: proto.metadata.into_rust()?,
16261631
encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
16271632
key_lower: proto.key_lower.into(),
16281633
structured_key_lower: proto.structured_key_lower.into_rust()?,
@@ -2008,6 +2013,7 @@ mod tests {
20082013
),
20092014
vec![RunPart::Single(BatchPart::Hollow(HollowBatchPart {
20102015
key: PartialBatchKey("a".into()),
2016+
meta: Default::default(),
20112017
encoded_size_bytes: 5,
20122018
key_lower: vec![],
20132019
structured_key_lower: None,
@@ -2035,6 +2041,7 @@ mod tests {
20352041
.parts
20362042
.push(RunPart::Single(BatchPart::Hollow(HollowBatchPart {
20372043
key: PartialBatchKey("b".into()),
2044+
meta: Default::default(),
20382045
encoded_size_bytes: 0,
20392046
key_lower: vec![],
20402047
structured_key_lower: None,

src/persist-client/src/internal/state.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ message ProtoHollowBatchPart {
6060
}
6161
optional uint64 schema_id = 12;
6262

63+
map<string, bytes> metadata = 14;
64+
6365
optional bytes key_stats = 536870906;
6466

6567
reserved 536870907 to 536870911;

src/persist-client/src/internal/state.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@ use uuid::Uuid;
5656

5757
use crate::critical::CriticalReaderId;
5858
use crate::error::InvalidUsage;
59-
use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, parse_id};
59+
use crate::internal::encoding::{
60+
LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, parse_id,
61+
};
6062
use crate::internal::gc::GcReq;
6163
use crate::internal::machine::retry_external;
6264
use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
@@ -825,6 +827,9 @@ pub struct RunMeta {
825827
pub struct HollowBatchPart<T> {
826828
/// Pointer usable to retrieve the updates.
827829
pub key: PartialBatchKey,
830+
/// Miscellaneous metadata.
831+
#[serde(skip_serializing_if = "MetadataMap::is_empty")]
832+
pub meta: MetadataMap,
828833
/// The encoded size of this part.
829834
pub encoded_size_bytes: usize,
830835
/// A lower bound on the keys in the part. (By default, this the minimum
@@ -1213,6 +1218,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
12131218
// are added.
12141219
let HollowBatchPart {
12151220
key: self_key,
1221+
meta: self_meta,
12161222
encoded_size_bytes: self_encoded_size_bytes,
12171223
key_lower: self_key_lower,
12181224
structured_key_lower: self_structured_key_lower,
@@ -1225,6 +1231,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
12251231
} = self;
12261232
let HollowBatchPart {
12271233
key: other_key,
1234+
meta: other_meta,
12281235
encoded_size_bytes: other_encoded_size_bytes,
12291236
key_lower: other_key_lower,
12301237
structured_key_lower: other_structured_key_lower,
@@ -1237,6 +1244,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
12371244
} = other;
12381245
(
12391246
self_key,
1247+
self_meta,
12401248
self_encoded_size_bytes,
12411249
self_key_lower,
12421250
self_structured_key_lower,
@@ -1249,6 +1257,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
12491257
)
12501258
.cmp(&(
12511259
other_key,
1260+
other_meta,
12521261
other_encoded_size_bytes,
12531262
other_key_lower,
12541263
other_structured_key_lower,
@@ -2998,6 +3007,7 @@ pub(crate) mod tests {
29983007
)| {
29993008
HollowBatchPart {
30003009
key,
3010+
meta: Default::default(),
30013011
encoded_size_bytes,
30023012
key_lower,
30033013
structured_key_lower: None,
@@ -3171,6 +3181,7 @@ pub(crate) mod tests {
31713181
.map(|x| {
31723182
RunPart::Single(BatchPart::Hollow(HollowBatchPart {
31733183
key: PartialBatchKey((*x).to_owned()),
3184+
meta: Default::default(),
31743185
encoded_size_bytes: 0,
31753186
key_lower: vec![],
31763187
structured_key_lower: None,

src/persist-client/src/iter.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1273,6 +1273,7 @@ mod tests {
12731273
key: PartialBatchKey(
12741274
"n0000000/p00000000-0000-0000-0000-000000000000".into(),
12751275
),
1276+
meta: Default::default(),
12761277
encoded_size_bytes,
12771278
key_lower: vec![],
12781279
structured_key_lower: None,

0 commit comments

Comments
 (0)