Skip to content

Commit 7c0c5ae

Browse files
committed
use an enum?
1 parent 94131dd commit 7c0c5ae

File tree

12 files changed

+78
-121
lines changed

12 files changed

+78
-121
lines changed

datafusion/core/tests/parquet/file_statistics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ async fn check_stats_precision_with_filter_pushdown() {
8383
Arc::new(FilterExec::try_new(physical_filter, exec_with_filter).unwrap())
8484
as Arc<dyn ExecutionPlan>;
8585

86-
let optimized_exec = FilterPushdown::new_static()
86+
let optimized_exec = FilterPushdown::new_pre_optimization()
8787
.optimize(filtered_exec, &options)
8888
.unwrap();
8989

datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ fn test_pushdown_into_scan() {
6363

6464
// expect the predicate to be pushed down into the DataSource
6565
insta::assert_snapshot!(
66-
OptimizationTest::new(plan, FilterPushdown::new_static(), true),
66+
OptimizationTest::new(plan, FilterPushdown::new_pre_optimization(), true),
6767
@r"
6868
OptimizationTest:
6969
input:
@@ -87,7 +87,7 @@ fn test_pushdown_into_scan_with_config_options() {
8787
insta::assert_snapshot!(
8888
OptimizationTest::new(
8989
Arc::clone(&plan),
90-
FilterPushdown::new_static(),
90+
FilterPushdown::new_pre_optimization(),
9191
false
9292
),
9393
@r"
@@ -106,7 +106,7 @@ fn test_pushdown_into_scan_with_config_options() {
106106
insta::assert_snapshot!(
107107
OptimizationTest::new(
108108
plan,
109-
FilterPushdown::new_static(),
109+
FilterPushdown::new_pre_optimization(),
110110
true
111111
),
112112
@r"
@@ -131,7 +131,7 @@ fn test_filter_collapse() {
131131
let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap());
132132

133133
insta::assert_snapshot!(
134-
OptimizationTest::new(plan, FilterPushdown::new_static(), true),
134+
OptimizationTest::new(plan, FilterPushdown::new_pre_optimization(), true),
135135
@r"
136136
OptimizationTest:
137137
input:
@@ -159,7 +159,7 @@ fn test_filter_with_projection() {
159159

160160
// expect the predicate to be pushed down into the DataSource but the FilterExec to be converted to ProjectionExec
161161
insta::assert_snapshot!(
162-
OptimizationTest::new(plan, FilterPushdown::new_static(), true),
162+
OptimizationTest::new(plan, FilterPushdown::new_pre_optimization(), true),
163163
@r"
164164
OptimizationTest:
165165
input:
@@ -182,7 +182,7 @@ fn test_filter_with_projection() {
182182
.unwrap(),
183183
);
184184
insta::assert_snapshot!(
185-
OptimizationTest::new(plan, FilterPushdown::new_static(),true),
185+
OptimizationTest::new(plan, FilterPushdown::new_pre_optimization(),true),
186186
@r"
187187
OptimizationTest:
188188
input:
@@ -211,7 +211,7 @@ fn test_push_down_through_transparent_nodes() {
211211

212212
// expect the predicate to be pushed down into the DataSource
213213
insta::assert_snapshot!(
214-
OptimizationTest::new(plan, FilterPushdown::new_static(),true),
214+
OptimizationTest::new(plan, FilterPushdown::new_pre_optimization(),true),
215215
@r"
216216
OptimizationTest:
217217
input:
@@ -275,7 +275,7 @@ fn test_no_pushdown_through_aggregates() {
275275

276276
// expect the predicate to be pushed down into the DataSource
277277
insta::assert_snapshot!(
278-
OptimizationTest::new(plan, FilterPushdown::new_static(), true),
278+
OptimizationTest::new(plan, FilterPushdown::new_pre_optimization(), true),
279279
@r"
280280
OptimizationTest:
281281
input:
@@ -306,7 +306,7 @@ fn test_node_handles_child_pushdown_result() {
306306
let predicate = col_lit_predicate("a", "foo", &schema());
307307
let plan = Arc::new(TestNode::new(true, Arc::clone(&scan), predicate));
308308
insta::assert_snapshot!(
309-
OptimizationTest::new(plan, FilterPushdown::new_static(), true),
309+
OptimizationTest::new(plan, FilterPushdown::new_pre_optimization(), true),
310310
@r"
311311
OptimizationTest:
312312
input:
@@ -325,7 +325,7 @@ fn test_node_handles_child_pushdown_result() {
325325
let predicate = col_lit_predicate("a", "foo", &schema());
326326
let plan = Arc::new(TestNode::new(true, Arc::clone(&scan), predicate));
327327
insta::assert_snapshot!(
328-
OptimizationTest::new(plan, FilterPushdown::new_static(), true),
328+
OptimizationTest::new(plan, FilterPushdown::new_pre_optimization(), true),
329329
@r"
330330
OptimizationTest:
331331
input:
@@ -345,7 +345,7 @@ fn test_node_handles_child_pushdown_result() {
345345
let predicate = col_lit_predicate("a", "foo", &schema());
346346
let plan = Arc::new(TestNode::new(false, Arc::clone(&scan), predicate));
347347
insta::assert_snapshot!(
348-
OptimizationTest::new(plan, FilterPushdown::new_static(), true),
348+
OptimizationTest::new(plan, FilterPushdown::new_pre_optimization(), true),
349349
@r"
350350
OptimizationTest:
351351
input:
@@ -393,7 +393,7 @@ async fn test_topk_dynamic_filter_pushdown() {
393393

394394
// expect the predicate to be pushed down into the DataSource
395395
insta::assert_snapshot!(
396-
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_dynamic(), true),
396+
OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true),
397397
@r"
398398
OptimizationTest:
399399
input:
@@ -409,7 +409,7 @@ async fn test_topk_dynamic_filter_pushdown() {
409409
// Actually apply the optimization to the plan and put some data through it to check that the filter is updated to reflect the TopK state
410410
let mut config = ConfigOptions::default();
411411
config.execution.parquet.pushdown_filters = true;
412-
let plan = FilterPushdown::new_dynamic()
412+
let plan = FilterPushdown::new_post_optimization()
413413
.optimize(plan, &config)
414414
.unwrap();
415415
let config = SessionConfig::new().with_batch_size(2);

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,7 @@ impl ExecutionPlan for TestNode {
508508
unimplemented!("TestInsertExec is a stub for testing.")
509509
}
510510

511-
fn gather_static_filters_for_pushdown(
511+
fn gather_filters_for_pushdown(
512512
&self,
513513
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
514514
_config: &ConfigOptions,

datafusion/datasource/src/source.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext};
3636
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
3737
use datafusion_physical_expr_common::sort_expr::LexOrdering;
3838
use datafusion_physical_plan::filter_pushdown::{
39-
ChildPushdownResult, FilterPushdownPropagation,
39+
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation
4040
};
4141
use datafusion_physical_plan::yield_stream::wrap_yield_stream;
4242

@@ -318,6 +318,7 @@ impl ExecutionPlan for DataSourceExec {
318318

319319
fn handle_child_pushdown_result(
320320
&self,
321+
_phase: FilterPushdownPhase,
321322
child_pushdown_result: ChildPushdownResult,
322323
config: &ConfigOptions,
323324
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {

datafusion/physical-optimizer/src/filter_pushdown.rs

Lines changed: 11 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ use crate::PhysicalOptimizerRule;
2222
use datafusion_common::{config::ConfigOptions, Result};
2323
use datafusion_physical_expr::PhysicalExpr;
2424
use datafusion_physical_plan::filter_pushdown::{
25-
ChildPushdownResult, FilterPushdownPropagation, PredicateSupport, PredicateSupports,
25+
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation,
26+
PredicateSupport, PredicateSupports,
2627
};
2728
use datafusion_physical_plan::{with_new_children_if_necessary, ExecutionPlan};
2829

@@ -363,19 +364,19 @@ use itertools::izip;
363364
/// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec
364365
#[derive(Debug)]
365366
pub struct FilterPushdown {
366-
phase: PushdownPhase,
367+
phase: FilterPushdownPhase,
367368
}
368369

369370
impl FilterPushdown {
370-
pub fn new_static() -> Self {
371+
pub fn new_pre_optimization() -> Self {
371372
Self {
372-
phase: PushdownPhase::Static,
373+
phase: FilterPushdownPhase::BeforOptimization,
373374
}
374375
}
375376

376-
pub fn new_dynamic() -> Self {
377+
pub fn new_post_optimization() -> Self {
377378
Self {
378-
phase: PushdownPhase::Dynamic,
379+
phase: FilterPushdownPhase::AfterOptimization,
379380
}
380381
}
381382
}
@@ -415,7 +416,7 @@ fn push_down_filters(
415416
node: Arc<dyn ExecutionPlan>,
416417
parent_predicates: Vec<Arc<dyn PhysicalExpr>>,
417418
config: &ConfigOptions,
418-
phase: PushdownPhase,
419+
phase: FilterPushdownPhase,
419420
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
420421
// If the node has any child, these will be rewritten as supported or unsupported
421422
let mut parent_predicates_pushdown_states =
@@ -424,16 +425,8 @@ fn push_down_filters(
424425
let mut new_children = Vec::with_capacity(node.children().len());
425426

426427
let children = node.children();
427-
let filter_description = match phase {
428-
PushdownPhase::Static => {
429-
// Gather filters for static pushdown
430-
node.gather_static_filters_for_pushdown(parent_predicates.clone(), config)?
431-
}
432-
PushdownPhase::Dynamic => {
433-
// Gather filters for dynamic pushdown
434-
node.gather_dynamic_filters_for_pushdown(parent_predicates.clone(), config)?
435-
}
436-
};
428+
let filter_description =
429+
node.gather_filters_for_pushdown(phase, parent_predicates.clone(), config)?;
437430

438431
for (child, parent_filters, self_filters) in izip!(
439432
children,
@@ -543,6 +536,7 @@ fn push_down_filters(
543536
// `ExecutionPlan` implementation will not change the plan itself.
544537
// Should we have a separate method for dynamic pushdown that does not allow modifying the plan?
545538
let mut res = updated_node.handle_child_pushdown_result(
539+
phase,
546540
ChildPushdownResult {
547541
parent_filters: parent_pushdown_result,
548542
self_filters: self_filters_pushdown_supports,
@@ -556,17 +550,3 @@ fn push_down_filters(
556550
}
557551
Ok(res)
558552
}
559-
560-
/// Which phase of pushdown we are in.
561-
/// We have two phases:
562-
/// 1. Static: We are pushing down filters that have no references to plan nodes and thus the plan can be modified after they are pushed down.
563-
/// This phase also gives nodes the opportunity to rewrite the plan itself since we are early on in the optimization process.
564-
/// In practice this means we call [`ExecutionPlan::gather_filters_for_pushdown`] and [`ExecutionPlan::handle_child_pushdown_result`].
565-
/// 2. Dynamic: We are pushing down filters that have references to plan nodes and thus the plan cannot be modified after they are pushed down.
566-
/// This phase does not permit nodes to rewrite the plan itself since we are late in the optimization process.
567-
/// In practice this means we call [`ExecutionPlan::gather_dynamic_filters_for_pushdown`] but not [`ExecutionPlan::handle_child_pushdown_result`].
568-
#[derive(Debug, Clone, Copy)]
569-
enum PushdownPhase {
570-
Static,
571-
Dynamic,
572-
}

datafusion/physical-optimizer/src/optimizer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl PhysicalOptimizer {
9999
// The FilterPushdown rule tries to push down filters as far as it can.
100100
// For example, it will push down filtering from a `FilterExec` to
101101
// a `DataSourceExec`.
102-
Arc::new(FilterPushdown::new_static()),
102+
Arc::new(FilterPushdown::new_pre_optimization()),
103103
// The EnforceDistribution rule is for adding essential repartitioning to satisfy distribution
104104
// requirements. Please make sure that the whole plan tree is determined before this rule.
105105
// This rule increases parallelism if doing so is beneficial to the physical plan; i.e. at
@@ -132,7 +132,7 @@ impl PhysicalOptimizer {
132132
// past operators that support limit pushdown.
133133
Arc::new(LimitPushdown::new()),
134134
// This FilterPushdown handles dynamic filters that may have references to the source ExecutionPlan
135-
Arc::new(FilterPushdown::new_dynamic()),
135+
Arc::new(FilterPushdown::new_post_optimization()),
136136
// The ProjectionPushdown rule tries to push projections towards
137137
// the sources in the execution plan. As a result of this process,
138138
// a projection can disappear if it reaches the source providers, and

datafusion/physical-plan/src/coalesce_batches.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ use datafusion_physical_expr::PhysicalExpr;
3737
use crate::coalesce::{BatchCoalescer, CoalescerState};
3838
use crate::execution_plan::CardinalityEffect;
3939
use crate::filter_pushdown::{
40-
ChildPushdownResult, FilterDescription, FilterPushdownPropagation,
40+
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
41+
FilterPushdownPropagation,
4142
};
4243
use datafusion_common::config::ConfigOptions;
4344
use futures::ready;
@@ -227,17 +228,9 @@ impl ExecutionPlan for CoalesceBatchesExec {
227228
CardinalityEffect::Equal
228229
}
229230

230-
fn gather_static_filters_for_pushdown(
231-
&self,
232-
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
233-
_config: &ConfigOptions,
234-
) -> Result<FilterDescription> {
235-
Ok(FilterDescription::new_with_child_count(1)
236-
.all_parent_filters_supported(parent_filters))
237-
}
238-
239-
fn gather_dynamic_filters_for_pushdown(
231+
fn gather_filters_for_pushdown(
240232
&self,
233+
_phase: FilterPushdownPhase,
241234
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
242235
_config: &ConfigOptions,
243236
) -> Result<FilterDescription> {
@@ -247,6 +240,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
247240

248241
fn handle_child_pushdown_result(
249242
&self,
243+
_phase: FilterPushdownPhase,
250244
child_pushdown_result: ChildPushdownResult,
251245
_config: &ConfigOptions,
252246
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {

datafusion/physical-plan/src/execution_plan.rs

Lines changed: 5 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay};
1919
use crate::filter_pushdown::{
20-
ChildPushdownResult, FilterDescription, FilterPushdownPropagation,
20+
ChildPushdownResult, FilterDescription, FilterPushdownPhase,
21+
FilterPushdownPropagation,
2122
};
2223
pub use crate::metrics::Metric;
2324
pub use crate::ordering::InputOrderMode;
@@ -491,41 +492,6 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
491492
Ok(None)
492493
}
493494

494-
/// Collect any *dynamic* filters that this node can push down to its children.
495-
/// The return value is expected a vector of vectors, where each inner vector
496-
/// represents a set of filters that can be pushed down to a single child and the length of the outer vector
497-
/// is equal to the number of children of this node.
498-
///
499-
/// Dynamic filters are filters that have references to this node's internal state, or otherwise
500-
/// depend on the shape the the execution plan tree.
501-
/// This is different than static filters (see [`ExecutionPlan::gather_filters_for_pushdown`])
502-
/// which can be pushed down from a node and that node is later removed from the plan, or where the plan tree may be
503-
/// arbitrarily modified by other optimizations.
504-
/// Dynamic filter pushdown does not afford the ability to later edit the plan tree (e.g. to remove the current node) so it is allowed
505-
/// for filters to have references to specific nodes in the plan tree.
506-
/// Combinations of these requirements (e.g. pushing down dynamic filters and editing the plan tree or manipulating parent filters)
507-
/// are not supported.
508-
///
509-
/// The default implementation returns an empty vector, meaning that no dynamic filters are pushed down.
510-
///
511-
/// This is currently used by `SortExec` when it is used to implement a `TopK` operation to
512-
/// push down a filter representing the current state of the heap for early pruning during the scan phase.
513-
/// It is planned to be used by `HashJoinExec` to push down a filter representing the hash table
514-
/// to the scan side of the join.
515-
///
516-
/// Since this does not perform any modifications to the plan tree it is called late in the optimization phase
517-
/// once the plan tree is finalized.
518-
fn gather_dynamic_filters_for_pushdown(
519-
&self,
520-
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
521-
_config: &ConfigOptions,
522-
) -> Result<FilterDescription> {
523-
Ok(
524-
FilterDescription::new_with_child_count(self.children().len())
525-
.all_parent_filters_unsupported(parent_filters),
526-
)
527-
}
528-
529495
/// Collect filters that this node can push down to its children.
530496
/// Filters that are being pushed down from parents are passed in,
531497
/// and the node may generate additional filters to push down.
@@ -547,8 +513,9 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
547513
///
548514
/// Since this may perform deep modifications to the plan tree it is called early in the optimization phase
549515
/// and is not expected to be called multiple times on the same plan.
550-
fn gather_static_filters_for_pushdown(
516+
fn gather_filters_for_pushdown(
551517
&self,
518+
_phase: FilterPushdownPhase,
552519
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
553520
_config: &ConfigOptions,
554521
) -> Result<FilterDescription> {
@@ -590,6 +557,7 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
590557
/// [`PredicateSupports::new_with_supported_check`]: crate::filter_pushdown::PredicateSupports::new_with_supported_check
591558
fn handle_child_pushdown_result(
592559
&self,
560+
_phase: FilterPushdownPhase,
593561
child_pushdown_result: ChildPushdownResult,
594562
_config: &ConfigOptions,
595563
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {

0 commit comments

Comments
 (0)