diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 3ace3e14ec25..4b6d8f274932 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -179,6 +179,10 @@ name = "csv_load" harness = false name = "distinct_query_sql" +[[bench]] +harness = false +name = "push_down_filter" + [[bench]] harness = false name = "sort_limit_query_sql" diff --git a/datafusion/core/benches/push_down_filter.rs b/datafusion/core/benches/push_down_filter.rs new file mode 100644 index 000000000000..92de1711a9e8 --- /dev/null +++ b/datafusion/core/benches/push_down_filter.rs @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::RecordBatch; +use arrow::datatypes::{DataType, Field, Schema}; +use bytes::{BufMut, BytesMut}; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use datafusion::config::ConfigOptions; +use datafusion::prelude::{ParquetReadOptions, SessionContext}; +use datafusion_execution::object_store::ObjectStoreUrl; +use datafusion_physical_optimizer::filter_pushdown::FilterPushdown; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::ExecutionPlan; +use object_store::memory::InMemory; +use object_store::path::Path; +use object_store::ObjectStore; +use parquet::arrow::ArrowWriter; +use std::sync::Arc; + +async fn create_plan() -> Arc { + let ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("name", DataType::Utf8, true), + Field::new("age", DataType::UInt16, true), + Field::new("salary", DataType::Float64, true), + ])); + let batch = RecordBatch::new_empty(schema); + + let store = Arc::new(InMemory::new()) as Arc; + let mut out = BytesMut::new().writer(); + { + let mut writer = ArrowWriter::try_new(&mut out, batch.schema(), None).unwrap(); + writer.write(&batch).unwrap(); + writer.finish().unwrap(); + } + let data = out.into_inner().freeze(); + store + .put(&Path::from("test.parquet"), data.into()) + .await + .unwrap(); + ctx.register_object_store( + ObjectStoreUrl::parse("memory://").unwrap().as_ref(), + store, + ); + + ctx.register_parquet("t", "memory:///", ParquetReadOptions::default()) + .await + .unwrap(); + + let df = ctx + .sql( + r" + WITH brackets AS ( + SELECT age % 10 AS age_bracket + FROM t + GROUP BY age % 10 + HAVING COUNT(*) > 10 + ) + SELECT id, name, age, salary + FROM t + JOIN brackets ON t.age % 10 = brackets.age_bracket + WHERE age > 20 AND t.salary > 1000 + ORDER BY t.salary DESC + LIMIT 100 + ", + ) + .await + .unwrap(); + + df.create_physical_plan().await.unwrap() +} + +#[derive(Clone)] +struct BenchmarkPlan { + plan: Arc, + config: ConfigOptions, +} + +impl std::fmt::Display for BenchmarkPlan { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "BenchmarkPlan") + } +} + +fn bench_push_down_filter(c: &mut Criterion) { + // Create a relatively complex plan + let plan = tokio::runtime::Runtime::new() + .unwrap() + .block_on(create_plan()); + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + let plan = BenchmarkPlan { plan, config }; + + c.bench_with_input( + BenchmarkId::new("push_down_filter", plan.clone()), + &plan, + |b, plan| { + b.iter(|| { + let optimizer = FilterPushdown::new(); + optimizer + .optimize(Arc::clone(&plan.plan), &plan.config) + .unwrap(); + }); + }, + ); +} + +criterion_group!(benches, bench_push_down_filter); +criterion_main!(benches); diff --git a/datafusion/core/tests/physical_optimizer/push_down_filter.rs b/datafusion/core/tests/physical_optimizer/push_down_filter.rs index b19144f1bcff..326a7b837e7a 100644 --- a/datafusion/core/tests/physical_optimizer/push_down_filter.rs +++ b/datafusion/core/tests/physical_optimizer/push_down_filter.rs @@ -44,11 +44,10 @@ use datafusion_physical_expr::{ aggregate::AggregateExprBuilder, conjunction, Partitioning, }; use datafusion_physical_expr_common::physical_expr::fmt_sql; -use datafusion_physical_optimizer::push_down_filter::PushdownFilter; +use datafusion_physical_optimizer::filter_pushdown::FilterPushdown; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::filter_pushdown::{ - filter_pushdown_not_supported, FilterDescription, FilterPushdownResult, - FilterPushdownSupport, + FilterPushdownPropagation, PredicateSupports, }; use datafusion_physical_plan::{ aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}, @@ -154,29 +153,24 @@ impl FileSource for TestSource { fn try_pushdown_filters( &self, - mut fd: FilterDescription, + mut filters: Vec>, config: &ConfigOptions, - ) -> Result>> { + ) -> Result>> { if self.support && config.execution.parquet.pushdown_filters { if let Some(internal) = self.predicate.as_ref() { - fd.filters.push(Arc::clone(internal)); + filters.push(Arc::clone(internal)); } - let all_filters = fd.take_description(); - - Ok(FilterPushdownResult { - support: FilterPushdownSupport::Supported { - child_descriptions: vec![], - op: Arc::new(TestSource { - support: true, - predicate: Some(conjunction(all_filters)), - statistics: self.statistics.clone(), // should be updated in reality - }), - revisit: false, - }, - remaining_description: FilterDescription::empty(), + let new_node = Arc::new(TestSource { + support: true, + predicate: Some(conjunction(filters.clone())), + statistics: self.statistics.clone(), // should be updated in reality + }); + Ok(FilterPushdownPropagation { + filters: PredicateSupports::all_supported(filters), + updated_node: Some(new_node), }) } else { - Ok(filter_pushdown_not_supported(fd)) + Ok(FilterPushdownPropagation::unsupported(filters)) } } } @@ -201,7 +195,7 @@ fn test_pushdown_into_scan() { // expect the predicate to be pushed down into the DataSource insta::assert_snapshot!( - OptimizationTest::new(plan, PushdownFilter{}, true), + OptimizationTest::new(plan, FilterPushdown{}, true), @r" OptimizationTest: input: @@ -225,7 +219,7 @@ fn test_pushdown_into_scan_with_config_options() { insta::assert_snapshot!( OptimizationTest::new( Arc::clone(&plan), - PushdownFilter {}, + FilterPushdown {}, false ), @r" @@ -244,7 +238,7 @@ fn test_pushdown_into_scan_with_config_options() { insta::assert_snapshot!( OptimizationTest::new( plan, - PushdownFilter {}, + FilterPushdown {}, true ), @r" @@ -269,7 +263,7 @@ fn test_filter_collapse() { let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap()); insta::assert_snapshot!( - OptimizationTest::new(plan, PushdownFilter{}, true), + OptimizationTest::new(plan, FilterPushdown{}, true), @r" OptimizationTest: input: @@ -278,7 +272,7 @@ fn test_filter_collapse() { - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true output: Ok: - - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = bar AND a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar " ); } @@ -288,25 +282,28 @@ fn test_filter_with_projection() { let scan = test_scan(true); let projection = vec![1, 0]; let predicate = col_lit_predicate("a", "foo", schema()); - let plan = Arc::new( - FilterExec::try_new(predicate, Arc::clone(&scan)) + let filter = Arc::new( + FilterExec::try_new(Arc::clone(&predicate), Arc::clone(&scan)) .unwrap() .with_projection(Some(projection)) .unwrap(), ); + let predicate = col_lit_predicate("b", "bar", &filter.schema()); + let plan = Arc::new(FilterExec::try_new(predicate, filter).unwrap()); // expect the predicate to be pushed down into the DataSource but the FilterExec to be converted to ProjectionExec insta::assert_snapshot!( - OptimizationTest::new(plan, PushdownFilter{}, true), + OptimizationTest::new(plan, FilterPushdown{}, true), @r" OptimizationTest: input: - - FilterExec: a@0 = foo, projection=[b@1, a@0] - - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true + - FilterExec: b@0 = bar + - FilterExec: a@0 = foo, projection=[b@1, a@0] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true output: Ok: - ProjectionExec: expr=[b@1 as b, a@0 as a] - - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar ", ); @@ -320,7 +317,7 @@ fn test_filter_with_projection() { .unwrap(), ); insta::assert_snapshot!( - OptimizationTest::new(plan, PushdownFilter{},true), + OptimizationTest::new(plan, FilterPushdown{},true), @r" OptimizationTest: input: @@ -349,7 +346,7 @@ fn test_push_down_through_transparent_nodes() { // expect the predicate to be pushed down into the DataSource insta::assert_snapshot!( - OptimizationTest::new(plan, PushdownFilter{},true), + OptimizationTest::new(plan, FilterPushdown{},true), @r" OptimizationTest: input: @@ -362,7 +359,7 @@ fn test_push_down_through_transparent_nodes() { Ok: - RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=0 - CoalesceBatchesExec: target_batch_size=1 - - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = bar AND a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar " ); } @@ -413,7 +410,7 @@ fn test_no_pushdown_through_aggregates() { // expect the predicate to be pushed down into the DataSource insta::assert_snapshot!( - OptimizationTest::new(plan, PushdownFilter{}, true), + OptimizationTest::new(plan, FilterPushdown{}, true), @r" OptimizationTest: input: diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 6bbe9e50d3b0..c9b5c416f0c0 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -28,10 +28,8 @@ use crate::file_stream::FileOpener; use arrow::datatypes::SchemaRef; use datafusion_common::config::ConfigOptions; use datafusion_common::{Result, Statistics}; -use datafusion_physical_expr::LexOrdering; -use datafusion_physical_plan::filter_pushdown::{ - filter_pushdown_not_supported, FilterDescription, FilterPushdownResult, -}; +use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; +use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::DisplayFormatType; @@ -108,14 +106,14 @@ pub trait FileSource: Send + Sync { } /// Try to push down filters into this FileSource. - /// See [`ExecutionPlan::try_pushdown_filters`] for more details. + /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details. /// - /// [`ExecutionPlan::try_pushdown_filters`]: datafusion_physical_plan::ExecutionPlan::try_pushdown_filters + /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result fn try_pushdown_filters( &self, - fd: FilterDescription, + filters: Vec>, _config: &ConfigOptions, - ) -> Result>> { - Ok(filter_pushdown_not_supported(fd)) + ) -> Result>> { + Ok(FilterPushdownPropagation::unsupported(filters)) } } diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index fb756cc11fbb..ae94af5a7b26 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -48,14 +48,12 @@ use datafusion_common::{DataFusionError, ScalarValue}; use datafusion_execution::{ object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext, }; +use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::{ expressions::Column, EquivalenceProperties, LexOrdering, Partitioning, PhysicalSortExpr, }; -use datafusion_physical_plan::filter_pushdown::{ - filter_pushdown_not_supported, FilterDescription, FilterPushdownResult, - FilterPushdownSupport, -}; +use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; use datafusion_physical_plan::{ display::{display_orderings, ProjectSchemaDisplay}, metrics::ExecutionPlanMetricsSet, @@ -596,40 +594,29 @@ impl DataSource for FileScanConfig { fn try_pushdown_filters( &self, - fd: FilterDescription, + filters: Vec>, config: &ConfigOptions, - ) -> Result>> { - let FilterPushdownResult { - support, - remaining_description, - } = self.file_source.try_pushdown_filters(fd, config)?; - - match support { - FilterPushdownSupport::Supported { - child_descriptions, - op, - revisit, - } => { - let new_data_source = Arc::new( - FileScanConfigBuilder::from(self.clone()) - .with_source(op) - .build(), - ); - - debug_assert!(child_descriptions.is_empty()); - debug_assert!(!revisit); - - Ok(FilterPushdownResult { - support: FilterPushdownSupport::Supported { - child_descriptions, - op: new_data_source, - revisit, - }, - remaining_description, + ) -> Result>> { + let result = self.file_source.try_pushdown_filters(filters, config)?; + match result.updated_node { + Some(new_file_source) => { + let file_scan_config = FileScanConfigBuilder::new( + self.object_store_url.clone(), + Arc::clone(&self.file_schema), + new_file_source, + ) + .build(); + Ok(FilterPushdownPropagation { + filters: result.filters, + updated_node: Some(Arc::new(file_scan_config) as _), }) } - FilterPushdownSupport::NotSupported => { - Ok(filter_pushdown_not_supported(remaining_description)) + None => { + // If the file source does not support filter pushdown, return the original config + Ok(FilterPushdownPropagation { + filters: result.filters, + updated_node: None, + }) } } } diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 022f77f2e421..cf42347e3aba 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -33,11 +33,10 @@ use crate::file_scan_config::FileScanConfig; use datafusion_common::config::ConfigOptions; use datafusion_common::{Constraints, Result, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; -use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use datafusion_physical_plan::filter_pushdown::{ - filter_pushdown_not_supported, FilterDescription, FilterPushdownResult, - FilterPushdownSupport, + ChildPushdownResult, FilterPushdownPropagation, }; /// A source of data, typically a list of files or memory @@ -95,13 +94,15 @@ pub trait DataSource: Send + Sync + Debug { _projection: &ProjectionExec, ) -> Result>>; /// Try to push down filters into this DataSource. - /// See [`ExecutionPlan::try_pushdown_filters`] for more details. + /// See [`ExecutionPlan::handle_child_pushdown_result`] for more details. + /// + /// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result fn try_pushdown_filters( &self, - fd: FilterDescription, + filters: Vec>, _config: &ConfigOptions, - ) -> Result>> { - Ok(filter_pushdown_not_supported(fd)) + ) -> Result>> { + Ok(FilterPushdownPropagation::unsupported(filters)) } } @@ -237,39 +238,31 @@ impl ExecutionPlan for DataSourceExec { self.data_source.try_swapping_with_projection(projection) } - fn try_pushdown_filters( + fn handle_child_pushdown_result( &self, - fd: FilterDescription, + child_pushdown_result: ChildPushdownResult, config: &ConfigOptions, - ) -> Result>> { - let FilterPushdownResult { - support, - remaining_description, - } = self.data_source.try_pushdown_filters(fd, config)?; - - match support { - FilterPushdownSupport::Supported { - child_descriptions, - op, - revisit, - } => { - let new_exec = Arc::new(DataSourceExec::new(op)); - - debug_assert!(child_descriptions.is_empty()); - debug_assert!(!revisit); - - Ok(FilterPushdownResult { - support: FilterPushdownSupport::Supported { - child_descriptions, - op: new_exec, - revisit, - }, - remaining_description, + ) -> Result>> { + // Push any remaining filters into our data source + let res = self.data_source.try_pushdown_filters( + child_pushdown_result.parent_filters.collect_all(), + config, + )?; + match res.updated_node { + Some(data_source) => { + let mut new_node = self.clone(); + new_node.data_source = data_source; + new_node.cache = + Self::compute_properties(Arc::clone(&new_node.data_source)); + Ok(FilterPushdownPropagation { + filters: res.filters, + updated_node: Some(Arc::new(new_node)), }) } - FilterPushdownSupport::NotSupported => { - Ok(filter_pushdown_not_supported(remaining_description)) - } + None => Ok(FilterPushdownPropagation { + filters: res.filters, + updated_node: None, + }), } } } diff --git a/datafusion/physical-optimizer/src/push_down_filter.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs similarity index 67% rename from datafusion/physical-optimizer/src/push_down_filter.rs rename to datafusion/physical-optimizer/src/filter_pushdown.rs index 80201454d06d..6c445458b51b 100644 --- a/datafusion/physical-optimizer/src/push_down_filter.rs +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -19,25 +19,25 @@ use std::sync::Arc; use crate::PhysicalOptimizerRule; -use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion}; use datafusion_common::{config::ConfigOptions, Result}; -use datafusion_physical_expr::conjunction; -use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::filter_pushdown::{ - FilterDescription, FilterPushdownResult, FilterPushdownSupport, + ChildPushdownResult, FilterPushdownPropagation, PredicateSupport, PredicateSupports, }; -use datafusion_physical_plan::tree_node::PlanContext; -use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::{with_new_children_if_necessary, ExecutionPlan}; + +use itertools::izip; /// Attempts to recursively push given filters from the top of the tree into leafs. /// /// # Default Implementation /// -/// The default implementation in [`ExecutionPlan::try_pushdown_filters`] is a no-op -/// that assumes that: +/// The default implementation in [`ExecutionPlan::gather_filters_for_pushdown`] +/// and [`ExecutionPlan::handle_child_pushdown_result`] assumes that: /// -/// * Parent filters can't be passed onto children. -/// * This node has no filters to contribute. +/// * Parent filters can't be passed onto children (determined by [`ExecutionPlan::gather_filters_for_pushdown`]) +/// * This node has no filters to contribute (determined by [`ExecutionPlan::gather_filters_for_pushdown`]). +/// * Any filters that could not be pushed down to the children are marked as unsupported (determined by [`ExecutionPlan::handle_child_pushdown_result`]). /// /// # Example: Push filter into a `DataSourceExec` /// @@ -240,7 +240,7 @@ use datafusion_physical_plan::ExecutionPlan; /// The point here is that: /// 1. We cannot push down `sum > 10` through the [`AggregateExec`] node into the `DataSourceExec` node. /// Any filters above the [`AggregateExec`] node are not pushed down. -/// This is determined by calling [`ExecutionPlan::try_pushdown_filters`] on the [`AggregateExec`] node. +/// This is determined by calling [`ExecutionPlan::gather_filters_for_pushdown`] on the [`AggregateExec`] node. /// 2. We need to keep recursing into the tree so that we can discover the other [`FilterExec`] node and push /// down the `id=1` filter. /// @@ -362,47 +362,29 @@ use datafusion_physical_plan::ExecutionPlan; /// [`ProjectionExec`]: datafusion_physical_plan::projection::ProjectionExec /// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec #[derive(Debug)] -pub struct PushdownFilter {} +pub struct FilterPushdown {} -impl Default for PushdownFilter { +impl FilterPushdown { + pub fn new() -> Self { + Self {} + } +} + +impl Default for FilterPushdown { fn default() -> Self { Self::new() } } -pub type FilterDescriptionContext = PlanContext; - -impl PhysicalOptimizerRule for PushdownFilter { +impl PhysicalOptimizerRule for FilterPushdown { fn optimize( &self, plan: Arc, config: &ConfigOptions, ) -> Result> { - let context = FilterDescriptionContext::new_default(plan); - - context - .transform_up(|node| { - if node.plan.as_any().downcast_ref::().is_some() { - let initial_plan = Arc::clone(&node.plan); - let mut accept_updated = false; - let updated_node = node.transform_down(|filter_node| { - Self::try_pushdown(filter_node, config, &mut accept_updated) - }); - - if accept_updated { - updated_node - } else { - Ok(Transformed::no(FilterDescriptionContext::new_default( - initial_plan, - ))) - } - } - // Other filter introducing operators extends here - else { - Ok(Transformed::no(node)) - } - }) - .map(|updated| updated.data.plan) + Ok(push_down_filters(Arc::clone(&plan), vec![], config)? + .updated_node + .unwrap_or(plan)) } fn name(&self) -> &str { @@ -414,122 +396,146 @@ impl PhysicalOptimizerRule for PushdownFilter { } } -impl PushdownFilter { - pub fn new() -> Self { - Self {} - } +/// Support state of each predicate for the children of the node. +/// These predicates are coming from the parent node. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum ParentPredicateStates { + NoChildren, + Unsupported, + Supported, +} - fn try_pushdown( - mut node: FilterDescriptionContext, - config: &ConfigOptions, - accept_updated: &mut bool, - ) -> Result> { - let initial_description = FilterDescription { - filters: node.data.take_description(), - }; +fn push_down_filters( + node: Arc, + parent_predicates: Vec>, + config: &ConfigOptions, +) -> Result>> { + // If the node has any child, these will be rewritten as supported or unsupported + let mut parent_predicates_pushdown_states = + vec![ParentPredicateStates::NoChildren; parent_predicates.len()]; + let mut self_filters_pushdown_supports = vec![]; + let mut new_children = vec![]; - let FilterPushdownResult { - support, - remaining_description, - } = node - .plan - .try_pushdown_filters(initial_description, config)?; + let children = node.children(); + let filter_description = + node.gather_filters_for_pushdown(parent_predicates.clone(), config)?; - match support { - FilterPushdownSupport::Supported { - mut child_descriptions, - op, - revisit, - } => { - if revisit { - // This check handles cases where the current operator is entirely removed - // from the plan and replaced with its child. In such cases, to not skip - // over the new node, we need to explicitly re-apply this pushdown logic - // to the new node. - // - // TODO: If TreeNodeRecursion supports a Revisit mechanism in the future, - // this manual recursion could be removed. + for (child, parent_filters, self_filters) in izip!( + children, + filter_description.parent_filters(), + filter_description.self_filters() + ) { + // Here, `parent_filters` are the predicates which are provided by the parent node of + // the current node, and tried to be pushed down over the child which the loop points + // currently. `self_filters` are the predicates which are provided by the current node, + // and tried to be pushed down over the child similarly. - // If the operator is removed, it should not leave any filters as remaining - debug_assert!(remaining_description.filters.is_empty()); - // Operators having 2 children cannot be removed - debug_assert_eq!(child_descriptions.len(), 1); - debug_assert_eq!(node.children.len(), 1); + let num_self_filters = self_filters.len(); + let mut parent_supported_predicate_indices = vec![]; + let mut all_predicates = self_filters; - node.plan = op; - node.data = child_descriptions.swap_remove(0); - node.children = node.children.swap_remove(0).children; - Self::try_pushdown(node, config, accept_updated) - } else { - if remaining_description.filters.is_empty() { - // Filter can be pushed down safely - node.plan = op; - if node.children.is_empty() { - *accept_updated = true; - } else { - for (child, descr) in - node.children.iter_mut().zip(child_descriptions) - { - child.data = descr; - } - } - } else { - // Filter cannot be pushed down - node = insert_filter_exec( - node, - child_descriptions, - remaining_description, - )?; + // Iterate over each predicate coming from the parent + for (idx, filter) in parent_filters.into_iter().enumerate() { + // Check if we can push this filter down to our child. + // These supports are defined in `gather_filters_for_pushdown()` + match filter { + PredicateSupport::Supported(predicate) => { + // Queue this filter up for pushdown to this child + all_predicates.push(predicate); + parent_supported_predicate_indices.push(idx); + // Mark this filter as supported by our children if no child has marked it as unsupported + if parent_predicates_pushdown_states[idx] + != ParentPredicateStates::Unsupported + { + parent_predicates_pushdown_states[idx] = + ParentPredicateStates::Supported; } - Ok(Transformed::yes(node)) } - } - FilterPushdownSupport::NotSupported => { - if remaining_description.filters.is_empty() { - Ok(Transformed { - data: node, - transformed: false, - tnr: TreeNodeRecursion::Stop, - }) - } else { - node = insert_filter_exec( - node, - vec![FilterDescription::empty(); 1], - remaining_description, - )?; - Ok(Transformed { - data: node, - transformed: true, - tnr: TreeNodeRecursion::Stop, - }) + PredicateSupport::Unsupported(_) => { + // Mark as unsupported by our children + parent_predicates_pushdown_states[idx] = + ParentPredicateStates::Unsupported; } } } - } -} -fn insert_filter_exec( - node: FilterDescriptionContext, - mut child_descriptions: Vec, - remaining_description: FilterDescription, -) -> Result { - let mut new_child_node = node; + // Any filters that could not be pushed down to a child are marked as not-supported to our parents + let result = push_down_filters(Arc::clone(child), all_predicates, config)?; - // Filter has one child - if !child_descriptions.is_empty() { - debug_assert_eq!(child_descriptions.len(), 1); - new_child_node.data = child_descriptions.swap_remove(0); - } - let new_plan = Arc::new(FilterExec::try_new( - conjunction(remaining_description.filters), - Arc::clone(&new_child_node.plan), - )?); - let new_children = vec![new_child_node]; - let new_data = FilterDescription::empty(); + if let Some(new_child) = result.updated_node { + // If we have a filter pushdown result, we need to update our children + new_children.push(new_child); + } else { + // If we don't have a filter pushdown result, we need to update our children + new_children.push(Arc::clone(child)); + } + + // Our child doesn't know the difference between filters that were passed down + // from our parents and filters that the current node injected. We need to de-entangle + // this since we do need to distinguish between them. + let mut all_filters = result.filters.into_inner(); + let parent_predicates = all_filters.split_off(num_self_filters); + let self_predicates = all_filters; + self_filters_pushdown_supports.push(PredicateSupports::new(self_predicates)); - Ok(FilterDescriptionContext::new( - new_plan, - new_data, - new_children, - )) + for (idx, result) in parent_supported_predicate_indices + .iter() + .zip(parent_predicates) + { + let current_node_state = match result { + PredicateSupport::Supported(_) => ParentPredicateStates::Supported, + PredicateSupport::Unsupported(_) => ParentPredicateStates::Unsupported, + }; + match (current_node_state, parent_predicates_pushdown_states[*idx]) { + (r, ParentPredicateStates::NoChildren) => { + // If we have no result, use the current state from this child + parent_predicates_pushdown_states[*idx] = r; + } + (ParentPredicateStates::Supported, ParentPredicateStates::Supported) => { + // If the current child and all previous children are supported, + // the filter continues to support it + parent_predicates_pushdown_states[*idx] = + ParentPredicateStates::Supported; + } + _ => { + // Either the current child or a previous child marked this filter as unsupported + parent_predicates_pushdown_states[*idx] = + ParentPredicateStates::Unsupported; + } + } + } + } + // Re-create this node with new children + let updated_node = with_new_children_if_necessary(Arc::clone(&node), new_children)?; + // Remap the result onto the parent filters as they were given to us. + // Any filters that were not pushed down to any children are marked as unsupported. + let parent_pushdown_result = PredicateSupports::new( + parent_predicates_pushdown_states + .into_iter() + .zip(parent_predicates) + .map(|(state, filter)| match state { + ParentPredicateStates::NoChildren => { + PredicateSupport::Unsupported(filter) + } + ParentPredicateStates::Unsupported => { + PredicateSupport::Unsupported(filter) + } + ParentPredicateStates::Supported => PredicateSupport::Supported(filter), + }) + .collect(), + ); + // Check what the current node wants to do given the result of pushdown to it's children + let mut res = updated_node.handle_child_pushdown_result( + ChildPushdownResult { + parent_filters: parent_pushdown_result, + self_filters: self_filters_pushdown_supports, + }, + config, + )?; + // Compare pointers for new_node and node, if they are different we must replace + // ourselves because of changes in our children. + if res.updated_node.is_none() && !Arc::ptr_eq(&updated_node, &node) { + res.updated_node = Some(updated_node) + } + Ok(res) } diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index 57dac21b6eee..5a43d7118d63 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -29,6 +29,7 @@ pub mod coalesce_batches; pub mod combine_partial_final_agg; pub mod enforce_distribution; pub mod enforce_sorting; +pub mod filter_pushdown; pub mod join_selection; pub mod limit_pushdown; pub mod limited_distinct_aggregation; @@ -36,7 +37,6 @@ pub mod optimizer; pub mod output_requirements; pub mod projection_pushdown; pub mod pruning; -pub mod push_down_filter; pub mod sanity_checker; pub mod topk_aggregation; pub mod update_aggr_exprs; diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index d4ff7d6b9e15..78d3e2ad8873 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -25,12 +25,12 @@ use crate::coalesce_batches::CoalesceBatches; use crate::combine_partial_final_agg::CombinePartialFinalAggregate; use crate::enforce_distribution::EnforceDistribution; use crate::enforce_sorting::EnforceSorting; +use crate::filter_pushdown::FilterPushdown; use crate::join_selection::JoinSelection; use crate::limit_pushdown::LimitPushdown; use crate::limited_distinct_aggregation::LimitedDistinctAggregation; use crate::output_requirements::OutputRequirements; use crate::projection_pushdown::ProjectionPushdown; -use crate::push_down_filter::PushdownFilter; use crate::sanity_checker::SanityCheckPlan; use crate::topk_aggregation::TopKAggregation; use crate::update_aggr_exprs::OptimizeAggregateOrder; @@ -125,7 +125,7 @@ impl PhysicalOptimizer { // The FilterPushdown rule tries to push down filters as far as it can. // For example, it will push down filtering from a `FilterExec` to // a `DataSourceExec`, or from a `TopK`'s current state to a `DataSourceExec`. - Arc::new(PushdownFilter::new()), + Arc::new(FilterPushdown::new()), // The LimitPushdown rule tries to push limits down as far as possible, // replacing operators with fetching variants, or adding limits // past operators that support limit pushdown. diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 34b3f1b0241b..a0dd7371b4a0 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -32,11 +32,12 @@ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::Result; use datafusion_execution::TaskContext; +use datafusion_physical_expr::PhysicalExpr; use crate::coalesce::{BatchCoalescer, CoalescerState}; use crate::execution_plan::CardinalityEffect; use crate::filter_pushdown::{ - filter_pushdown_transparent, FilterDescription, FilterPushdownResult, + ChildPushdownResult, FilterDescription, FilterPushdownPropagation, }; use datafusion_common::config::ConfigOptions; use futures::ready; @@ -226,14 +227,22 @@ impl ExecutionPlan for CoalesceBatchesExec { CardinalityEffect::Equal } - fn try_pushdown_filters( + fn gather_filters_for_pushdown( &self, - fd: FilterDescription, + parent_filters: Vec>, _config: &ConfigOptions, - ) -> Result>> { - Ok(filter_pushdown_transparent::>( - Arc::new(self.clone()), - fd, + ) -> Result { + Ok(FilterDescription::new_with_child_count(1) + .all_parent_filters_supported(parent_filters)) + } + + fn handle_child_pushdown_result( + &self, + child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result>> { + Ok(FilterPushdownPropagation::transparent( + child_pushdown_result, )) } } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 9551c2b1743e..b81b3c8beeac 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -17,7 +17,7 @@ pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; use crate::filter_pushdown::{ - filter_pushdown_not_supported, FilterDescription, FilterPushdownResult, + ChildPushdownResult, FilterDescription, FilterPushdownPropagation, }; pub use crate::metrics::Metric; pub use crate::ordering::InputOrderMode; @@ -491,39 +491,60 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { Ok(None) } - /// Attempts to recursively push given filters from the top of the tree into leafs. - /// - /// This is used for various optimizations, such as: - /// - /// * Pushing down filters into scans in general to minimize the amount of data that needs to be materialzied. - /// * Pushing down dynamic filters from operators like TopK and Joins into scans. - /// - /// Generally the further down (closer to leaf nodes) that filters can be pushed, the better. - /// - /// Consider the case of a query such as `SELECT * FROM t WHERE a = 1 AND b = 2`. - /// With no filter pushdown the scan needs to read and materialize all the data from `t` and then filter based on `a` and `b`. - /// With filter pushdown into the scan it can first read only `a`, then `b` and keep track of - /// which rows match the filter. - /// Then only for rows that match the filter does it have to materialize the rest of the columns. - /// - /// # Default Implementation - /// - /// The default implementation assumes: - /// * Parent filters can't be passed onto children. - /// * This node has no filters to contribute. - /// - /// # Implementation Notes - /// - /// Most of the actual logic is implemented as a Physical Optimizer rule. - /// See [`PushdownFilter`] for more details. - /// - /// [`PushdownFilter`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/filter_pushdown/struct.PushdownFilter.html - fn try_pushdown_filters( + /// Collect filters that this node can push down to its children. + /// Filters that are being pushed down from parents are passed in, + /// and the node may generate additional filters to push down. + /// For example, given the plan FilterExec -> HashJoinExec -> DataSourceExec, + /// what will happen is that we recurse down the plan calling `ExecutionPlan::gather_filters_for_pushdown`: + /// 1. `FilterExec::gather_filters_for_pushdown` is called with no parent + /// filters so it only returns that `FilterExec` wants to push down its own predicate. + /// 2. `HashJoinExec::gather_filters_for_pushdown` is called with the filter from + /// `FilterExec`, which it only allows to push down to one side of the join (unless it's on the join key) + /// but it also adds its own filters (e.g. pushing down a bloom filter of the hash table to the scan side of the join). + /// 3. `DataSourceExec::gather_filters_for_pushdown` is called with both filters from `HashJoinExec` + /// and `FilterExec`, however `DataSourceExec::gather_filters_for_pushdown` doesn't actually do anything + /// since it has no children and no additional filters to push down. + /// It's only once [`ExecutionPlan::handle_child_pushdown_result`] is called on `DataSourceExec` as we recurse + /// up the plan that `DataSourceExec` can actually bind the filters. + /// + /// The default implementation bars all parent filters from being pushed down and adds no new filters. + /// This is the safest option, making filter pushdown opt-in on a per-node pasis. + fn gather_filters_for_pushdown( + &self, + parent_filters: Vec>, + _config: &ConfigOptions, + ) -> Result { + Ok( + FilterDescription::new_with_child_count(self.children().len()) + .all_parent_filters_unsupported(parent_filters), + ) + } + + /// Handle the result of a child pushdown. + /// This is called as we recurse back up the plan tree after recursing down and calling [`ExecutionPlan::gather_filters_for_pushdown`]. + /// Once we know what the result of pushing down filters into children is we ask the current node what it wants to do with that result. + /// For a `DataSourceExec` that may be absorbing the filters to apply them during the scan phase + /// (also known as late materialization). + /// A `FilterExec` may absorb any filters its children could not absorb, or if there are no filters left it + /// may remove itself from the plan altogether. + /// It combines both [`ChildPushdownResult::parent_filters`] and [`ChildPushdownResult::self_filters`] into a single + /// predicate and replaces it's own predicate. + /// Then it passes [`PredicateSupport::Supported`] for each parent predicate to the parent. + /// A `HashJoinExec` may ignore the pushdown result since it needs to apply the filters as part of the join anyhow. + /// It passes [`ChildPushdownResult::parent_filters`] back up to it's parents wrapped in [`FilterPushdownPropagation::transparent`] + /// and [`ChildPushdownResult::self_filters`] is discarded. + /// + /// The default implementation is a no-op that passes the result of pushdown from the children to its parent. + /// + /// [`PredicateSupport::Supported`]: crate::filter_pushdown::PredicateSupport::Supported + fn handle_child_pushdown_result( &self, - fd: FilterDescription, + child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, - ) -> Result>> { - Ok(filter_pushdown_not_supported(fd)) + ) -> Result>> { + Ok(FilterPushdownPropagation::transparent( + child_pushdown_result, + )) } } diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 6df3e236a0dd..9f5d9dc2984e 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -16,6 +16,7 @@ // under the License. use std::any::Any; +use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; @@ -27,7 +28,7 @@ use super::{ use crate::common::can_project; use crate::execution_plan::CardinalityEffect; use crate::filter_pushdown::{ - FilterDescription, FilterPushdownResult, FilterPushdownSupport, + ChildPushdownResult, FilterDescription, FilterPushdownPropagation, PredicateSupport, }; use crate::projection::{ make_with_child, try_embed_projection, update_expr, EmbeddedProjection, @@ -44,24 +45,29 @@ use arrow::record_batch::RecordBatch; use datafusion_common::cast::as_boolean_array; use datafusion_common::config::ConfigOptions; use datafusion_common::stats::Precision; +use datafusion_common::tree_node::{ + Transformed, TransformedResult, TreeNode, TreeNodeRecursion, +}; use datafusion_common::{ internal_err, plan_err, project_schema, DataFusionError, Result, ScalarValue, }; use datafusion_execution::TaskContext; use datafusion_expr::Operator; use datafusion_physical_expr::equivalence::ProjectionMapping; -use datafusion_physical_expr::expressions::{BinaryExpr, Column}; +use datafusion_physical_expr::expressions::{lit, BinaryExpr, Column}; use datafusion_physical_expr::intervals::utils::check_support; use datafusion_physical_expr::utils::collect_columns; use datafusion_physical_expr::{ - analyze, split_conjunction, AcrossPartitions, AnalysisContext, ConstExpr, - ExprBoundaries, PhysicalExpr, + analyze, conjunction, split_conjunction, AcrossPartitions, AnalysisContext, + ConstExpr, ExprBoundaries, PhysicalExpr, }; use datafusion_physical_expr_common::physical_expr::fmt_sql; use futures::stream::{Stream, StreamExt}; use log::trace; +const FILTER_EXEC_DEFAULT_SELECTIVITY: u8 = 20; + /// FilterExec evaluates a boolean predicate against all input batches to determine which rows to /// include in its output batches. #[derive(Debug, Clone)] @@ -88,7 +94,7 @@ impl FilterExec { ) -> Result { match predicate.data_type(input.schema().as_ref())? { DataType::Boolean => { - let default_selectivity = 20; + let default_selectivity = FILTER_EXEC_DEFAULT_SELECTIVITY; let cache = Self::compute_properties( &input, &predicate, @@ -448,54 +454,126 @@ impl ExecutionPlan for FilterExec { try_embed_projection(projection, self) } - fn try_pushdown_filters( + fn gather_filters_for_pushdown( &self, - mut fd: FilterDescription, + parent_filters: Vec>, _config: &ConfigOptions, - ) -> Result>> { - // Extend the filter descriptions - fd.filters.push(Arc::clone(&self.predicate)); - - // Extract the information - let child_descriptions = vec![fd]; - let remaining_description = FilterDescription { filters: vec![] }; - let filter_input = Arc::clone(self.input()); + ) -> Result { + let self_filter = Arc::clone(&self.predicate); - if let Some(projection_indices) = self.projection.as_ref() { - // Push the filters down, but leave a ProjectionExec behind, instead of the FilterExec - let filter_child_schema = filter_input.schema(); - let proj_exprs = projection_indices + let parent_filters = if let Some(projection_indices) = self.projection.as_ref() { + // We need to invert the projection on any referenced columns in the filter + // Create a mapping from the output columns to the input columns (the inverse of the projection) + let inverse_projection = projection_indices .iter() - .map(|p| { - let field = filter_child_schema.field(*p).clone(); - ( - Arc::new(Column::new(field.name(), *p)) as Arc, - field.name().to_string(), - ) + .enumerate() + .map(|(i, &p)| (p, i)) + .collect::>(); + parent_filters + .into_iter() + .map(|f| { + f.transform_up(|expr| { + let mut res = + if let Some(col) = expr.as_any().downcast_ref::() { + let index = col.index(); + let index_in_input_schema = + inverse_projection.get(&index).ok_or_else(|| { + DataFusionError::Internal(format!( + "Column {} not found in projection", + index + )) + })?; + Transformed::yes(Arc::new(Column::new( + col.name(), + *index_in_input_schema, + )) as _) + } else { + Transformed::no(expr) + }; + // Columns can only exist in the leaves, no need to try all nodes + res.tnr = TreeNodeRecursion::Jump; + Ok(res) + }) + .data() }) - .collect::>(); - let projection_exec = - Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?) as _; - - Ok(FilterPushdownResult { - support: FilterPushdownSupport::Supported { - child_descriptions, - op: projection_exec, - revisit: false, - }, - remaining_description, - }) + .collect::>>()? } else { - // Pull out the FilterExec, and inform the rule as it should be re-run - Ok(FilterPushdownResult { - support: FilterPushdownSupport::Supported { - child_descriptions, - op: filter_input, - revisit: true, - }, - remaining_description, - }) + parent_filters + }; + + Ok(FilterDescription::new_with_child_count(1) + .all_parent_filters_supported(parent_filters) + .with_self_filter(self_filter)) + } + + fn handle_child_pushdown_result( + &self, + mut child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result>> { + assert_eq!( + child_pushdown_result.self_filters.len(), + 1, + "FilterExec should only have one child" + ); + assert_eq!( + child_pushdown_result.self_filters[0].len(), + 1, + "FilterExec produces only one filter" + ); + + // We absorb any parent filters that were not handled by our children + let mut unhandled_filters = + child_pushdown_result.parent_filters.collect_unsupported(); + + let self_filters = child_pushdown_result + .self_filters + .swap_remove(0) + .into_inner() + .swap_remove(0); + if let PredicateSupport::Unsupported(expr) = self_filters { + unhandled_filters.push(expr); } + + // If we have unhandled filters, we need to create a new FilterExec + let filter_input = Arc::clone(self.input()); + let new_predicate = conjunction(unhandled_filters); + let new_exec = if new_predicate.eq(&lit(true)) { + // FilterExec is no longer needed, but we may need to leave a projection in place + match self.projection() { + Some(projection_indices) => { + let filter_child_schema = filter_input.schema(); + let proj_exprs = projection_indices + .iter() + .map(|p| { + let field = filter_child_schema.field(*p).clone(); + ( + Arc::new(Column::new(field.name(), *p)) + as Arc, + field.name().to_string(), + ) + }) + .collect::>(); + Arc::new(ProjectionExec::try_new(proj_exprs, filter_input)?) + as Arc + } + None => { + // No projection needed, just return the input + filter_input + } + } + } else { + // Create a new FilterExec with the new predicate + Arc::new( + FilterExec::try_new(new_predicate, filter_input)? + .with_default_selectivity(self.default_selectivity())? + .with_projection(self.projection().cloned())?, + ) + }; + Ok(FilterPushdownPropagation { + filters: child_pushdown_result.parent_filters.make_supported(), + updated_node: Some(new_exec), + }) } } diff --git a/datafusion/physical-plan/src/filter_pushdown.rs b/datafusion/physical-plan/src/filter_pushdown.rs index 38f5aef5923e..0003fc9d7277 100644 --- a/datafusion/physical-plan/src/filter_pushdown.rs +++ b/datafusion/physical-plan/src/filter_pushdown.rs @@ -16,80 +16,285 @@ // under the License. use std::sync::Arc; +use std::vec::IntoIter; -use crate::ExecutionPlan; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -#[derive(Clone, Debug)] -pub struct FilterDescription { - /// Expressions coming from the parent nodes - pub filters: Vec>, +/// The result of a plan for pushing down a filter into a child node. +/// This contains references to filters so that nodes can mutate a filter +/// before pushing it down to a child node (e.g. to adjust a projection) +/// or can directly take ownership of `Unsupported` filters that their children +/// could not handle. +#[derive(Debug, Clone)] +pub enum PredicateSupport { + Supported(Arc), + Unsupported(Arc), } -impl Default for FilterDescription { - fn default() -> Self { - Self::empty() +/// A thin wrapper around [`PredicateSupport`]s that allows for easy collection of +/// supported and unsupported filters. Inner vector stores each predicate for one node. +#[derive(Debug, Clone)] +pub struct PredicateSupports(Vec); + +impl PredicateSupports { + /// Create a new FilterPushdowns with the given filters and their pushdown status. + pub fn new(pushdowns: Vec) -> Self { + Self(pushdowns) } -} -impl FilterDescription { - /// Takes the filters out of the struct, leaving an empty vector in its place. - pub fn take_description(&mut self) -> Vec> { - std::mem::take(&mut self.filters) + /// Create a new [`PredicateSupports`] with all filters as supported. + pub fn all_supported(filters: Vec>) -> Self { + let pushdowns = filters + .into_iter() + .map(PredicateSupport::Supported) + .collect(); + Self::new(pushdowns) + } + + /// Create a new [`PredicateSupports`] with all filters as unsupported. + pub fn all_unsupported(filters: Vec>) -> Self { + let pushdowns = filters + .into_iter() + .map(PredicateSupport::Unsupported) + .collect(); + Self::new(pushdowns) + } + + /// Transform all filters to supported, returning a new FilterPushdowns. + /// This does not modify the original [`PredicateSupports`]. + pub fn make_supported(self) -> Self { + let pushdowns = self + .0 + .into_iter() + .map(|f| match f { + PredicateSupport::Supported(expr) => PredicateSupport::Supported(expr), + PredicateSupport::Unsupported(expr) => PredicateSupport::Supported(expr), + }) + .collect(); + Self::new(pushdowns) + } + + /// Collect unsupported filters into a Vec, without removing them from the original + /// [`PredicateSupports`]. + pub fn collect_unsupported(&self) -> Vec> { + self.0 + .iter() + .filter_map(|f| match f { + PredicateSupport::Unsupported(expr) => Some(Arc::clone(expr)), + PredicateSupport::Supported(_) => None, + }) + .collect() + } + + /// Collect all filters into a Vec, without removing them from the original + /// FilterPushdowns. + pub fn collect_all(self) -> Vec> { + self.0 + .into_iter() + .map(|f| match f { + PredicateSupport::Supported(expr) + | PredicateSupport::Unsupported(expr) => expr, + }) + .collect() + } + + pub fn into_inner(self) -> Vec { + self.0 } - pub fn empty() -> FilterDescription { - Self { filters: vec![] } + /// Return an iterator over the inner `Vec`. + pub fn iter(&self) -> impl Iterator { + self.0.iter() + } + + /// Return the number of filters in the inner `Vec`. + pub fn len(&self) -> usize { + self.0.len() + } + + /// Check if the inner `Vec` is empty. + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } +} + +impl IntoIterator for PredicateSupports { + type Item = PredicateSupport; + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() } } -#[derive(Debug)] -pub enum FilterPushdownSupport { - Supported { - // Filter predicates which can be pushed down through the operator. - // NOTE that these are not placed into any operator. - child_descriptions: Vec, - // Possibly updated new operator - op: T, - // Whether the node is removed from the plan and the rule should be re-run manually - // on the new node. - // TODO: If TreeNodeRecursion supports Revisit mechanism, this flag can be removed - revisit: bool, - }, - NotSupported, +/// The result of pushing down filters into a child node. +/// This is the result provided to nodes in [`ExecutionPlan::handle_child_pushdown_result`]. +/// Nodes process this result and convert it into a [`FilterPushdownPropagation`] +/// that is returned to their parent. +/// +/// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result +#[derive(Debug, Clone)] +pub struct ChildPushdownResult { + /// The combined result of pushing down each parent filter into each child. + /// For example, given the fitlers `[a, b]` and children `[1, 2, 3]` the matrix of responses: + /// + // | filter | child 1 | child 2 | child 3 | result | + // |--------|-------------|-----------|-----------|-------------| + // | a | Supported | Supported | Supported | Supported | + // | b | Unsupported | Supported | Supported | Unsupported | + /// + /// That is: if any child marks a filter as unsupported or if the filter was not pushed + /// down into any child then the result is unsupported. + /// If at least one children and all children that received the filter mark it as supported + /// then the result is supported. + pub parent_filters: PredicateSupports, + /// The result of pushing down each filter this node provided into each of it's children. + /// This is not combined with the parent filters so that nodes can treat each child independently. + pub self_filters: Vec, } -#[derive(Debug)] -pub struct FilterPushdownResult { - pub support: FilterPushdownSupport, - // Filters which cannot be pushed down through the operator. - // NOTE that caller of try_pushdown_filters() should handle these remanining predicates, - // possibly introducing a FilterExec on top of this operator. - pub remaining_description: FilterDescription, +/// The result of pushing down filters into a node that it returns to its parent. +/// This is what nodes return from [`ExecutionPlan::handle_child_pushdown_result`] to communicate +/// to the optimizer: +/// +/// 1. What to do with any parent filters that were not completely handled by the children. +/// 2. If the node needs to be replaced in the execution plan with a new node or not. +/// +/// [`ExecutionPlan::handle_child_pushdown_result`]: crate::ExecutionPlan::handle_child_pushdown_result +#[derive(Debug, Clone)] +pub struct FilterPushdownPropagation { + pub filters: PredicateSupports, + pub updated_node: Option, } -pub fn filter_pushdown_not_supported( - remaining_description: FilterDescription, -) -> FilterPushdownResult { - FilterPushdownResult { - support: FilterPushdownSupport::NotSupported, - remaining_description, +impl FilterPushdownPropagation { + /// Create a new [`FilterPushdownPropagation`] that tells the parent node + /// that echoes back up to the parent the result of pushing down the filters + /// into the children. + pub fn transparent(child_pushdown_result: ChildPushdownResult) -> Self { + Self { + filters: child_pushdown_result.parent_filters, + updated_node: None, + } } + + /// Create a new [`FilterPushdownPropagation`] that tells the parent node + /// that none of the parent filters were not pushed down. + pub fn unsupported(parent_filters: Vec>) -> Self { + let unsupported = PredicateSupports::all_unsupported(parent_filters); + Self { + filters: unsupported, + updated_node: None, + } + } +} + +#[derive(Debug, Clone)] +struct ChildFilterDescription { + /// Description of which parent filters can be pushed down into this node. + /// Since we need to transmit filter pushdown results back to this node's parent + /// we need to track each parent filter for each child, even those that are unsupported / won't be pushed down. + /// We do this using a [`PredicateSupports`] which simplifies manipulating supported/unsupported filters. + parent_filters: PredicateSupports, + /// Description of which filters this node is pushing down to its children. + /// Since this is not transmitted back to the parents we can have variable sized inner arrays + /// instead of having to track supported/unsupported. + self_filters: Vec>, } -pub fn filter_pushdown_transparent( - plan: Arc, - fd: FilterDescription, -) -> FilterPushdownResult> { - let child_descriptions = vec![fd]; - let remaining_description = FilterDescription::empty(); - - FilterPushdownResult { - support: FilterPushdownSupport::Supported { - child_descriptions, - op: plan, - revisit: false, - }, - remaining_description, +impl ChildFilterDescription { + fn new() -> Self { + Self { + parent_filters: PredicateSupports::new(vec![]), + self_filters: vec![], + } + } +} + +#[derive(Debug, Clone)] +pub struct FilterDescription { + /// A filter description for each child. + /// This includes which parent filters and which self filters (from the node in question) + /// will get pushed down to each child. + child_filter_descriptions: Vec, +} + +impl FilterDescription { + pub fn new_with_child_count(num_children: usize) -> Self { + Self { + child_filter_descriptions: vec![ChildFilterDescription::new(); num_children], + } + } + + pub fn parent_filters(&self) -> Vec { + self.child_filter_descriptions + .iter() + .map(|d| &d.parent_filters) + .cloned() + .collect() + } + + pub fn self_filters(&self) -> Vec>> { + self.child_filter_descriptions + .iter() + .map(|d| &d.self_filters) + .cloned() + .collect() + } + + /// Mark all parent filters as supported for all children. + /// This is the case if the node allows filters to be pushed down through it + /// without any modification. + /// This broadcasts the parent filters to all children. + /// If handling of parent filters is different for each child then you should set the + /// field direclty. + /// For example, nodes like [`RepartitionExec`] that let filters pass through it transparently + /// use this to mark all parent filters as supported. + /// + /// [`RepartitionExec`]: crate::repartition::RepartitionExec + pub fn all_parent_filters_supported( + mut self, + parent_filters: Vec>, + ) -> Self { + let supported = PredicateSupports::all_supported(parent_filters); + for child in &mut self.child_filter_descriptions { + child.parent_filters = supported.clone(); + } + self + } + + /// Mark all parent filters as unsupported for all children. + /// This is the case if the node does not allow filters to be pushed down through it. + /// This broadcasts the parent filters to all children. + /// If handling of parent filters is different for each child then you should set the + /// field direclty. + /// For example, the default implementation of filter pushdwon in [`ExecutionPlan`] + /// assumes that filters cannot be pushed down to children. + /// + /// [`ExecutionPlan`]: crate::ExecutionPlan + pub fn all_parent_filters_unsupported( + mut self, + parent_filters: Vec>, + ) -> Self { + let unsupported = PredicateSupports::all_unsupported(parent_filters); + for child in &mut self.child_filter_descriptions { + child.parent_filters = unsupported.clone(); + } + self + } + + /// Add a filter generated / owned by the current node to be pushed down to all children. + /// This assumes that there is a single filter that that gets pushed down to all children + /// equally. + /// If there are multiple filters or pushdown to children is not homogeneous then + /// you should set the field directly. + /// For example: + /// - `TopK` uses this to push down a single filter to all children, it can use this method. + /// - `HashJoinExec` pushes down a filter only to the probe side, it cannot use this method. + pub fn with_self_filter(mut self, predicate: Arc) -> Self { + for child in &mut self.child_filter_descriptions { + child.self_filters = vec![Arc::clone(&predicate)]; + } + self } } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index f7c4f7477f12..c86a37697a05 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -54,7 +54,7 @@ use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; use crate::filter_pushdown::{ - filter_pushdown_transparent, FilterDescription, FilterPushdownResult, + ChildPushdownResult, FilterDescription, FilterPushdownPropagation, }; use futures::stream::Stream; use futures::{FutureExt, StreamExt, TryStreamExt}; @@ -743,14 +743,22 @@ impl ExecutionPlan for RepartitionExec { )?))) } - fn try_pushdown_filters( + fn gather_filters_for_pushdown( &self, - fd: FilterDescription, + parent_filters: Vec>, _config: &ConfigOptions, - ) -> Result>> { - Ok(filter_pushdown_transparent::>( - Arc::new(self.clone()), - fd, + ) -> Result { + Ok(FilterDescription::new_with_child_count(1) + .all_parent_filters_supported(parent_filters)) + } + + fn handle_child_pushdown_result( + &self, + child_pushdown_result: ChildPushdownResult, + _config: &ConfigOptions, + ) -> Result>> { + Ok(FilterPushdownPropagation::transparent( + child_pushdown_result, )) } } diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 99fbb713646d..252704f260b8 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -186,14 +186,14 @@ COPY ( ) TO 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet' STORED AS PARQUET; -query TT rowsort -select * from t_pushdown where part == val +query TT +select * from t_pushdown where part == val order by part, val; ---- a a b b query TT -select * from t_pushdown where part != val +select * from t_pushdown where part != val order by part, val; ---- xyz c