Skip to content

Commit 6fd953a

Browse files
committed
Add a metadata map to the hollow batch part
1 parent c9cb669 commit 6fd953a

File tree

6 files changed

+28
-2
lines changed

6 files changed

+28
-2
lines changed

src/persist-client/src/batch.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ use crate::async_runtime::IsolatedRuntime;
5151
use crate::cfg::{BATCH_BUILDER_MAX_OUTSTANDING_PARTS, MiB};
5252
use crate::error::InvalidUsage;
5353
use crate::internal::compact::{CompactConfig, Compactor};
54-
use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
54+
use crate::internal::encoding::{
55+
LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, Schemas,
56+
};
5557
use crate::internal::machine::retry_external;
5658
use crate::internal::merge::{MergeTree, Pending};
5759
use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics};
@@ -1279,8 +1281,10 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
12791281
stats
12801282
});
12811283

1284+
let meta = MetadataMap::default();
12821285
BatchPart::Hollow(HollowBatchPart {
12831286
key: partial_key,
1287+
meta,
12841288
encoded_size_bytes: payload_len,
12851289
key_lower,
12861290
structured_key_lower,

src/persist-client/src/internal/datadriven.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ impl<'a> DirectiveArgs<'a> {
104104
.map(|x| {
105105
RunPart::Single(BatchPart::Hollow(HollowBatchPart {
106106
key: PartialBatchKey((*x).to_owned()),
107+
meta: Default::default(),
107108
encoded_size_bytes: 0,
108109
key_lower: vec![],
109110
structured_key_lower: None,

src/persist-client/src/internal/encoding.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1461,6 +1461,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatch> for HollowBatch<T> {
14611461
parts.extend(proto.deprecated_keys.into_iter().map(|key| {
14621462
RunPart::Single(BatchPart::Hollow(HollowBatchPart {
14631463
key: PartialBatchKey(key),
1464+
meta: Default::default(),
14641465
encoded_size_bytes: 0,
14651466
key_lower: vec![],
14661467
structured_key_lower: None,
@@ -1571,6 +1572,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for HollowRunRef<T>
15711572
schema_id: None,
15721573
structured_key_lower: self.structured_key_lower.into_proto(),
15731574
deprecated_schema_id: None,
1575+
metadata: BTreeMap::default(),
15741576
};
15751577
part
15761578
}
@@ -1608,6 +1610,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
16081610
format: x.format.map(|f| f.into_proto()),
16091611
schema_id: x.schema_id.into_proto(),
16101612
deprecated_schema_id: x.deprecated_schema_id.into_proto(),
1613+
metadata: BTreeMap::default(),
16111614
},
16121615
BatchPart::Inline {
16131616
updates,
@@ -1625,6 +1628,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
16251628
format: None,
16261629
schema_id: schema_id.into_proto(),
16271630
deprecated_schema_id: deprecated_schema_id.into_proto(),
1631+
metadata: BTreeMap::default(),
16281632
},
16291633
}
16301634
}
@@ -1640,6 +1644,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
16401644
Some(proto_hollow_batch_part::Kind::Key(key)) => {
16411645
Ok(BatchPart::Hollow(HollowBatchPart {
16421646
key: key.into_rust()?,
1647+
meta: proto.metadata.into_rust()?,
16431648
encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
16441649
key_lower: proto.key_lower.into(),
16451650
structured_key_lower: proto.structured_key_lower.into_rust()?,
@@ -2034,6 +2039,7 @@ mod tests {
20342039
),
20352040
vec![RunPart::Single(BatchPart::Hollow(HollowBatchPart {
20362041
key: PartialBatchKey("a".into()),
2042+
meta: Default::default(),
20372043
encoded_size_bytes: 5,
20382044
key_lower: vec![],
20392045
structured_key_lower: None,
@@ -2061,6 +2067,7 @@ mod tests {
20612067
.parts
20622068
.push(RunPart::Single(BatchPart::Hollow(HollowBatchPart {
20632069
key: PartialBatchKey("b".into()),
2070+
meta: Default::default(),
20642071
encoded_size_bytes: 0,
20652072
key_lower: vec![],
20662073
structured_key_lower: None,

src/persist-client/src/internal/state.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ message ProtoHollowBatchPart {
6060
}
6161
optional uint64 schema_id = 12;
6262

63+
map<string, bytes> metadata = 14;
64+
6365
optional bytes key_stats = 536870906;
6466

6567
reserved 536870907 to 536870911;

src/persist-client/src/internal/state.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@ use uuid::Uuid;
5656

5757
use crate::critical::CriticalReaderId;
5858
use crate::error::InvalidUsage;
59-
use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, parse_id};
59+
use crate::internal::encoding::{
60+
LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, parse_id,
61+
};
6062
use crate::internal::gc::GcReq;
6163
use crate::internal::machine::retry_external;
6264
use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
@@ -825,6 +827,9 @@ pub struct RunMeta {
825827
pub struct HollowBatchPart<T> {
826828
/// Pointer usable to retrieve the updates.
827829
pub key: PartialBatchKey,
830+
/// Miscellaneous metadata.
831+
#[serde(skip_serializing_if = "MetadataMap::is_empty")]
832+
pub meta: MetadataMap,
828833
/// The encoded size of this part.
829834
pub encoded_size_bytes: usize,
830835
/// A lower bound on the keys in the part. (By default, this the minimum
@@ -1213,6 +1218,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
12131218
// are added.
12141219
let HollowBatchPart {
12151220
key: self_key,
1221+
meta: self_meta,
12161222
encoded_size_bytes: self_encoded_size_bytes,
12171223
key_lower: self_key_lower,
12181224
structured_key_lower: self_structured_key_lower,
@@ -1225,6 +1231,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
12251231
} = self;
12261232
let HollowBatchPart {
12271233
key: other_key,
1234+
meta: other_meta,
12281235
encoded_size_bytes: other_encoded_size_bytes,
12291236
key_lower: other_key_lower,
12301237
structured_key_lower: other_structured_key_lower,
@@ -1237,6 +1244,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
12371244
} = other;
12381245
(
12391246
self_key,
1247+
self_meta,
12401248
self_encoded_size_bytes,
12411249
self_key_lower,
12421250
self_structured_key_lower,
@@ -1249,6 +1257,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
12491257
)
12501258
.cmp(&(
12511259
other_key,
1260+
other_meta,
12521261
other_encoded_size_bytes,
12531262
other_key_lower,
12541263
other_structured_key_lower,
@@ -2998,6 +3007,7 @@ pub(crate) mod tests {
29983007
)| {
29993008
HollowBatchPart {
30003009
key,
3010+
meta: Default::default(),
30013011
encoded_size_bytes,
30023012
key_lower,
30033013
structured_key_lower: None,
@@ -3171,6 +3181,7 @@ pub(crate) mod tests {
31713181
.map(|x| {
31723182
RunPart::Single(BatchPart::Hollow(HollowBatchPart {
31733183
key: PartialBatchKey((*x).to_owned()),
3184+
meta: Default::default(),
31743185
encoded_size_bytes: 0,
31753186
key_lower: vec![],
31763187
structured_key_lower: None,

src/persist-client/src/iter.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1273,6 +1273,7 @@ mod tests {
12731273
key: PartialBatchKey(
12741274
"n0000000/p00000000-0000-0000-0000-000000000000".into(),
12751275
),
1276+
meta: Default::default(),
12761277
encoded_size_bytes,
12771278
key_lower: vec![],
12781279
structured_key_lower: None,

0 commit comments

Comments
 (0)