From 734f6de887191db5301e3b020e6850905f13b5f0 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 27 Apr 2026 16:52:19 -0500 Subject: [PATCH 1/2] feat(parquet): add ParquetPushDecoder::swap_strategy for adaptive scans MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a small surface to the push decoder so callers can swap the RowFilter, ProjectionMask, and/or RowSelectionPolicy at row-group boundaries without rebuilding the decoder: - new pub fn `can_swap_strategy() -> bool` — true between row groups (outer state `ReadingRowGroup`, inner state `Finished`) - new pub fn `swap_strategy(StrategySwap) -> Result<()>` — rejected with `ParquetError::General` when called mid-row-group - new `pub struct StrategySwap` (`#[non_exhaustive]`) with builder methods `with_projection`, `with_filter`, `with_row_selection_policy` - new pub fn `row_groups_remaining() -> usize` for diagnostics `PushBuffers` carries through the swap, so bytes already fetched for columns that survive into the new strategy are reused — only bytes the new strategy needs but that aren't already buffered get requested via `NeedsData`. Adaptive callers should drive the decoder with `try_next_reader` rather than `try_decode`: handing the active reader off transitions the decoder back to `ReadingRowGroup` immediately, giving the caller a clean swap window between two consecutive returns. `try_decode` loops past row-group boundaries internally and is unsuitable for in-flight strategy changes. Tests cover: filter swap between row groups, mid-row-group rejection, swap-while-iterating-handed-off-reader, and projection narrowing that reuses already-buffered bytes. Co-Authored-By: Claude Opus 4.7 (1M context) --- parquet/src/arrow/push_decoder/mod.rs | 328 ++++++++++++++++++ .../arrow/push_decoder/reader_builder/mod.rs | 57 +++ parquet/src/arrow/push_decoder/remaining.rs | 39 ++- 3 files changed, 423 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index 4c667e534366..ad5c90dcbeaa 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -22,8 +22,10 @@ mod reader_builder; mod remaining; use crate::DecodeResult; +use crate::arrow::ProjectionMask; use crate::arrow::arrow_reader::{ ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader, + RowFilter, RowSelectionPolicy, }; use crate::errors::ParquetError; use crate::file::metadata::ParquetMetaData; @@ -374,6 +376,102 @@ impl ParquetPushDecoder { pub fn clear_all_ranges(&mut self) { self.state.clear_all_ranges(); } + + /// True iff the decoder is at a row-group boundary and a + /// [`Self::swap_strategy`] call would succeed. + /// + /// A boundary is "between row groups": the previous row group's + /// [`ParquetRecordBatchReader`] has been fully extracted (via + /// [`Self::try_next_reader`]) or fully drained (via [`Self::try_decode`]), + /// and the next row group has not yet been planned. While + /// [`Self::try_decode`] is iterating an active row group's reader this + /// returns `false`; with [`Self::try_next_reader`] there is a clean + /// window between two consecutive returns where this is `true`. + pub fn can_swap_strategy(&self) -> bool { + self.state.can_swap_strategy() + } + + /// Number of row groups left to decode after the one currently in flight. + /// Useful as a "should I bother computing a new strategy?" signal. + pub fn row_groups_remaining(&self) -> usize { + self.state.row_groups_remaining() + } + + /// Replace projection / row filter / row selection policy for subsequent + /// row groups. + /// + /// Returns `Err(ParquetError::General)` when called outside a row-group + /// boundary; check [`Self::can_swap_strategy`] first. + /// + /// The decoder's [`PushBuffers`] are preserved across the swap. Bytes + /// that have already been fetched for columns the new strategy still + /// needs will not be re-requested. Bytes for columns the new strategy + /// no longer needs remain buffered until [`Self::clear_all_ranges`] is + /// called or the decoder is dropped. + /// + /// Limit, offset, batch size, metadata, fields, and predicate-cache size + /// are unchanged by a swap. + /// + /// [`PushBuffers`]: crate::util::push_buffers::PushBuffers + pub fn swap_strategy(&mut self, swap: StrategySwap) -> Result<(), ParquetError> { + self.state.swap_strategy(swap) + } +} + +/// Description of a strategy swap to apply via +/// [`ParquetPushDecoder::swap_strategy`]. +/// +/// Each field is `Option`-wrapped so callers only override what they intend +/// to change. Fields left as `None` carry their previous value through the +/// swap. +/// +/// [`Self::filter`] is doubly-`Option`-wrapped on purpose: the outer +/// `Option` is "do you want to change the filter?", the inner is "set or +/// clear?". `Some(None)` clears any previously-installed filter. +#[derive(Debug, Default)] +#[non_exhaustive] +pub struct StrategySwap { + /// New projection mask. `None` means "leave the projection alone". + pub projection: Option, + /// New row filter. Outer `None` means "leave the filter alone"; + /// `Some(None)` clears the filter; `Some(Some(filter))` installs a new + /// one. + pub filter: Option>, + /// New row selection policy. `None` means "leave the policy alone". + pub row_selection_policy: Option, +} + +impl StrategySwap { + /// Empty swap. No fields will be modified — chain `with_*` setters to + /// configure. + pub fn new() -> Self { + Self::default() + } + + /// Set a new projection mask for subsequent row groups. + pub fn with_projection(mut self, projection: ProjectionMask) -> Self { + self.projection = Some(projection); + self + } + + /// Install a new row filter (or `None` to clear) for subsequent row + /// groups. + pub fn with_filter(mut self, filter: Option) -> Self { + self.filter = Some(filter); + self + } + + /// Set a new row selection policy for subsequent row groups. + pub fn with_row_selection_policy(mut self, policy: RowSelectionPolicy) -> Self { + self.row_selection_policy = Some(policy); + self + } + + /// True iff every field is `None`. A no-op swap returns successfully but + /// has no effect. + pub fn is_empty(&self) -> bool { + self.projection.is_none() && self.filter.is_none() && self.row_selection_policy.is_none() + } } /// Internal state machine for the [`ParquetPushDecoder`] @@ -596,6 +694,75 @@ impl ParquetDecoderState { ParquetDecoderState::Finished => {} } } + + fn can_swap_strategy(&self) -> bool { + match self { + ParquetDecoderState::ReadingRowGroup { + remaining_row_groups, + } => remaining_row_groups.is_at_row_group_boundary(), + // Mid-row-group: the active reader holds an `ArrayReader` and + // `ReadPlan` keyed to the *current* projection/filter; swapping + // would require throwing that work away. + ParquetDecoderState::DecodingRowGroup { .. } => false, + ParquetDecoderState::Finished => false, + } + } + + fn row_groups_remaining(&self) -> usize { + match self { + ParquetDecoderState::ReadingRowGroup { + remaining_row_groups, + } => remaining_row_groups.row_groups_remaining(), + ParquetDecoderState::DecodingRowGroup { + remaining_row_groups, + .. + } => remaining_row_groups.row_groups_remaining(), + ParquetDecoderState::Finished => 0, + } + } + + fn swap_strategy(&mut self, swap: StrategySwap) -> Result<(), ParquetError> { + if swap.is_empty() { + return Ok(()); + } + let remaining = match self { + ParquetDecoderState::ReadingRowGroup { + remaining_row_groups, + } => remaining_row_groups, + ParquetDecoderState::DecodingRowGroup { .. } => { + return Err(ParquetError::General( + "swap_strategy called while a row group is being decoded; \ + check can_swap_strategy() first" + .to_string(), + )); + } + ParquetDecoderState::Finished => { + return Err(ParquetError::General( + "swap_strategy called on a finished decoder".to_string(), + )); + } + }; + if !remaining.is_at_row_group_boundary() { + return Err(ParquetError::General( + "swap_strategy called mid-row-group; check can_swap_strategy() first".to_string(), + )); + } + let StrategySwap { + projection, + filter, + row_selection_policy, + } = swap; + if let Some(projection) = projection { + remaining.set_projection(projection)?; + } + if let Some(filter) = filter { + remaining.set_filter(filter)?; + } + if let Some(policy) = row_selection_policy { + remaining.set_row_selection_policy(policy)?; + } + Ok(()) + } } #[cfg(test)] @@ -1451,6 +1618,167 @@ mod test { expect_finished(decoder.try_decode()); } + /// `swap_strategy` between row groups installs a new filter for the + /// next row group while leaving the just-decoded row group's results + /// untouched. + /// + /// Adaptive callers should drive the decoder with `try_next_reader` + /// rather than `try_decode`: `try_next_reader` returns once per row + /// group, giving the caller a clean window between two consecutive + /// returns to inspect stats and call `swap_strategy`. `try_decode` + /// barrels through row-group boundaries and is unsuitable for in-flight + /// strategy changes. + #[test] + fn test_swap_strategy_installs_filter_between_row_groups() { + let metadata = test_file_parquet_metadata(); + let schema_descr = metadata.file_metadata().schema_descr_ptr(); + + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(metadata) + .unwrap() + .with_batch_size(1024) + .build() + .unwrap(); + + // Prefetch the entire file so we don't have to interleave I/O with + // the swap. PushBuffers carries the bytes through the swap. + decoder + .push_range(test_file_range(), TEST_FILE_DATA.clone()) + .unwrap(); + + // Reader for row group 0 — no filter. + let reader0 = expect_data(decoder.try_next_reader()); + let batches0: Vec<_> = reader0.collect::>().unwrap(); + let batch0 = concat_batches(&TEST_BATCH.schema(), &batches0).unwrap(); + assert_eq!(batch0.num_rows(), 200); + assert_eq!(batch0, TEST_BATCH.slice(0, 200)); + + // We're between row groups now. Swap in a filter on column "a". + assert!(decoder.can_swap_strategy()); + let filter = + ArrowPredicateFn::new(ProjectionMask::columns(&schema_descr, ["a"]), |batch| { + gt(batch.column(0), &Int64Array::new_scalar(250)) + }); + decoder + .swap_strategy( + StrategySwap::new().with_filter(Some(RowFilter::new(vec![Box::new(filter)]))), + ) + .unwrap(); + + // Reader for row group 1 — filter applied. Column "a" in RG1 has + // values 200..399; `a > 250` keeps 251..399 = 149 rows. + let reader1 = expect_data(decoder.try_next_reader()); + let batches1: Vec<_> = reader1.collect::>().unwrap(); + let batch1 = concat_batches(&TEST_BATCH.schema(), &batches1).unwrap(); + let expected1 = TEST_BATCH.slice(251, 149); + assert_eq!(batch1, expected1); + expect_finished(decoder.try_next_reader()); + } + + /// `swap_strategy` is rejected while a row group's reader is being + /// drained, and accepted again the moment we cross the boundary. + #[test] + fn test_swap_strategy_rejected_mid_row_group() { + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_batch_size(50) + .build() + .unwrap(); + + decoder + .push_range(test_file_range(), TEST_FILE_DATA.clone()) + .unwrap(); + + // After getting the first batch, we're inside `DecodingRowGroup`: + // an active reader is still alive. Mid-reader is not a swap point. + let _ = expect_data(decoder.try_decode()); + assert!(!decoder.can_swap_strategy()); + + // Trying to swap mid-row-group is an error. + let err = decoder + .swap_strategy( + StrategySwap::new().with_row_selection_policy(super::RowSelectionPolicy::Mask), + ) + .unwrap_err(); + let err_msg = format!("{err}"); + assert!( + err_msg.contains("can_swap_strategy"), + "unexpected error: {err_msg}" + ); + + // Empty swap is a no-op even mid-row-group. + decoder.swap_strategy(StrategySwap::new()).unwrap(); + } + + /// `try_next_reader` hands the active reader off to the caller and + /// transitions the decoder back to `ReadingRowGroup` — so the caller + /// can call `swap_strategy` even while still iterating the returned + /// reader. (The handed-off reader has no link back to the decoder's + /// projection/filter; it has its own `ArrayReader` and `ReadPlan`.) + #[test] + fn test_swap_strategy_allowed_while_iterating_handed_off_reader() { + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()) + .unwrap() + .with_batch_size(1024) + .build() + .unwrap(); + decoder + .push_range(test_file_range(), TEST_FILE_DATA.clone()) + .unwrap(); + + let reader0 = expect_data(decoder.try_next_reader()); + // Decoder no longer owns the reader, so it considers itself + // "between row groups" — swap is allowed even though the caller + // hasn't iterated `reader0` yet. + assert!(decoder.can_swap_strategy()); + // Iterating `reader0` is independent of the decoder's state. + let batches: Vec<_> = reader0.collect::>().unwrap(); + let batch0 = concat_batches(&TEST_BATCH.schema(), &batches).unwrap(); + assert_eq!(batch0, TEST_BATCH.slice(0, 200)); + } + + /// Bytes already buffered for a column survive a projection narrowing — + /// the next row group should not request any new data for the column + /// that was *already* in the projection. + #[test] + fn test_swap_strategy_preserves_buffered_bytes() { + let metadata = test_file_parquet_metadata(); + let schema_descr = metadata.file_metadata().schema_descr_ptr(); + + let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(metadata) + .unwrap() + .with_batch_size(1024) + .build() + .unwrap(); + + // Prefetch the whole file once. + decoder + .push_range(test_file_range(), TEST_FILE_DATA.clone()) + .unwrap(); + assert_eq!(decoder.buffered_bytes(), test_file_len()); + + // Drain RG0 with full projection via try_next_reader. + let reader0 = expect_data(decoder.try_next_reader()); + let _: Vec<_> = reader0.collect::>().unwrap(); + + // Narrow projection to just column "a" for RG1. + assert!(decoder.can_swap_strategy()); + decoder + .swap_strategy( + StrategySwap::new().with_projection(ProjectionMask::columns(&schema_descr, ["a"])), + ) + .unwrap(); + + // Column "a" bytes for RG1 were part of the original wide + // projection's prefetch; we should not need additional data. + let reader1 = expect_data(decoder.try_next_reader()); + let batches1: Vec<_> = reader1.collect::>().unwrap(); + // Schema is now just "a"; RG1 has values 200..399. + let expected1 = TEST_BATCH.slice(200, 200).project(&[0]).unwrap(); + let batch1 = concat_batches(&batches1[0].schema(), &batches1).unwrap(); + assert_eq!(batch1, expected1); + expect_finished(decoder.try_next_reader()); + } + /// Returns a batch with 400 rows, with 3 columns: "a", "b", "c" /// /// Note c is a different types (so the data page sizes will be different) diff --git a/parquet/src/arrow/push_decoder/reader_builder/mod.rs b/parquet/src/arrow/push_decoder/reader_builder/mod.rs index 60e50d29524e..b0a2a3f6d3ae 100644 --- a/parquet/src/arrow/push_decoder/reader_builder/mod.rs +++ b/parquet/src/arrow/push_decoder/reader_builder/mod.rs @@ -208,6 +208,63 @@ impl RowGroupReaderBuilder { self.buffers.push_ranges(ranges, buffers); } + /// True iff the inner state is `Finished`. This is the only state in + /// which it is safe to swap projection / filter / row selection policy + /// because no `RowGroupInfo`, `FilterInfo`, or in-flight `DataRequest` + /// is referencing the previous values. + pub(crate) fn is_finished(&self) -> bool { + matches!(self.state, Some(RowGroupDecoderState::Finished)) + } + + /// Replace the projection mask used for subsequent row groups. + /// + /// Must only be called when [`Self::is_finished`]. `PushBuffers` are + /// preserved; bytes already fetched for columns that survive into the + /// new mask are reused. + pub(crate) fn set_projection( + &mut self, + projection: ProjectionMask, + ) -> Result<(), ParquetError> { + if !self.is_finished() { + return Err(ParquetError::General( + "RowGroupReaderBuilder::set_projection: state must be Finished".to_string(), + )); + } + self.projection = projection; + Ok(()) + } + + /// Replace the row filter used for subsequent row groups. + /// + /// Must only be called when [`Self::is_finished`]. Pass `None` to clear + /// the filter, `Some(filter)` to install a new one. + pub(crate) fn set_filter(&mut self, filter: Option) -> Result<(), ParquetError> { + if !self.is_finished() { + return Err(ParquetError::General( + "RowGroupReaderBuilder::set_filter: state must be Finished".to_string(), + )); + } + self.filter = filter; + Ok(()) + } + + /// Replace the row selection policy used for subsequent row groups. + /// + /// Must only be called when [`Self::is_finished`]. + pub(crate) fn set_row_selection_policy( + &mut self, + policy: RowSelectionPolicy, + ) -> Result<(), ParquetError> { + if !self.is_finished() { + return Err(ParquetError::General( + "RowGroupReaderBuilder::set_row_selection_policy: state must be Finished" + .to_string(), + )); + } + self.row_selection_policy = policy; + Ok(()) + } + /// Returns the total number of buffered bytes available pub fn buffered_bytes(&self) -> u64 { self.buffers.buffered_bytes() diff --git a/parquet/src/arrow/push_decoder/remaining.rs b/parquet/src/arrow/push_decoder/remaining.rs index 2986ca0da8d8..b99b6dc82080 100644 --- a/parquet/src/arrow/push_decoder/remaining.rs +++ b/parquet/src/arrow/push_decoder/remaining.rs @@ -16,7 +16,10 @@ // under the License. use crate::DecodeResult; -use crate::arrow::arrow_reader::{ParquetRecordBatchReader, RowSelection}; +use crate::arrow::ProjectionMask; +use crate::arrow::arrow_reader::{ + ParquetRecordBatchReader, RowFilter, RowSelection, RowSelectionPolicy, +}; use crate::arrow::push_decoder::reader_builder::RowGroupReaderBuilder; use crate::errors::ParquetError; use crate::file::metadata::ParquetMetaData; @@ -75,6 +78,40 @@ impl RemainingRowGroups { self.row_group_reader_builder.clear_all_ranges(); } + /// True iff the inner row-group reader is between row groups (state + /// `Finished`). Forward to [`RowGroupReaderBuilder::is_finished`]. + pub fn is_at_row_group_boundary(&self) -> bool { + self.row_group_reader_builder.is_finished() + } + + /// Number of row groups remaining (not including the one currently + /// being decoded). + pub fn row_groups_remaining(&self) -> usize { + self.row_groups.len() + } + + /// Replace the projection. Must be called when + /// [`Self::is_at_row_group_boundary`]. + pub fn set_projection(&mut self, projection: ProjectionMask) -> Result<(), ParquetError> { + self.row_group_reader_builder.set_projection(projection) + } + + /// Replace the row filter. Must be called when + /// [`Self::is_at_row_group_boundary`]. + pub fn set_filter(&mut self, filter: Option) -> Result<(), ParquetError> { + self.row_group_reader_builder.set_filter(filter) + } + + /// Replace the row selection policy. Must be called when + /// [`Self::is_at_row_group_boundary`]. + pub fn set_row_selection_policy( + &mut self, + policy: RowSelectionPolicy, + ) -> Result<(), ParquetError> { + self.row_group_reader_builder + .set_row_selection_policy(policy) + } + /// returns [`ParquetRecordBatchReader`] suitable for reading the next /// group of rows from the Parquet data, or the list of data ranges still /// needed to proceed From 403183a87964e8d0b4825b03ccbc24c365ab49a8 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 27 Apr 2026 23:20:46 -0500 Subject: [PATCH 2/2] fix(parquet): drop private intra-doc link from swap_strategy docs `PushBuffers` is `pub(crate)` in the arrow-rs workspace, so the intra-doc link `[\`PushBuffers\`]: crate::util::push_buffers::PushBuffers` in `ParquetPushDecoder::swap_strategy`'s public docs failed the `-D rustdoc::private-intra-doc-links` check on `cargo +nightly doc --document-private-items`. The rest of the doc is unchanged; the prose now refers to the type by name without a link. Co-Authored-By: Claude Opus 4.7 (1M context) --- parquet/src/arrow/push_decoder/mod.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/parquet/src/arrow/push_decoder/mod.rs b/parquet/src/arrow/push_decoder/mod.rs index ad5c90dcbeaa..343a936908c0 100644 --- a/parquet/src/arrow/push_decoder/mod.rs +++ b/parquet/src/arrow/push_decoder/mod.rs @@ -403,16 +403,14 @@ impl ParquetPushDecoder { /// Returns `Err(ParquetError::General)` when called outside a row-group /// boundary; check [`Self::can_swap_strategy`] first. /// - /// The decoder's [`PushBuffers`] are preserved across the swap. Bytes - /// that have already been fetched for columns the new strategy still - /// needs will not be re-requested. Bytes for columns the new strategy - /// no longer needs remain buffered until [`Self::clear_all_ranges`] is - /// called or the decoder is dropped. + /// The decoder's internal `PushBuffers` are preserved across the swap. + /// Bytes that have already been fetched for columns the new strategy + /// still needs will not be re-requested. Bytes for columns the new + /// strategy no longer needs remain buffered until + /// [`Self::clear_all_ranges`] is called or the decoder is dropped. /// /// Limit, offset, batch size, metadata, fields, and predicate-cache size /// are unchanged by a swap. - /// - /// [`PushBuffers`]: crate::util::push_buffers::PushBuffers pub fn swap_strategy(&mut self, swap: StrategySwap) -> Result<(), ParquetError> { self.state.swap_strategy(swap) }