@@ -23,6 +23,7 @@ use datafusion::common::{DFSchema, DFSchemaRef, Result as DataFusionResult, Stat
23
23
use datafusion:: error:: DataFusionError ;
24
24
use datafusion:: execution:: context:: TaskContext ;
25
25
use datafusion:: logical_expr:: { EmptyRelation , Expr , LogicalPlan , UserDefinedLogicalNodeCore } ;
26
+ use datafusion:: physical_plan:: expressions:: Column as ColumnExpr ;
26
27
use datafusion:: physical_plan:: metrics:: {
27
28
BaselineMetrics , Count , ExecutionPlanMetricsSet , MetricBuilder , MetricValue , MetricsSet ,
28
29
} ;
@@ -32,7 +33,6 @@ use datafusion::physical_plan::{
32
33
} ;
33
34
use datatypes:: arrow:: array:: TimestampMillisecondArray ;
34
35
use datatypes:: arrow:: datatypes:: SchemaRef ;
35
- use datatypes:: arrow:: error:: Result as ArrowResult ;
36
36
use datatypes:: arrow:: record_batch:: RecordBatch ;
37
37
use futures:: { ready, Stream , StreamExt } ;
38
38
use greptime_proto:: substrait_extension as pb;
@@ -55,6 +55,7 @@ pub struct SeriesNormalize {
55
55
offset : Millisecond ,
56
56
time_index_column_name : String ,
57
57
need_filter_out_nan : bool ,
58
+ tag_columns : Vec < String > ,
58
59
59
60
input : LogicalPlan ,
60
61
}
@@ -100,6 +101,7 @@ impl UserDefinedLogicalNodeCore for SeriesNormalize {
100
101
time_index_column_name : self . time_index_column_name . clone ( ) ,
101
102
need_filter_out_nan : self . need_filter_out_nan ,
102
103
input : inputs. into_iter ( ) . next ( ) . unwrap ( ) ,
104
+ tag_columns : self . tag_columns . clone ( ) ,
103
105
} )
104
106
}
105
107
}
@@ -109,12 +111,14 @@ impl SeriesNormalize {
109
111
offset : Millisecond ,
110
112
time_index_column_name : N ,
111
113
need_filter_out_nan : bool ,
114
+ tag_columns : Vec < String > ,
112
115
input : LogicalPlan ,
113
116
) -> Self {
114
117
Self {
115
118
offset,
116
119
time_index_column_name : time_index_column_name. as_ref ( ) . to_string ( ) ,
117
120
need_filter_out_nan,
121
+ tag_columns,
118
122
input,
119
123
}
120
124
}
@@ -129,6 +133,7 @@ impl SeriesNormalize {
129
133
time_index_column_name : self . time_index_column_name . clone ( ) ,
130
134
need_filter_out_nan : self . need_filter_out_nan ,
131
135
input : exec_input,
136
+ tag_columns : self . tag_columns . clone ( ) ,
132
137
metric : ExecutionPlanMetricsSet :: new ( ) ,
133
138
} )
134
139
}
@@ -138,6 +143,7 @@ impl SeriesNormalize {
138
143
offset : self . offset ,
139
144
time_index : self . time_index_column_name . clone ( ) ,
140
145
filter_nan : self . need_filter_out_nan ,
146
+ tag_columns : self . tag_columns . clone ( ) ,
141
147
}
142
148
. encode_to_vec ( )
143
149
}
@@ -152,6 +158,7 @@ impl SeriesNormalize {
152
158
pb_normalize. offset ,
153
159
pb_normalize. time_index ,
154
160
pb_normalize. filter_nan ,
161
+ pb_normalize. tag_columns ,
155
162
placeholder_plan,
156
163
) )
157
164
}
@@ -162,6 +169,7 @@ pub struct SeriesNormalizeExec {
162
169
offset : Millisecond ,
163
170
time_index_column_name : String ,
164
171
need_filter_out_nan : bool ,
172
+ tag_columns : Vec < String > ,
165
173
166
174
input : Arc < dyn ExecutionPlan > ,
167
175
metric : ExecutionPlanMetricsSet ,
@@ -177,7 +185,14 @@ impl ExecutionPlan for SeriesNormalizeExec {
177
185
}
178
186
179
187
fn required_input_distribution ( & self ) -> Vec < Distribution > {
180
- vec ! [ Distribution :: SinglePartition ]
188
+ let schema = self . input . schema ( ) ;
189
+ vec ! [ Distribution :: HashPartitioned (
190
+ self . tag_columns
191
+ . iter( )
192
+ // Safety: the tag column names is verified in the planning phase
193
+ . map( |tag| Arc :: new( ColumnExpr :: new_with_schema( tag, & schema) . unwrap( ) ) as _)
194
+ . collect( ) ,
195
+ ) ]
181
196
}
182
197
183
198
fn properties ( & self ) -> & PlanProperties {
@@ -198,6 +213,7 @@ impl ExecutionPlan for SeriesNormalizeExec {
198
213
time_index_column_name : self . time_index_column_name . clone ( ) ,
199
214
need_filter_out_nan : self . need_filter_out_nan ,
200
215
input : children[ 0 ] . clone ( ) ,
216
+ tag_columns : self . tag_columns . clone ( ) ,
201
217
metric : self . metric . clone ( ) ,
202
218
} ) )
203
219
}
@@ -288,31 +304,24 @@ impl SeriesNormalizeStream {
288
304
289
305
// bias the timestamp column by offset
290
306
let ts_column_biased = if self . offset == 0 {
291
- ts_column. clone ( )
307
+ Arc :: new ( ts_column. clone ( ) ) as _
292
308
} else {
293
- TimestampMillisecondArray :: from_iter (
309
+ Arc :: new ( TimestampMillisecondArray :: from_iter (
294
310
ts_column. iter ( ) . map ( |ts| ts. map ( |ts| ts + self . offset ) ) ,
295
- )
311
+ ) )
296
312
} ;
297
313
let mut columns = input. columns ( ) . to_vec ( ) ;
298
- columns[ self . time_index ] = Arc :: new ( ts_column_biased) ;
299
-
300
- // sort the record batch
301
- let ordered_indices = compute:: sort_to_indices ( & columns[ self . time_index ] , None , None ) ?;
302
- let ordered_columns = columns
303
- . iter ( )
304
- . map ( |array| compute:: take ( array, & ordered_indices, None ) )
305
- . collect :: < ArrowResult < Vec < _ > > > ( ) ?;
306
- let ordered_batch = RecordBatch :: try_new ( input. schema ( ) , ordered_columns) ?;
314
+ columns[ self . time_index ] = ts_column_biased;
307
315
316
+ let result_batch = RecordBatch :: try_new ( input. schema ( ) , columns) ?;
308
317
if !self . need_filter_out_nan {
309
- return Ok ( ordered_batch ) ;
318
+ return Ok ( result_batch ) ;
310
319
}
311
320
312
321
// TODO(ruihang): consider the "special NaN"
313
322
// filter out NaN
314
323
let mut filter = vec ! [ true ; input. num_rows( ) ] ;
315
- for column in ordered_batch . columns ( ) {
324
+ for column in result_batch . columns ( ) {
316
325
if let Some ( float_column) = column. as_any ( ) . downcast_ref :: < Float64Array > ( ) {
317
326
for ( i, flag) in filter. iter_mut ( ) . enumerate ( ) {
318
327
if float_column. value ( i) . is_nan ( ) {
@@ -322,7 +331,7 @@ impl SeriesNormalizeStream {
322
331
}
323
332
}
324
333
325
- let result = compute:: filter_record_batch ( & ordered_batch , & BooleanArray :: from ( filter) )
334
+ let result = compute:: filter_record_batch ( & result_batch , & BooleanArray :: from ( filter) )
326
335
. map_err ( |e| DataFusionError :: ArrowError ( e, None ) ) ?;
327
336
Ok ( result)
328
337
}
@@ -338,10 +347,10 @@ impl Stream for SeriesNormalizeStream {
338
347
type Item = DataFusionResult < RecordBatch > ;
339
348
340
349
fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
341
- let timer = std:: time:: Instant :: now ( ) ;
342
350
let poll = match ready ! ( self . input. poll_next_unpin( cx) ) {
343
351
Some ( Ok ( batch) ) => {
344
352
self . num_series . add ( 1 ) ;
353
+ let timer = std:: time:: Instant :: now ( ) ;
345
354
let result = Ok ( batch) . and_then ( |batch| self . normalize ( batch) ) ;
346
355
self . metric . elapsed_compute ( ) . add_elapsed ( timer) ;
347
356
Poll :: Ready ( Some ( result) )
@@ -399,6 +408,7 @@ mod test {
399
408
time_index_column_name : TIME_INDEX_COLUMN . to_string ( ) ,
400
409
need_filter_out_nan : true ,
401
410
input : memory_exec,
411
+ tag_columns : vec ! [ "path" . to_string( ) ] ,
402
412
metric : ExecutionPlanMetricsSet :: new ( ) ,
403
413
} ) ;
404
414
let session_context = SessionContext :: default ( ) ;
@@ -413,11 +423,11 @@ mod test {
413
423
"+---------------------+--------+------+\
414
424
\n | timestamp | value | path |\
415
425
\n +---------------------+--------+------+\
426
+ \n | 1970-01-01T00:01:00 | 0.0 | foo |\
427
+ \n | 1970-01-01T00:02:00 | 1.0 | foo |\
416
428
\n | 1970-01-01T00:00:00 | 10.0 | foo |\
417
429
\n | 1970-01-01T00:00:30 | 100.0 | foo |\
418
- \n | 1970-01-01T00:01:00 | 0.0 | foo |\
419
430
\n | 1970-01-01T00:01:30 | 1000.0 | foo |\
420
- \n | 1970-01-01T00:02:00 | 1.0 | foo |\
421
431
\n +---------------------+--------+------+",
422
432
) ;
423
433
@@ -428,11 +438,12 @@ mod test {
428
438
async fn test_offset_record_batch ( ) {
429
439
let memory_exec = Arc :: new ( prepare_test_data ( ) ) ;
430
440
let normalize_exec = Arc :: new ( SeriesNormalizeExec {
431
- offset : 1_000 , // offset 1s
441
+ offset : 1_000 ,
432
442
time_index_column_name : TIME_INDEX_COLUMN . to_string ( ) ,
433
443
need_filter_out_nan : true ,
434
444
input : memory_exec,
435
445
metric : ExecutionPlanMetricsSet :: new ( ) ,
446
+ tag_columns : vec ! [ "path" . to_string( ) ] ,
436
447
} ) ;
437
448
let session_context = SessionContext :: default ( ) ;
438
449
let result = datafusion:: physical_plan:: collect ( normalize_exec, session_context. task_ctx ( ) )
@@ -446,11 +457,11 @@ mod test {
446
457
"+---------------------+--------+------+\
447
458
\n | timestamp | value | path |\
448
459
\n +---------------------+--------+------+\
460
+ \n | 1970-01-01T00:01:01 | 0.0 | foo |\
461
+ \n | 1970-01-01T00:02:01 | 1.0 | foo |\
449
462
\n | 1970-01-01T00:00:01 | 10.0 | foo |\
450
463
\n | 1970-01-01T00:00:31 | 100.0 | foo |\
451
- \n | 1970-01-01T00:01:01 | 0.0 | foo |\
452
464
\n | 1970-01-01T00:01:31 | 1000.0 | foo |\
453
- \n | 1970-01-01T00:02:01 | 1.0 | foo |\
454
465
\n +---------------------+--------+------+",
455
466
) ;
456
467
0 commit comments