Skip to content

Commit 079e1f5

Browse files
committed
refactor filter pushdown apis
1 parent 9730404 commit 079e1f5

10 files changed

Lines changed: 544 additions & 390 deletions

File tree

datafusion/core/tests/physical_optimizer/push_down_filter.rs

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,7 @@ use datafusion_physical_expr_common::physical_expr::fmt_sql;
4747
use datafusion_physical_optimizer::push_down_filter::PushdownFilter;
4848
use datafusion_physical_optimizer::PhysicalOptimizerRule;
4949
use datafusion_physical_plan::filter_pushdown::{
50-
filter_pushdown_not_supported, FilterDescription, FilterPushdownResult,
51-
FilterPushdownSupport,
50+
FilterPushdownPropagation, FilterPushdowns,
5251
};
5352
use datafusion_physical_plan::{
5453
aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
@@ -154,29 +153,25 @@ impl FileSource for TestSource {
154153

155154
fn try_pushdown_filters(
156155
&self,
157-
mut fd: FilterDescription,
156+
filters: &[Arc<dyn PhysicalExpr>],
158157
config: &ConfigOptions,
159-
) -> Result<FilterPushdownResult<Arc<dyn FileSource>>> {
158+
) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
159+
let mut all_filters = filters.iter().map(Arc::clone).collect::<Vec<_>>();
160160
if self.support && config.execution.parquet.pushdown_filters {
161161
if let Some(internal) = self.predicate.as_ref() {
162-
fd.filters.push(Arc::clone(internal));
162+
all_filters.push(Arc::clone(internal));
163163
}
164-
let all_filters = fd.take_description();
165-
166-
Ok(FilterPushdownResult {
167-
support: FilterPushdownSupport::Supported {
168-
child_descriptions: vec![],
169-
op: Arc::new(TestSource {
170-
support: true,
171-
predicate: Some(conjunction(all_filters)),
172-
statistics: self.statistics.clone(), // should be updated in reality
173-
}),
174-
revisit: false,
175-
},
176-
remaining_description: FilterDescription::empty(),
164+
let new_node = Arc::new(TestSource {
165+
support: true,
166+
predicate: Some(conjunction(all_filters.clone())),
167+
statistics: self.statistics.clone(), // should be updated in reality
168+
});
169+
Ok(FilterPushdownPropagation {
170+
parent_filter_result: FilterPushdowns::all_supported(&all_filters),
171+
new_node: Some(new_node),
177172
})
178173
} else {
179-
Ok(filter_pushdown_not_supported(fd))
174+
Ok(FilterPushdownPropagation::unsupported(filters))
180175
}
181176
}
182177
}

datafusion/datasource/src/file.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,8 @@ use crate::file_stream::FileOpener;
2828
use arrow::datatypes::SchemaRef;
2929
use datafusion_common::config::ConfigOptions;
3030
use datafusion_common::{Result, Statistics};
31-
use datafusion_physical_expr::LexOrdering;
32-
use datafusion_physical_plan::filter_pushdown::{
33-
filter_pushdown_not_supported, FilterDescription, FilterPushdownResult,
34-
};
31+
use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
32+
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
3533
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
3634
use datafusion_physical_plan::DisplayFormatType;
3735

@@ -104,9 +102,9 @@ pub trait FileSource: Send + Sync {
104102
/// [`ExecutionPlan::try_pushdown_filters`]: datafusion_physical_plan::ExecutionPlan::try_pushdown_filters
105103
fn try_pushdown_filters(
106104
&self,
107-
fd: FilterDescription,
105+
filters: &[Arc<dyn PhysicalExpr>],
108106
_config: &ConfigOptions,
109-
) -> Result<FilterPushdownResult<Arc<dyn FileSource>>> {
110-
Ok(filter_pushdown_not_supported(fd))
107+
) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
108+
Ok(FilterPushdownPropagation::unsupported(filters))
111109
}
112110
}

datafusion/datasource/src/file_scan_config.rs

Lines changed: 58 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,12 @@ use datafusion_common::{DataFusionError, ScalarValue};
4848
use datafusion_execution::{
4949
object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
5050
};
51+
use datafusion_physical_expr::PhysicalExpr;
5152
use datafusion_physical_expr::{
5253
expressions::Column, EquivalenceProperties, LexOrdering, Partitioning,
5354
PhysicalSortExpr,
5455
};
55-
use datafusion_physical_plan::filter_pushdown::{
56-
filter_pushdown_not_supported, FilterDescription, FilterPushdownResult,
57-
FilterPushdownSupport,
58-
};
56+
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
5957
use datafusion_physical_plan::{
6058
display::{display_orderings, ProjectSchemaDisplay},
6159
metrics::ExecutionPlanMetricsSet,
@@ -596,43 +594,68 @@ impl DataSource for FileScanConfig {
596594

597595
fn try_pushdown_filters(
598596
&self,
599-
fd: FilterDescription,
597+
filters: &[Arc<dyn PhysicalExpr>],
600598
config: &ConfigOptions,
601-
) -> Result<FilterPushdownResult<Arc<dyn DataSource>>> {
602-
let FilterPushdownResult {
603-
support,
604-
remaining_description,
605-
} = self.file_source.try_pushdown_filters(fd, config)?;
606-
607-
match support {
608-
FilterPushdownSupport::Supported {
609-
child_descriptions,
610-
op,
611-
revisit,
612-
} => {
613-
let new_data_source = Arc::new(
614-
FileScanConfigBuilder::from(self.clone())
615-
.with_source(op)
616-
.build(),
617-
);
618-
619-
debug_assert!(child_descriptions.is_empty());
620-
debug_assert!(!revisit);
621-
622-
Ok(FilterPushdownResult {
623-
support: FilterPushdownSupport::Supported {
624-
child_descriptions,
625-
op: new_data_source,
626-
revisit,
627-
},
628-
remaining_description,
599+
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
600+
let result = self.file_source.try_pushdown_filters(filters, config)?;
601+
match result.new_node {
602+
Some(new_node) => {
603+
let mut new_data_source = self.clone();
604+
new_data_source.file_source = new_node;
605+
Ok(FilterPushdownPropagation {
606+
parent_filter_result: result.parent_filter_result,
607+
new_node: Some(Arc::new(new_data_source) as _),
629608
})
630609
}
631-
FilterPushdownSupport::NotSupported => {
632-
Ok(filter_pushdown_not_supported(remaining_description))
610+
None => {
611+
// If the file source does not support filter pushdown, return the original config
612+
Ok(FilterPushdownPropagation {
613+
parent_filter_result: result.parent_filter_result,
614+
new_node: None,
615+
})
633616
}
634617
}
635618
}
619+
620+
// fn try_pushdown_filters(
621+
// &self,
622+
// parent_filters: &[Arc<dyn PhysicalExpr>],
623+
// config: &ConfigOptions,
624+
// ) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
625+
// // let FilterPushdownResult {
626+
// // support,
627+
// // remaining_description,
628+
// // } = self.file_source.try_pushdown_filters(fd, config)?;
629+
630+
// // match support {
631+
// // FilterPushdownSupport::Supported {
632+
// // child_descriptions,
633+
// // op,
634+
// // revisit,
635+
// // } => {
636+
// // let new_data_source = Arc::new(
637+
// // FileScanConfigBuilder::from(self.clone())
638+
// // .with_source(op)
639+
// // .build(),
640+
// // );
641+
642+
// // debug_assert!(child_descriptions.is_empty());
643+
// // debug_assert!(!revisit);
644+
645+
// // Ok(FilterPushdownResult {
646+
// // support: FilterPushdownSupport::Supported {
647+
// // child_descriptions,
648+
// // op: new_data_source,
649+
// // revisit,
650+
// // },
651+
// // remaining_description,
652+
// // })
653+
// // }
654+
// // FilterPushdownSupport::NotSupported => {
655+
// // Ok(filter_pushdown_not_supported(remaining_description))
656+
// // }
657+
// // }
658+
// }
636659
}
637660

638661
impl FileScanConfig {

datafusion/datasource/src/source.rs

Lines changed: 26 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,10 @@ use crate::file_scan_config::FileScanConfig;
3333
use datafusion_common::config::ConfigOptions;
3434
use datafusion_common::{Constraints, Result, Statistics};
3535
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
36-
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
36+
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
3737
use datafusion_physical_expr_common::sort_expr::LexOrdering;
3838
use datafusion_physical_plan::filter_pushdown::{
39-
filter_pushdown_not_supported, FilterDescription, FilterPushdownResult,
40-
FilterPushdownSupport,
39+
ChildPushdownResult, FilterPushdownPropagation,
4140
};
4241

4342
/// Common behaviors in Data Sources for both from Files and Memory.
@@ -87,10 +86,10 @@ pub trait DataSource: Send + Sync + Debug {
8786
/// See [`ExecutionPlan::try_pushdown_filters`] for more details.
8887
fn try_pushdown_filters(
8988
&self,
90-
fd: FilterDescription,
89+
filters: &[Arc<dyn PhysicalExpr>],
9190
_config: &ConfigOptions,
92-
) -> Result<FilterPushdownResult<Arc<dyn DataSource>>> {
93-
Ok(filter_pushdown_not_supported(fd))
91+
) -> Result<FilterPushdownPropagation<Arc<dyn DataSource>>> {
92+
Ok(FilterPushdownPropagation::unsupported(filters))
9493
}
9594
}
9695

@@ -206,39 +205,31 @@ impl ExecutionPlan for DataSourceExec {
206205
self.data_source.try_swapping_with_projection(projection)
207206
}
208207

209-
fn try_pushdown_filters(
208+
fn handle_child_pushdown_result(
210209
&self,
211-
fd: FilterDescription,
210+
child_pushdown_result: ChildPushdownResult,
212211
config: &ConfigOptions,
213-
) -> Result<FilterPushdownResult<Arc<dyn ExecutionPlan>>> {
214-
let FilterPushdownResult {
215-
support,
216-
remaining_description,
217-
} = self.data_source.try_pushdown_filters(fd, config)?;
218-
219-
match support {
220-
FilterPushdownSupport::Supported {
221-
child_descriptions,
222-
op,
223-
revisit,
224-
} => {
225-
let new_exec = Arc::new(DataSourceExec::new(op));
226-
227-
debug_assert!(child_descriptions.is_empty());
228-
debug_assert!(!revisit);
229-
230-
Ok(FilterPushdownResult {
231-
support: FilterPushdownSupport::Supported {
232-
child_descriptions,
233-
op: new_exec,
234-
revisit,
235-
},
236-
remaining_description,
212+
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
213+
// Push any remaining filters into our data source
214+
let res = self.data_source.try_pushdown_filters(
215+
&child_pushdown_result.parent_filters.unpack(),
216+
config,
217+
)?;
218+
match res.new_node {
219+
Some(data_source) => {
220+
let mut new_node = self.clone();
221+
new_node.data_source = data_source;
222+
new_node.cache =
223+
Self::compute_properties(Arc::clone(&new_node.data_source));
224+
Ok(FilterPushdownPropagation {
225+
parent_filter_result: res.parent_filter_result,
226+
new_node: Some(Arc::new(new_node)),
237227
})
238228
}
239-
FilterPushdownSupport::NotSupported => {
240-
Ok(filter_pushdown_not_supported(remaining_description))
241-
}
229+
None => Ok(FilterPushdownPropagation {
230+
parent_filter_result: res.parent_filter_result,
231+
new_node: None,
232+
}),
242233
}
243234
}
244235
}

0 commit comments

Comments
 (0)