33// SPDX-License-Identifier: Apache-2.0
44// SPDX-FileCopyrightText: Copyright the Vortex contributors
55
6+ use std:: num:: NonZeroUsize ;
67use std:: sync:: Arc ;
78
89use async_trait:: async_trait;
@@ -26,9 +27,11 @@ use vortex_array::aggregate_fn::fns::nan_count::NanCount;
2627use vortex_array:: aggregate_fn:: fns:: null_count:: NullCount ;
2728use vortex_array:: aggregate_fn:: fns:: sum:: Sum ;
2829use vortex_array:: dtype:: DType ;
30+ use vortex_error:: VortexError ;
2931use vortex_error:: VortexResult ;
30- use vortex_error :: vortex_ensure ;
32+ use vortex_io :: session :: RuntimeSessionExt ;
3133use vortex_session:: VortexSession ;
34+ use vortex_utils:: parallelism:: get_available_parallelism;
3235
3336use crate :: IntoLayout ;
3437use crate :: LayoutRef ;
@@ -50,18 +53,23 @@ use crate::sequence::SequentialStreamExt;
5053/// possibly the final partial zone.
5154pub struct ZonedLayoutOptions {
5255 /// The size of a statistics block
53- pub block_size : usize ,
56+ pub block_size : NonZeroUsize ,
5457 /// The aggregate partials to collect for each block.
5558 ///
5659 /// If unset, the writer chooses pruning aggregates from the input dtype.
5760 pub aggregate_fns : Option < Arc < [ AggregateFnRef ] > > ,
61+ /// Number of chunks to compute aggregate partials in parallel.
62+ pub concurrency : NonZeroUsize ,
5863}
5964
6065impl Default for ZonedLayoutOptions {
6166 fn default ( ) -> Self {
6267 Self {
63- block_size : 8192 ,
68+ block_size : unsafe { NonZeroUsize :: new_unchecked ( 8192 ) } ,
6469 aggregate_fns : None ,
70+ concurrency : unsafe {
71+ NonZeroUsize :: new_unchecked ( get_available_parallelism ( ) . unwrap_or ( 1 ) )
72+ } ,
6573 }
6674 }
6775}
@@ -97,38 +105,44 @@ impl LayoutStrategy for ZonedStrategy {
97105 mut eof : SequencePointer ,
98106 session : & VortexSession ,
99107 ) -> VortexResult < LayoutRef > {
100- vortex_ensure ! (
101- self . options. block_size > 0 ,
102- "ZonedStrategy requires block_size > 0 when writing"
103- ) ;
104-
105108 let aggregate_fns = self
106109 . options
107110 . aggregate_fns
108111 . clone ( )
109112 . unwrap_or_else ( || default_zoned_aggregate_fns ( stream. dtype ( ) ) ) ;
110- let session = session. clone ( ) ;
113+ let compute_session = session. clone ( ) ;
111114
112115 let stats_accumulator = Arc :: new ( Mutex :: new ( AggregateStatsAccumulator :: new (
113116 stream. dtype ( ) ,
114117 & aggregate_fns,
115118 ) ) ) ;
116119 let aggregate_fns = stats_accumulator. lock ( ) . aggregate_fns ( ) ;
117120
121+ let stream_dtype = stream. dtype ( ) . clone ( ) ;
122+ let concurrency = self . options . concurrency . get ( ) ;
123+ let stream = stream
124+ . map ( move |item| {
125+ let aggregate_fns = Arc :: clone ( & aggregate_fns) ;
126+ let session = compute_session. clone ( ) ;
127+ session. handle ( ) . spawn_cpu ( move || {
128+ let ( sequence_id, chunk) = item?;
129+ let partials = aggregate_partials (
130+ & chunk,
131+ & aggregate_fns,
132+ & mut session. create_execution_ctx ( ) ,
133+ ) ?;
134+ Ok :: < _ , VortexError > ( ( sequence_id, chunk, partials) )
135+ } )
136+ } )
137+ . buffered ( concurrency) ;
138+
118139 // Accumulate zone stats in stream order so the auxiliary table stays aligned with the
119140 // data child.
120141 let stats_accumulator2 = Arc :: clone ( & stats_accumulator) ;
121- let aggregate_fns2 = Arc :: clone ( & aggregate_fns) ;
122- let compute_session = session. clone ( ) ;
123142 let stream = SequentialStreamAdapter :: new (
124- stream . dtype ( ) . clone ( ) ,
143+ stream_dtype ,
125144 stream. map ( move |item| {
126- let ( sequence_id, chunk) = item?;
127- let partials = aggregate_partials (
128- & chunk,
129- & aggregate_fns2,
130- & mut compute_session. create_execution_ctx ( ) ,
131- ) ?;
145+ let ( sequence_id, chunk, partials) = item?;
132146 stats_accumulator2. lock ( ) . push_partials ( partials) ?;
133147 Ok ( ( sequence_id, chunk) )
134148 } ) ,
@@ -146,7 +160,7 @@ impl LayoutStrategy for ZonedStrategy {
146160 Arc :: clone ( & segment_sink) ,
147161 stream,
148162 data_eof,
149- & session,
163+ session,
150164 )
151165 . await ?;
152166
@@ -164,7 +178,7 @@ impl LayoutStrategy for ZonedStrategy {
164178 . sequenced ( eof. split_off ( ) ) ;
165179 let zones_layout = self
166180 . stats
167- . write_stream ( ctx, Arc :: clone ( & segment_sink) , stats_stream, eof, & session)
181+ . write_stream ( ctx, Arc :: clone ( & segment_sink) , stats_stream, eof, session)
168182 . await ?;
169183
170184 Ok (
0 commit comments