Skip to content

Commit a4f4b17

Browse files
Fix constant window for evaluate stateful (#16430)
* Fix constant window for evaluate stateful * Add test * Add ticket reference to test --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 8b03e5e commit a4f4b17

File tree

3 files changed

+96
-0
lines changed

3 files changed

+96
-0
lines changed

datafusion/core/tests/physical_optimizer/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,4 @@ mod projection_pushdown;
3030
mod replace_with_order_preserving_variants;
3131
mod sanity_checker;
3232
mod test_utils;
33+
mod window_optimize;
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#[cfg(test)]
19+
mod test {
20+
use arrow::array::{Int32Array, RecordBatch};
21+
use arrow_schema::{DataType, Field, Schema};
22+
use datafusion_common::Result;
23+
use datafusion_datasource::memory::MemorySourceConfig;
24+
use datafusion_datasource::source::DataSourceExec;
25+
use datafusion_execution::TaskContext;
26+
use datafusion_expr::WindowFrame;
27+
use datafusion_functions_aggregate::count::count_udaf;
28+
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
29+
use datafusion_physical_expr::expressions::{col, Column};
30+
use datafusion_physical_expr::window::PlainAggregateWindowExpr;
31+
use datafusion_physical_plan::windows::BoundedWindowAggExec;
32+
use datafusion_physical_plan::{common, ExecutionPlan, InputOrderMode};
33+
use std::sync::Arc;
34+
35+
/// Test case for <https://github.com/apache/datafusion/issues/16308>
36+
#[tokio::test]
37+
async fn test_window_constant_aggregate() -> Result<()> {
38+
let source = mock_data()?;
39+
let schema = source.schema();
40+
let c = Arc::new(Column::new("b", 1));
41+
let cnt = AggregateExprBuilder::new(count_udaf(), vec![c])
42+
.schema(schema.clone())
43+
.alias("t")
44+
.build()?;
45+
let parition = [col("a", &schema)?];
46+
let frame = WindowFrame::new(None);
47+
let plain =
48+
PlainAggregateWindowExpr::new(Arc::new(cnt), &parition, &[], Arc::new(frame));
49+
50+
let bounded_agg_exec = BoundedWindowAggExec::try_new(
51+
vec![Arc::new(plain)],
52+
source,
53+
InputOrderMode::Linear,
54+
true,
55+
)?;
56+
let task_ctx = Arc::new(TaskContext::default());
57+
common::collect(bounded_agg_exec.execute(0, task_ctx)?).await?;
58+
59+
Ok(())
60+
}
61+
62+
pub fn mock_data() -> Result<Arc<DataSourceExec>> {
63+
let schema = Arc::new(Schema::new(vec![
64+
Field::new("a", DataType::Int32, true),
65+
Field::new("b", DataType::Int32, true),
66+
]));
67+
68+
let batch = RecordBatch::try_new(
69+
Arc::clone(&schema),
70+
vec![
71+
Arc::new(Int32Array::from(vec![
72+
Some(1),
73+
Some(1),
74+
Some(3),
75+
Some(2),
76+
Some(1),
77+
])),
78+
Arc::new(Int32Array::from(vec![
79+
Some(1),
80+
Some(6),
81+
Some(2),
82+
Some(8),
83+
Some(9),
84+
])),
85+
],
86+
)?;
87+
88+
MemorySourceConfig::try_new_exec(&[vec![batch]], Arc::clone(&schema), None)
89+
}
90+
}

datafusion/physical-expr/src/window/window_expr.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -277,6 +277,11 @@ pub trait AggregateWindowExpr: WindowExpr {
277277
let values = self.evaluate_args(record_batch)?;
278278

279279
if self.is_constant_in_partition() {
280+
if not_end {
281+
let field = self.field()?;
282+
let out_type = field.data_type();
283+
return Ok(new_empty_array(out_type));
284+
}
280285
accumulator.update_batch(&values)?;
281286
let value = accumulator.evaluate()?;
282287
return value.to_array_of_size(record_batch.num_rows());

0 commit comments

Comments
 (0)