Skip to content

Commit a689cd2

Browse files
committed
[Breaking change] Fix calling Records::encode / Records::decode with None
1 parent 6f3dbd8 commit a689cd2

File tree

3 files changed

+60
-17
lines changed

3 files changed

+60
-17
lines changed

src/records.rs

+38-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
//! for topic in res.responses {
2828
//! for partition in topic.partitions {
2929
//! let mut records = partition.records.unwrap();
30-
//! let records = RecordBatchDecoder::decode(&mut records, Some(decompress_record_batch_data)).unwrap();
30+
//! let records = RecordBatchDecoder::decode_with_custom_compression(&mut records, Some(decompress_record_batch_data)).unwrap();
3131
//! }
3232
//! }
3333
//!
@@ -156,13 +156,34 @@ pub struct Record {
156156
const MAGIC_BYTE_OFFSET: usize = 16;
157157

158158
impl RecordBatchEncoder {
159+
/// Encode records into given buffer, using provided encoding options that select the encoding
160+
/// strategy based on version.
161+
pub fn encode<'a, B, I, CF>(
162+
buf: &mut B,
163+
records: I,
164+
options: &RecordEncodeOptions,
165+
) -> Result<()>
166+
where
167+
B: ByteBufMut,
168+
I: IntoIterator<Item = &'a Record>,
169+
I::IntoIter: Clone,
170+
CF: Fn(&mut BytesMut, &mut B, Compression) -> Result<()>,
171+
{
172+
Self::encode_with_custom_compression(
173+
buf,
174+
records,
175+
options,
176+
None::<fn(&mut BytesMut, &mut B, Compression) -> Result<()>>,
177+
)
178+
}
179+
159180
/// Encode records into given buffer, using provided encoding options that select the encoding
160181
/// strategy based on version.
161182
/// # Arguments
162183
/// * `compressor` - A function that compresses the given batch of records.
163184
///
164185
/// If `None`, the right compression algorithm will automatically be selected and applied.
165-
pub fn encode<'a, B, I, CF>(
186+
pub fn encode_with_custom_compression<'a, B, I, CF>(
166187
buf: &mut B,
167188
records: I,
168189
options: &RecordEncodeOptions,
@@ -485,12 +506,26 @@ impl RecordBatchEncoder {
485506
}
486507

487508
impl RecordBatchDecoder {
509+
/// Decode the provided buffer into a vec of records.
510+
pub fn decode<B: ByteBuf, F>(buf: &mut B) -> Result<Vec<Record>>
511+
where
512+
F: Fn(&mut bytes::Bytes, Compression) -> Result<B>,
513+
{
514+
Self::decode_with_custom_compression(
515+
buf,
516+
None::<fn(&mut bytes::Bytes, Compression) -> Result<B>>,
517+
)
518+
}
519+
488520
/// Decode the provided buffer into a vec of records.
489521
/// # Arguments
490522
/// * `decompressor` - A function that decompresses the given batch of records.
491523
///
492524
/// If `None`, the right decompression algorithm will automatically be selected and applied.
493-
pub fn decode<B: ByteBuf, F>(buf: &mut B, decompressor: Option<F>) -> Result<Vec<Record>>
525+
pub fn decode_with_custom_compression<B: ByteBuf, F>(
526+
buf: &mut B,
527+
decompressor: Option<F>,
528+
) -> Result<Vec<Record>>
494529
where
495530
F: Fn(&mut bytes::Bytes, Compression) -> Result<B>,
496531
{

tests/all_tests/fetch_response.rs

+15-9
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,11 @@ mod client_tests {
8686
assert_eq!(partition.aborted_transactions.as_ref().unwrap().len(), 0);
8787

8888
let mut records = partition.records.unwrap();
89-
let records =
90-
RecordBatchDecoder::decode(&mut records, Some(decompress_record_batch_data))
91-
.unwrap();
89+
let records = RecordBatchDecoder::decode_with_custom_compression(
90+
&mut records,
91+
Some(decompress_record_batch_data),
92+
)
93+
.unwrap();
9294
assert_eq!(records.len(), 1);
9395
for record in records {
9496
assert_eq!(
@@ -123,9 +125,11 @@ mod client_tests {
123125
assert_eq!(partition.aborted_transactions.as_ref().unwrap().len(), 0);
124126

125127
let mut records = partition.records.unwrap();
126-
let records =
127-
RecordBatchDecoder::decode(&mut records, Some(decompress_record_batch_data))
128-
.unwrap();
128+
let records = RecordBatchDecoder::decode_with_custom_compression(
129+
&mut records,
130+
Some(decompress_record_batch_data),
131+
)
132+
.unwrap();
129133
assert_eq!(records.len(), 1);
130134
for record in records {
131135
assert_eq!(
@@ -161,9 +165,11 @@ mod client_tests {
161165
assert_eq!(partition.aborted_transactions.as_ref().unwrap().len(), 0);
162166

163167
let mut records = partition.records.unwrap();
164-
let records =
165-
RecordBatchDecoder::decode(&mut records, Some(decompress_record_batch_data))
166-
.unwrap();
168+
let records = RecordBatchDecoder::decode_with_custom_compression(
169+
&mut records,
170+
Some(decompress_record_batch_data),
171+
)
172+
.unwrap();
167173
assert_eq!(records.len(), 1);
168174
}
169175
}

tests/all_tests/produce_fetch.rs

+7-5
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ fn record_batch_produce_fetch() {
3232
];
3333

3434
let mut encoded = BytesMut::new();
35-
RecordBatchEncoder::encode(
35+
RecordBatchEncoder::encode_with_custom_compression(
3636
&mut encoded,
3737
&records,
3838
&RecordEncodeOptions {
@@ -67,7 +67,7 @@ fn message_set_v1_produce_fetch() {
6767
];
6868

6969
let mut encoded = BytesMut::new();
70-
RecordBatchEncoder::encode(
70+
RecordBatchEncoder::encode_with_custom_compression(
7171
&mut encoded,
7272
&records,
7373
&RecordEncodeOptions {
@@ -196,9 +196,11 @@ fn fetch_records(
196196
);
197197

198198
let mut fetched_records = partition_response.records.clone().unwrap();
199-
let fetched_records =
200-
RecordBatchDecoder::decode(&mut fetched_records, Some(decompress_record_batch_data))
201-
.unwrap();
199+
let fetched_records = RecordBatchDecoder::decode_with_custom_compression(
200+
&mut fetched_records,
201+
Some(decompress_record_batch_data),
202+
)
203+
.unwrap();
202204

203205
eprintln!("{expected:#?}");
204206
eprintln!("{fetched_records:#?}");

0 commit comments

Comments
 (0)