Skip to content

Commit 34adfe3

Browse files
committed
Implement Parquet filter pushdown via new filter pushdown APIs
1 parent 2d1551d commit 34adfe3

File tree

32 files changed

+417
-391
lines changed

32 files changed

+417
-391
lines changed

datafusion-examples/examples/advanced_parquet_index.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ impl TableProvider for IndexTableProvider {
495495
ParquetSource::default()
496496
// provide the predicate so the DataSourceExec can try and prune
497497
// row groups internally
498-
.with_predicate(Arc::clone(&schema), predicate)
498+
.with_predicate(predicate)
499499
// provide the factory to create parquet reader without re-reading metadata
500500
.with_parquet_file_reader_factory(Arc::new(reader_factory)),
501501
);

datafusion-examples/examples/custom_file_format.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use arrow::{
2222
datatypes::{DataType, Field, Schema, SchemaRef, UInt64Type},
2323
};
2424
use datafusion::physical_expr::LexRequirement;
25-
use datafusion::physical_expr::PhysicalExpr;
2625
use datafusion::{
2726
catalog::Session,
2827
common::{GetExt, Statistics},
@@ -112,11 +111,8 @@ impl FileFormat for TSVFileFormat {
112111
&self,
113112
state: &dyn Session,
114113
conf: FileScanConfig,
115-
filters: Option<&Arc<dyn PhysicalExpr>>,
116114
) -> Result<Arc<dyn ExecutionPlan>> {
117-
self.csv_file_format
118-
.create_physical_plan(state, conf, filters)
119-
.await
115+
self.csv_file_format.create_physical_plan(state, conf).await
120116
}
121117

122118
async fn create_writer_physical_plan(

datafusion-examples/examples/parquet_index.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,7 @@ impl TableProvider for IndexTableProvider {
242242
let files = self.index.get_files(predicate.clone())?;
243243

244244
let object_store_url = ObjectStoreUrl::parse("file://")?;
245-
let source =
246-
Arc::new(ParquetSource::default().with_predicate(self.schema(), predicate));
245+
let source = Arc::new(ParquetSource::default().with_predicate(predicate));
247246
let mut file_scan_config_builder =
248247
FileScanConfigBuilder::new(object_store_url, self.schema(), source)
249248
.with_projection(projection.cloned())

datafusion/core/src/datasource/file_format/arrow.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ use datafusion_datasource::sink::{DataSink, DataSinkExec};
5454
use datafusion_datasource::write::ObjectWriterBuilder;
5555
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
5656
use datafusion_expr::dml::InsertOp;
57-
use datafusion_physical_expr::PhysicalExpr;
5857
use datafusion_physical_expr_common::sort_expr::LexRequirement;
5958

6059
use async_trait::async_trait;
@@ -174,7 +173,6 @@ impl FileFormat for ArrowFormat {
174173
&self,
175174
_state: &dyn Session,
176175
conf: FileScanConfig,
177-
_filters: Option<&Arc<dyn PhysicalExpr>>,
178176
) -> Result<Arc<dyn ExecutionPlan>> {
179177
let source = Arc::new(ArrowSource::default());
180178
let config = FileScanConfigBuilder::from(conf)

datafusion/core/src/datasource/file_format/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,6 @@ pub(crate) mod test_util {
9393
.with_projection(projection)
9494
.with_limit(limit)
9595
.build(),
96-
None,
9796
)
9897
.await?;
9998
Ok(exec)

datafusion/core/src/datasource/listing/table.rs

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@ use std::{any::Any, str::FromStr, sync::Arc};
2424

2525
use crate::datasource::{
2626
create_ordering,
27-
file_format::{
28-
file_compression_type::FileCompressionType, FileFormat, FilePushdownSupport,
29-
},
27+
file_format::{file_compression_type::FileCompressionType, FileFormat},
3028
physical_plan::FileSinkConfig,
3129
};
3230
use crate::execution::context::SessionState;
@@ -35,22 +33,19 @@ use datafusion_common::{config_err, DataFusionError, Result};
3533
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
3634
use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
3735
use datafusion_expr::dml::InsertOp;
38-
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
36+
use datafusion_expr::{Expr, TableProviderFilterPushDown};
3937
use datafusion_expr::{SortExpr, TableType};
4038
use datafusion_physical_plan::empty::EmptyExec;
4139
use datafusion_physical_plan::{ExecutionPlan, Statistics};
4240

4341
use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef};
4442
use datafusion_common::{
45-
config_datafusion_err, internal_err, plan_err, project_schema, Constraints,
46-
SchemaExt, ToDFSchema,
43+
config_datafusion_err, internal_err, plan_err, project_schema, Constraints, SchemaExt,
4744
};
4845
use datafusion_execution::cache::{
4946
cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache,
5047
};
51-
use datafusion_physical_expr::{
52-
create_physical_expr, LexOrdering, PhysicalSortRequirement,
53-
};
48+
use datafusion_physical_expr::{LexOrdering, PhysicalSortRequirement};
5449

5550
use async_trait::async_trait;
5651
use datafusion_catalog::Session;
@@ -941,19 +936,6 @@ impl TableProvider for ListingTable {
941936
None => {} // no ordering required
942937
};
943938

944-
let filters = match conjunction(filters.to_vec()) {
945-
Some(expr) => {
946-
let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?;
947-
let filters = create_physical_expr(
948-
&expr,
949-
&table_df_schema,
950-
state.execution_props(),
951-
)?;
952-
Some(filters)
953-
}
954-
None => None,
955-
};
956-
957939
let Some(object_store_url) =
958940
self.table_paths.first().map(ListingTableUrl::object_store)
959941
else {
@@ -978,7 +960,6 @@ impl TableProvider for ListingTable {
978960
.with_output_ordering(output_ordering)
979961
.with_table_partition_cols(table_partition_cols)
980962
.build(),
981-
filters.as_ref(),
982963
)
983964
.await
984965
}
@@ -1002,18 +983,6 @@ impl TableProvider for ListingTable {
1002983
return Ok(TableProviderFilterPushDown::Exact);
1003984
}
1004985

1005-
// if we can't push it down completely with only the filename-based/path-based
1006-
// column names, then we should check if we can do parquet predicate pushdown
1007-
let supports_pushdown = self.options.format.supports_filters_pushdown(
1008-
&self.file_schema,
1009-
&self.table_schema,
1010-
&[filter],
1011-
)?;
1012-
1013-
if supports_pushdown == FilePushdownSupport::Supported {
1014-
return Ok(TableProviderFilterPushDown::Exact);
1015-
}
1016-
1017986
Ok(TableProviderFilterPushDown::Inexact)
1018987
})
1019988
.collect()

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ mod tests {
5454
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
5555
use datafusion_datasource::source::DataSourceExec;
5656

57+
use datafusion_datasource::file::FileSource;
5758
use datafusion_datasource::{FileRange, PartitionedFile};
5859
use datafusion_datasource_parquet::source::ParquetSource;
5960
use datafusion_datasource_parquet::{
@@ -139,7 +140,7 @@ mod tests {
139140
self.round_trip(batches).await.batches
140141
}
141142

142-
fn build_file_source(&self, file_schema: SchemaRef) -> Arc<ParquetSource> {
143+
fn build_file_source(&self, file_schema: SchemaRef) -> Arc<dyn FileSource> {
143144
// set up predicate (this is normally done by a layer higher up)
144145
let predicate = self
145146
.predicate
@@ -148,7 +149,7 @@ mod tests {
148149

149150
let mut source = ParquetSource::default();
150151
if let Some(predicate) = predicate {
151-
source = source.with_predicate(Arc::clone(&file_schema), predicate);
152+
source = source.with_predicate(predicate);
152153
}
153154

154155
if self.pushdown_predicate {
@@ -161,14 +162,14 @@ mod tests {
161162
source = source.with_enable_page_index(true);
162163
}
163164

164-
Arc::new(source)
165+
source.with_schema(Arc::clone(&file_schema))
165166
}
166167

167168
fn build_parquet_exec(
168169
&self,
169170
file_schema: SchemaRef,
170171
file_group: FileGroup,
171-
source: Arc<ParquetSource>,
172+
source: Arc<dyn FileSource>,
172173
) -> Arc<DataSourceExec> {
173174
let base_config = FileScanConfigBuilder::new(
174175
ObjectStoreUrl::local_filesystem(),

datafusion/core/src/test_util/parquet.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use crate::physical_plan::metrics::MetricsSet;
3737
use crate::physical_plan::ExecutionPlan;
3838
use crate::prelude::{Expr, SessionConfig, SessionContext};
3939

40+
use datafusion_datasource::file::FileSource;
4041
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
4142
use datafusion_datasource::source::DataSourceExec;
4243
use object_store::path::Path;
@@ -182,10 +183,11 @@ impl TestParquetFile {
182183
let physical_filter_expr =
183184
create_physical_expr(&filter, &df_schema, &ExecutionProps::default())?;
184185

185-
let source = Arc::new(ParquetSource::new(parquet_options).with_predicate(
186-
Arc::clone(&self.schema),
187-
Arc::clone(&physical_filter_expr),
188-
));
186+
let source = Arc::new(
187+
ParquetSource::new(parquet_options)
188+
.with_predicate(Arc::clone(&physical_filter_expr)),
189+
)
190+
.with_schema(Arc::clone(&self.schema));
189191
let config = scan_config_builder.with_source(source).build();
190192
let parquet_exec = DataSourceExec::from_data_source(config);
191193

datafusion/core/tests/fuzz_cases/pruning.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ async fn execute_with_predicate(
276276
ctx: &SessionContext,
277277
) -> Vec<String> {
278278
let parquet_source = if prune_stats {
279-
ParquetSource::default().with_predicate(Arc::clone(&schema), predicate.clone())
279+
ParquetSource::default().with_predicate(predicate.clone())
280280
} else {
281281
ParquetSource::default()
282282
};

datafusion/core/tests/parquet/external_access_plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ impl TestFull {
346346
let source = if let Some(predicate) = predicate {
347347
let df_schema = DFSchema::try_from(schema.clone())?;
348348
let predicate = ctx.create_physical_expr(predicate, &df_schema)?;
349-
Arc::new(ParquetSource::default().with_predicate(schema.clone(), predicate))
349+
Arc::new(ParquetSource::default().with_predicate(predicate))
350350
} else {
351351
Arc::new(ParquetSource::default())
352352
};

datafusion/core/tests/parquet/file_statistics.rs

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use datafusion::execution::context::SessionState;
2828
use datafusion::execution::session_state::SessionStateBuilder;
2929
use datafusion::prelude::SessionContext;
3030
use datafusion_common::stats::Precision;
31+
use datafusion_common::DFSchema;
3132
use datafusion_execution::cache::cache_manager::CacheManagerConfig;
3233
use datafusion_execution::cache::cache_unit::{
3334
DefaultFileStatisticsCache, DefaultListFilesCache,
@@ -37,6 +38,10 @@ use datafusion_execution::runtime_env::RuntimeEnvBuilder;
3738
use datafusion_expr::{col, lit, Expr};
3839

3940
use datafusion::datasource::physical_plan::FileScanConfig;
41+
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
42+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
43+
use datafusion_physical_plan::filter::FilterExec;
44+
use datafusion_physical_plan::ExecutionPlan;
4045
use tempfile::tempdir;
4146

4247
#[tokio::test]
@@ -47,21 +52,49 @@ async fn check_stats_precision_with_filter_pushdown() {
4752

4853
let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
4954
let table = get_listing_table(&table_path, None, &opt).await;
55+
5056
let (_, _, state) = get_cache_runtime_state();
57+
let mut options = state.config().options().clone();
58+
options.execution.parquet.pushdown_filters = true;
59+
5160
// Scan without filter, stats are exact
5261
let exec = table.scan(&state, None, &[], None).await.unwrap();
5362
assert_eq!(
5463
exec.partition_statistics(None).unwrap().num_rows,
55-
Precision::Exact(8)
64+
Precision::Exact(8),
65+
"Stats without filter should be exact"
5666
);
5767

58-
// Scan with filter pushdown, stats are inexact
59-
let filter = Expr::gt(col("id"), lit(1));
68+
// This is a filter that cannot be evaluated by the table provider scanning
69+
// (it is not a partition filter). Therefore; it will be pushed down to the
70+
// source operator after the appropriate optimizer pass.
71+
let filter_expr = Expr::gt(col("id"), lit(1));
72+
let exec_with_filter = table
73+
.scan(&state, None, &[filter_expr.clone()], None)
74+
.await
75+
.unwrap();
76+
77+
let ctx = SessionContext::new();
78+
let df_schema = DFSchema::try_from(table.schema()).unwrap();
79+
let physical_filter = ctx.create_physical_expr(filter_expr, &df_schema).unwrap();
6080

61-
let exec = table.scan(&state, None, &[filter], None).await.unwrap();
81+
let filtered_exec =
82+
Arc::new(FilterExec::try_new(physical_filter, exec_with_filter).unwrap())
83+
as Arc<dyn ExecutionPlan>;
84+
85+
let optimized_exec = FilterPushdown::new()
86+
.optimize(filtered_exec, &options)
87+
.unwrap();
88+
89+
assert!(
90+
optimized_exec.as_any().is::<DataSourceExec>(),
91+
"Sanity check that the pushdown did what we expected"
92+
);
93+
// Scan with filter pushdown, stats are inexact
6294
assert_eq!(
63-
exec.partition_statistics(None).unwrap().num_rows,
64-
Precision::Inexact(8)
95+
optimized_exec.partition_statistics(None).unwrap().num_rows,
96+
Precision::Inexact(8),
97+
"Stats after filter pushdown should be inexact"
6598
);
6699
}
67100

datafusion/core/tests/parquet/page_pruning.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> DataSourceExec
7777

7878
let source = Arc::new(
7979
ParquetSource::default()
80-
.with_predicate(Arc::clone(&schema), predicate)
80+
.with_predicate(predicate)
8181
.with_enable_page_index(true),
8282
);
8383
let base_config = FileScanConfigBuilder::new(object_store_url, schema, source)

datafusion/core/tests/sql/path_partition.rs

Lines changed: 0 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ use std::sync::Arc;
2525

2626
use arrow::datatypes::DataType;
2727
use datafusion::datasource::listing::ListingTableUrl;
28-
use datafusion::datasource::physical_plan::ParquetSource;
29-
use datafusion::datasource::source::DataSourceExec;
3028
use datafusion::{
3129
datasource::{
3230
file_format::{csv::CsvFormat, parquet::ParquetFormat},
@@ -42,8 +40,6 @@ use datafusion_common::stats::Precision;
4240
use datafusion_common::test_util::batches_to_sort_string;
4341
use datafusion_common::ScalarValue;
4442
use datafusion_execution::config::SessionConfig;
45-
use datafusion_expr::{col, lit, Expr, Operator};
46-
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
4743

4844
use async_trait::async_trait;
4945
use bytes::Bytes;
@@ -57,55 +53,6 @@ use object_store::{
5753
use object_store::{Attributes, MultipartUpload, PutMultipartOpts, PutPayload};
5854
use url::Url;
5955

60-
#[tokio::test]
61-
async fn parquet_partition_pruning_filter() -> Result<()> {
62-
let ctx = SessionContext::new();
63-
64-
let table = create_partitioned_alltypes_parquet_table(
65-
&ctx,
66-
&[
67-
"year=2021/month=09/day=09/file.parquet",
68-
"year=2021/month=10/day=09/file.parquet",
69-
"year=2021/month=10/day=28/file.parquet",
70-
],
71-
&[
72-
("year", DataType::Int32),
73-
("month", DataType::Int32),
74-
("day", DataType::Int32),
75-
],
76-
"mirror:///",
77-
"alltypes_plain.parquet",
78-
)
79-
.await;
80-
81-
// The first three filters can be resolved using only the partition columns.
82-
let filters = [
83-
Expr::eq(col("year"), lit(2021)),
84-
Expr::eq(col("month"), lit(10)),
85-
Expr::eq(col("day"), lit(28)),
86-
Expr::gt(col("id"), lit(1)),
87-
];
88-
let exec = table.scan(&ctx.state(), None, &filters, None).await?;
89-
let data_source_exec = exec.as_any().downcast_ref::<DataSourceExec>().unwrap();
90-
if let Some((_, parquet_config)) =
91-
data_source_exec.downcast_to_file_source::<ParquetSource>()
92-
{
93-
let pred = parquet_config.predicate().unwrap();
94-
// Only the last filter should be pushdown to TableScan
95-
let expected = Arc::new(BinaryExpr::new(
96-
Arc::new(Column::new_with_schema("id", &exec.schema()).unwrap()),
97-
Operator::Gt,
98-
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
99-
));
100-
101-
assert!(pred.as_any().is::<BinaryExpr>());
102-
let pred = pred.as_any().downcast_ref::<BinaryExpr>().unwrap();
103-
104-
assert_eq!(pred, expected.as_ref());
105-
}
106-
Ok(())
107-
}
108-
10956
#[tokio::test]
11057
async fn parquet_distinct_partition_col() -> Result<()> {
11158
let ctx = SessionContext::new();

0 commit comments

Comments
 (0)