From a689cd2a8da49fbd2217e786b422fb57e67dbdc4 Mon Sep 17 00:00:00 2001 From: Lucas Kent <rubickent@gmail.com> Date: Mon, 25 Nov 2024 14:06:48 +1100 Subject: [PATCH] [Breaking change] Fix calling Records::encode / Records::decode with None --- src/records.rs | 41 ++++++++++++++++++++++++++++--- tests/all_tests/fetch_response.rs | 24 +++++++++++------- tests/all_tests/produce_fetch.rs | 12 +++++---- 3 files changed, 60 insertions(+), 17 deletions(-) diff --git a/src/records.rs b/src/records.rs index 1cf0690..6b3645e 100644 --- a/src/records.rs +++ b/src/records.rs @@ -27,7 +27,7 @@ //! for topic in res.responses { //! for partition in topic.partitions { //! let mut records = partition.records.unwrap(); -//! let records = RecordBatchDecoder::decode(&mut records, Some(decompress_record_batch_data)).unwrap(); +//! let records = RecordBatchDecoder::decode_with_custom_compression(&mut records, Some(decompress_record_batch_data)).unwrap(); //! } //! } //! @@ -156,13 +156,34 @@ pub struct Record { const MAGIC_BYTE_OFFSET: usize = 16; impl RecordBatchEncoder { + /// Encode records into given buffer, using provided encoding options that select the encoding + /// strategy based on version. + pub fn encode<'a, B, I, CF>( + buf: &mut B, + records: I, + options: &RecordEncodeOptions, + ) -> Result<()> + where + B: ByteBufMut, + I: IntoIterator<Item = &'a Record>, + I::IntoIter: Clone, + CF: Fn(&mut BytesMut, &mut B, Compression) -> Result<()>, + { + Self::encode_with_custom_compression( + buf, + records, + options, + None::<fn(&mut BytesMut, &mut B, Compression) -> Result<()>>, + ) + } + /// Encode records into given buffer, using provided encoding options that select the encoding /// strategy based on version. /// # Arguments /// * `compressor` - A function that compresses the given batch of records. /// /// If `None`, the right compression algorithm will automatically be selected and applied. - pub fn encode<'a, B, I, CF>( + pub fn encode_with_custom_compression<'a, B, I, CF>( buf: &mut B, records: I, options: &RecordEncodeOptions, @@ -485,12 +506,26 @@ impl RecordBatchEncoder { } impl RecordBatchDecoder { + /// Decode the provided buffer into a vec of records. + pub fn decode<B: ByteBuf, F>(buf: &mut B) -> Result<Vec<Record>> + where + F: Fn(&mut bytes::Bytes, Compression) -> Result<B>, + { + Self::decode_with_custom_compression( + buf, + None::<fn(&mut bytes::Bytes, Compression) -> Result<B>>, + ) + } + /// Decode the provided buffer into a vec of records. /// # Arguments /// * `decompressor` - A function that decompresses the given batch of records. /// /// If `None`, the right decompression algorithm will automatically be selected and applied. - pub fn decode<B: ByteBuf, F>(buf: &mut B, decompressor: Option<F>) -> Result<Vec<Record>> + pub fn decode_with_custom_compression<B: ByteBuf, F>( + buf: &mut B, + decompressor: Option<F>, + ) -> Result<Vec<Record>> where F: Fn(&mut bytes::Bytes, Compression) -> Result<B>, { diff --git a/tests/all_tests/fetch_response.rs b/tests/all_tests/fetch_response.rs index cf11c4f..6806769 100644 --- a/tests/all_tests/fetch_response.rs +++ b/tests/all_tests/fetch_response.rs @@ -86,9 +86,11 @@ mod client_tests { assert_eq!(partition.aborted_transactions.as_ref().unwrap().len(), 0); let mut records = partition.records.unwrap(); - let records = - RecordBatchDecoder::decode(&mut records, Some(decompress_record_batch_data)) - .unwrap(); + let records = RecordBatchDecoder::decode_with_custom_compression( + &mut records, + Some(decompress_record_batch_data), + ) + .unwrap(); assert_eq!(records.len(), 1); for record in records { assert_eq!( @@ -123,9 +125,11 @@ mod client_tests { assert_eq!(partition.aborted_transactions.as_ref().unwrap().len(), 0); let mut records = partition.records.unwrap(); - let records = - RecordBatchDecoder::decode(&mut records, Some(decompress_record_batch_data)) - .unwrap(); + let records = RecordBatchDecoder::decode_with_custom_compression( + &mut records, + Some(decompress_record_batch_data), + ) + .unwrap(); assert_eq!(records.len(), 1); for record in records { assert_eq!( @@ -161,9 +165,11 @@ mod client_tests { assert_eq!(partition.aborted_transactions.as_ref().unwrap().len(), 0); let mut records = partition.records.unwrap(); - let records = - RecordBatchDecoder::decode(&mut records, Some(decompress_record_batch_data)) - .unwrap(); + let records = RecordBatchDecoder::decode_with_custom_compression( + &mut records, + Some(decompress_record_batch_data), + ) + .unwrap(); assert_eq!(records.len(), 1); } } diff --git a/tests/all_tests/produce_fetch.rs b/tests/all_tests/produce_fetch.rs index 13114b8..d04878f 100644 --- a/tests/all_tests/produce_fetch.rs +++ b/tests/all_tests/produce_fetch.rs @@ -32,7 +32,7 @@ fn record_batch_produce_fetch() { ]; let mut encoded = BytesMut::new(); - RecordBatchEncoder::encode( + RecordBatchEncoder::encode_with_custom_compression( &mut encoded, &records, &RecordEncodeOptions { @@ -67,7 +67,7 @@ fn message_set_v1_produce_fetch() { ]; let mut encoded = BytesMut::new(); - RecordBatchEncoder::encode( + RecordBatchEncoder::encode_with_custom_compression( &mut encoded, &records, &RecordEncodeOptions { @@ -196,9 +196,11 @@ fn fetch_records( ); let mut fetched_records = partition_response.records.clone().unwrap(); - let fetched_records = - RecordBatchDecoder::decode(&mut fetched_records, Some(decompress_record_batch_data)) - .unwrap(); + let fetched_records = RecordBatchDecoder::decode_with_custom_compression( + &mut fetched_records, + Some(decompress_record_batch_data), + ) + .unwrap(); eprintln!("{expected:#?}"); eprintln!("{fetched_records:#?}");