Skip to content

Commit 94131dd

Browse files
committed
show alternative dynamic filter pushdown approach
1 parent 10430c2 commit 94131dd

File tree

15 files changed

+158
-114
lines changed

15 files changed

+158
-114
lines changed

datafusion/core/tests/parquet/file_statistics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ async fn check_stats_precision_with_filter_pushdown() {
8383
Arc::new(FilterExec::try_new(physical_filter, exec_with_filter).unwrap())
8484
as Arc<dyn ExecutionPlan>;
8585

86-
let optimized_exec = FilterPushdown::new()
86+
let optimized_exec = FilterPushdown::new_static()
8787
.optimize(filtered_exec, &options)
8888
.unwrap();
8989

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ fn test_pushdown_into_scan() {
6363

6464
// expect the predicate to be pushed down into the DataSource
6565
insta::assert_snapshot!(
66-
OptimizationTest::new(plan, FilterPushdown{}, true),
66+
OptimizationTest::new(plan, FilterPushdown::new_static(), true),
6767
@r"
6868
OptimizationTest:
6969
input:
@@ -87,7 +87,7 @@ fn test_pushdown_into_scan_with_config_options() {
8787
insta::assert_snapshot!(
8888
OptimizationTest::new(
8989
Arc::clone(&plan),
90-
FilterPushdown {},
90+
FilterPushdown::new_static(),
9191
false
9292
),
9393
@r"
@@ -106,7 +106,7 @@ fn test_pushdown_into_scan_with_config_options() {
106106
insta::assert_snapshot!(
107107
OptimizationTest::new(
108108
plan,
109-
FilterPushdown {},
109+
FilterPushdown::new_static(),
110110
true
111111
),
112112
@r"
@@ -131,7 +131,7 @@ fn test_filter_collapse() {
131131
let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap());
132132

133133
insta::assert_snapshot!(
134-
OptimizationTest::new(plan, FilterPushdown{}, true),
134+
OptimizationTest::new(plan, FilterPushdown::new_static(), true),
135135
@r"
136136
OptimizationTest:
137137
input:
@@ -159,7 +159,7 @@ fn test_filter_with_projection() {
159159

160160
// expect the predicate to be pushed down into the DataSource but the FilterExec to be converted to ProjectionExec
161161
insta::assert_snapshot!(
162-
OptimizationTest::new(plan, FilterPushdown{}, true),
162+
OptimizationTest::new(plan, FilterPushdown::new_static(), true),
163163
@r"
164164
OptimizationTest:
165165
input:
@@ -182,7 +182,7 @@ fn test_filter_with_projection() {
182182
.unwrap(),
183183
);
184184
insta::assert_snapshot!(
185-
OptimizationTest::new(plan, FilterPushdown{},true),
185+
OptimizationTest::new(plan, FilterPushdown::new_static(),true),
186186
@r"
187187
OptimizationTest:
188188
input:
@@ -211,7 +211,7 @@ fn test_push_down_through_transparent_nodes() {
211211

212212
// expect the predicate to be pushed down into the DataSource
213213
insta::assert_snapshot!(
214-
OptimizationTest::new(plan, FilterPushdown{},true),
214+
OptimizationTest::new(plan, FilterPushdown::new_static(),true),
215215
@r"
216216
OptimizationTest:
217217
input:
@@ -275,7 +275,7 @@ fn test_no_pushdown_through_aggregates() {
275275

276276
// expect the predicate to be pushed down into the DataSource
277277
insta::assert_snapshot!(
278-
OptimizationTest::new(plan, FilterPushdown{}, true),
278+
OptimizationTest::new(plan, FilterPushdown::new_static(), true),
279279
@r"
280280
OptimizationTest:
281281
input:
@@ -306,7 +306,7 @@ fn test_node_handles_child_pushdown_result() {
306306
let predicate = col_lit_predicate("a", "foo", &schema());
307307
let plan = Arc::new(TestNode::new(true, Arc::clone(&scan), predicate));
308308
insta::assert_snapshot!(
309-
OptimizationTest::new(plan, FilterPushdown{}, true),
309+
OptimizationTest::new(plan, FilterPushdown::new_static(), true),
310310
@r"
311311
OptimizationTest:
312312
input:
@@ -325,7 +325,7 @@ fn test_node_handles_child_pushdown_result() {
325325
let predicate = col_lit_predicate("a", "foo", &schema());
326326
let plan = Arc::new(TestNode::new(true, Arc::clone(&scan), predicate));
327327
insta::assert_snapshot!(
328-
OptimizationTest::new(plan, FilterPushdown{}, true),
328+
OptimizationTest::new(plan, FilterPushdown::new_static(), true),
329329
@r"
330330
OptimizationTest:
331331
input:
@@ -345,7 +345,7 @@ fn test_node_handles_child_pushdown_result() {
345345
let predicate = col_lit_predicate("a", "foo", &schema());
346346
let plan = Arc::new(TestNode::new(false, Arc::clone(&scan), predicate));
347347
insta::assert_snapshot!(
348-
OptimizationTest::new(plan, FilterPushdown{}, true),
348+
OptimizationTest::new(plan, FilterPushdown::new_static(), true),
349349
@r"
350350
OptimizationTest:
351351
input:
@@ -393,7 +393,7 @@ async fn test_topk_dynamic_filter_pushdown() {
393393

394394
// expect the predicate to be pushed down into the DataSource
395395
insta::assert_snapshot!(
396-
OptimizationTest::new(Arc::clone(&plan), FilterPushdown{}, true),
396+
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_dynamic(), true),
397397
@r"
398398
OptimizationTest:
399399
input:
@@ -409,7 +409,9 @@ async fn test_topk_dynamic_filter_pushdown() {
409409
// Actually apply the optimization to the plan and put some data through it to check that the filter is updated to reflect the TopK state
410410
let mut config = ConfigOptions::default();
411411
config.execution.parquet.pushdown_filters = true;
412-
let plan = FilterPushdown::new().optimize(plan, &config).unwrap();
412+
let plan = FilterPushdown::new_dynamic()
413+
.optimize(plan, &config)
414+
.unwrap();
413415
let config = SessionConfig::new().with_batch_size(2);
414416
let session_ctx = SessionContext::new_with_config(config);
415417
session_ctx.register_object_store(

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,17 @@ impl ExecutionPlan for TestNode {
508508
unimplemented!("TestInsertExec is a stub for testing.")
509509
}
510510

511-
fn gather_filters_for_pushdown(
511+
fn gather_static_filters_for_pushdown(
512+
&self,
513+
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
514+
_config: &ConfigOptions,
515+
) -> Result<FilterDescription> {
516+
Ok(FilterDescription::new_with_child_count(1)
517+
.all_parent_filters_supported(parent_filters)
518+
.with_self_filter(Arc::clone(&self.predicate)))
519+
}
520+
521+
fn gather_dynamic_filters_for_pushdown(
512522
&self,
513523
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
514524
_config: &ConfigOptions,

datafusion/physical-optimizer/src/enforce_distribution.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1308,7 +1308,6 @@ pub fn ensure_distribution(
13081308
.downcast_ref::<OutputRequirementExec>()
13091309
.map(|output| output.fetch())
13101310
.unwrap_or(None),
1311-
None,
13121311
)?;
13131312
}
13141313
}

datafusion/physical-optimizer/src/enforce_sorting/mod.rs

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ use datafusion_common::config::ConfigOptions;
5757
use datafusion_common::plan_err;
5858
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
5959
use datafusion_common::Result;
60-
use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr;
6160
use datafusion_physical_expr::{Distribution, Partitioning};
6261
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
6362
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -404,7 +403,7 @@ pub fn parallelize_sorts(
404403
&& requirements.plan.output_partitioning().partition_count() <= 1
405404
{
406405
// Take the initial sort expressions and requirements
407-
let (sort_exprs, fetch, filter) = get_sort_exprs(&requirements.plan)?;
406+
let (sort_exprs, fetch) = get_sort_exprs(&requirements.plan)?;
408407
let sort_reqs = LexRequirement::from(sort_exprs.clone());
409408
let sort_exprs = sort_exprs.clone();
410409

@@ -418,7 +417,7 @@ pub fn parallelize_sorts(
418417
// deals with the children and their children and so on.
419418
requirements = requirements.children.swap_remove(0);
420419

421-
requirements = add_sort_above_with_check(requirements, sort_reqs, fetch, filter)?;
420+
requirements = add_sort_above_with_check(requirements, sort_reqs, fetch)?;
422421

423422
let spm =
424423
SortPreservingMergeExec::new(sort_exprs, Arc::clone(&requirements.plan));
@@ -514,7 +513,6 @@ pub fn ensure_sorting(
514513
.downcast_ref::<OutputRequirementExec>()
515514
.map(|output| output.fetch())
516515
.unwrap_or(None),
517-
None,
518516
);
519517
child = update_sort_ctx_children_data(child, true)?;
520518
}
@@ -646,7 +644,7 @@ fn adjust_window_sort_removal(
646644
// Satisfy the ordering requirement so that the window can run:
647645
let mut child_node = window_tree.children.swap_remove(0);
648646
if let Some(reqs) = reqs {
649-
child_node = add_sort_above(child_node, reqs.into_single(), None, None);
647+
child_node = add_sort_above(child_node, reqs.into_single(), None);
650648
}
651649
let child_plan = Arc::clone(&child_node.plan);
652650
window_tree.children.push(child_node);
@@ -805,20 +803,15 @@ fn remove_corresponding_sort_from_sub_plan(
805803
Ok(node)
806804
}
807805

808-
/// Return type for get_sort_exprs function
809-
type SortExprsResult<'a> = (
810-
&'a LexOrdering,
811-
Option<usize>,
812-
Option<Arc<DynamicFilterPhysicalExpr>>,
813-
);
814-
815806
/// Converts an [ExecutionPlan] trait object to a [LexOrdering] reference when possible.
816-
fn get_sort_exprs(sort_any: &Arc<dyn ExecutionPlan>) -> Result<SortExprsResult<'_>> {
807+
fn get_sort_exprs(
808+
sort_any: &Arc<dyn ExecutionPlan>,
809+
) -> Result<(&LexOrdering, Option<usize>)> {
817810
if let Some(sort_exec) = sort_any.as_any().downcast_ref::<SortExec>() {
818-
Ok((sort_exec.expr(), sort_exec.fetch(), sort_exec.filter()))
811+
Ok((sort_exec.expr(), sort_exec.fetch()))
819812
} else if let Some(spm) = sort_any.as_any().downcast_ref::<SortPreservingMergeExec>()
820813
{
821-
Ok((spm.expr(), spm.fetch(), None))
814+
Ok((spm.expr(), spm.fetch()))
822815
} else {
823816
plan_err!("Given ExecutionPlan is not a SortExec or a SortPreservingMergeExec")
824817
}

datafusion/physical-optimizer/src/enforce_sorting/sort_pushdown.rs

Lines changed: 1 addition & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use arrow::datatypes::SchemaRef;
2626
use datafusion_common::tree_node::{Transformed, TreeNode};
2727
use datafusion_common::{internal_err, HashSet, JoinSide, Result};
2828
use datafusion_expr::JoinType;
29-
use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr};
29+
use datafusion_physical_expr::expressions::Column;
3030
use datafusion_physical_expr::utils::collect_columns;
3131
use datafusion_physical_expr::{
3232
add_offset_to_physical_sort_exprs, EquivalenceProperties,
@@ -57,7 +57,6 @@ use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
5757
pub struct ParentRequirements {
5858
ordering_requirement: Option<OrderingRequirements>,
5959
fetch: Option<usize>,
60-
filter: Option<Arc<DynamicFilterPhysicalExpr>>,
6160
}
6261

6362
pub type SortPushDown = PlanContext<ParentRequirements>;
@@ -71,8 +70,6 @@ pub fn assign_initial_requirements(sort_push_down: &mut SortPushDown) {
7170
// If the parent has a fetch value, assign it to the children
7271
// Or use the fetch value of the child.
7372
fetch: child.plan.fetch(),
74-
// If the parent has a filter, assign it to the children
75-
filter: sort_push_down.data.filter.clone(),
7673
};
7774
}
7875
}
@@ -98,7 +95,6 @@ fn pushdown_sorts_helper(
9895
) -> Result<Transformed<SortPushDown>> {
9996
let plan = sort_push_down.plan;
10097
let parent_fetch = sort_push_down.data.fetch;
101-
let parent_filter = sort_push_down.data.filter.clone();
10298

10399
let Some(parent_requirement) = sort_push_down.data.ordering_requirement.clone()
104100
else {
@@ -118,18 +114,6 @@ fn pushdown_sorts_helper(
118114
sort_push_down.data.fetch = fetch;
119115
sort_push_down.data.ordering_requirement =
120116
Some(OrderingRequirements::from(sort_ordering));
121-
let filter = plan
122-
.as_any()
123-
.downcast_ref::<SortExec>()
124-
.and_then(|s| s.filter().clone());
125-
match filter {
126-
Some(filter) => {
127-
sort_push_down.data.filter = Some(filter);
128-
}
129-
None => {
130-
sort_push_down.data.filter = parent_filter.clone();
131-
}
132-
}
133117
// Recursive call to helper, so it doesn't transform_down and miss
134118
// the new node (previous child of sort):
135119
return pushdown_sorts_helper(sort_push_down);
@@ -147,20 +131,11 @@ fn pushdown_sorts_helper(
147131
return internal_err!("SortExec should have output ordering");
148132
};
149133

150-
let filter = plan
151-
.as_any()
152-
.downcast_ref::<SortExec>()
153-
.and_then(|s| s.filter().clone());
154-
155134
let sort_fetch = plan.fetch();
156135
let parent_is_stricter = eqp.requirements_compatible(
157136
parent_requirement.first().clone(),
158137
sort_ordering.clone().into(),
159138
);
160-
let sort_filter = plan
161-
.as_any()
162-
.downcast_ref::<SortExec>()
163-
.and_then(|s| s.filter().clone());
164139

165140
// Remove the current sort as we are either going to prove that it is
166141
// unnecessary, or replace it with a stricter sort.
@@ -177,27 +152,17 @@ fn pushdown_sorts_helper(
177152
sort_push_down,
178153
parent_requirement.into_single(),
179154
parent_fetch,
180-
filter.clone(),
181155
);
182156
// Update pushdown requirements:
183157
sort_push_down.children[0].data = ParentRequirements {
184158
ordering_requirement: Some(OrderingRequirements::from(sort_ordering)),
185159
fetch: sort_fetch,
186-
filter,
187160
};
188161
return Ok(Transformed::yes(sort_push_down));
189162
} else {
190163
// Sort was unnecessary, just propagate the stricter fetch and
191164
// ordering requirements:
192165
sort_push_down.data.fetch = min_fetch(sort_fetch, parent_fetch);
193-
match sort_filter {
194-
Some(filter) => {
195-
sort_push_down.data.filter = Some(filter);
196-
}
197-
None => {
198-
sort_push_down.data.filter = parent_filter.clone();
199-
}
200-
}
201166
let current_is_stricter = eqp.requirements_compatible(
202167
sort_ordering.clone().into(),
203168
parent_requirement.first().clone(),
@@ -229,22 +194,9 @@ fn pushdown_sorts_helper(
229194
// For operators that can take a sort pushdown, continue with updated
230195
// requirements:
231196
let current_fetch = sort_push_down.plan.fetch();
232-
let current_filter = sort_push_down
233-
.plan
234-
.as_any()
235-
.downcast_ref::<SortExec>()
236-
.and_then(|s| s.filter().clone());
237197
for (child, order) in sort_push_down.children.iter_mut().zip(adjusted) {
238198
child.data.ordering_requirement = order;
239199
child.data.fetch = min_fetch(current_fetch, parent_fetch);
240-
match current_filter {
241-
Some(ref filter) => {
242-
child.data.filter = Some(Arc::clone(filter));
243-
}
244-
None => {
245-
child.data.filter = parent_filter.clone();
246-
}
247-
}
248200
}
249201
sort_push_down.data.ordering_requirement = None;
250202
} else {
@@ -253,7 +205,6 @@ fn pushdown_sorts_helper(
253205
sort_push_down,
254206
parent_requirement.into_single(),
255207
parent_fetch,
256-
parent_filter,
257208
);
258209
assign_initial_requirements(&mut sort_push_down);
259210
}

0 commit comments

Comments
 (0)