Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 245 additions & 0 deletions datafusion/datasource-parquet/src/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,77 @@ impl ParquetAccessPlan {

PreparedAccessPlan::new(row_group_indexes, row_selection)
}

/// Split this plan into an ordered list of sub-plans ("chunks"), each of
/// which represents a contiguous prefix of work packed together.
///
/// Each returned plan has the same `len()` as `self`. Row groups outside
/// the chunk are set to [`RowGroupAccess::Skip`]; row groups inside the
/// chunk keep their original [`RowGroupAccess`].
///
/// Chunks are formed by walking `self.row_groups` in order and grouping
/// consecutive entries with `should_scan() == true`. A new chunk is started
/// whenever adding the next scannable row group would push the accumulated
/// row count past `max_rows` or compressed byte size past `max_bytes`. A
/// single row group that already exceeds either limit becomes its own
/// chunk (no sub-row-group split is performed).
///
/// [`RowGroupAccess::Skip`] entries are carried silently in whichever chunk
/// is active at that point; they contribute no rows or bytes.
///
/// If there are no scannable row groups, the result is empty.
pub(crate) fn split_into_chunks(
self,
row_group_meta_data: &[RowGroupMetaData],
max_rows: u64,
max_bytes: u64,
) -> Vec<ParquetAccessPlan> {
assert_eq!(self.row_groups.len(), row_group_meta_data.len());

let len = self.row_groups.len();
let mut chunks: Vec<ParquetAccessPlan> = Vec::new();
let mut current: Option<(ParquetAccessPlan, u64, u64)> = None;

for (idx, access) in self.row_groups.into_iter().enumerate() {
if !access.should_scan() {
// Skip entries are attached to the currently open chunk (if
// any) so they do not force a chunk boundary. They contribute
// zero rows/bytes.
if let Some((plan, _, _)) = current.as_mut() {
plan.row_groups[idx] = access;
}
continue;
}

let rg_meta = &row_group_meta_data[idx];
let rg_rows = rg_meta.num_rows().max(0) as u64;
let rg_bytes = rg_meta.compressed_size().max(0) as u64;

if let Some((plan, acc_rows, acc_bytes)) = current.as_mut() {
let exceeds = acc_rows.saturating_add(rg_rows) > max_rows
|| acc_bytes.saturating_add(rg_bytes) > max_bytes;
if exceeds {
chunks.push(current.take().unwrap().0);
} else {
plan.row_groups[idx] = access;
*acc_rows += rg_rows;
*acc_bytes += rg_bytes;
continue;
}
}

// Start a new chunk with this row group.
let mut plan = ParquetAccessPlan::new_none(len);
plan.row_groups[idx] = access;
current = Some((plan, rg_rows, rg_bytes));
}

if let Some((plan, _, _)) = current {
chunks.push(plan);
}

chunks
}
}

/// Represents a prepared, fully resolved [`ParquetAccessPlan`]
Expand Down Expand Up @@ -600,6 +671,180 @@ mod test {
.collect()
});

/// Build metadata for row groups with the given `(num_rows, compressed_bytes)`
/// pairs. Returned metadata has one `BYTE_ARRAY` column per row group.
fn row_groups_with_bytes(specs: &[(i64, i64)]) -> Vec<RowGroupMetaData> {
let schema_descr = get_test_schema_descr();
specs
.iter()
.map(|(num_rows, compressed)| {
let column = ColumnChunkMetaData::builder(schema_descr.column(0))
.set_num_values(*num_rows)
.set_total_compressed_size(*compressed)
.build()
.unwrap();

RowGroupMetaData::builder(schema_descr.clone())
.set_num_rows(*num_rows)
.set_column_metadata(vec![column])
.build()
.unwrap()
})
.collect()
}

fn access_kinds(plan: &ParquetAccessPlan) -> Vec<&'static str> {
plan.inner()
.iter()
.map(|rg| match rg {
RowGroupAccess::Skip => "skip",
RowGroupAccess::Scan => "scan",
RowGroupAccess::Selection(_) => "sel",
})
.collect()
}

#[test]
fn test_split_into_chunks_empty() {
let plan = ParquetAccessPlan::new(vec![]);
let chunks = plan.split_into_chunks(&[], 1000, 1000);
assert!(chunks.is_empty());
}

#[test]
fn test_split_into_chunks_all_skip() {
let meta = row_groups_with_bytes(&[(100, 1_000), (100, 1_000)]);
let plan = ParquetAccessPlan::new_none(2);
let chunks = plan.split_into_chunks(&meta, 1000, 10_000);
assert!(chunks.is_empty());
}

#[test]
fn test_split_into_chunks_one_per_row_group() {
// Each row group is already at the per-morsel limit, so each becomes
// its own chunk.
let meta = row_groups_with_bytes(&[(100, 1_000), (100, 1_000), (100, 1_000)]);
let plan = ParquetAccessPlan::new_all(3);
let chunks = plan.split_into_chunks(&meta, 100, 1_000);
assert_eq!(chunks.len(), 3);
assert_eq!(access_kinds(&chunks[0]), vec!["scan", "skip", "skip"]);
assert_eq!(access_kinds(&chunks[1]), vec!["skip", "scan", "skip"]);
assert_eq!(access_kinds(&chunks[2]), vec!["skip", "skip", "scan"]);
}

#[test]
fn test_split_into_chunks_packs_small() {
// Three small row groups fit within one chunk by rows AND bytes.
let meta = row_groups_with_bytes(&[(30, 100), (30, 100), (30, 100)]);
let plan = ParquetAccessPlan::new_all(3);
let chunks = plan.split_into_chunks(&meta, 100, 1_000);
assert_eq!(chunks.len(), 1);
assert_eq!(access_kinds(&chunks[0]), vec!["scan", "scan", "scan"]);
}

#[test]
fn test_split_into_chunks_oversized_single() {
// First row group alone exceeds max_rows; still becomes its own chunk
// (no sub-row-group split).
let meta = row_groups_with_bytes(&[(1_000, 100), (10, 100), (10, 100)]);
let plan = ParquetAccessPlan::new_all(3);
let chunks = plan.split_into_chunks(&meta, 100, 10_000);
assert_eq!(chunks.len(), 2);
assert_eq!(access_kinds(&chunks[0]), vec!["scan", "skip", "skip"]);
assert_eq!(access_kinds(&chunks[1]), vec!["skip", "scan", "scan"]);
}

#[test]
fn test_split_into_chunks_respects_bytes() {
// All row groups are small in rows but the second one is big enough
// that it must start a new chunk on byte budget alone.
let meta = row_groups_with_bytes(&[(10, 500), (10, 600), (10, 100), (10, 100)]);
let plan = ParquetAccessPlan::new_all(4);
let chunks = plan.split_into_chunks(&meta, 1_000_000, 1_000);
assert_eq!(chunks.len(), 2);
assert_eq!(
access_kinds(&chunks[0]),
vec!["scan", "skip", "skip", "skip"]
);
assert_eq!(
access_kinds(&chunks[1]),
vec!["skip", "scan", "scan", "scan"]
);
}

#[test]
fn test_split_into_chunks_with_skip_preserved() {
// Skip entries are carried by whichever chunk is currently being
// grown and never contribute to the row/byte budget, so here all
// three scan row groups fit together despite the wide skip in the
// middle.
let meta =
row_groups_with_bytes(&[(30, 100), (1_000, 500), (30, 100), (30, 100)]);
let plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
RowGroupAccess::Skip,
RowGroupAccess::Scan,
RowGroupAccess::Scan,
]);
let chunks = plan.split_into_chunks(&meta, 100, 1_000);
assert_eq!(chunks.len(), 1);
assert_eq!(
access_kinds(&chunks[0]),
vec!["scan", "skip", "scan", "scan"]
);
}

#[test]
fn test_split_into_chunks_skip_between_chunks() {
// When a chunk closes on budget, a following Skip is picked up by the
// next chunk rather than creating an empty one.
let meta = row_groups_with_bytes(&[(50, 100), (50, 100), (50, 100), (50, 100)]);
let plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
RowGroupAccess::Scan,
RowGroupAccess::Skip,
RowGroupAccess::Scan,
]);
let chunks = plan.split_into_chunks(&meta, 100, 10_000);
assert_eq!(chunks.len(), 2);
assert_eq!(
access_kinds(&chunks[0]),
vec!["scan", "scan", "skip", "skip"]
);
// rg2's Skip still lives in chunk 0 because chunk 0 was still open
// when we hit rg2; chunk 1 only covers rg3.
assert_eq!(
access_kinds(&chunks[1]),
vec!["skip", "skip", "skip", "scan"]
);
}

#[test]
fn test_split_into_chunks_preserves_selection() {
let meta = row_groups_with_bytes(&[(10, 100), (20, 100), (30, 100)]);
let selection: RowSelection =
vec![RowSelector::select(5), RowSelector::skip(15)].into();
let plan = ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
RowGroupAccess::Selection(selection),
RowGroupAccess::Scan,
]);
// Budget forces each row group into its own chunk.
let chunks = plan.split_into_chunks(&meta, 15, 10_000);
assert_eq!(chunks.len(), 3);
assert_eq!(access_kinds(&chunks[0]), vec!["scan", "skip", "skip"]);
assert_eq!(access_kinds(&chunks[1]), vec!["skip", "sel", "skip"]);
assert_eq!(access_kinds(&chunks[2]), vec!["skip", "skip", "scan"]);
// The Selection must be preserved verbatim in its chunk.
let RowGroupAccess::Selection(sel) = &chunks[1].inner()[1] else {
panic!("expected Selection preserved in chunk");
};
let selectors: Vec<_> = sel.clone().into();
assert_eq!(selectors.len(), 2);
assert_eq!((selectors[0].skip, selectors[0].row_count), (false, 5));
assert_eq!((selectors[1].skip, selectors[1].row_count), (true, 15));
}

/// Single column schema with a single column named "a" of type `BYTE_ARRAY`/`String`
fn get_test_schema_descr() -> SchemaDescPtr {
use parquet::basic::Type as PhysicalType;
Expand Down
Loading
Loading