diff --git a/datafusion/datasource-parquet/src/access_plan.rs b/datafusion/datasource-parquet/src/access_plan.rs index ca4d097c37a44..32d9795d605de 100644 --- a/datafusion/datasource-parquet/src/access_plan.rs +++ b/datafusion/datasource-parquet/src/access_plan.rs @@ -349,6 +349,77 @@ impl ParquetAccessPlan { PreparedAccessPlan::new(row_group_indexes, row_selection) } + + /// Split this plan into an ordered list of sub-plans ("chunks"), each of + /// which represents a contiguous prefix of work packed together. + /// + /// Each returned plan has the same `len()` as `self`. Row groups outside + /// the chunk are set to [`RowGroupAccess::Skip`]; row groups inside the + /// chunk keep their original [`RowGroupAccess`]. + /// + /// Chunks are formed by walking `self.row_groups` in order and grouping + /// consecutive entries with `should_scan() == true`. A new chunk is started + /// whenever adding the next scannable row group would push the accumulated + /// row count past `max_rows` or compressed byte size past `max_bytes`. A + /// single row group that already exceeds either limit becomes its own + /// chunk (no sub-row-group split is performed). + /// + /// [`RowGroupAccess::Skip`] entries are carried silently in whichever chunk + /// is active at that point; they contribute no rows or bytes. + /// + /// If there are no scannable row groups, the result is empty. + pub(crate) fn split_into_chunks( + self, + row_group_meta_data: &[RowGroupMetaData], + max_rows: u64, + max_bytes: u64, + ) -> Vec { + assert_eq!(self.row_groups.len(), row_group_meta_data.len()); + + let len = self.row_groups.len(); + let mut chunks: Vec = Vec::new(); + let mut current: Option<(ParquetAccessPlan, u64, u64)> = None; + + for (idx, access) in self.row_groups.into_iter().enumerate() { + if !access.should_scan() { + // Skip entries are attached to the currently open chunk (if + // any) so they do not force a chunk boundary. They contribute + // zero rows/bytes. + if let Some((plan, _, _)) = current.as_mut() { + plan.row_groups[idx] = access; + } + continue; + } + + let rg_meta = &row_group_meta_data[idx]; + let rg_rows = rg_meta.num_rows().max(0) as u64; + let rg_bytes = rg_meta.compressed_size().max(0) as u64; + + if let Some((plan, acc_rows, acc_bytes)) = current.as_mut() { + let exceeds = acc_rows.saturating_add(rg_rows) > max_rows + || acc_bytes.saturating_add(rg_bytes) > max_bytes; + if exceeds { + chunks.push(current.take().unwrap().0); + } else { + plan.row_groups[idx] = access; + *acc_rows += rg_rows; + *acc_bytes += rg_bytes; + continue; + } + } + + // Start a new chunk with this row group. + let mut plan = ParquetAccessPlan::new_none(len); + plan.row_groups[idx] = access; + current = Some((plan, rg_rows, rg_bytes)); + } + + if let Some((plan, _, _)) = current { + chunks.push(plan); + } + + chunks + } } /// Represents a prepared, fully resolved [`ParquetAccessPlan`] @@ -600,6 +671,180 @@ mod test { .collect() }); + /// Build metadata for row groups with the given `(num_rows, compressed_bytes)` + /// pairs. Returned metadata has one `BYTE_ARRAY` column per row group. + fn row_groups_with_bytes(specs: &[(i64, i64)]) -> Vec { + let schema_descr = get_test_schema_descr(); + specs + .iter() + .map(|(num_rows, compressed)| { + let column = ColumnChunkMetaData::builder(schema_descr.column(0)) + .set_num_values(*num_rows) + .set_total_compressed_size(*compressed) + .build() + .unwrap(); + + RowGroupMetaData::builder(schema_descr.clone()) + .set_num_rows(*num_rows) + .set_column_metadata(vec![column]) + .build() + .unwrap() + }) + .collect() + } + + fn access_kinds(plan: &ParquetAccessPlan) -> Vec<&'static str> { + plan.inner() + .iter() + .map(|rg| match rg { + RowGroupAccess::Skip => "skip", + RowGroupAccess::Scan => "scan", + RowGroupAccess::Selection(_) => "sel", + }) + .collect() + } + + #[test] + fn test_split_into_chunks_empty() { + let plan = ParquetAccessPlan::new(vec![]); + let chunks = plan.split_into_chunks(&[], 1000, 1000); + assert!(chunks.is_empty()); + } + + #[test] + fn test_split_into_chunks_all_skip() { + let meta = row_groups_with_bytes(&[(100, 1_000), (100, 1_000)]); + let plan = ParquetAccessPlan::new_none(2); + let chunks = plan.split_into_chunks(&meta, 1000, 10_000); + assert!(chunks.is_empty()); + } + + #[test] + fn test_split_into_chunks_one_per_row_group() { + // Each row group is already at the per-morsel limit, so each becomes + // its own chunk. + let meta = row_groups_with_bytes(&[(100, 1_000), (100, 1_000), (100, 1_000)]); + let plan = ParquetAccessPlan::new_all(3); + let chunks = plan.split_into_chunks(&meta, 100, 1_000); + assert_eq!(chunks.len(), 3); + assert_eq!(access_kinds(&chunks[0]), vec!["scan", "skip", "skip"]); + assert_eq!(access_kinds(&chunks[1]), vec!["skip", "scan", "skip"]); + assert_eq!(access_kinds(&chunks[2]), vec!["skip", "skip", "scan"]); + } + + #[test] + fn test_split_into_chunks_packs_small() { + // Three small row groups fit within one chunk by rows AND bytes. + let meta = row_groups_with_bytes(&[(30, 100), (30, 100), (30, 100)]); + let plan = ParquetAccessPlan::new_all(3); + let chunks = plan.split_into_chunks(&meta, 100, 1_000); + assert_eq!(chunks.len(), 1); + assert_eq!(access_kinds(&chunks[0]), vec!["scan", "scan", "scan"]); + } + + #[test] + fn test_split_into_chunks_oversized_single() { + // First row group alone exceeds max_rows; still becomes its own chunk + // (no sub-row-group split). + let meta = row_groups_with_bytes(&[(1_000, 100), (10, 100), (10, 100)]); + let plan = ParquetAccessPlan::new_all(3); + let chunks = plan.split_into_chunks(&meta, 100, 10_000); + assert_eq!(chunks.len(), 2); + assert_eq!(access_kinds(&chunks[0]), vec!["scan", "skip", "skip"]); + assert_eq!(access_kinds(&chunks[1]), vec!["skip", "scan", "scan"]); + } + + #[test] + fn test_split_into_chunks_respects_bytes() { + // All row groups are small in rows but the second one is big enough + // that it must start a new chunk on byte budget alone. + let meta = row_groups_with_bytes(&[(10, 500), (10, 600), (10, 100), (10, 100)]); + let plan = ParquetAccessPlan::new_all(4); + let chunks = plan.split_into_chunks(&meta, 1_000_000, 1_000); + assert_eq!(chunks.len(), 2); + assert_eq!( + access_kinds(&chunks[0]), + vec!["scan", "skip", "skip", "skip"] + ); + assert_eq!( + access_kinds(&chunks[1]), + vec!["skip", "scan", "scan", "scan"] + ); + } + + #[test] + fn test_split_into_chunks_with_skip_preserved() { + // Skip entries are carried by whichever chunk is currently being + // grown and never contribute to the row/byte budget, so here all + // three scan row groups fit together despite the wide skip in the + // middle. + let meta = + row_groups_with_bytes(&[(30, 100), (1_000, 500), (30, 100), (30, 100)]); + let plan = ParquetAccessPlan::new(vec![ + RowGroupAccess::Scan, + RowGroupAccess::Skip, + RowGroupAccess::Scan, + RowGroupAccess::Scan, + ]); + let chunks = plan.split_into_chunks(&meta, 100, 1_000); + assert_eq!(chunks.len(), 1); + assert_eq!( + access_kinds(&chunks[0]), + vec!["scan", "skip", "scan", "scan"] + ); + } + + #[test] + fn test_split_into_chunks_skip_between_chunks() { + // When a chunk closes on budget, a following Skip is picked up by the + // next chunk rather than creating an empty one. + let meta = row_groups_with_bytes(&[(50, 100), (50, 100), (50, 100), (50, 100)]); + let plan = ParquetAccessPlan::new(vec![ + RowGroupAccess::Scan, + RowGroupAccess::Scan, + RowGroupAccess::Skip, + RowGroupAccess::Scan, + ]); + let chunks = plan.split_into_chunks(&meta, 100, 10_000); + assert_eq!(chunks.len(), 2); + assert_eq!( + access_kinds(&chunks[0]), + vec!["scan", "scan", "skip", "skip"] + ); + // rg2's Skip still lives in chunk 0 because chunk 0 was still open + // when we hit rg2; chunk 1 only covers rg3. + assert_eq!( + access_kinds(&chunks[1]), + vec!["skip", "skip", "skip", "scan"] + ); + } + + #[test] + fn test_split_into_chunks_preserves_selection() { + let meta = row_groups_with_bytes(&[(10, 100), (20, 100), (30, 100)]); + let selection: RowSelection = + vec![RowSelector::select(5), RowSelector::skip(15)].into(); + let plan = ParquetAccessPlan::new(vec![ + RowGroupAccess::Scan, + RowGroupAccess::Selection(selection), + RowGroupAccess::Scan, + ]); + // Budget forces each row group into its own chunk. + let chunks = plan.split_into_chunks(&meta, 15, 10_000); + assert_eq!(chunks.len(), 3); + assert_eq!(access_kinds(&chunks[0]), vec!["scan", "skip", "skip"]); + assert_eq!(access_kinds(&chunks[1]), vec!["skip", "sel", "skip"]); + assert_eq!(access_kinds(&chunks[2]), vec!["skip", "skip", "scan"]); + // The Selection must be preserved verbatim in its chunk. + let RowGroupAccess::Selection(sel) = &chunks[1].inner()[1] else { + panic!("expected Selection preserved in chunk"); + }; + let selectors: Vec<_> = sel.clone().into(); + assert_eq!(selectors.len(), 2); + assert_eq!((selectors[0].skip, selectors[0].row_count), (false, 5)); + assert_eq!((selectors[1].skip, selectors[1].row_count), (true, 15)); + } + /// Single column schema with a single column named "a" of type `BYTE_ARRAY`/`String` fn get_test_schema_descr() -> SchemaDescPtr { use parquet::basic::Type as PhysicalType; diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index bad1c684b47f5..b548f1f2bde36 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -75,7 +75,16 @@ use parquet::arrow::parquet_column; use parquet::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder}; use parquet::basic::Type; use parquet::bloom_filter::Sbbf; -use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader}; +use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader}; + +/// Default soft upper bound on the number of rows packed into a single +/// row-group morsel. Adjacent row groups are coalesced until this limit would +/// be exceeded. A single oversized row group still becomes its own morsel. +pub(crate) const DEFAULT_MORSEL_MAX_ROWS: u64 = 100_000; + +/// Default soft upper bound on the compressed byte size of a single row-group +/// morsel. See [`DEFAULT_MORSEL_MAX_ROWS`]. +pub(crate) const DEFAULT_MORSEL_MAX_COMPRESSED_BYTES: u64 = 64 * 1024 * 1024; /// Stateless Parquet morselizer implementation. /// @@ -136,6 +145,15 @@ pub(super) struct ParquetMorselizer { pub max_predicate_cache_size: Option, /// Whether to read row groups in reverse order pub reverse_row_groups: bool, + /// Upper bound on the number of rows coalesced into a single morsel. + /// + /// Row groups are packed greedily until the next row group would push + /// the total past this limit; a single oversized row group still becomes + /// its own morsel. + pub morsel_max_rows: u64, + /// Upper bound on the compressed byte size coalesced into a single + /// morsel. See [`Self::morsel_max_rows`]. + pub morsel_max_compressed_bytes: u64, } impl fmt::Debug for ParquetMorselizer { @@ -228,8 +246,15 @@ enum ParquetOpenState { /// /// TODO: split state as this currently does both I/O and CPU work. BuildStream(Box), - /// Terminal state: the final opened stream is ready to return. - Ready(BoxStream<'static, Result>), + /// Terminal state: one or more per-morsel lazy builders are ready to + /// return. + /// + /// Each morsel corresponds to one row-group-sized chunk of the file. + /// Morsels defer row-filter compilation, decoder construction, and + /// reader acquisition until [`Morsel::into_stream`] is actually + /// invoked — so construction work for a morsel only happens when the + /// scheduler picks it up. + Ready(Vec>), /// Terminal state: reading complete Done, } @@ -287,6 +312,8 @@ struct PreparedParquetOpen { max_predicate_cache_size: Option, reverse_row_groups: bool, preserve_order: bool, + morsel_max_rows: u64, + morsel_max_compressed_bytes: u64, #[cfg(feature = "parquet_encryption")] file_decryption_properties: Option>, } @@ -399,7 +426,7 @@ impl ParquetOpenState { ParquetOpenState::BuildStream(prepared) => { Ok(ParquetOpenState::Ready(prepared.build_stream()?)) } - ParquetOpenState::Ready(stream) => Ok(ParquetOpenState::Ready(stream)), + ParquetOpenState::Ready(streams) => Ok(ParquetOpenState::Ready(streams)), ParquetOpenState::Done => { panic!("ParquetOpenFuture polled after completion"); } @@ -407,27 +434,200 @@ impl ParquetOpenState { } } -/// Implements the Morsel API -struct ParquetStreamMorsel { - stream: BoxStream<'static, Result>, +/// File-level state shared across every lazy morsel from a single file open. +/// +/// Each [`ParquetLazyMorsel`] holds an `Arc` to one of these so the +/// expensive-to-clone pieces (metadata, schemas, metrics, Arc predicates) +/// are not duplicated. The only non-shareable resource is the +/// [`FilePruner`], which is held on chunk 0's morsel because it's +/// `!Clone`. +struct LazyMorselShared { + partition_index: usize, + partitioned_file: PartitionedFile, + metadata_size_hint: Option, + metrics: ExecutionPlanMetricsSet, + file_metrics: ParquetFileMetrics, + baseline_metrics: BaselineMetrics, + parquet_file_reader_factory: Arc, + batch_size: usize, + physical_file_schema: SchemaRef, + output_schema: SchemaRef, + projection: ProjectionExprs, + predicate: Option>, + pushdown_filters: bool, + force_filter_selections: bool, + reorder_predicates: bool, + limit: Option, + max_predicate_cache_size: Option, + reverse_row_groups: bool, + reader_metadata: ArrowReaderMetadata, + file_metadata: Arc, } -impl ParquetStreamMorsel { - fn new(stream: BoxStream<'static, Result>) -> Self { - Self { stream } - } +/// Lazy per-morsel builder. +/// +/// Holds everything needed to construct the parquet decoder stream for a +/// single chunk of row groups, but defers the actual construction — +/// `build_row_filter`, decoder build, reader acquisition — to +/// [`Morsel::into_stream`]. This means a file's morsel construction cost +/// is paid only as each morsel is scheduled, not all-at-once at +/// `build_stream` time. +struct ParquetLazyMorsel { + shared: Arc, + chunk_plan: ParquetAccessPlan, + chunk_idx: usize, + /// The file-level [`FilePruner`] used for dynamic-filter early-stop. + /// `FilePruner` is not `Clone` and holds stateful predicate-generation + /// counters, so it's attached only to chunk 0's stream. + file_pruner: Option, } -impl fmt::Debug for ParquetStreamMorsel { +impl fmt::Debug for ParquetLazyMorsel { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("ParquetStreamMorsel") + f.debug_struct("ParquetLazyMorsel") + .field("chunk_idx", &self.chunk_idx) .finish_non_exhaustive() } } -impl Morsel for ParquetStreamMorsel { +impl Morsel for ParquetLazyMorsel { fn into_stream(self: Box) -> BoxStream<'static, Result> { - self.stream + match (*self).build_stream_now() { + Ok(stream) => stream, + Err(e) => futures::stream::once(async move { Err(e) }).boxed(), + } + } +} + +impl ParquetLazyMorsel { + fn build_stream_now(self) -> Result>> { + let ParquetLazyMorsel { + shared, + chunk_plan, + chunk_idx, + file_pruner, + } = self; + + let rg_metadata = shared.file_metadata.row_groups(); + let mut prepared_plan = chunk_plan.prepare(rg_metadata)?; + if shared.reverse_row_groups { + prepared_plan = prepared_plan.reverse(shared.file_metadata.as_ref())?; + } + + // `RowFilter` is not `Clone` because it owns `Box`s, + // so a fresh filter has to be built per chunk. + let row_filter = if let Some(predicate) = shared + .pushdown_filters + .then_some(shared.predicate.clone()) + .flatten() + { + match row_filter::build_row_filter( + &predicate, + &shared.physical_file_schema, + shared.file_metadata.as_ref(), + shared.reorder_predicates, + &shared.file_metrics, + ) { + Ok(Some(filter)) => Some(filter), + Ok(None) => None, + Err(e) => { + debug!("Ignoring error building row filter for '{predicate:?}': {e}"); + None + } + } + } else { + None + }; + + let arrow_reader_metrics = ArrowReaderMetrics::enabled(); + let read_plan = build_projection_read_plan( + shared.projection.expr_iter(), + &shared.physical_file_schema, + shared.reader_metadata.parquet_schema(), + ); + + let mut decoder_builder = + ParquetPushDecoderBuilder::new_with_metadata(shared.reader_metadata.clone()) + .with_projection(read_plan.projection_mask) + .with_batch_size(shared.batch_size) + .with_metrics(arrow_reader_metrics.clone()); + + if let Some(row_filter) = row_filter { + decoder_builder = decoder_builder.with_row_filter(row_filter); + } + if shared.force_filter_selections { + decoder_builder = + decoder_builder.with_row_selection_policy(RowSelectionPolicy::Selectors); + } + if let Some(row_selection) = prepared_plan.row_selection { + decoder_builder = decoder_builder.with_row_selection(row_selection); + } + decoder_builder = + decoder_builder.with_row_groups(prepared_plan.row_group_indexes); + // `ScanState.remain` enforces the true outer limit across all + // morsels; passing the per-chunk limit here is a conservative + // per-chunk cap that bounds wasted decode once the outer cap is hit. + if let Some(limit) = shared.limit { + decoder_builder = decoder_builder.with_limit(limit); + } + if let Some(max_predicate_cache_size) = shared.max_predicate_cache_size { + decoder_builder = + decoder_builder.with_max_predicate_cache_size(max_predicate_cache_size); + } + + let decoder = decoder_builder.build()?; + + let reader = shared.parquet_file_reader_factory.create_reader( + shared.partition_index, + shared.partitioned_file.clone(), + shared.metadata_size_hint, + &shared.metrics, + )?; + + let stream_schema = read_plan.projected_schema; + let replace_schema = stream_schema != shared.output_schema; + let projection = shared + .projection + .clone() + .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; + let projector = projection.make_projector(&stream_schema)?; + + let predicate_cache_inner_records = + shared.file_metrics.predicate_cache_inner_records.clone(); + let predicate_cache_records = shared.file_metrics.predicate_cache_records.clone(); + + let stream = futures::stream::unfold( + PushDecoderStreamState { + decoder, + reader, + projector, + output_schema: Arc::clone(&shared.output_schema), + replace_schema, + arrow_reader_metrics, + predicate_cache_inner_records, + predicate_cache_records, + baseline_metrics: shared.baseline_metrics.clone(), + }, + |state| async move { state.transition().await }, + ) + .fuse(); + + // Attach `FilePruner` only to chunk 0 so the whole file scan can + // still early-stop when a dynamic filter narrows. + let boxed: BoxStream<'static, Result> = if chunk_idx == 0 + && let Some(pruner) = file_pruner + { + EarlyStoppingStream::new( + stream.boxed(), + pruner, + shared.file_metrics.files_ranges_pruned_statistics.clone(), + ) + .boxed() + } else { + stream.boxed() + }; + + Ok(boxed) } } @@ -515,9 +715,12 @@ impl MorselPlanner for ParquetMorselPlanner { ))) }))) } - ParquetOpenState::Ready(stream) => { - let morsels: Vec> = - vec![Box::new(ParquetStreamMorsel::new(stream))]; + ParquetOpenState::Ready(morsels) => { + if morsels.is_empty() { + // No row groups survived pruning, so there's nothing to + // feed the executor — terminate this file's planner. + return Ok(None); + } Ok(Some(MorselPlan::new().with_morsels(morsels))) } ParquetOpenState::Done => Ok(None), @@ -656,6 +859,8 @@ impl ParquetMorselizer { max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, preserve_order: self.preserve_order, + morsel_max_rows: self.morsel_max_rows, + morsel_max_compressed_bytes: self.morsel_max_compressed_bytes, #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, }) @@ -1055,8 +1260,15 @@ impl BloomFiltersLoadedParquetOpen { } impl RowGroupsPrunedParquetOpen { - /// Build the final parquet stream once all pruning work is complete. - fn build_stream(self) -> Result>> { + /// Build one or more per-morsel streams once all pruning work is complete. + /// + /// Row groups are packed into chunks of up to [`MORSEL_MAX_ROWS`] rows and + /// [`MORSEL_MAX_COMPRESSED_BYTES`] compressed bytes. Each chunk becomes an + /// independent stream that can be wrapped in a `ParquetStreamMorsel`, + /// letting the driver interleave row-group work with other operators and + /// unblocking the follow-on work of sharing row-group-level work across + /// sibling `FileStream`s. + fn build_stream(self) -> Result>> { let RowGroupsPrunedParquetOpen { prepared, mut row_groups, @@ -1075,32 +1287,6 @@ impl RowGroupsPrunedParquetOpen { let file_metadata = Arc::clone(reader_metadata.metadata()); let rg_metadata = file_metadata.row_groups(); - // Filter pushdown: evaluate predicates during scan - let row_filter = if let Some(predicate) = prepared - .pushdown_filters - .then_some(prepared.predicate.clone()) - .flatten() - { - let row_filter = row_filter::build_row_filter( - &predicate, - &prepared.physical_file_schema, - file_metadata.as_ref(), - prepared.reorder_predicates, - &prepared.file_metrics, - ); - - match row_filter { - Ok(Some(filter)) => Some(filter), - Ok(None) => None, - Err(e) => { - debug!("Ignoring error building row filter for '{predicate:?}': {e}"); - None - } - } - } else { - None - }; - // Prune by limit if limit is set and limit order is not sensitive if let (Some(limit), false) = (prepared.limit, prepared.preserve_order) { row_groups.prune_by_limit(limit, rg_metadata, &prepared.file_metrics); @@ -1123,98 +1309,72 @@ impl RowGroupsPrunedParquetOpen { ); } - // Prepare the access plan (extract row groups and row selection) - let mut prepared_plan = access_plan.prepare(rg_metadata)?; - - // Potentially reverse the access plan for performance. - // See `ParquetSource::try_pushdown_sort` for the rationale. - if prepared.reverse_row_groups { - prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?; + if access_plan.row_group_index_iter().next().is_none() { + return Ok(Vec::new()); } - let arrow_reader_metrics = ArrowReaderMetrics::enabled(); - let read_plan = build_projection_read_plan( - prepared.projection.expr_iter(), - &prepared.physical_file_schema, - reader_metadata.parquet_schema(), + let mut chunk_plans = access_plan.split_into_chunks( + rg_metadata, + prepared.morsel_max_rows, + prepared.morsel_max_compressed_bytes, ); - let mut decoder_builder = - ParquetPushDecoderBuilder::new_with_metadata(reader_metadata) - .with_projection(read_plan.projection_mask) - .with_batch_size(prepared.batch_size) - .with_metrics(arrow_reader_metrics.clone()); - - if let Some(row_filter) = row_filter { - decoder_builder = decoder_builder.with_row_filter(row_filter); - } - if prepared.force_filter_selections { - decoder_builder = - decoder_builder.with_row_selection_policy(RowSelectionPolicy::Selectors); - } - if let Some(row_selection) = prepared_plan.row_selection { - decoder_builder = decoder_builder.with_row_selection(row_selection); - } - decoder_builder = - decoder_builder.with_row_groups(prepared_plan.row_group_indexes); - if let Some(limit) = prepared.limit { - decoder_builder = decoder_builder.with_limit(limit); - } - if let Some(max_predicate_cache_size) = prepared.max_predicate_cache_size { - decoder_builder = - decoder_builder.with_max_predicate_cache_size(max_predicate_cache_size); + // Reverse chunk order so that, when `reverse_row_groups` is set, the + // first emitted morsel corresponds to the file's last row groups. + // Each chunk's `PreparedAccessPlan` is also reversed below so that + // within a chunk the row-group read order mirrors the file-wide + // reversal. See `ParquetSource::try_pushdown_sort` for the rationale. + if prepared.reverse_row_groups { + chunk_plans.reverse(); } - let decoder = decoder_builder.build()?; - - let predicate_cache_inner_records = - prepared.file_metrics.predicate_cache_inner_records.clone(); - let predicate_cache_records = - prepared.file_metrics.predicate_cache_records.clone(); - - // Check if we need to replace the schema to handle things like differing nullability or metadata. - // See note below about file vs. output schema. - let stream_schema = read_plan.projected_schema; - let replace_schema = stream_schema != prepared.output_schema; - - // Rebase column indices to match the narrowed stream schema. - // The projection expressions have indices based on physical_file_schema, - // but the stream only contains the columns selected by the ProjectionMask. - let projection = prepared - .projection - .try_map_exprs(|expr| reassign_expr_columns(expr, &stream_schema))?; - let projector = projection.make_projector(&stream_schema)?; - let output_schema = Arc::clone(&prepared.output_schema); - let files_ranges_pruned_statistics = - prepared.file_metrics.files_ranges_pruned_statistics.clone(); - let stream = futures::stream::unfold( - PushDecoderStreamState { - decoder, - reader: prepared.async_file_reader, - projector, - output_schema, - replace_schema, - arrow_reader_metrics, - predicate_cache_inner_records, - predicate_cache_records, - baseline_metrics: prepared.baseline_metrics, - }, - |state| async move { state.transition().await }, - ) - .fuse(); + // `prepared.async_file_reader` served metadata / page-index / + // bloom-filter loads and is dropped here: each morsel mints its + // own reader via the factory at `into_stream` time. Built-in + // factories wrap only `Arc` (HTTP/connection + // pool already shared) or an `Arc`, so the + // "warm cache" benefit of reusing a reader is negligible. + let mut file_pruner = prepared.file_pruner; + + let shared = Arc::new(LazyMorselShared { + partition_index: prepared.partition_index, + partitioned_file: prepared.partitioned_file, + metadata_size_hint: prepared.metadata_size_hint, + metrics: prepared.metrics, + file_metrics: prepared.file_metrics, + baseline_metrics: prepared.baseline_metrics, + parquet_file_reader_factory: prepared.parquet_file_reader_factory, + batch_size: prepared.batch_size, + physical_file_schema: prepared.physical_file_schema, + output_schema: prepared.output_schema, + projection: prepared.projection, + predicate: prepared.predicate, + pushdown_filters: prepared.pushdown_filters, + force_filter_selections: prepared.force_filter_selections, + reorder_predicates: prepared.reorder_predicates, + limit: prepared.limit, + max_predicate_cache_size: prepared.max_predicate_cache_size, + reverse_row_groups: prepared.reverse_row_groups, + reader_metadata, + file_metadata, + }); + + // `FilePruner` is `!Clone`, so `take` hands it to the first morsel + // and leaves `None` for the rest. + let morsels: Vec> = chunk_plans + .into_iter() + .enumerate() + .map(|(chunk_idx, chunk_plan)| { + Box::new(ParquetLazyMorsel { + shared: Arc::clone(&shared), + chunk_plan, + chunk_idx, + file_pruner: file_pruner.take(), + }) as Box + }) + .collect(); - // Wrap the stream so a dynamic filter can stop the file scan early. - if let Some(file_pruner) = prepared.file_pruner { - let stream = stream.boxed(); - Ok(EarlyStoppingStream::new( - stream, - file_pruner, - files_ranges_pruned_statistics, - ) - .boxed()) - } else { - Ok(stream.boxed()) - } + Ok(morsels) } } @@ -1629,8 +1789,7 @@ mod test { use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use bytes::{BufMut, BytesMut}; use datafusion_common::{ - ColumnStatistics, ScalarValue, Statistics, internal_err, record_batch, - stats::Precision, + ColumnStatistics, ScalarValue, Statistics, record_batch, stats::Precision, }; use datafusion_datasource::morsel::{Morsel, Morselizer}; use datafusion_datasource::{PartitionedFile, TableSchema}; @@ -1676,6 +1835,8 @@ mod test { max_predicate_cache_size: Option, reverse_row_groups: bool, preserve_order: bool, + morsel_max_rows: u64, + morsel_max_compressed_bytes: u64, } impl ParquetMorselizerBuilder { @@ -1702,6 +1863,8 @@ mod test { max_predicate_cache_size: None, reverse_row_groups: false, preserve_order: false, + morsel_max_rows: DEFAULT_MORSEL_MAX_ROWS, + morsel_max_compressed_bytes: DEFAULT_MORSEL_MAX_COMPRESSED_BYTES, } } @@ -1765,6 +1928,19 @@ mod test { self } + /// Override the per-morsel row budget. + fn with_morsel_max_rows(mut self, limit: u64) -> Self { + self.morsel_max_rows = limit; + self + } + + /// Override the per-morsel compressed byte budget. + #[expect(dead_code)] + fn with_morsel_max_compressed_bytes(mut self, limit: u64) -> Self { + self.morsel_max_compressed_bytes = limit; + self + } + /// Build the ParquetMorselizer instance. /// /// # Panics @@ -1816,6 +1992,8 @@ mod test { encryption_factory: None, max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, + morsel_max_rows: self.morsel_max_rows, + morsel_max_compressed_bytes: self.morsel_max_compressed_bytes, } } } @@ -1830,32 +2008,49 @@ mod test { morselizer: &ParquetMorselizer, file: PartitionedFile, ) -> Result>> { - let mut planners = VecDeque::from([morselizer.plan_file(file)?]); - let mut morsels: VecDeque> = VecDeque::new(); + let morsels = collect_all_morsels(morselizer, file).await?; + if let Some(first) = morsels.into_iter().next() { + Ok(Box::pin(first.into_stream())) + } else { + Ok(Box::pin(futures::stream::empty())) + } + } - loop { - if let Some(morsel) = morsels.pop_front() { - return Ok(Box::pin(morsel.into_stream())); - } + /// Drives the morselizer to completion and returns every morsel it + /// produced, in order. Useful for asserting how a file is split into + /// row-group morsels. + async fn collect_all_morsels( + morselizer: &ParquetMorselizer, + file: PartitionedFile, + ) -> Result>> { + let mut planners = VecDeque::from([morselizer.plan_file(file)?]); + let mut morsels: Vec> = Vec::new(); - let Some(planner) = planners.pop_front() else { - return Ok(Box::pin(futures::stream::empty())); + while let Some(planner) = planners.pop_front() { + let Some(mut plan) = planner.plan()? else { + continue; }; + morsels.extend(plan.take_morsels()); + planners.extend(plan.take_ready_planners()); - if let Some(mut plan) = planner.plan()? { - morsels.extend(plan.take_morsels()); - planners.extend(plan.take_ready_planners()); + if let Some(pending_planner) = plan.take_pending_planner() { + planners.push_front(pending_planner.await?); + } + } - if let Some(pending_planner) = plan.take_pending_planner() { - planners.push_front(pending_planner.await?); - continue; - } + Ok(morsels) + } - if morsels.is_empty() && planners.is_empty() { - return internal_err!("planner returned an empty morsel plan"); - } - } + /// Concatenate all batches produced by `streams`, returning the int32 + /// values from the first column of each batch. + async fn collect_int32_values_across( + streams: Vec>>, + ) -> Vec { + let mut values = vec![]; + for stream in streams { + values.extend(collect_int32_values(stream).await); } + values } fn constant_int_stats() -> (Statistics, SchemaRef) { @@ -2651,6 +2846,216 @@ mod test { ); } + /// A multi-row-group file whose pruned access plan exceeds the per-morsel + /// row budget produces multiple morsels, and their concatenated output + /// matches the single-morsel reference. + #[tokio::test] + async fn test_row_group_split_produces_multiple_morsels() { + use parquet::file::properties::WriterProperties; + + let store = Arc::new(InMemory::new()) as Arc; + + // Three row groups of 3 rows each. Packing stops at 3 rows/morsel, so + // we expect three morsels. + let batch1 = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let batch2 = + record_batch!(("a", Int32, vec![Some(4), Some(5), Some(6)])).unwrap(); + let batch3 = + record_batch!(("a", Int32, vec![Some(7), Some(8), Some(9)])).unwrap(); + + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(3)) + .build(); + let data_len = write_parquet_batches( + Arc::clone(&store), + "test.parquet", + vec![batch1.clone(), batch2, batch3], + Some(props), + ) + .await; + let schema = batch1.schema(); + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(Arc::clone(&schema)) + .with_projection_indices(&[0]) + .with_morsel_max_rows(3) + .build(); + let morsels = collect_all_morsels(&morselizer, file.clone()) + .await + .unwrap(); + assert_eq!(morsels.len(), 3, "one morsel per row group"); + + let streams = morsels + .into_iter() + .map(|m| Box::pin(m.into_stream()) as BoxStream<_>) + .collect(); + let values = collect_int32_values_across(streams).await; + assert_eq!(values, vec![1, 2, 3, 4, 5, 6, 7, 8, 9]); + + // Reference: default budget keeps everything in one morsel. + let reference_morselizer = ParquetMorselizerBuilder::new() + .with_store(Arc::clone(&store)) + .with_schema(schema) + .with_projection_indices(&[0]) + .build(); + let reference_stream = open_file(&reference_morselizer, file).await.unwrap(); + assert_eq!( + collect_int32_values(reference_stream).await, + vec![1, 2, 3, 4, 5, 6, 7, 8, 9] + ); + } + + /// When adjacent row groups fit inside the morsel budget they should be + /// packed together rather than emitted one-per-morsel. + #[tokio::test] + async fn test_row_group_split_packs_within_budget() { + use parquet::file::properties::WriterProperties; + + let store = Arc::new(InMemory::new()) as Arc; + + let batch1 = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let batch2 = + record_batch!(("a", Int32, vec![Some(4), Some(5), Some(6)])).unwrap(); + let batch3 = + record_batch!(("a", Int32, vec![Some(7), Some(8), Some(9)])).unwrap(); + + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(3)) + .build(); + let data_len = write_parquet_batches( + Arc::clone(&store), + "test.parquet", + vec![batch1.clone(), batch2, batch3], + Some(props), + ) + .await; + let schema = batch1.schema(); + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ); + + // Budget fits exactly 2 row groups; expect two morsels: [0+1], [2]. + let morselizer = ParquetMorselizerBuilder::new() + .with_store(store) + .with_schema(schema) + .with_projection_indices(&[0]) + .with_morsel_max_rows(6) + .build(); + let morsels = collect_all_morsels(&morselizer, file).await.unwrap(); + assert_eq!(morsels.len(), 2); + } + + /// A user-supplied access plan with a `Skip` entry between scanned row + /// groups should preserve the skip across chunking. + #[tokio::test] + async fn test_row_group_split_honors_user_skip() { + use crate::ParquetAccessPlan; + use parquet::file::properties::WriterProperties; + + let store = Arc::new(InMemory::new()) as Arc; + + let batch1 = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let batch2 = + record_batch!(("a", Int32, vec![Some(4), Some(5), Some(6)])).unwrap(); + let batch3 = + record_batch!(("a", Int32, vec![Some(7), Some(8), Some(9)])).unwrap(); + + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(3)) + .build(); + let data_len = write_parquet_batches( + Arc::clone(&store), + "test.parquet", + vec![batch1.clone(), batch2, batch3], + Some(props), + ) + .await; + let schema = batch1.schema(); + + let mut access_plan = ParquetAccessPlan::new_all(3); + access_plan.skip(1); + + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ) + .with_extensions(Arc::new(access_plan)); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(store) + .with_schema(schema) + .with_projection_indices(&[0]) + .with_morsel_max_rows(3) + .build(); + let morsels = collect_all_morsels(&morselizer, file).await.unwrap(); + let streams = morsels + .into_iter() + .map(|m| Box::pin(m.into_stream()) as BoxStream<_>) + .collect(); + let values = collect_int32_values_across(streams).await; + assert_eq!(values, vec![1, 2, 3, 7, 8, 9], "row group 1 is skipped"); + } + + /// When `reverse_row_groups` is set the per-morsel split should preserve + /// the reverse output order: the first morsel emits the file's last row + /// group. + #[tokio::test] + async fn test_row_group_split_with_reverse() { + use parquet::file::properties::WriterProperties; + + let store = Arc::new(InMemory::new()) as Arc; + + let batch1 = + record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); + let batch2 = + record_batch!(("a", Int32, vec![Some(4), Some(5), Some(6)])).unwrap(); + let batch3 = + record_batch!(("a", Int32, vec![Some(7), Some(8), Some(9)])).unwrap(); + + let props = WriterProperties::builder() + .set_max_row_group_row_count(Some(3)) + .build(); + let data_len = write_parquet_batches( + Arc::clone(&store), + "test.parquet", + vec![batch1.clone(), batch2, batch3], + Some(props), + ) + .await; + let schema = batch1.schema(); + let file = PartitionedFile::new( + "test.parquet".to_string(), + u64::try_from(data_len).unwrap(), + ); + + let morselizer = ParquetMorselizerBuilder::new() + .with_store(store) + .with_schema(schema) + .with_projection_indices(&[0]) + .with_morsel_max_rows(3) + .with_reverse_row_groups(true) + .build(); + let morsels = collect_all_morsels(&morselizer, file).await.unwrap(); + assert_eq!(morsels.len(), 3); + + // First morsel should emit the originally-last row group. + let streams: Vec<_> = morsels + .into_iter() + .map(|m| Box::pin(m.into_stream()) as BoxStream<_>) + .collect(); + let values = collect_int32_values_across(streams).await; + assert_eq!(values, vec![7, 8, 9, 4, 5, 6, 1, 2, 3]); + } + /// Test that page pruning predicates are only built and applied when `enable_page_index` is true. /// /// The file has a single row group with 10 pages (10 rows each, values 1..100). diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index a014c8b2726e7..0d0840655bf26 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -580,6 +580,9 @@ impl FileSource for ParquetSource { encryption_factory: self.get_encryption_factory_with_config(), max_predicate_cache_size: self.max_predicate_cache_size(), reverse_row_groups: self.reverse_row_groups, + morsel_max_rows: crate::opener::DEFAULT_MORSEL_MAX_ROWS, + morsel_max_compressed_bytes: + crate::opener::DEFAULT_MORSEL_MAX_COMPRESSED_BYTES, })) }