Skip to content

Commit d3057ff

Browse files
committed
Implement Parquet filter pushdown via new filter pushdown APIs
1 parent a39c07a commit d3057ff

File tree

31 files changed

+483
-426
lines changed

31 files changed

+483
-426
lines changed

datafusion-examples/examples/advanced_parquet_index.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ impl TableProvider for IndexTableProvider {
495495
ParquetSource::default()
496496
// provide the predicate so the DataSourceExec can try and prune
497497
// row groups internally
498-
.with_predicate(Arc::clone(&schema), predicate)
498+
.with_predicate(predicate)
499499
// provide the factory to create parquet reader without re-reading metadata
500500
.with_parquet_file_reader_factory(Arc::new(reader_factory)),
501501
);

datafusion-examples/examples/parquet_index.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,7 @@ impl TableProvider for IndexTableProvider {
242242
let files = self.index.get_files(predicate.clone())?;
243243

244244
let object_store_url = ObjectStoreUrl::parse("file://")?;
245-
let source =
246-
Arc::new(ParquetSource::default().with_predicate(self.schema(), predicate));
245+
let source = Arc::new(ParquetSource::default().with_predicate(predicate));
247246
let mut file_scan_config_builder =
248247
FileScanConfigBuilder::new(object_store_url, self.schema(), source)
249248
.with_projection(projection.cloned())

datafusion/core/src/datasource/file_format/arrow.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ use datafusion_datasource::sink::{DataSink, DataSinkExec};
5454
use datafusion_datasource::write::ObjectWriterBuilder;
5555
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
5656
use datafusion_expr::dml::InsertOp;
57-
use datafusion_physical_expr::PhysicalExpr;
5857
use datafusion_physical_expr_common::sort_expr::LexRequirement;
5958

6059
use async_trait::async_trait;
@@ -174,7 +173,6 @@ impl FileFormat for ArrowFormat {
174173
&self,
175174
_state: &dyn Session,
176175
conf: FileScanConfig,
177-
_filters: Option<&Arc<dyn PhysicalExpr>>,
178176
) -> Result<Arc<dyn ExecutionPlan>> {
179177
let source = Arc::new(ArrowSource::default());
180178
let config = FileScanConfigBuilder::from(conf)

datafusion/core/src/datasource/file_format/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ pub(crate) mod test_util {
9393
.with_projection(projection)
9494
.with_limit(limit)
9595
.build(),
96-
None,
9796
)
9897
.await?;
9998
Ok(exec)

datafusion/core/src/datasource/listing/table.rs

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@ use std::{any::Any, str::FromStr, sync::Arc};
2424

2525
use crate::datasource::{
2626
create_ordering,
27-
file_format::{
28-
file_compression_type::FileCompressionType, FileFormat, FilePushdownSupport,
29-
},
27+
file_format::{file_compression_type::FileCompressionType, FileFormat},
3028
physical_plan::FileSinkConfig,
3129
};
3230
use crate::execution::context::SessionState;
@@ -35,22 +33,19 @@ use datafusion_common::{config_err, DataFusionError, Result};
3533
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
3634
use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
3735
use datafusion_expr::dml::InsertOp;
38-
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
36+
use datafusion_expr::{Expr, TableProviderFilterPushDown};
3937
use datafusion_expr::{SortExpr, TableType};
4038
use datafusion_physical_plan::empty::EmptyExec;
4139
use datafusion_physical_plan::{ExecutionPlan, Statistics};
4240

4341
use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef};
4442
use datafusion_common::{
45-
config_datafusion_err, internal_err, plan_err, project_schema, Constraints,
46-
SchemaExt, ToDFSchema,
43+
config_datafusion_err, internal_err, plan_err, project_schema, Constraints, SchemaExt,
4744
};
4845
use datafusion_execution::cache::{
4946
cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache,
5047
};
51-
use datafusion_physical_expr::{
52-
create_physical_expr, LexOrdering, PhysicalSortRequirement,
53-
};
48+
use datafusion_physical_expr::{LexOrdering, PhysicalSortRequirement};
5449

5550
use async_trait::async_trait;
5651
use datafusion_catalog::Session;
@@ -941,19 +936,6 @@ impl TableProvider for ListingTable {
941936
None => {} // no ordering required
942937
};
943938

944-
let filters = match conjunction(filters.to_vec()) {
945-
Some(expr) => {
946-
let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?;
947-
let filters = create_physical_expr(
948-
&expr,
949-
&table_df_schema,
950-
state.execution_props(),
951-
)?;
952-
Some(filters)
953-
}
954-
None => None,
955-
};
956-
957939
let Some(object_store_url) =
958940
self.table_paths.first().map(ListingTableUrl::object_store)
959941
else {
@@ -978,7 +960,6 @@ impl TableProvider for ListingTable {
978960
.with_output_ordering(output_ordering)
979961
.with_table_partition_cols(table_partition_cols)
980962
.build(),
981-
filters.as_ref(),
982963
)
983964
.await
984965
}
@@ -1002,18 +983,6 @@ impl TableProvider for ListingTable {
1002983
return Ok(TableProviderFilterPushDown::Exact);
1003984
}
1004985

1005-
// if we can't push it down completely with only the filename-based/path-based
1006-
// column names, then we should check if we can do parquet predicate pushdown
1007-
let supports_pushdown = self.options.format.supports_filters_pushdown(
1008-
&self.file_schema,
1009-
&self.table_schema,
1010-
&[filter],
1011-
)?;
1012-
1013-
if supports_pushdown == FilePushdownSupport::Supported {
1014-
return Ok(TableProviderFilterPushDown::Exact);
1015-
}
1016-
1017986
Ok(TableProviderFilterPushDown::Inexact)
1018987
})
1019988
.collect()

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ mod tests {
5454
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
5555
use datafusion_datasource::source::DataSourceExec;
5656

57+
use datafusion_datasource::file::FileSource;
5758
use datafusion_datasource::{FileRange, PartitionedFile};
5859
use datafusion_datasource_parquet::source::ParquetSource;
5960
use datafusion_datasource_parquet::{
@@ -139,7 +140,7 @@ mod tests {
139140
self.round_trip(batches).await.batches
140141
}
141142

142-
fn build_file_source(&self, file_schema: SchemaRef) -> Arc<ParquetSource> {
143+
fn build_file_source(&self, file_schema: SchemaRef) -> Arc<dyn FileSource> {
143144
// set up predicate (this is normally done by a layer higher up)
144145
let predicate = self
145146
.predicate
@@ -148,7 +149,7 @@ mod tests {
148149

149150
let mut source = ParquetSource::default();
150151
if let Some(predicate) = predicate {
151-
source = source.with_predicate(Arc::clone(&file_schema), predicate);
152+
source = source.with_predicate(predicate);
152153
}
153154

154155
if self.pushdown_predicate {
@@ -161,14 +162,14 @@ mod tests {
161162
source = source.with_enable_page_index(true);
162163
}
163164

164-
Arc::new(source)
165+
source.with_schema(Arc::clone(&file_schema))
165166
}
166167

167168
fn build_parquet_exec(
168169
&self,
169170
file_schema: SchemaRef,
170171
file_group: FileGroup,
171-
source: Arc<ParquetSource>,
172+
source: Arc<dyn FileSource>,
172173
) -> Arc<DataSourceExec> {
173174
let base_config = FileScanConfigBuilder::new(
174175
ObjectStoreUrl::local_filesystem(),

datafusion/core/src/test_util/parquet.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use crate::physical_plan::metrics::MetricsSet;
3737
use crate::physical_plan::ExecutionPlan;
3838
use crate::prelude::{Expr, SessionConfig, SessionContext};
3939

40+
use datafusion_datasource::file::FileSource;
4041
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
4142
use datafusion_datasource::source::DataSourceExec;
4243
use object_store::path::Path;
@@ -182,10 +183,11 @@ impl TestParquetFile {
182183
let physical_filter_expr =
183184
create_physical_expr(&filter, &df_schema, &ExecutionProps::default())?;
184185

185-
let source = Arc::new(ParquetSource::new(parquet_options).with_predicate(
186-
Arc::clone(&self.schema),
187-
Arc::clone(&physical_filter_expr),
188-
));
186+
let source = Arc::new(
187+
ParquetSource::new(parquet_options)
188+
.with_predicate(Arc::clone(&physical_filter_expr)),
189+
)
190+
.with_schema(Arc::clone(&self.schema));
189191
let config = scan_config_builder.with_source(source).build();
190192
let parquet_exec = DataSourceExec::from_data_source(config);
191193

datafusion/core/tests/fuzz_cases/pruning.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ async fn execute_with_predicate(
276276
ctx: &SessionContext,
277277
) -> Vec<String> {
278278
let parquet_source = if prune_stats {
279-
ParquetSource::default().with_predicate(Arc::clone(&schema), predicate.clone())
279+
ParquetSource::default().with_predicate(predicate.clone())
280280
} else {
281281
ParquetSource::default()
282282
};

datafusion/core/tests/parquet/external_access_plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ impl TestFull {
346346
let source = if let Some(predicate) = predicate {
347347
let df_schema = DFSchema::try_from(schema.clone())?;
348348
let predicate = ctx.create_physical_expr(predicate, &df_schema)?;
349-
Arc::new(ParquetSource::default().with_predicate(schema.clone(), predicate))
349+
Arc::new(ParquetSource::default().with_predicate(predicate))
350350
} else {
351351
Arc::new(ParquetSource::default())
352352
};

datafusion/core/tests/parquet/page_pruning.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> DataSourceExec
7777

7878
let source = Arc::new(
7979
ParquetSource::default()
80-
.with_predicate(Arc::clone(&schema), predicate)
80+
.with_predicate(predicate)
8181
.with_enable_page_index(true),
8282
);
8383
let base_config = FileScanConfigBuilder::new(object_store_url, schema, source)

datafusion/core/tests/physical_optimizer/push_down_filter.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use datafusion_physical_expr_common::physical_expr::fmt_sql;
4747
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
4848
use datafusion_physical_optimizer::PhysicalOptimizerRule;
4949
use datafusion_physical_plan::filter_pushdown::{
50-
FilterPushdownPropagation, PredicateSupports,
50+
FilterPushdownPropagation, ChildPushdownSupports,
5151
};
5252
use datafusion_physical_plan::{
5353
aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
@@ -98,7 +98,7 @@ impl FileSource for TestSource {
9898
}
9999

100100
fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
101-
todo!("should not be called")
101+
Arc::new(self.clone()) as Arc<dyn FileSource>
102102
}
103103

104104
fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
@@ -166,7 +166,7 @@ impl FileSource for TestSource {
166166
statistics: self.statistics.clone(), // should be updated in reality
167167
});
168168
Ok(FilterPushdownPropagation {
169-
filters: PredicateSupports::all_supported(filters),
169+
filters: ChildPushdownSupports::all_exact(filters),
170170
updated_node: Some(new_node),
171171
})
172172
} else {

datafusion/core/tests/sql/path_partition.rs

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ use std::sync::Arc;
2525

2626
use arrow::datatypes::DataType;
2727
use datafusion::datasource::listing::ListingTableUrl;
28-
use datafusion::datasource::physical_plan::ParquetSource;
29-
use datafusion::datasource::source::DataSourceExec;
3028
use datafusion::{
3129
datasource::{
3230
file_format::{csv::CsvFormat, parquet::ParquetFormat},
@@ -42,8 +40,6 @@ use datafusion_common::stats::Precision;
4240
use datafusion_common::test_util::batches_to_sort_string;
4341
use datafusion_common::ScalarValue;
4442
use datafusion_execution::config::SessionConfig;
45-
use datafusion_expr::{col, lit, Expr, Operator};
46-
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
4743

4844
use async_trait::async_trait;
4945
use bytes::Bytes;
@@ -57,55 +53,6 @@ use object_store::{
5753
use object_store::{Attributes, MultipartUpload, PutMultipartOpts, PutPayload};
5854
use url::Url;
5955

60-
#[tokio::test]
61-
async fn parquet_partition_pruning_filter() -> Result<()> {
62-
let ctx = SessionContext::new();
63-
64-
let table = create_partitioned_alltypes_parquet_table(
65-
&ctx,
66-
&[
67-
"year=2021/month=09/day=09/file.parquet",
68-
"year=2021/month=10/day=09/file.parquet",
69-
"year=2021/month=10/day=28/file.parquet",
70-
],
71-
&[
72-
("year", DataType::Int32),
73-
("month", DataType::Int32),
74-
("day", DataType::Int32),
75-
],
76-
"mirror:///",
77-
"alltypes_plain.parquet",
78-
)
79-
.await;
80-
81-
// The first three filters can be resolved using only the partition columns.
82-
let filters = [
83-
Expr::eq(col("year"), lit(2021)),
84-
Expr::eq(col("month"), lit(10)),
85-
Expr::eq(col("day"), lit(28)),
86-
Expr::gt(col("id"), lit(1)),
87-
];
88-
let exec = table.scan(&ctx.state(), None, &filters, None).await?;
89-
let data_source_exec = exec.as_any().downcast_ref::<DataSourceExec>().unwrap();
90-
if let Some((_, parquet_config)) =
91-
data_source_exec.downcast_to_file_source::<ParquetSource>()
92-
{
93-
let pred = parquet_config.predicate().unwrap();
94-
// Only the last filter should be pushdown to TableScan
95-
let expected = Arc::new(BinaryExpr::new(
96-
Arc::new(Column::new_with_schema("id", &exec.schema()).unwrap()),
97-
Operator::Gt,
98-
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
99-
));
100-
101-
assert!(pred.as_any().is::<BinaryExpr>());
102-
let pred = pred.as_any().downcast_ref::<BinaryExpr>().unwrap();
103-
104-
assert_eq!(pred, expected.as_ref());
105-
}
106-
Ok(())
107-
}
108-
10956
#[tokio::test]
11057
async fn parquet_distinct_partition_col() -> Result<()> {
11158
let ctx = SessionContext::new();

datafusion/datasource-avro/src/file_format.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ use datafusion_datasource::file_compression_type::FileCompressionType;
3737
use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
3838
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
3939
use datafusion_datasource::source::DataSourceExec;
40-
use datafusion_physical_expr::PhysicalExpr;
4140
use datafusion_physical_plan::ExecutionPlan;
4241
use datafusion_session::Session;
4342

@@ -150,7 +149,6 @@ impl FileFormat for AvroFormat {
150149
&self,
151150
_state: &dyn Session,
152151
conf: FileScanConfig,
153-
_filters: Option<&Arc<dyn PhysicalExpr>>,
154152
) -> Result<Arc<dyn ExecutionPlan>> {
155153
let config = FileScanConfigBuilder::from(conf)
156154
.with_source(self.file_source())

datafusion/datasource-csv/src/file_format.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join;
5050
use datafusion_datasource::write::BatchSerializer;
5151
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
5252
use datafusion_expr::dml::InsertOp;
53-
use datafusion_physical_expr::PhysicalExpr;
5453
use datafusion_physical_expr_common::sort_expr::LexRequirement;
5554
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
5655
use datafusion_session::Session;
@@ -408,7 +407,6 @@ impl FileFormat for CsvFormat {
408407
&self,
409408
state: &dyn Session,
410409
conf: FileScanConfig,
411-
_filters: Option<&Arc<dyn PhysicalExpr>>,
412410
) -> Result<Arc<dyn ExecutionPlan>> {
413411
// Consult configuration options for default values
414412
let has_header = self

datafusion/datasource-json/src/file_format.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join;
5252
use datafusion_datasource::write::BatchSerializer;
5353
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
5454
use datafusion_expr::dml::InsertOp;
55-
use datafusion_physical_expr::PhysicalExpr;
5655
use datafusion_physical_expr_common::sort_expr::LexRequirement;
5756
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
5857
use datafusion_session::Session;
@@ -249,7 +248,6 @@ impl FileFormat for JsonFormat {
249248
&self,
250249
_state: &dyn Session,
251250
conf: FileScanConfig,
252-
_filters: Option<&Arc<dyn PhysicalExpr>>,
253251
) -> Result<Arc<dyn ExecutionPlan>> {
254252
let source = Arc::new(JsonSource::new());
255253
let conf = FileScanConfigBuilder::from(conf)

0 commit comments

Comments
 (0)