Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
326 changes: 326 additions & 0 deletions parquet/src/arrow/push_decoder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ mod reader_builder;
mod remaining;

use crate::DecodeResult;
use crate::arrow::ProjectionMask;
use crate::arrow::arrow_reader::{
ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
RowFilter, RowSelectionPolicy,
};
use crate::errors::ParquetError;
use crate::file::metadata::ParquetMetaData;
Expand Down Expand Up @@ -374,6 +376,100 @@ impl ParquetPushDecoder {
pub fn clear_all_ranges(&mut self) {
self.state.clear_all_ranges();
}

/// True iff the decoder is at a row-group boundary and a
/// [`Self::swap_strategy`] call would succeed.
///
/// A boundary is "between row groups": the previous row group's
/// [`ParquetRecordBatchReader`] has been fully extracted (via
/// [`Self::try_next_reader`]) or fully drained (via [`Self::try_decode`]),
/// and the next row group has not yet been planned. While
/// [`Self::try_decode`] is iterating an active row group's reader this
/// returns `false`; with [`Self::try_next_reader`] there is a clean
/// window between two consecutive returns where this is `true`.
pub fn can_swap_strategy(&self) -> bool {
self.state.can_swap_strategy()
}

/// Number of row groups left to decode after the one currently in flight.
/// Useful as a "should I bother computing a new strategy?" signal.
pub fn row_groups_remaining(&self) -> usize {
self.state.row_groups_remaining()
}

/// Replace projection / row filter / row selection policy for subsequent
/// row groups.
///
/// Returns `Err(ParquetError::General)` when called outside a row-group
/// boundary; check [`Self::can_swap_strategy`] first.
///
/// The decoder's internal `PushBuffers` are preserved across the swap.
/// Bytes that have already been fetched for columns the new strategy
/// still needs will not be re-requested. Bytes for columns the new
/// strategy no longer needs remain buffered until
/// [`Self::clear_all_ranges`] is called or the decoder is dropped.
///
/// Limit, offset, batch size, metadata, fields, and predicate-cache size
/// are unchanged by a swap.
pub fn swap_strategy(&mut self, swap: StrategySwap) -> Result<(), ParquetError> {
self.state.swap_strategy(swap)
}
}

/// Description of a strategy swap to apply via
/// [`ParquetPushDecoder::swap_strategy`].
///
/// Each field is `Option`-wrapped so callers only override what they intend
/// to change. Fields left as `None` carry their previous value through the
/// swap.
///
/// [`Self::filter`] is doubly-`Option`-wrapped on purpose: the outer
/// `Option` is "do you want to change the filter?", the inner is "set or
/// clear?". `Some(None)` clears any previously-installed filter.
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct StrategySwap {
/// New projection mask. `None` means "leave the projection alone".
pub projection: Option<ProjectionMask>,
/// New row filter. Outer `None` means "leave the filter alone";
/// `Some(None)` clears the filter; `Some(Some(filter))` installs a new
/// one.
pub filter: Option<Option<RowFilter>>,
/// New row selection policy. `None` means "leave the policy alone".
pub row_selection_policy: Option<RowSelectionPolicy>,
}

impl StrategySwap {
/// Empty swap. No fields will be modified — chain `with_*` setters to
/// configure.
pub fn new() -> Self {
Self::default()
}

/// Set a new projection mask for subsequent row groups.
pub fn with_projection(mut self, projection: ProjectionMask) -> Self {
self.projection = Some(projection);
self
}

/// Install a new row filter (or `None` to clear) for subsequent row
/// groups.
pub fn with_filter(mut self, filter: Option<RowFilter>) -> Self {
self.filter = Some(filter);
self
}

/// Set a new row selection policy for subsequent row groups.
pub fn with_row_selection_policy(mut self, policy: RowSelectionPolicy) -> Self {
self.row_selection_policy = Some(policy);
self
}

/// True iff every field is `None`. A no-op swap returns successfully but
/// has no effect.
pub fn is_empty(&self) -> bool {
self.projection.is_none() && self.filter.is_none() && self.row_selection_policy.is_none()
}
}

/// Internal state machine for the [`ParquetPushDecoder`]
Expand Down Expand Up @@ -596,6 +692,75 @@ impl ParquetDecoderState {
ParquetDecoderState::Finished => {}
}
}

fn can_swap_strategy(&self) -> bool {
match self {
ParquetDecoderState::ReadingRowGroup {
remaining_row_groups,
} => remaining_row_groups.is_at_row_group_boundary(),
// Mid-row-group: the active reader holds an `ArrayReader` and
// `ReadPlan` keyed to the *current* projection/filter; swapping
// would require throwing that work away.
ParquetDecoderState::DecodingRowGroup { .. } => false,
ParquetDecoderState::Finished => false,
}
}

fn row_groups_remaining(&self) -> usize {
match self {
ParquetDecoderState::ReadingRowGroup {
remaining_row_groups,
} => remaining_row_groups.row_groups_remaining(),
ParquetDecoderState::DecodingRowGroup {
remaining_row_groups,
..
} => remaining_row_groups.row_groups_remaining(),
ParquetDecoderState::Finished => 0,
}
}

fn swap_strategy(&mut self, swap: StrategySwap) -> Result<(), ParquetError> {
if swap.is_empty() {
return Ok(());
}
let remaining = match self {
ParquetDecoderState::ReadingRowGroup {
remaining_row_groups,
} => remaining_row_groups,
ParquetDecoderState::DecodingRowGroup { .. } => {
return Err(ParquetError::General(
"swap_strategy called while a row group is being decoded; \
check can_swap_strategy() first"
.to_string(),
));
}
ParquetDecoderState::Finished => {
return Err(ParquetError::General(
"swap_strategy called on a finished decoder".to_string(),
));
}
};
if !remaining.is_at_row_group_boundary() {
return Err(ParquetError::General(
"swap_strategy called mid-row-group; check can_swap_strategy() first".to_string(),
));
}
let StrategySwap {
projection,
filter,
row_selection_policy,
} = swap;
if let Some(projection) = projection {
remaining.set_projection(projection)?;
}
if let Some(filter) = filter {
remaining.set_filter(filter)?;
}
if let Some(policy) = row_selection_policy {
remaining.set_row_selection_policy(policy)?;
}
Ok(())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -1451,6 +1616,167 @@ mod test {
expect_finished(decoder.try_decode());
}

/// `swap_strategy` between row groups installs a new filter for the
/// next row group while leaving the just-decoded row group's results
/// untouched.
///
/// Adaptive callers should drive the decoder with `try_next_reader`
/// rather than `try_decode`: `try_next_reader` returns once per row
/// group, giving the caller a clean window between two consecutive
/// returns to inspect stats and call `swap_strategy`. `try_decode`
/// barrels through row-group boundaries and is unsuitable for in-flight
/// strategy changes.
#[test]
fn test_swap_strategy_installs_filter_between_row_groups() {
let metadata = test_file_parquet_metadata();
let schema_descr = metadata.file_metadata().schema_descr_ptr();

let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(metadata)
.unwrap()
.with_batch_size(1024)
.build()
.unwrap();

// Prefetch the entire file so we don't have to interleave I/O with
// the swap. PushBuffers carries the bytes through the swap.
decoder
.push_range(test_file_range(), TEST_FILE_DATA.clone())
.unwrap();

// Reader for row group 0 — no filter.
let reader0 = expect_data(decoder.try_next_reader());
let batches0: Vec<_> = reader0.collect::<Result<_, _>>().unwrap();
let batch0 = concat_batches(&TEST_BATCH.schema(), &batches0).unwrap();
assert_eq!(batch0.num_rows(), 200);
assert_eq!(batch0, TEST_BATCH.slice(0, 200));

// We're between row groups now. Swap in a filter on column "a".
assert!(decoder.can_swap_strategy());
let filter =
ArrowPredicateFn::new(ProjectionMask::columns(&schema_descr, ["a"]), |batch| {
gt(batch.column(0), &Int64Array::new_scalar(250))
});
decoder
.swap_strategy(
StrategySwap::new().with_filter(Some(RowFilter::new(vec![Box::new(filter)]))),
)
.unwrap();

// Reader for row group 1 — filter applied. Column "a" in RG1 has
// values 200..399; `a > 250` keeps 251..399 = 149 rows.
let reader1 = expect_data(decoder.try_next_reader());
let batches1: Vec<_> = reader1.collect::<Result<_, _>>().unwrap();
let batch1 = concat_batches(&TEST_BATCH.schema(), &batches1).unwrap();
let expected1 = TEST_BATCH.slice(251, 149);
assert_eq!(batch1, expected1);
expect_finished(decoder.try_next_reader());
}

/// `swap_strategy` is rejected while a row group's reader is being
/// drained, and accepted again the moment we cross the boundary.
#[test]
fn test_swap_strategy_rejected_mid_row_group() {
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_batch_size(50)
.build()
.unwrap();

decoder
.push_range(test_file_range(), TEST_FILE_DATA.clone())
.unwrap();

// After getting the first batch, we're inside `DecodingRowGroup`:
// an active reader is still alive. Mid-reader is not a swap point.
let _ = expect_data(decoder.try_decode());
assert!(!decoder.can_swap_strategy());

// Trying to swap mid-row-group is an error.
let err = decoder
.swap_strategy(
StrategySwap::new().with_row_selection_policy(super::RowSelectionPolicy::Mask),
)
.unwrap_err();
let err_msg = format!("{err}");
assert!(
err_msg.contains("can_swap_strategy"),
"unexpected error: {err_msg}"
);

// Empty swap is a no-op even mid-row-group.
decoder.swap_strategy(StrategySwap::new()).unwrap();
}

/// `try_next_reader` hands the active reader off to the caller and
/// transitions the decoder back to `ReadingRowGroup` — so the caller
/// can call `swap_strategy` even while still iterating the returned
/// reader. (The handed-off reader has no link back to the decoder's
/// projection/filter; it has its own `ArrayReader` and `ReadPlan`.)
#[test]
fn test_swap_strategy_allowed_while_iterating_handed_off_reader() {
let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
.unwrap()
.with_batch_size(1024)
.build()
.unwrap();
decoder
.push_range(test_file_range(), TEST_FILE_DATA.clone())
.unwrap();

let reader0 = expect_data(decoder.try_next_reader());
// Decoder no longer owns the reader, so it considers itself
// "between row groups" — swap is allowed even though the caller
// hasn't iterated `reader0` yet.
assert!(decoder.can_swap_strategy());
// Iterating `reader0` is independent of the decoder's state.
let batches: Vec<_> = reader0.collect::<Result<_, _>>().unwrap();
let batch0 = concat_batches(&TEST_BATCH.schema(), &batches).unwrap();
assert_eq!(batch0, TEST_BATCH.slice(0, 200));
}

/// Bytes already buffered for a column survive a projection narrowing —
/// the next row group should not request any new data for the column
/// that was *already* in the projection.
#[test]
fn test_swap_strategy_preserves_buffered_bytes() {
let metadata = test_file_parquet_metadata();
let schema_descr = metadata.file_metadata().schema_descr_ptr();

let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(metadata)
.unwrap()
.with_batch_size(1024)
.build()
.unwrap();

// Prefetch the whole file once.
decoder
.push_range(test_file_range(), TEST_FILE_DATA.clone())
.unwrap();
assert_eq!(decoder.buffered_bytes(), test_file_len());

// Drain RG0 with full projection via try_next_reader.
let reader0 = expect_data(decoder.try_next_reader());
let _: Vec<_> = reader0.collect::<Result<_, _>>().unwrap();

// Narrow projection to just column "a" for RG1.
assert!(decoder.can_swap_strategy());
decoder
.swap_strategy(
StrategySwap::new().with_projection(ProjectionMask::columns(&schema_descr, ["a"])),
)
.unwrap();

// Column "a" bytes for RG1 were part of the original wide
// projection's prefetch; we should not need additional data.
let reader1 = expect_data(decoder.try_next_reader());
let batches1: Vec<_> = reader1.collect::<Result<_, _>>().unwrap();
// Schema is now just "a"; RG1 has values 200..399.
let expected1 = TEST_BATCH.slice(200, 200).project(&[0]).unwrap();
let batch1 = concat_batches(&batches1[0].schema(), &batches1).unwrap();
assert_eq!(batch1, expected1);
expect_finished(decoder.try_next_reader());
}

/// Returns a batch with 400 rows, with 3 columns: "a", "b", "c"
///
/// Note c is a different types (so the data page sizes will be different)
Expand Down
Loading
Loading