Skip to content

Commit e9cb59b

Browse files
committed
move filter order
1 parent cc8ae9a commit e9cb59b

File tree

3 files changed

+12
-16
lines changed

3 files changed

+12
-16
lines changed

datafusion/physical-optimizer/src/optimizer.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,12 @@ impl PhysicalOptimizer {
9595
// as that rule may inject other operations in between the different AggregateExecs.
9696
// Applying the rule early means only directly-connected AggregateExecs must be examined.
9797
Arc::new(LimitedDistinctAggregation::new()),
98+
// The FilterPushdown rule tries to push down filters as far as it can.
99+
// For example, it will push down filtering from a `FilterExec` to
100+
// a `DataSourceExec`, or from a `TopK`'s current state to a `DataSourceExec`.
101+
// Since this may remove `FilterExec` nodes (or insert/swap other nodes)
102+
// it must run before `EnforceDistribution`.
103+
Arc::new(PushdownFilter::new()),
98104
// The EnforceDistribution rule is for adding essential repartitioning to satisfy distribution
99105
// requirements. Please make sure that the whole plan tree is determined before this rule.
100106
// This rule increases parallelism if doing so is beneficial to the physical plan; i.e. at
@@ -122,10 +128,6 @@ impl PhysicalOptimizer {
122128
// into an `order by max(x) limit y`. In this case it will copy the limit value down
123129
// to the aggregation, allowing it to use only y number of accumulators.
124130
Arc::new(TopKAggregation::new()),
125-
// The FilterPushdown rule tries to push down filters as far as it can.
126-
// For example, it will push down filtering from a `FilterExec` to
127-
// a `DataSourceExec`, or from a `TopK`'s current state to a `DataSourceExec`.
128-
Arc::new(PushdownFilter::new()),
129131
// The LimitPushdown rule tries to push limits down as far as possible,
130132
// replacing operators with fetching variants, or adding limits
131133
// past operators that support limit pushdown.

datafusion/sqllogictest/test_files/explain.slt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ physical_plan after OutputRequirements
229229
physical_plan after aggregate_statistics SAME TEXT AS ABOVE
230230
physical_plan after join_selection SAME TEXT AS ABOVE
231231
physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE
232+
physical_plan after PushdownFilter SAME TEXT AS ABOVE
232233
physical_plan after EnforceDistribution SAME TEXT AS ABOVE
233234
physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
234235
physical_plan after EnforceSorting SAME TEXT AS ABOVE
@@ -237,7 +238,6 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
237238
physical_plan after coalesce_batches SAME TEXT AS ABOVE
238239
physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true
239240
physical_plan after LimitAggregation SAME TEXT AS ABOVE
240-
physical_plan after PushdownFilter SAME TEXT AS ABOVE
241241
physical_plan after LimitPushdown SAME TEXT AS ABOVE
242242
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
243243
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
@@ -304,6 +304,7 @@ physical_plan after OutputRequirements
304304
physical_plan after aggregate_statistics SAME TEXT AS ABOVE
305305
physical_plan after join_selection SAME TEXT AS ABOVE
306306
physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE
307+
physical_plan after PushdownFilter SAME TEXT AS ABOVE
307308
physical_plan after EnforceDistribution SAME TEXT AS ABOVE
308309
physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
309310
physical_plan after EnforceSorting SAME TEXT AS ABOVE
@@ -314,7 +315,6 @@ physical_plan after OutputRequirements
314315
01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
315316
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
316317
physical_plan after LimitAggregation SAME TEXT AS ABOVE
317-
physical_plan after PushdownFilter SAME TEXT AS ABOVE
318318
physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
319319
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
320320
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
@@ -345,6 +345,7 @@ physical_plan after OutputRequirements
345345
physical_plan after aggregate_statistics SAME TEXT AS ABOVE
346346
physical_plan after join_selection SAME TEXT AS ABOVE
347347
physical_plan after LimitedDistinctAggregation SAME TEXT AS ABOVE
348+
physical_plan after PushdownFilter SAME TEXT AS ABOVE
348349
physical_plan after EnforceDistribution SAME TEXT AS ABOVE
349350
physical_plan after CombinePartialFinalAggregate SAME TEXT AS ABOVE
350351
physical_plan after EnforceSorting SAME TEXT AS ABOVE
@@ -355,7 +356,6 @@ physical_plan after OutputRequirements
355356
01)GlobalLimitExec: skip=0, fetch=10
356357
02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet
357358
physical_plan after LimitAggregation SAME TEXT AS ABOVE
358-
physical_plan after PushdownFilter SAME TEXT AS ABOVE
359359
physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet
360360
physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
361361
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE

datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,7 @@ logical_plan
8787
physical_plan
8888
01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
8989
02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
90-
03)----CoalesceBatchesExec: target_batch_size=8192
91-
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
92-
05)--------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 AND b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2 AND b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[]
90+
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 AND b@1 > 2, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2 AND b_null_count@1 != row_count@2 AND b_max@0 > 2, required_guarantees=[]
9391

9492

9593
# When filter pushdown *is* enabled, ParquetExec can filter exactly,
@@ -137,9 +135,7 @@ logical_plan
137135
physical_plan
138136
01)SortPreservingMergeExec: [a@0 ASC NULLS LAST]
139137
02)--SortExec: expr=[a@0 ASC NULLS LAST], preserve_partitioning=[true]
140-
03)----CoalesceBatchesExec: target_batch_size=8192
141-
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
142-
05)--------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 AND a@0 IS NOT NULL AND b@1 > 2 AND a@0 IS NOT NULL, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2 AND a_null_count@3 != row_count@2 AND b_null_count@1 != row_count@2 AND b_max@0 > 2 AND a_null_count@3 != row_count@2, required_guarantees=[]
138+
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], file_type=parquet, predicate=b@1 > 2 AND a@0 IS NOT NULL AND b@1 > 2 AND a@0 IS NOT NULL, pruning_predicate=b_null_count@1 != row_count@2 AND b_max@0 > 2 AND a_null_count@3 != row_count@2 AND b_null_count@1 != row_count@2 AND b_max@0 > 2 AND a_null_count@3 != row_count@2, required_guarantees=[]
143139

144140

145141
query I
@@ -158,9 +154,7 @@ logical_plan
158154
physical_plan
159155
01)SortPreservingMergeExec: [b@0 ASC NULLS LAST]
160156
02)--SortExec: expr=[b@0 ASC NULLS LAST], preserve_partitioning=[true]
161-
03)----CoalesceBatchesExec: target_batch_size=8192
162-
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2
163-
05)--------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[b], file_type=parquet, predicate=a@0 = bar AND a@0 = bar, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= bar AND bar <= a_max@1 AND a_null_count@2 != row_count@3 AND a_min@0 <= bar AND bar <= a_max@1, required_guarantees=[a in (bar)]
157+
03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[b], file_type=parquet, predicate=a@0 = bar AND a@0 = bar, pruning_predicate=a_null_count@2 != row_count@3 AND a_min@0 <= bar AND bar <= a_max@1 AND a_null_count@2 != row_count@3 AND a_min@0 <= bar AND bar <= a_max@1, required_guarantees=[a in (bar)]
164158

165159
## cleanup
166160
statement ok

0 commit comments

Comments
 (0)