Skip to content

Commit 9d3aafb

Browse files
authored
perf(scan): intra-file decode parallelism — sub-split large chunk spans (#8400)
## Summary `SplitBy::Layout` now sub-divides any span between adjacent chunk boundaries wider than `IDEAL_SPLIT_SIZE` (100k rows) into evenly sized row-range splits, so a file with few large chunks (e.g. a single flat layout, or byte-targeted int columns that coalesce to ~262k rows/chunk) decodes across multiple cores instead of one. ## Correctness Subdivision only inserts points strictly between existing adjacent boundaries — it never moves or removes one — so the half-open ranges consumers derive (`tuple_windows`) remain a contiguous, non-overlapping, exact partition of the same rows. Spans at or below the cap pass through untouched (fast-path no-op). All boundary consumers (`RepeatedScan`, `VortexFile::splits()`, the DataFusion repartitioner) operate on arbitrary ranges; sub-chunk ranges were already exercised by `SplitBy::RowCount`. The arithmetic saturates at `u64::MAX`. ## API/Observable behavior For files whose merged (projected-column) chunk boundaries leave spans > 100k rows, `VortexFile::splits()` (incl. Python bindings) returns more, smaller ranges; scans emit smaller batches; DataFusion gets real repartitioning where a single-chunk file previously collapsed to one partition. Fine-grained files (e.g. ~8k-row string chunks from the default 1 MiB block target) are untouched. ## Testing - Unit/property/overflow tests for `subdivide_large_spans` (no-op, large single chunk, mixed gaps, exact-coverage property, `u64::MAX` boundary). - E2E: 250k-row single flat chunk → splits all ≤ the cap, contiguous, exact endpoints; full + filtered scans match the unsplit data. - E2E (rstest): fixed-size `SplitBy::RowCount` scans (unaligned 33,333 and exceeds-file 300,000 cases). - E2E: ~120-byte string column via the default write strategy keeps its natural fine-grained chunk splits (bounded relative to the cap, not to writer defaults). - `cargo nextest run -p vortex-layout -p vortex-file` — 174 passed on this branch. --------- Signed-off-by: Luke Kim <80174+lukekim@users.noreply.github.com>
1 parent 51752c8 commit 9d3aafb

4 files changed

Lines changed: 286 additions & 2 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vortex-file/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ vortex-zigzag = { workspace = true }
5959
vortex-zstd = { workspace = true, optional = true }
6060

6161
[dev-dependencies]
62+
rstest = { workspace = true }
6263
tokio = { workspace = true, features = ["full"] }
6364
vortex-array = { workspace = true, features = ["_test-harness"] }
6465
vortex-io = { workspace = true, features = ["tokio"] }

vortex-file/src/tests.rs

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use bytes::Bytes;
1010
use futures::StreamExt;
1111
use futures::TryStreamExt;
1212
use futures::pin_mut;
13+
use rstest::rstest;
1314
use vortex_array::ArrayRef;
1415
use vortex_array::IntoArray;
1516
use vortex_array::VortexSessionExecute;
@@ -64,9 +65,11 @@ use vortex_buffer::buffer;
6465
use vortex_error::VortexResult;
6566
use vortex_io::session::RuntimeSession;
6667
use vortex_layout::Layout;
68+
use vortex_layout::layouts::flat::writer::FlatLayoutStrategy;
6769
use vortex_layout::layouts::zoned::LegacyStats;
6870
use vortex_layout::layouts::zoned::Zoned;
6971
use vortex_layout::scan::scan_builder::ScanBuilder;
72+
use vortex_layout::scan::split_by::SplitBy;
7073
use vortex_layout::session::LayoutSession;
7174
use vortex_session::VortexSession;
7275

@@ -1812,6 +1815,144 @@ fn assert_offsets_ordered(before: &[u64], after: &[u64], context: &str) {
18121815
}
18131816
}
18141817

1818+
/// Mirrors the (private) `IDEAL_SPLIT_SIZE` that `SplitBy::Layout` uses to sub-divide wide
1819+
/// chunk-boundary spans: layout splits are never wider than this many rows.
1820+
const MAX_SPLIT_ROWS: u64 = 100_000;
1821+
1822+
#[tokio::test]
1823+
#[cfg_attr(miri, ignore)]
1824+
async fn test_large_flat_chunk_scan_subdivides_splits() -> VortexResult<()> {
1825+
// A single flat (unchunked) 250k-row layout spans the 100k sub-split threshold, so the scan
1826+
// must decode it as multiple row-range splits.
1827+
let mut ctx = SESSION.create_execution_ctx();
1828+
const N_ROWS: u64 = 250_000;
1829+
let values =
1830+
Buffer::from_iter((0..N_ROWS as i32).map(|i| if i % 2 == 0 { i } else { -i })).into_array();
1831+
1832+
let mut buf = ByteBufferMut::empty();
1833+
SESSION
1834+
.write_options()
1835+
.with_strategy(Arc::new(FlatLayoutStrategy::default()))
1836+
.write(&mut buf, values.to_array_stream())
1837+
.await?;
1838+
1839+
let file = SESSION.open_options().open_buffer(buf)?;
1840+
1841+
// Sub-division caps each split at MAX_SPLIT_ROWS while tiling the file exactly.
1842+
let splits = file.splits()?;
1843+
assert!(splits.len() > 1, "expected sub-divided splits: {splits:?}");
1844+
assert!(splits.iter().all(|r| r.end - r.start <= MAX_SPLIT_ROWS));
1845+
assert_eq!(splits.first().map(|r| r.start), Some(0));
1846+
assert_eq!(splits.last().map(|r| r.end), Some(N_ROWS));
1847+
assert!(splits.windows(2).all(|w| w[0].end == w[1].start));
1848+
1849+
// A full scan across the sub-splits returns the original rows.
1850+
let result = file.scan()?.into_array_stream()?.read_all().await?;
1851+
assert_arrays_eq!(result, values, &mut ctx);
1852+
1853+
// A filtered scan crossing sub-split boundaries selects exactly the matching rows.
1854+
let result = file
1855+
.scan()?
1856+
.with_filter(gt(root(), lit(0i32)))
1857+
.into_array_stream()?
1858+
.read_all()
1859+
.await?;
1860+
let expected =
1861+
Buffer::from_iter((0..N_ROWS as i32).filter(|i| i % 2 == 0 && *i > 0)).into_array();
1862+
assert_arrays_eq!(result, expected, &mut ctx);
1863+
1864+
Ok(())
1865+
}
1866+
1867+
#[rstest]
1868+
#[case::unaligned(33_333)]
1869+
#[case::exceeds_file(300_000)]
1870+
#[tokio::test]
1871+
#[cfg_attr(miri, ignore)]
1872+
async fn test_flat_chunk_scan_with_row_count_splits(
1873+
#[case] rows_per_split: usize,
1874+
) -> VortexResult<()> {
1875+
// Fixed-size splits ignore chunk boundaries entirely, so scans must produce identical
1876+
// results whether the split size straddles the chunk arbitrarily or exceeds the file's
1877+
// row count (a single split).
1878+
let mut ctx = SESSION.create_execution_ctx();
1879+
const N_ROWS: u64 = 250_000;
1880+
let values =
1881+
Buffer::from_iter((0..N_ROWS as i32).map(|i| if i % 2 == 0 { i } else { -i })).into_array();
1882+
1883+
let mut buf = ByteBufferMut::empty();
1884+
SESSION
1885+
.write_options()
1886+
.with_strategy(Arc::new(FlatLayoutStrategy::default()))
1887+
.write(&mut buf, values.to_array_stream())
1888+
.await?;
1889+
1890+
let file = SESSION.open_options().open_buffer(buf)?;
1891+
1892+
let result = file
1893+
.scan()?
1894+
.with_split_by(SplitBy::RowCount(rows_per_split))
1895+
.into_array_stream()?
1896+
.read_all()
1897+
.await?;
1898+
assert_arrays_eq!(result, values, &mut ctx);
1899+
1900+
let result = file
1901+
.scan()?
1902+
.with_split_by(SplitBy::RowCount(rows_per_split))
1903+
.with_filter(gt(root(), lit(0i32)))
1904+
.into_array_stream()?
1905+
.read_all()
1906+
.await?;
1907+
let expected =
1908+
Buffer::from_iter((0..N_ROWS as i32).filter(|i| i % 2 == 0 && *i > 0)).into_array();
1909+
assert_arrays_eq!(result, expected, &mut ctx);
1910+
1911+
Ok(())
1912+
}
1913+
1914+
#[tokio::test]
1915+
#[cfg_attr(miri, ignore)]
1916+
async fn test_string_chunks_stay_fine_grained_under_split_cap() -> VortexResult<()> {
1917+
// Default writing targets ~1MiB uncompressed blocks, so ~120-byte strings chunk at a few
1918+
// thousand rows (~8k with today's defaults). These natural boundaries sit far below the
1919+
// sub-split cap, and SplitBy::Layout must pass them through untouched.
1920+
let mut ctx = SESSION.create_execution_ctx();
1921+
const N_ROWS: usize = 40_000;
1922+
let strings = VarBinArray::from_iter(
1923+
(0..N_ROWS).map(|i| Some(format!("{i:0>120}"))),
1924+
DType::Utf8(Nullability::Nullable),
1925+
)
1926+
.into_array();
1927+
let st = StructArray::from_fields(&[("s", strings)])?.into_array();
1928+
1929+
let mut buf = ByteBufferMut::empty();
1930+
SESSION
1931+
.write_options()
1932+
.write(&mut buf, st.to_array_stream())
1933+
.await?;
1934+
1935+
let file = SESSION.open_options().open_buffer(buf)?;
1936+
1937+
let splits = file.splits()?;
1938+
assert!(
1939+
splits.len() > 1,
1940+
"expected multiple natural chunks: {splits:?}"
1941+
);
1942+
assert!(
1943+
splits.iter().all(|r| r.end - r.start < MAX_SPLIT_ROWS / 4),
1944+
"string chunks should stay fine-grained, nowhere near the split cap: {splits:?}"
1945+
);
1946+
assert_eq!(splits.first().map(|r| r.start), Some(0));
1947+
assert_eq!(splits.last().map(|r| r.end), Some(N_ROWS as u64));
1948+
assert!(splits.windows(2).all(|w| w[0].end == w[1].start));
1949+
1950+
let result = file.scan()?.into_array_stream()?.read_all().await?;
1951+
assert_arrays_eq!(result, st, &mut ctx);
1952+
1953+
Ok(())
1954+
}
1955+
18151956
#[tokio::test]
18161957
#[cfg_attr(miri, ignore)]
18171958
async fn test_segment_ordering_dict_codes_before_values() -> VortexResult<()> {

vortex-layout/src/scan/split_by.rs

Lines changed: 143 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,29 @@ use std::iter::once;
55
use std::ops::Range;
66

77
use vortex_array::dtype::FieldMask;
8+
use vortex_error::VortexExpect;
89
use vortex_error::VortexResult;
910

1011
use crate::LayoutReader;
1112
use crate::RowSplits;
1213
use crate::SplitRange;
14+
use crate::scan::IDEAL_SPLIT_SIZE;
15+
16+
/// Chunk-boundary spans wider than this are sub-divided into multiple row-range splits so that a
17+
/// file with few, large chunks can be decoded across multiple cores rather than one.
18+
///
19+
/// Reuses [`IDEAL_SPLIT_SIZE`] as the target span per split.
20+
const MAX_SPLIT_ROWS: u64 = IDEAL_SPLIT_SIZE;
1321

1422
/// Defines how the Vortex file is split into batches for reading.
1523
///
1624
/// Note that each split must fit into the platform's maximum usize.
1725
#[derive(Default, Copy, Clone, Debug)]
1826
pub enum SplitBy {
1927
#[default]
20-
/// Splits any time there is a chunk boundary in the file.
28+
/// Splits any time there is a chunk boundary in the file. Spans between adjacent boundaries
29+
/// wider than `MAX_SPLIT_ROWS` are further sub-divided so that a file with few, large chunks
30+
/// can still be decoded across multiple cores.
2131
Layout,
2232
/// Splits every n rows.
2333
RowCount(usize),
@@ -44,7 +54,7 @@ impl SplitBy {
4454
&SplitRange::root(row_range.clone())?,
4555
&mut row_splits,
4656
)?;
47-
row_splits.into_sorted_deduped()
57+
subdivide_large_spans(row_splits.into_sorted_deduped(), MAX_SPLIT_ROWS)
4858
}
4959
SplitBy::RowCount(n) => row_range
5060
.clone()
@@ -55,6 +65,60 @@ impl SplitBy {
5565
}
5666
}
5767

68+
/// Sub-divide any gap between adjacent split boundaries that is wider than `max_span` into evenly
69+
/// sized row-range sub-splits.
70+
///
71+
/// `boundaries` is the sorted, deduplicated list of split points produced by the layout (chunk
72+
/// boundaries). Downstream consumers turn this list into half-open ranges by pairing adjacent
73+
/// entries (`tuple_windows().map(|(s, e)| s..e)`), so the row coverage is fully determined by the
74+
/// boundary set. This function only *inserts* points that lie strictly between two existing
75+
/// adjacent boundaries; it never moves or removes a boundary. Splitting `[lo, hi)` at an interior
76+
/// point `m` (with `lo < m < hi`) yields exactly `[lo, m) + [m, hi)`, so the union of ranges is
77+
/// unchanged: the rows are still partitioned contiguously, with no gaps and no overlaps, covering
78+
/// every row exactly once. The output remains sorted and deduplicated.
79+
fn subdivide_large_spans(boundaries: Vec<u64>, max_span: u64) -> Vec<u64> {
80+
debug_assert!(boundaries.is_sorted(), "boundaries must be sorted");
81+
debug_assert!(max_span > 0, "max_span must be non-zero");
82+
83+
// Fast path: nothing to split (also covers the empty / single-boundary cases).
84+
if boundaries.len() < 2 || boundaries.windows(2).all(|w| w[1] - w[0] <= max_span) {
85+
return boundaries;
86+
}
87+
88+
let mut out = Vec::with_capacity(boundaries.len() * 2);
89+
for window in boundaries.windows(2) {
90+
let lo = window[0];
91+
let hi = window[1];
92+
// Always emit the lower boundary; the final `hi` is appended once after the loop.
93+
out.push(lo);
94+
95+
let span = hi - lo;
96+
if span > max_span {
97+
// Number of sub-ranges so that each is <= max_span. `span > max_span` and
98+
// `max_span >= 1` guarantee `sub_count >= 2`.
99+
let sub_count = span.div_ceil(max_span);
100+
// Even sub-range size (rounded up); the last sub-range absorbs any remainder and is
101+
// bounded by `hi`. Inserted points `lo + k*sub_size` are strictly in `(lo, hi)`.
102+
let sub_size = span.div_ceil(sub_count);
103+
let mut point = lo + sub_size;
104+
while point < hi {
105+
out.push(point);
106+
// Saturating: a sum past u64::MAX can never be < `hi`, so the loop exits.
107+
point = point.saturating_add(sub_size);
108+
}
109+
}
110+
}
111+
// Append the final boundary (the `hi` of the last window).
112+
out.push(*boundaries.last().vortex_expect("len >= 2 checked above"));
113+
114+
debug_assert!(out.is_sorted(), "subdivided boundaries must stay sorted");
115+
debug_assert!(
116+
out.windows(2).all(|w| w[0] < w[1]),
117+
"subdivided boundaries must stay strictly increasing (deduped)"
118+
);
119+
out
120+
}
121+
58122
#[cfg(test)]
59123
mod test {
60124
use std::any::Any;
@@ -212,4 +276,81 @@ mod test {
212276
.unwrap();
213277
assert_eq!(splits, vec![0u64, 5, 10]);
214278
}
279+
280+
#[test]
281+
fn subdivide_below_threshold_is_noop() {
282+
// Gaps all <= max_span: boundaries returned unchanged.
283+
assert_eq!(subdivide_large_spans(vec![0, 5, 10], 100), vec![0, 5, 10]);
284+
assert_eq!(subdivide_large_spans(vec![0, 100], 100), vec![0, 100]);
285+
assert_eq!(
286+
subdivide_large_spans(Vec::<u64>::new(), 100),
287+
Vec::<u64>::new()
288+
);
289+
assert_eq!(subdivide_large_spans(vec![7], 100), vec![7]);
290+
}
291+
292+
#[test]
293+
fn subdivide_near_u64_max_does_not_overflow() {
294+
// The increment past the last interior point would overflow without saturating math.
295+
let hi = u64::MAX;
296+
let out = subdivide_large_spans(vec![hi - 3, hi], 2);
297+
assert_eq!(out, vec![hi - 3, hi - 1, hi]);
298+
}
299+
300+
#[test]
301+
fn subdivide_splits_large_single_chunk() {
302+
// One large chunk [0, 1000) with max_span 100 -> 10 contiguous sub-splits.
303+
let out = subdivide_large_spans(vec![0, 1000], 100);
304+
assert_eq!(
305+
out,
306+
vec![0, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000]
307+
);
308+
}
309+
310+
#[test]
311+
fn subdivide_only_large_gaps() {
312+
// Mixed: [0,50) stays whole, [50, 350) splits into 100-row pieces, [350, 360) stays whole.
313+
let out = subdivide_large_spans(vec![0, 50, 350, 360], 100);
314+
assert_eq!(out, vec![0, 50, 150, 250, 350, 360]);
315+
}
316+
317+
/// Property: for any sorted, deduped boundary set, subdivision (a) keeps the first and last
318+
/// boundary, (b) stays strictly increasing, and (c) preserves exact row coverage — the union
319+
/// of the half-open ranges the consumer derives is identical before and after.
320+
#[test]
321+
fn subdivide_preserves_exact_coverage() {
322+
let cases: Vec<Vec<u64>> = vec![
323+
vec![0, 1000],
324+
vec![0, 7, 250_001],
325+
vec![0, 5, 10, 15, 20, 25, 30],
326+
vec![3, 1_000_003],
327+
vec![0, 99_999, 100_000, 300_000],
328+
];
329+
for boundaries in cases {
330+
let out = subdivide_large_spans(boundaries.clone(), MAX_SPLIT_ROWS);
331+
// (a) endpoints preserved
332+
assert_eq!(out.first(), boundaries.first());
333+
assert_eq!(out.last(), boundaries.last());
334+
// (b) strictly increasing (sorted + deduped)
335+
assert!(
336+
out.windows(2).all(|w| w[0] < w[1]),
337+
"not strictly increasing: {out:?}"
338+
);
339+
// (c) exact coverage: ranges from `out` tile the same span with no gap/overlap, and
340+
// every original boundary is still present (so original ranges are sub-divided, never
341+
// merged or shifted).
342+
let total: u64 = out.windows(2).map(|w| w[1] - w[0]).sum();
343+
let expected_total = boundaries.last().unwrap() - boundaries.first().unwrap();
344+
assert_eq!(
345+
total, expected_total,
346+
"coverage span changed for {boundaries:?}"
347+
);
348+
for b in &boundaries {
349+
assert!(
350+
out.contains(b),
351+
"original boundary {b} dropped from {out:?}"
352+
);
353+
}
354+
}
355+
}
215356
}

0 commit comments

Comments
 (0)