Skip to content

Commit 73b800a

Browse files
committed
update slts
1 parent d39c36b commit 73b800a

File tree

7 files changed

+157
-30
lines changed

7 files changed

+157
-30
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 90 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,40 @@
1717

1818
use std::sync::{Arc, LazyLock};
1919

20-
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
20+
use arrow::{
21+
array::record_batch,
22+
datatypes::{DataType, Field, Schema, SchemaRef},
23+
};
24+
use arrow_schema::SortOptions;
2125
use datafusion::{
2226
logical_expr::Operator,
2327
physical_plan::{
2428
expressions::{BinaryExpr, Column, Literal},
2529
PhysicalExpr,
2630
},
31+
prelude::{SessionConfig, SessionContext},
2732
scalar::ScalarValue,
2833
};
2934
use datafusion_common::config::ConfigOptions;
35+
use datafusion_execution::object_store::ObjectStoreUrl;
3036
use datafusion_functions_aggregate::count::count_udaf;
31-
use datafusion_physical_expr::expressions::col;
3237
use datafusion_physical_expr::{aggregate::AggregateExprBuilder, Partitioning};
33-
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
38+
use datafusion_physical_expr::{expressions::col, LexOrdering, PhysicalSortExpr};
39+
use datafusion_physical_optimizer::{
40+
filter_pushdown::FilterPushdown, PhysicalOptimizerRule,
41+
};
3442
use datafusion_physical_plan::{
3543
aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
3644
coalesce_batches::CoalesceBatchesExec,
3745
filter::FilterExec,
3846
repartition::RepartitionExec,
47+
sorts::sort::SortExec,
48+
ExecutionPlan,
3949
};
4050

41-
use util::{OptimizationTest, TestNode, TestScanBuilder};
51+
use futures::StreamExt;
52+
use object_store::memory::InMemory;
53+
use util::{format_plan_for_test, OptimizationTest, TestNode, TestScanBuilder};
4254

4355
mod util;
4456

@@ -346,6 +358,80 @@ fn test_node_handles_child_pushdown_result() {
346358
);
347359
}
348360

361+
#[tokio::test]
362+
async fn test_topk_dynamic_filter_pushdown() {
363+
// This test is a bit of a hack, but it shows that we can push down dynamic filters
364+
// into the DataSourceExec. The test is not perfect because we don't have a real
365+
// implementation of the dynamic filter yet, so we just use a static filter.
366+
let batches = vec![
367+
record_batch!(
368+
("a", Utf8, ["aa", "ab"]),
369+
("b", Utf8, ["bd", "bc"]),
370+
("c", Float64, [1.0, 2.0])
371+
)
372+
.unwrap(),
373+
record_batch!(
374+
("a", Utf8, ["ac", "ad"]),
375+
("b", Utf8, ["bb", "ba"]),
376+
("c", Float64, [2.0, 1.0])
377+
)
378+
.unwrap(),
379+
];
380+
let scan = TestScanBuilder::new(schema())
381+
.with_support(true)
382+
.with_batches(batches)
383+
.build();
384+
let plan = Arc::new(
385+
SortExec::new(
386+
LexOrdering::new(vec![PhysicalSortExpr::new(
387+
col("b", &schema()).unwrap(),
388+
SortOptions::new(true, false), // descending, nulls_first
389+
)]),
390+
Arc::clone(&scan),
391+
)
392+
.with_fetch(Some(1)),
393+
) as Arc<dyn ExecutionPlan>;
394+
395+
// expect the predicate to be pushed down into the DataSource
396+
insta::assert_snapshot!(
397+
OptimizationTest::new(Arc::clone(&plan), FilterPushdown{}, true),
398+
@r"
399+
OptimizationTest:
400+
input:
401+
- SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], preserve_partitioning=[false]
402+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true
403+
output:
404+
Ok:
405+
- SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], preserve_partitioning=[false]
406+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ]
407+
"
408+
);
409+
410+
// Actually apply the optimization to the plan
411+
let mut config = ConfigOptions::default();
412+
config.execution.parquet.pushdown_filters = true;
413+
let plan = FilterPushdown {}.optimize(plan, &config).unwrap();
414+
let config = SessionConfig::new().with_batch_size(2);
415+
let session_ctx = SessionContext::new_with_config(config);
416+
session_ctx.register_object_store(
417+
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
418+
Arc::new(InMemory::new()),
419+
);
420+
let state = session_ctx.state();
421+
let task_ctx = state.task_ctx();
422+
let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap();
423+
// Iterate one batch
424+
stream.next().await.unwrap().unwrap();
425+
// Now check what our filter looks like
426+
insta::assert_snapshot!(
427+
format!("{}", format_plan_for_test(&plan)),
428+
@r"
429+
- SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], preserve_partitioning=[false], filter=[b@1 > bd]
430+
- DataSourceExec: file_groups={1 group: [[test.paqruet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ b@1 > bd ]
431+
"
432+
);
433+
}
434+
349435
/// Schema:
350436
/// a: String
351437
/// b: String

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,11 @@ impl TestScanBuilder {
266266
self
267267
}
268268

269+
pub fn with_batches(mut self, batches: Vec<RecordBatch>) -> Self {
270+
self.batches = batches;
271+
self
272+
}
273+
269274
pub fn build(self) -> Arc<dyn ExecutionPlan> {
270275
let source = Arc::new(TestSource::new(self.support, self.batches));
271276
let base_config = FileScanConfigBuilder::new(
@@ -421,6 +426,15 @@ fn format_lines(s: &str) -> Vec<String> {
421426
s.trim().split('\n').map(|s| s.to_string()).collect()
422427
}
423428

429+
pub fn format_plan_for_test(plan: &Arc<dyn ExecutionPlan>) -> String {
430+
let mut out = String::new();
431+
for line in format_execution_plan(plan) {
432+
out.push_str(&format!(" - {line}\n"));
433+
}
434+
out.push('\n');
435+
out
436+
}
437+
424438
#[derive(Debug)]
425439
pub(crate) struct TestNode {
426440
inject_filter: bool,

datafusion/physical-optimizer/src/optimizer.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,6 @@ impl PhysicalOptimizer {
9595
// as that rule may inject other operations in between the different AggregateExecs.
9696
// Applying the rule early means only directly-connected AggregateExecs must be examined.
9797
Arc::new(LimitedDistinctAggregation::new()),
98-
// The FilterPushdown rule tries to push down filters as far as it can.
99-
// For example, it will push down filtering from a `FilterExec` to
100-
// a `DataSourceExec`, or from a `TopK`'s current state to a `DataSourceExec`.
101-
Arc::new(FilterPushdown::new()),
10298
// The EnforceDistribution rule is for adding essential repartitioning to satisfy distribution
10399
// requirements. Please make sure that the whole plan tree is determined before this rule.
104100
// This rule increases parallelism if doing so is beneficial to the physical plan; i.e. at
@@ -137,6 +133,10 @@ impl PhysicalOptimizer {
137133
// are not present, the load of executors such as join or union will be
138134
// reduced by narrowing their input tables.
139135
Arc::new(ProjectionPushdown::new()),
136+
// The FilterPushdown rule tries to push down filters as far as it can.
137+
// For example, it will push down filtering from a `FilterExec` to
138+
// a `DataSourceExec`, or from a `TopK`'s current state to a `DataSourceExec`.
139+
Arc::new(FilterPushdown::new()),
140140
// The SanityCheckPlan rule checks whether the order and
141141
// distribution requirements of each node in the plan
142142
// is satisfied. It will also reject non-runnable query

datafusion/sqllogictest/test_files/explain.slt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,6 @@ physical_plan after OutputRequirements
229229
physical_plan after aggregate_statistics SAME TEXT AS ABOVE
230230
physical_plan after join_selection SAME TEXT AS ABOVE
231231
physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE
232-
physical_plan after PushdownFilter SAME TEXT AS ABOVE
233232
physical_plan after EnforceDistribution SAME TEXT AS ABOVE
234233
physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
235234
physical_plan after EnforceSorting SAME TEXT AS ABOVE
@@ -240,6 +239,7 @@ physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[W
240239
physical_plan after LimitAggregation SAME TEXT AS ABOVE
241240
physical_plan after LimitPushdown SAME TEXT AS ABOVE
242241
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
242+
physical_plan after PushdownFilter SAME TEXT AS ABOVE
243243
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
244244
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true
245245
physical_plan_with_stats DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]]
@@ -304,7 +304,6 @@ physical_plan after OutputRequirements
304304
physical_plan after aggregate_statistics SAME TEXT AS ABOVE
305305
physical_plan after join_selection SAME TEXT AS ABOVE
306306
physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE
307-
physical_plan after PushdownFilter SAME TEXT AS ABOVE
308307
physical_plan after EnforceDistribution SAME TEXT AS ABOVE
309308
physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
310309
physical_plan after EnforceSorting SAME TEXT AS ABOVE
@@ -317,6 +316,7 @@ physical_plan after OutputRequirements
317316
physical_plan after LimitAggregation SAME TEXT AS ABOVE
318317
physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
319318
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
319+
physical_plan after PushdownFilter SAME TEXT AS ABOVE
320320
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
321321
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
322322
physical_plan_with_schema DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N]
@@ -345,7 +345,6 @@ physical_plan after OutputRequirements
345345
physical_plan after aggregate_statistics SAME TEXT AS ABOVE
346346
physical_plan after join_selection SAME TEXT AS ABOVE
347347
physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE
348-
physical_plan after PushdownFilter SAME TEXT AS ABOVE
349348
physical_plan after EnforceDistribution SAME TEXT AS ABOVE
350349
physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
351350
physical_plan after EnforceSorting SAME TEXT AS ABOVE
@@ -358,6 +357,7 @@ physical_plan after OutputRequirements
358357
physical_plan after LimitAggregation SAME TEXT AS ABOVE
359358
physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet
360359
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
360+
physical_plan after PushdownFilter SAME TEXT AS ABOVE
361361
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
362362
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet
363363
physical_plan_with_stats DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]

datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,10 @@ logical_plan
8686
physical_plan
8787
01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
8888
02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
89-
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[]
89+
03)----CoalesceBatchesExec: target_batch_size=8192
90+
04)------ProjectionExec: expr=[a@0 as a]
91+
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
92+
06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[]
9093

9194

9295
# When filter pushdown *is* enabled, ParquetExec can filter exactly,
@@ -134,7 +137,10 @@ logical_plan
134137
physical_plan
135138
01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
136139
02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
137-
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 AND a@0 IS NOT NULL, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2 AND a_null_count@3 != row_count@2, required_guarantees=[]
140+
03)----CoalesceBatchesExec: target_batch_size=8192
141+
04)------ProjectionExec: expr=[a@0 as a]
142+
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
143+
06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=b@1 > 2 AND a@0 IS NOT NULL, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2 AND a_null_count@3 != row_count@2, required_guarantees=[]
138144

139145

140146
query I
@@ -153,7 +159,10 @@ logical_plan
153159
physical_plan
154160
01)SortPreservingMergeExec: [b@0 ASC NULLS LAST]
155161
02)--SortExec: expr=[b@0 ASC NULLS LAST], preserve_partitioning=[true]
156-
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[b], file_type=parquet, predicate=a@0 = bar, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= bar AND bar <= a_max@1, required_guarantees=[a in (bar)]
162+
03)----CoalesceBatchesExec: target_batch_size=8192
163+
04)------ProjectionExec: expr=[b@1 as b]
164+
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
165+
06)----------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], file_type=parquet, predicate=a@0 = bar, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= bar AND bar <= a_max@1, required_guarantees=[a in (bar)]
157166

158167
## cleanup
159168
statement ok
@@ -229,7 +238,10 @@ EXPLAIN select * from t_pushdown where val != 'c';
229238
logical_plan
230239
01)Filter: t_pushdown.val != Utf8("c")
231240
02)--TableScan: t_pushdown projection=[val, part], partial_filters=[t_pushdown.val != Utf8("c")]
232-
physical_plan DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != c, pruning_predicate=val_null_count@2 != row_count@3 AND (val_min@0 != c OR c != val_max@1), required_guarantees=[val not in (c)]
241+
physical_plan
242+
01)CoalesceBatchesExec: target_batch_size=8192
243+
02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
244+
03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != c, pruning_predicate=val_null_count@2 != row_count@3 AND (val_min@0 != c OR c != val_max@1), required_guarantees=[val not in (c)]
233245

234246
# If we have a mix of filters:
235247
# - The partition filters get evaluated during planning

0 commit comments

Comments
 (0)