2121//! The parquet column writer checks the data page byte limit only *after*
2222//! each mini-batch finishes writing. Mini-batches are sized in rows
2323//! (`write_batch_size`, default 1024), so for BYTE_ARRAY columns whose
24- //! values are large (e.g. multi-MiB blobs), a single mini-batch can buffer
24+ //! values are large (e.g. multi-MiB blobs) a single mini-batch can buffer
2525//! GiB into one page before the limit is consulted.
2626//!
2727//! This module isolates the per-chunk decision that prevents that: given a
2828//! chunk's level data and the input values, pick the largest `sub_batch_size`
29- //! such that one mini-batch will fit in one page byte budget. For the
30- //! overwhelmingly common case (small values), the answer is just
31- //! `chunk_size` and the decision is O(1) on the column type. Only when the
32- //! input might overflow does the chunker consult the encoder's byte
33- //! estimate.
29+ //! such that one mini-batch fits in one page byte budget. For the
30+ //! overwhelmingly common case (small or fixed-width values) the answer is
31+ //! just `chunk_size` and the decision is O(1) on the column type — only
32+ //! when the input might overflow does the chunker consult the encoder's
33+ //! byte estimate.
3434
3535use crate :: basic:: Type ;
3636use crate :: column:: writer:: LevelDataRef ;
3737use crate :: column:: writer:: encoder:: ColumnValueEncoder ;
3838use crate :: file:: properties:: WriterProperties ;
3939use crate :: schema:: types:: ColumnDescriptor ;
4040
41- /// Strategy for counting how many values fall in a chunk's level range.
42- /// Computed once per `write_batch_internal` call rather than per chunk so
43- /// `partition_point` and `LevelDataRef::value_count` don't run when their
44- /// answer is statically known to be `chunk_size`.
45- #[ derive( Clone , Copy ) ]
46- pub ( crate ) enum ValueCountStrategy < ' a > {
47- /// Every level corresponds to a non-null value, so the answer is
48- /// always `chunk_size`. Either the column has `max_def_level == 0`
49- /// or the arrow caller's `non_null_indices.len() == num_levels`.
50- AllPresent ,
51- /// Flat (`max_rep_level == 0`) arrow nullable path: `non_null_indices`
52- /// hold row positions, which coincide with level offsets, so
53- /// `partition_point` over the chunk's level range counts values
54- /// directly. O(log n) per chunk.
55- Sorted ( & ' a [ usize ] ) ,
56- /// Scan the chunk's def-level slice for entries matching `max_def`.
57- /// O(n) per chunk. Used for the non-arrow nullable path and for
58- /// repeated/nested columns, where `value_indices` index into the
59- /// decoupled leaf values array rather than the level stream.
60- DefLevelScan ( i16 ) ,
61- }
62-
63- /// Per-column-open chunker that picks byte-budget-aware mini-batch sizes.
41+ /// Picks byte-budget-aware mini-batch sizes for one column.
6442pub ( crate ) struct ByteBudgetChunker {
6543 /// Configured data page byte limit for the column.
6644 page_byte_limit : usize ,
45+ /// Max definition level of the column; a level equal to this marks a
46+ /// present (non-null) leaf value. Used to count values per chunk.
47+ max_def_level : i16 ,
6748 /// `true` when no chunk of `base_batch_size` values can ever overflow
6849 /// `page_byte_limit` regardless of input. Set once at column open from
6950 /// the physical type's known per-value byte size; lets the per-chunk
@@ -93,142 +74,76 @@ impl ByteBudgetChunker {
9374 . unwrap_or ( false ) ;
9475 Self {
9576 page_byte_limit,
77+ max_def_level : descr. max_def_level ( ) ,
9678 static_always_fits,
9779 }
9880 }
9981
100- /// Pick the cheapest strategy for `vals_in_chunk` queries for this
101- /// `write_batch_internal` call. Computed once and reused per chunk so
102- /// we don't repeat the check on every iteration.
103- #[ inline]
104- pub ( crate ) fn value_count_strategy < ' a > (
105- descr : & ColumnDescriptor ,
106- value_indices : Option < & ' a [ usize ] > ,
107- num_levels : usize ,
108- ) -> ValueCountStrategy < ' a > {
109- match value_indices {
110- // Arrow path. If every level has a non-null value, the gather
111- // index is the trivial `0..num_levels` and we don't need to
112- // walk it per chunk — `vals_in_chunk == chunk_size` by
113- // construction.
114- Some ( idx) if idx. len ( ) == num_levels => ValueCountStrategy :: AllPresent ,
115- // Repeated/nested arrow columns: `value_indices` index into the
116- // leaf values array, which is decoupled from the rep/def level
117- // stream. A `partition_point` of those indices against a level
118- // offset is meaningless — it makes `vals_in_chunk` drift away
119- // from the true per-chunk value count (it grows without bound
120- // as empty-list / sub-`max_def` levels accumulate, eventually
121- // forcing spurious granular sub-batching). Count via def levels
122- // instead. The `Sorted` fast path is only valid for flat
123- // columns, where `non_null_indices` are row positions that
124- // coincide with level offsets.
125- Some ( _) if descr. max_rep_level ( ) > 0 => {
126- ValueCountStrategy :: DefLevelScan ( descr. max_def_level ( ) )
127- }
128- Some ( idx) => ValueCountStrategy :: Sorted ( idx) ,
129- // Non-arrow path. `max_def_level == 0` means the column has
130- // no nullability, so again `vals_in_chunk == chunk_size`.
131- None if descr. max_def_level ( ) == 0 => ValueCountStrategy :: AllPresent ,
132- None => ValueCountStrategy :: DefLevelScan ( descr. max_def_level ( ) ) ,
133- }
134- }
135-
136- /// Decide how many levels at the start of `chunk_def` belong in one
137- /// mini-batch.
138- ///
139- /// Returns `chunk_size` when the whole chunk fits in one page byte
140- /// budget. A smaller number triggers granular sub-batching in
141- /// `write_batch_internal`'s `write_granular_chunk` arm.
82+ /// Decide how many levels at the start of a chunk belong in one
83+ /// mini-batch. Returns `chunk_size` when the whole chunk fits in one
84+ /// page byte budget; a smaller value triggers granular sub-batching in
85+ /// `write_batch_internal`.
14286 ///
143- /// Bypasses:
144- /// - When `static_always_fits` is true (fixed-width type with a
145- /// safe `base_batch_size`), return `chunk_size`.
146- /// - When the encoder is currently dictionary-encoding,
147- /// `estimated_value_bytes` would return plain-encoded bytes while
148- /// the actual page only stores small RLE indices, so the budget
149- /// would shrink pages spuriously. Return `chunk_size` and let
150- /// dictionary fallback bound dict-encoded pages independently.
151- /// - When `chunk_size == 0`, there's nothing to size.
87+ /// Returns `chunk_size` immediately (no value inspection) when:
88+ /// - the column is a fixed-width type that statically cannot overflow
89+ /// (`static_always_fits`);
90+ /// - the encoder is currently dictionary-encoding — a dict-encoded data
91+ /// page only stores small RLE indices, so a plain-encoded byte
92+ /// estimate would shrink pages spuriously; dictionary fallback bounds
93+ /// those pages independently;
94+ /// - the chunk is empty.
15295 ///
153- /// Hot path: when one of the bypass conditions fires this returns
154- /// `chunk_size` with one struct-field load and one virtual call into
155- /// the encoder. Marked `#[inline(always)]` because LLVM's heuristic
156- /// would otherwise refuse to inline now that the slow path lives
157- /// nearby — the GKE bench showed a +80% regression on
158- /// `string_dictionary/*` when the hint was just `#[inline]`.
159- #[ allow( clippy:: too_many_arguments) ]
160- #[ inline( always) ]
96+ /// `#[inline]`: this is a tiny per-chunk dispatcher; the actual byte
97+ /// inspection lives in the out-of-line `byte_budget_sub_batch_size`.
98+ #[ inline]
16199 pub ( crate ) fn pick_sub_batch_size < E : ColumnValueEncoder > (
162100 & self ,
163101 encoder : & E ,
164102 values : & E :: Values ,
165103 value_indices : Option < & [ usize ] > ,
166104 chunk_def : LevelDataRef < ' _ > ,
167- strategy : ValueCountStrategy < ' _ > ,
168105 values_offset : usize ,
169106 chunk_size : usize ,
170- end_offset : usize ,
171107 ) -> usize {
172108 if self . static_always_fits || encoder. has_dictionary ( ) || chunk_size == 0 {
173109 return chunk_size;
174110 }
175- self . byte_budget_sub_batch_size :: < E > (
176- values,
177- value_indices,
178- chunk_def,
179- strategy,
180- values_offset,
181- chunk_size,
182- end_offset,
183- )
111+ self . byte_budget_sub_batch_size :: < E > ( values, value_indices, chunk_def, values_offset, chunk_size)
184112 }
185113
186- /// Cold path: the encoder is plain-encoding and the bypass conditions
187- /// didn't fire, so we have to look at value sizes to decide whether
188- /// the chunk fits. Pulled out of `pick_sub_batch_size` and marked
189- /// `#[inline(never)]` + `#[cold]` so the inlined fast path stays
190- /// small and the dead-code placement signal pushes this body
191- /// physically away from the hot encoder loop's icache footprint.
192- #[ allow( clippy:: too_many_arguments) ]
114+ /// Inspect value sizes to decide how much of the chunk fits in a page.
115+ ///
116+ /// Reached once per chunk for variable-width (`BYTE_ARRAY`) columns
117+ /// while plain-encoding — numeric, bool and dictionary-encoded columns
118+ /// never get here, so it is `#[cold]` / `#[inline(never)]`: keeping it
119+ /// out of line keeps the hot `write_batch_internal` loop small.
193120 #[ inline( never) ]
194121 #[ cold]
195122 fn byte_budget_sub_batch_size < E : ColumnValueEncoder > (
196123 & self ,
197124 values : & E :: Values ,
198125 value_indices : Option < & [ usize ] > ,
199126 chunk_def : LevelDataRef < ' _ > ,
200- strategy : ValueCountStrategy < ' _ > ,
201127 values_offset : usize ,
202128 chunk_size : usize ,
203- end_offset : usize ,
204129 ) -> usize {
205- // Count how many values fall in this chunk's level range. The
206- // strategy was picked once per `write_batch_internal` call so
207- // the common all-non-null case (every level has a value) skips
208- // the per-chunk binary search and def-level scan entirely.
209- let vals_in_chunk = match strategy {
210- ValueCountStrategy :: AllPresent => chunk_size,
211- ValueCountStrategy :: Sorted ( idx) => {
212- idx[ values_offset..] . partition_point ( |& i| i < end_offset)
213- }
214- ValueCountStrategy :: DefLevelScan ( max_def) => chunk_def. value_count ( chunk_size, max_def) ,
215- } ;
130+ // How many of this chunk's levels carry an actual value. For a
131+ // non-nullable, unrepeated column every level is a value, so
132+ // `value_count` is O(1) (`Absent`/`Uniform` def levels); only
133+ // nullable or nested columns pay the O(chunk_size) def-level scan.
134+ let vals_in_chunk = chunk_def. value_count ( chunk_size, self . max_def_level ) ;
216135 if vals_in_chunk == 0 {
217136 return chunk_size;
218137 }
219- // Ask the encoder how many of the next values fit in one page
220- // byte budget. Dispatch on whether the caller supplied gather
221- // indices; this mirrors how `write_mini_batch` picks between
222- // `write_gather` and ` write`.
138+ // Ask the encoder how many of the next values fit in one page byte
139+ // budget. Dispatch on whether the caller supplied gather indices;
140+ // this mirrors how `write_mini_batch` picks `write_gather` vs
141+ // `write`.
223142 let fit = match value_indices {
224143 Some ( idx) => {
225144 let end = ( values_offset + vals_in_chunk) . min ( idx. len ( ) ) ;
226145 let start = values_offset. min ( end) ;
227- E :: count_values_within_byte_budget_gather (
228- values,
229- & idx[ start..end] ,
230- self . page_byte_limit ,
231- )
146+ E :: count_values_within_byte_budget_gather ( values, & idx[ start..end] , self . page_byte_limit )
232147 }
233148 None => E :: count_values_within_byte_budget (
234149 values,
@@ -240,10 +155,10 @@ impl ByteBudgetChunker {
240155 match fit {
241156 None => chunk_size,
242157 Some ( values_per_subbatch) => {
243- // Convert the value count from the encoder back into a
244- // level count. For non-nullable columns this is a no-op;
245- // for nullable, scale by the observed value-to-level
246- // ratio of the current chunk .
158+ // Convert the value count back into a level count. For a
159+ // non-nullable column this is a no-op; for nullable/nested
160+ // columns scale by the chunk's observed value-to-level
161+ // ratio.
247162 let levels_per_subbatch = if vals_in_chunk == chunk_size {
248163 values_per_subbatch
249164 } else {
@@ -256,64 +171,3 @@ impl ByteBudgetChunker {
256171 }
257172 }
258173}
259-
260- #[ cfg( test) ]
261- mod tests {
262- use super :: * ;
263- use crate :: basic:: Type as PhysicalType ;
264- use crate :: schema:: types:: { ColumnPath , Type as SchemaType } ;
265- use std:: sync:: Arc ;
266-
267- fn descr ( max_def_level : i16 , max_rep_level : i16 ) -> ColumnDescriptor {
268- let tpe = SchemaType :: primitive_type_builder ( "col" , PhysicalType :: BYTE_ARRAY )
269- . build ( )
270- . unwrap ( ) ;
271- ColumnDescriptor :: new (
272- Arc :: new ( tpe) ,
273- max_def_level,
274- max_rep_level,
275- ColumnPath :: from ( "col" ) ,
276- )
277- }
278-
279- #[ test]
280- fn value_count_strategy_uses_def_scan_for_repeated_columns ( ) {
281- // Regression: for a repeated/nested column the arrow `value_indices`
282- // index into the leaf values array, which is decoupled from the
283- // level stream. The `Sorted` strategy's `partition_point` against a
284- // level offset is meaningless there and makes `vals_in_chunk` drift
285- // without bound, spuriously triggering granular sub-batching.
286- // A repeated column with `idx.len() != num_levels` must resolve to
287- // `DefLevelScan`, never `Sorted`.
288- let d = descr ( 1 , 1 ) ;
289- let indices = [ 0usize , 1 , 2 , 3 ] ;
290- let strategy =
291- ByteBudgetChunker :: value_count_strategy ( & d, Some ( & indices) , /* num_levels */ 6 ) ;
292- assert ! (
293- matches!( strategy, ValueCountStrategy :: DefLevelScan ( 1 ) ) ,
294- "repeated column must count values via def levels, not the \
295- level-offset partition_point"
296- ) ;
297- }
298-
299- #[ test]
300- fn value_count_strategy_keeps_sorted_for_flat_nullable_columns ( ) {
301- // Flat (`max_rep_level == 0`) nullable columns keep the cheap
302- // `Sorted` strategy: there `non_null_indices` are row positions,
303- // which do coincide with level offsets.
304- let d = descr ( 1 , 0 ) ;
305- let indices = [ 0usize , 2 , 5 ] ;
306- let strategy =
307- ByteBudgetChunker :: value_count_strategy ( & d, Some ( & indices) , /* num_levels */ 8 ) ;
308- assert ! ( matches!( strategy, ValueCountStrategy :: Sorted ( _) ) ) ;
309- }
310-
311- #[ test]
312- fn value_count_strategy_all_present_when_every_level_has_a_value ( ) {
313- let d = descr ( 1 , 1 ) ;
314- let indices = [ 0usize , 1 , 2 , 3 ] ;
315- let strategy =
316- ByteBudgetChunker :: value_count_strategy ( & d, Some ( & indices) , /* num_levels */ 4 ) ;
317- assert ! ( matches!( strategy, ValueCountStrategy :: AllPresent ) ) ;
318- }
319- }
0 commit comments