15
15
//! Parquet writer.
16
16
17
17
use std:: future:: Future ;
18
+ use std:: mem;
18
19
use std:: pin:: Pin ;
19
20
use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
20
21
use std:: sync:: Arc ;
21
22
use std:: task:: { Context , Poll } ;
22
23
24
+ use common_telemetry:: debug;
23
25
use common_time:: Timestamp ;
24
26
use datatypes:: arrow:: datatypes:: SchemaRef ;
25
27
use object_store:: { FuturesAsyncWriter , ObjectStore } ;
@@ -59,7 +61,7 @@ pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvide
59
61
/// Indexer build that can create indexer for multiple files.
60
62
indexer_builder : I ,
61
63
/// Current active indexer.
62
- current_indexer : Indexer ,
64
+ current_indexer : Option < Indexer > ,
63
65
bytes_written : Arc < AtomicUsize > ,
64
66
}
65
67
@@ -121,8 +123,6 @@ where
121
123
path_provider : P ,
122
124
) -> ParquetWriter < F , I , P > {
123
125
let init_file = FileId :: random ( ) ;
124
- let index_file_path = path_provider. build_index_file_path ( init_file) ;
125
- let indexer = indexer_builder. build ( init_file, index_file_path) . await ;
126
126
127
127
ParquetWriter {
128
128
path_provider,
@@ -131,22 +131,50 @@ where
131
131
writer_factory : factory,
132
132
metadata,
133
133
indexer_builder,
134
- current_indexer : indexer ,
134
+ current_indexer : None ,
135
135
bytes_written : Arc :: new ( AtomicUsize :: new ( 0 ) ) ,
136
136
}
137
137
}
138
138
139
- async fn roll_to_next_file ( & mut self ) {
140
- self . current_file = FileId :: random ( ) ;
141
- let index_file_path = self . path_provider . build_index_file_path ( self . current_file ) ;
142
- let indexer = self
143
- . indexer_builder
144
- . build ( self . current_file , index_file_path)
145
- . await ;
146
- self . current_indexer = indexer;
147
-
139
+ /// Finishes current SST file and index file.
140
+ async fn finish_current_file (
141
+ & mut self ,
142
+ ssts : & mut SstInfoArray ,
143
+ stats : & mut SourceStats ,
144
+ ) -> Result < ( ) > {
148
145
// maybe_init_writer will re-create a new file.
149
- self . writer = None ;
146
+ if let Some ( mut current_writer) = mem:: take ( & mut self . writer ) {
147
+ let stats = mem:: take ( stats) ;
148
+ // At least one row has been written.
149
+ assert ! ( stats. num_rows > 0 ) ;
150
+
151
+ // Finish indexer and writer.
152
+ // safety: writer and index can only be both present or not.
153
+ let index_output = self . current_indexer . as_mut ( ) . unwrap ( ) . finish ( ) . await ;
154
+ current_writer. flush ( ) . await . context ( WriteParquetSnafu ) ?;
155
+
156
+ let file_meta = current_writer. close ( ) . await . context ( WriteParquetSnafu ) ?;
157
+ let file_size = self . bytes_written . load ( Ordering :: Relaxed ) as u64 ;
158
+
159
+ // Safety: num rows > 0 so we must have min/max.
160
+ let time_range = stats. time_range . unwrap ( ) ;
161
+
162
+ // convert FileMetaData to ParquetMetaData
163
+ let parquet_metadata = parse_parquet_metadata ( file_meta) ?;
164
+ ssts. push ( SstInfo {
165
+ file_id : self . current_file ,
166
+ time_range,
167
+ file_size,
168
+ num_rows : stats. num_rows ,
169
+ num_row_groups : parquet_metadata. num_row_groups ( ) as u64 ,
170
+ file_metadata : Some ( Arc :: new ( parquet_metadata) ) ,
171
+ index_metadata : index_output,
172
+ } ) ;
173
+ self . current_file = FileId :: random ( ) ;
174
+ self . bytes_written . store ( 0 , Ordering :: Relaxed )
175
+ } ;
176
+
177
+ Ok ( ( ) )
150
178
}
151
179
152
180
/// Iterates source and writes all rows to Parquet file.
@@ -158,6 +186,7 @@ where
158
186
override_sequence : Option < SequenceNumber > , // override the `sequence` field from `Source`
159
187
opts : & WriteOptions ,
160
188
) -> Result < SstInfoArray > {
189
+ let mut results = smallvec ! [ ] ;
161
190
let write_format =
162
191
WriteFormat :: new ( self . metadata . clone ( ) ) . with_override_sequence ( override_sequence) ;
163
192
let mut stats = SourceStats :: default ( ) ;
@@ -170,52 +199,31 @@ where
170
199
match res {
171
200
Ok ( batch) => {
172
201
stats. update ( & batch) ;
173
- self . current_indexer . update ( & batch) . await ;
202
+ // safety: self.current_indexer must be set when first batch has been written.
203
+ self . current_indexer . as_mut ( ) . unwrap ( ) . update ( & batch) . await ;
174
204
if self . bytes_written . load ( Ordering :: Relaxed ) > opts. max_file_size {
175
- self . roll_to_next_file ( ) . await ;
205
+ debug ! (
206
+ "Finishing current file {}, file size: {}, max file size: {}" ,
207
+ self . current_file,
208
+ self . bytes_written. load( Ordering :: Relaxed ) ,
209
+ opts. max_file_size
210
+ ) ;
211
+ self . finish_current_file ( & mut results, & mut stats) . await ?;
176
212
}
177
213
}
178
214
Err ( e) => {
179
- self . current_indexer . abort ( ) . await ;
215
+ if let Some ( indexer) = & mut self . current_indexer {
216
+ indexer. abort ( ) . await ;
217
+ }
180
218
return Err ( e) ;
181
219
}
182
220
}
183
221
}
184
222
185
- let index_output = self . current_indexer . finish ( ) . await ;
186
-
187
- if stats. num_rows == 0 {
188
- return Ok ( smallvec ! [ ] ) ;
189
- }
190
-
191
- let Some ( mut arrow_writer) = self . writer . take ( ) else {
192
- // No batch actually written.
193
- return Ok ( smallvec ! [ ] ) ;
194
- } ;
195
-
196
- arrow_writer. flush ( ) . await . context ( WriteParquetSnafu ) ?;
197
-
198
- let file_meta = arrow_writer. close ( ) . await . context ( WriteParquetSnafu ) ?;
199
- let file_size = self . bytes_written . load ( Ordering :: Relaxed ) as u64 ;
200
-
201
- // Safety: num rows > 0 so we must have min/max.
202
- let time_range = stats. time_range . unwrap ( ) ;
203
-
204
- // convert FileMetaData to ParquetMetaData
205
- let parquet_metadata = parse_parquet_metadata ( file_meta) ?;
206
-
207
- let file_id = self . current_file ;
223
+ self . finish_current_file ( & mut results, & mut stats) . await ?;
208
224
209
225
// object_store.write will make sure all bytes are written or an error is raised.
210
- Ok ( smallvec ! [ SstInfo {
211
- file_id,
212
- time_range,
213
- file_size,
214
- num_rows: stats. num_rows,
215
- num_row_groups: parquet_metadata. num_row_groups( ) as u64 ,
216
- file_metadata: Some ( Arc :: new( parquet_metadata) ) ,
217
- index_metadata: index_output,
218
- } ] )
226
+ Ok ( results)
219
227
}
220
228
221
229
/// Customizes per-column config according to schema and maybe column cardinality.
@@ -286,6 +294,14 @@ where
286
294
AsyncArrowWriter :: try_new ( writer, schema. clone ( ) , Some ( writer_props) )
287
295
. context ( WriteParquetSnafu ) ?;
288
296
self . writer = Some ( arrow_writer) ;
297
+
298
+ let index_file_path = self . path_provider . build_index_file_path ( self . current_file ) ;
299
+ let indexer = self
300
+ . indexer_builder
301
+ . build ( self . current_file , index_file_path)
302
+ . await ;
303
+ self . current_indexer = Some ( indexer) ;
304
+
289
305
// safety: self.writer is assigned above
290
306
Ok ( self . writer . as_mut ( ) . unwrap ( ) )
291
307
}
0 commit comments