Skip to content

Commit bf57aaf

Browse files
committed
Add a metadata map to the hollow batch part
1 parent c72b511 commit bf57aaf

File tree

6 files changed

+27
-2
lines changed

6 files changed

+27
-2
lines changed

src/persist-client/src/batch.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ use crate::async_runtime::IsolatedRuntime;
5151
use crate::cfg::{BATCH_BUILDER_MAX_OUTSTANDING_PARTS, MiB};
5252
use crate::error::InvalidUsage;
5353
use crate::internal::compact::{CompactConfig, Compactor};
54-
use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
54+
use crate::internal::encoding::{
55+
LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, Schemas,
56+
};
5557
use crate::internal::machine::retry_external;
5658
use crate::internal::merge::{MergeTree, Pending};
5759
use crate::internal::metrics::{BatchWriteMetrics, Metrics, RetryMetrics, ShardMetrics};
@@ -1279,8 +1281,10 @@ impl<T: Timestamp + Codec64> BatchParts<T> {
12791281
stats
12801282
});
12811283

1284+
let meta = MetadataMap::default();
12821285
BatchPart::Hollow(HollowBatchPart {
12831286
key: partial_key,
1287+
meta,
12841288
encoded_size_bytes: payload_len,
12851289
key_lower,
12861290
structured_key_lower,

src/persist-client/src/internal/datadriven.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ impl<'a> DirectiveArgs<'a> {
104104
.map(|x| {
105105
RunPart::Single(BatchPart::Hollow(HollowBatchPart {
106106
key: PartialBatchKey((*x).to_owned()),
107+
meta: Default::default(),
107108
encoded_size_bytes: 0,
108109
key_lower: vec![],
109110
structured_key_lower: None,

src/persist-client/src/internal/encoding.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1439,6 +1439,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatch> for HollowBatch<T> {
14391439
parts.extend(proto.deprecated_keys.into_iter().map(|key| {
14401440
RunPart::Single(BatchPart::Hollow(HollowBatchPart {
14411441
key: PartialBatchKey(key),
1442+
meta: Default::default(),
14421443
encoded_size_bytes: 0,
14431444
key_lower: vec![],
14441445
structured_key_lower: None,
@@ -1549,6 +1550,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for HollowRunRef<T>
15491550
schema_id: None,
15501551
structured_key_lower: self.structured_key_lower.into_proto(),
15511552
deprecated_schema_id: None,
1553+
metadata: BTreeMap::default(),
15521554
};
15531555
part
15541556
}
@@ -1586,6 +1588,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
15861588
format: x.format.map(|f| f.into_proto()),
15871589
schema_id: x.schema_id.into_proto(),
15881590
deprecated_schema_id: x.deprecated_schema_id.into_proto(),
1591+
metadata: BTreeMap::default(),
15891592
},
15901593
BatchPart::Inline {
15911594
updates,
@@ -1603,6 +1606,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
16031606
format: None,
16041607
schema_id: schema_id.into_proto(),
16051608
deprecated_schema_id: deprecated_schema_id.into_proto(),
1609+
metadata: BTreeMap::default(),
16061610
},
16071611
}
16081612
}
@@ -1618,6 +1622,7 @@ impl<T: Timestamp + Codec64> RustType<ProtoHollowBatchPart> for BatchPart<T> {
16181622
Some(proto_hollow_batch_part::Kind::Key(key)) => {
16191623
Ok(BatchPart::Hollow(HollowBatchPart {
16201624
key: key.into_rust()?,
1625+
meta: proto.metadata.into_rust()?,
16211626
encoded_size_bytes: proto.encoded_size_bytes.into_rust()?,
16221627
key_lower: proto.key_lower.into(),
16231628
structured_key_lower: proto.structured_key_lower.into_rust()?,
@@ -2003,6 +2008,7 @@ mod tests {
20032008
),
20042009
vec![RunPart::Single(BatchPart::Hollow(HollowBatchPart {
20052010
key: PartialBatchKey("a".into()),
2011+
meta: Default::default(),
20062012
encoded_size_bytes: 5,
20072013
key_lower: vec![],
20082014
structured_key_lower: None,
@@ -2030,6 +2036,7 @@ mod tests {
20302036
.parts
20312037
.push(RunPart::Single(BatchPart::Hollow(HollowBatchPart {
20322038
key: PartialBatchKey("b".into()),
2039+
meta: Default::default(),
20332040
encoded_size_bytes: 0,
20342041
key_lower: vec![],
20352042
structured_key_lower: None,

src/persist-client/src/internal/state.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ message ProtoHollowBatchPart {
6060
}
6161
optional uint64 schema_id = 12;
6262

63+
map<string, bytes> metadata = 14;
64+
6365
optional bytes key_stats = 536870906;
6466

6567
reserved 536870907 to 536870911;

src/persist-client/src/internal/state.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@ use uuid::Uuid;
5656

5757
use crate::critical::CriticalReaderId;
5858
use crate::error::InvalidUsage;
59-
use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, parse_id};
59+
use crate::internal::encoding::{
60+
LazyInlineBatchPart, LazyPartStats, LazyProto, MetadataMap, parse_id,
61+
};
6062
use crate::internal::gc::GcReq;
6163
use crate::internal::machine::retry_external;
6264
use crate::internal::paths::{BlobKey, PartId, PartialBatchKey, PartialRollupKey, WriterKey};
@@ -825,6 +827,8 @@ pub struct RunMeta {
825827
pub struct HollowBatchPart<T> {
826828
/// Pointer usable to retrieve the updates.
827829
pub key: PartialBatchKey,
830+
/// Miscellaneous metadata.
831+
pub meta: MetadataMap,
828832
/// The encoded size of this part.
829833
pub encoded_size_bytes: usize,
830834
/// A lower bound on the keys in the part. (By default, this the minimum
@@ -1213,6 +1217,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
12131217
// are added.
12141218
let HollowBatchPart {
12151219
key: self_key,
1220+
meta: self_meta,
12161221
encoded_size_bytes: self_encoded_size_bytes,
12171222
key_lower: self_key_lower,
12181223
structured_key_lower: self_structured_key_lower,
@@ -1225,6 +1230,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
12251230
} = self;
12261231
let HollowBatchPart {
12271232
key: other_key,
1233+
meta: other_meta,
12281234
encoded_size_bytes: other_encoded_size_bytes,
12291235
key_lower: other_key_lower,
12301236
structured_key_lower: other_structured_key_lower,
@@ -1237,6 +1243,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
12371243
} = other;
12381244
(
12391245
self_key,
1246+
self_meta,
12401247
self_encoded_size_bytes,
12411248
self_key_lower,
12421249
self_structured_key_lower,
@@ -1249,6 +1256,7 @@ impl<T: Ord> Ord for HollowBatchPart<T> {
12491256
)
12501257
.cmp(&(
12511258
other_key,
1259+
other_meta,
12521260
other_encoded_size_bytes,
12531261
other_key_lower,
12541262
other_structured_key_lower,
@@ -2998,6 +3006,7 @@ pub(crate) mod tests {
29983006
)| {
29993007
HollowBatchPart {
30003008
key,
3009+
meta: Default::default(),
30013010
encoded_size_bytes,
30023011
key_lower,
30033012
structured_key_lower: None,
@@ -3171,6 +3180,7 @@ pub(crate) mod tests {
31713180
.map(|x| {
31723181
RunPart::Single(BatchPart::Hollow(HollowBatchPart {
31733182
key: PartialBatchKey((*x).to_owned()),
3183+
meta: Default::default(),
31743184
encoded_size_bytes: 0,
31753185
key_lower: vec![],
31763186
structured_key_lower: None,

src/persist-client/src/iter.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1273,6 +1273,7 @@ mod tests {
12731273
key: PartialBatchKey(
12741274
"n0000000/p00000000-0000-0000-0000-000000000000".into(),
12751275
),
1276+
meta: Default::default(),
12761277
encoded_size_bytes,
12771278
key_lower: vec![],
12781279
structured_key_lower: None,

0 commit comments

Comments
 (0)