Skip to content

Commit 7924c97

Browse files
committed
Implement Parquet filter pushdown via new filter pushdown APIs
1 parent 9c1395f commit 7924c97

File tree

24 files changed

+371
-223
lines changed

24 files changed

+371
-223
lines changed

datafusion-examples/examples/advanced_parquet_index.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ impl TableProvider for IndexTableProvider {
495495
ParquetSource::default()
496496
// provide the predicate so the DataSourceExec can try and prune
497497
// row groups internally
498-
.with_predicate(Arc::clone(&schema), predicate)
498+
.with_predicate(predicate)
499499
// provide the factory to create parquet reader without re-reading metadata
500500
.with_parquet_file_reader_factory(Arc::new(reader_factory)),
501501
);

datafusion-examples/examples/parquet_index.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,7 @@ impl TableProvider for IndexTableProvider {
242242
let files = self.index.get_files(predicate.clone())?;
243243

244244
let object_store_url = ObjectStoreUrl::parse("file://")?;
245-
let source =
246-
Arc::new(ParquetSource::default().with_predicate(self.schema(), predicate));
245+
let source = Arc::new(ParquetSource::default().with_predicate(predicate));
247246
let mut file_scan_config_builder =
248247
FileScanConfigBuilder::new(object_store_url, self.schema(), source)
249248
.with_projection(projection.cloned())
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::RecordBatch;
19+
use arrow::datatypes::{DataType, Field, Schema};
20+
use bytes::{BufMut, BytesMut};
21+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
22+
use datafusion::config::ConfigOptions;
23+
use datafusion::prelude::{ParquetReadOptions, SessionContext};
24+
use datafusion_execution::object_store::ObjectStoreUrl;
25+
use datafusion_physical_optimizer::push_down_filter::PushdownFilter;
26+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
27+
use datafusion_physical_plan::ExecutionPlan;
28+
use object_store::memory::InMemory;
29+
use object_store::path::Path;
30+
use object_store::ObjectStore;
31+
use parquet::arrow::ArrowWriter;
32+
use std::sync::Arc;
33+
34+
async fn create_plan() -> Arc<dyn ExecutionPlan> {
35+
let ctx = SessionContext::new();
36+
let schema = Arc::new(Schema::new(vec![
37+
Field::new("id", DataType::Int32, true),
38+
Field::new("name", DataType::Utf8, true),
39+
Field::new("age", DataType::UInt16, true),
40+
Field::new("salary", DataType::Float64, true),
41+
]));
42+
let batch = RecordBatch::new_empty(schema);
43+
44+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
45+
let data = {
46+
let out = BytesMut::new();
47+
let mut writer =
48+
ArrowWriter::try_new(out.writer(), batch.schema(), None).unwrap();
49+
writer.write(&batch).unwrap();
50+
writer.finish().unwrap();
51+
writer.into_inner().unwrap().into_inner().freeze()
52+
};
53+
store
54+
.put(&Path::from("test.parquet"), data.into())
55+
.await
56+
.unwrap();
57+
ctx.register_object_store(
58+
ObjectStoreUrl::parse("memory://").unwrap().as_ref(),
59+
store,
60+
);
61+
62+
ctx.register_parquet("t", "memory://", ParquetReadOptions::default())
63+
.await
64+
.unwrap();
65+
66+
let df = ctx
67+
.sql(
68+
r"
69+
WITH brackets AS (
70+
SELECT age % 10 AS age_bracket
71+
FROM t
72+
GROUP BY age % 10
73+
HAVING COUNT(*) > 10
74+
)
75+
SELECT id, name, age, salary
76+
FROM t
77+
JOIN brackets ON t.age % 10 = brackets.age_bracket
78+
WHERE age > 20 AND data.salary > 1000
79+
ORDER BY data.salary DESC
80+
LIMIT 100
81+
",
82+
)
83+
.await
84+
.unwrap();
85+
86+
df.create_physical_plan().await.unwrap()
87+
}
88+
89+
#[derive(Clone)]
90+
struct BenchmarkPlan {
91+
plan: Arc<dyn ExecutionPlan>,
92+
config: ConfigOptions,
93+
}
94+
95+
impl std::fmt::Display for BenchmarkPlan {
96+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97+
write!(f, "BenchmarkPlan")
98+
}
99+
}
100+
101+
fn bench_push_down_filter(c: &mut Criterion) {
102+
// Create a relatively complex plan
103+
let plan = tokio::runtime::Runtime::new()
104+
.unwrap()
105+
.block_on(create_plan());
106+
let mut config = ConfigOptions::default();
107+
config.execution.parquet.pushdown_filters = true;
108+
let plan = BenchmarkPlan { plan, config };
109+
110+
c.bench_with_input(
111+
BenchmarkId::new("push_down_filter", plan.clone()),
112+
&plan,
113+
|b, plan| {
114+
b.iter(|| {
115+
let optimizer = PushdownFilter::new();
116+
optimizer
117+
.optimize(Arc::clone(&plan.plan), &plan.config)
118+
.unwrap();
119+
});
120+
},
121+
);
122+
}
123+
124+
criterion_group!(benches, bench_push_down_filter);
125+
criterion_main!(benches);

datafusion/core/src/datasource/listing/table.rs

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@ use std::{any::Any, str::FromStr, sync::Arc};
2424

2525
use crate::datasource::{
2626
create_ordering,
27-
file_format::{
28-
file_compression_type::FileCompressionType, FileFormat, FilePushdownSupport,
29-
},
27+
file_format::{file_compression_type::FileCompressionType, FileFormat},
3028
physical_plan::FileSinkConfig,
3129
};
3230
use crate::execution::context::SessionState;
@@ -1002,18 +1000,6 @@ impl TableProvider for ListingTable {
10021000
return Ok(TableProviderFilterPushDown::Exact);
10031001
}
10041002

1005-
// if we can't push it down completely with only the filename-based/path-based
1006-
// column names, then we should check if we can do parquet predicate pushdown
1007-
let supports_pushdown = self.options.format.supports_filters_pushdown(
1008-
&self.file_schema,
1009-
&self.table_schema,
1010-
&[filter],
1011-
)?;
1012-
1013-
if supports_pushdown == FilePushdownSupport::Supported {
1014-
return Ok(TableProviderFilterPushDown::Exact);
1015-
}
1016-
10171003
Ok(TableProviderFilterPushDown::Inexact)
10181004
})
10191005
.collect()

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ mod tests {
5454
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
5555
use datafusion_datasource::source::DataSourceExec;
5656

57+
use datafusion_datasource::file::FileSource;
5758
use datafusion_datasource::{FileRange, PartitionedFile};
5859
use datafusion_datasource_parquet::source::ParquetSource;
5960
use datafusion_datasource_parquet::{
@@ -139,7 +140,7 @@ mod tests {
139140
self.round_trip(batches).await.batches
140141
}
141142

142-
fn build_file_source(&self, file_schema: SchemaRef) -> Arc<ParquetSource> {
143+
fn build_file_source(&self, file_schema: SchemaRef) -> Arc<dyn FileSource> {
143144
// set up predicate (this is normally done by a layer higher up)
144145
let predicate = self
145146
.predicate
@@ -148,7 +149,7 @@ mod tests {
148149

149150
let mut source = ParquetSource::default();
150151
if let Some(predicate) = predicate {
151-
source = source.with_predicate(Arc::clone(&file_schema), predicate);
152+
source = source.with_predicate(predicate);
152153
}
153154

154155
if self.pushdown_predicate {
@@ -161,14 +162,14 @@ mod tests {
161162
source = source.with_enable_page_index(true);
162163
}
163164

164-
Arc::new(source)
165+
source.with_schema(Arc::clone(&file_schema))
165166
}
166167

167168
fn build_parquet_exec(
168169
&self,
169170
file_schema: SchemaRef,
170171
file_group: FileGroup,
171-
source: Arc<ParquetSource>,
172+
source: Arc<dyn FileSource>,
172173
) -> Arc<DataSourceExec> {
173174
let base_config = FileScanConfigBuilder::new(
174175
ObjectStoreUrl::local_filesystem(),

datafusion/core/src/test_util/parquet.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use crate::physical_plan::metrics::MetricsSet;
3737
use crate::physical_plan::ExecutionPlan;
3838
use crate::prelude::{Expr, SessionConfig, SessionContext};
3939

40+
use datafusion_datasource::file::FileSource;
4041
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
4142
use datafusion_datasource::source::DataSourceExec;
4243
use object_store::path::Path;
@@ -182,10 +183,11 @@ impl TestParquetFile {
182183
let physical_filter_expr =
183184
create_physical_expr(&filter, &df_schema, &ExecutionProps::default())?;
184185

185-
let source = Arc::new(ParquetSource::new(parquet_options).with_predicate(
186-
Arc::clone(&self.schema),
187-
Arc::clone(&physical_filter_expr),
188-
));
186+
let source = Arc::new(
187+
ParquetSource::new(parquet_options)
188+
.with_predicate(Arc::clone(&physical_filter_expr)),
189+
)
190+
.with_schema(Arc::clone(&self.schema));
189191
let config = scan_config_builder.with_source(source).build();
190192
let parquet_exec = DataSourceExec::from_data_source(config);
191193

datafusion/core/tests/fuzz_cases/pruning.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ async fn execute_with_predicate(
276276
ctx: &SessionContext,
277277
) -> Vec<String> {
278278
let parquet_source = if prune_stats {
279-
ParquetSource::default().with_predicate(Arc::clone(&schema), predicate.clone())
279+
ParquetSource::default().with_predicate(predicate.clone())
280280
} else {
281281
ParquetSource::default()
282282
};

datafusion/core/tests/parquet/external_access_plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ impl TestFull {
346346
let source = if let Some(predicate) = predicate {
347347
let df_schema = DFSchema::try_from(schema.clone())?;
348348
let predicate = ctx.create_physical_expr(predicate, &df_schema)?;
349-
Arc::new(ParquetSource::default().with_predicate(schema.clone(), predicate))
349+
Arc::new(ParquetSource::default().with_predicate(predicate))
350350
} else {
351351
Arc::new(ParquetSource::default())
352352
};

datafusion/core/tests/parquet/page_pruning.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> DataSourceExec
7777

7878
let source = Arc::new(
7979
ParquetSource::default()
80-
.with_predicate(Arc::clone(&schema), predicate)
80+
.with_predicate(predicate)
8181
.with_enable_page_index(true),
8282
);
8383
let base_config = FileScanConfigBuilder::new(object_store_url, schema, source)

datafusion/core/tests/physical_optimizer/push_down_filter.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl FileSource for TestSource {
9999
}
100100

101101
fn with_schema(&self, _schema: SchemaRef) -> Arc<dyn FileSource> {
102-
todo!("should not be called")
102+
Arc::new(self.clone()) as Arc<dyn FileSource>
103103
}
104104

105105
fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {

datafusion/datasource-parquet/src/file_format.rs

Lines changed: 2 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,7 @@ use datafusion_datasource::write::{
3131
get_writer_schema, ObjectWriterBuilder, SharedBuffer,
3232
};
3333

34-
use datafusion_datasource::file_format::{
35-
FileFormat, FileFormatFactory, FilePushdownSupport,
36-
};
34+
use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
3735
use datafusion_datasource::write::demux::DemuxedStreamReceiver;
3836

3937
use arrow::compute::sum;
@@ -54,15 +52,13 @@ use datafusion_datasource::sink::{DataSink, DataSinkExec};
5452
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
5553
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
5654
use datafusion_expr::dml::InsertOp;
57-
use datafusion_expr::Expr;
5855
use datafusion_functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
5956
use datafusion_physical_expr::PhysicalExpr;
6057
use datafusion_physical_expr_common::sort_expr::LexRequirement;
6158
use datafusion_physical_plan::Accumulator;
6259
use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
6360
use datafusion_session::Session;
6461

65-
use crate::can_expr_be_pushed_down_with_schemas;
6662
use crate::source::{parse_coerce_int96_string, ParquetSource};
6763
use async_trait::async_trait;
6864
use bytes::Bytes;
@@ -433,7 +429,7 @@ impl FileFormat for ParquetFormat {
433429
let mut source = ParquetSource::new(self.options.clone());
434430

435431
if let Some(predicate) = predicate {
436-
source = source.with_predicate(Arc::clone(&conf.file_schema), predicate);
432+
source = source.with_predicate(predicate);
437433
}
438434
if let Some(metadata_size_hint) = metadata_size_hint {
439435
source = source.with_metadata_size_hint(metadata_size_hint)
@@ -461,27 +457,6 @@ impl FileFormat for ParquetFormat {
461457
Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
462458
}
463459

464-
fn supports_filters_pushdown(
465-
&self,
466-
file_schema: &Schema,
467-
_table_schema: &Schema,
468-
filters: &[&Expr],
469-
) -> Result<FilePushdownSupport> {
470-
if !self.options().global.pushdown_filters {
471-
return Ok(FilePushdownSupport::NoSupport);
472-
}
473-
474-
let all_supported = filters
475-
.iter()
476-
.all(|filter| can_expr_be_pushed_down_with_schemas(filter, file_schema));
477-
478-
Ok(if all_supported {
479-
FilePushdownSupport::Supported
480-
} else {
481-
FilePushdownSupport::NotSupportedForFilter
482-
})
483-
}
484-
485460
fn file_source(&self) -> Arc<dyn FileSource> {
486461
Arc::new(ParquetSource::default())
487462
}

datafusion/datasource-parquet/src/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ pub use metrics::ParquetFileMetrics;
5959
pub use page_filter::PagePruningAccessPlanFilter;
6060
pub use reader::{DefaultParquetFileReaderFactory, ParquetFileReaderFactory};
6161
pub use row_filter::build_row_filter;
62-
pub use row_filter::can_expr_be_pushed_down_with_schemas;
6362
pub use row_group_filter::RowGroupAccessPlanFilter;
6463
use source::ParquetSource;
6564
pub use writer::plan_to_parquet;
@@ -223,8 +222,7 @@ impl ParquetExecBuilder {
223222
} = self;
224223
let mut parquet = ParquetSource::new(table_parquet_options);
225224
if let Some(predicate) = predicate.clone() {
226-
parquet = parquet
227-
.with_predicate(Arc::clone(&file_scan_config.file_schema), predicate);
225+
parquet = parquet.with_predicate(predicate);
228226
}
229227
if let Some(metadata_size_hint) = metadata_size_hint {
230228
parquet = parquet.with_metadata_size_hint(metadata_size_hint)
@@ -244,7 +242,7 @@ impl ParquetExecBuilder {
244242
inner: DataSourceExec::new(Arc::new(base_config.clone())),
245243
base_config,
246244
predicate,
247-
pruning_predicate: parquet.pruning_predicate,
245+
pruning_predicate: None, // for backwards compat since `ParquetExec` is only for backwards compat anyway
248246
schema_adapter_factory: parquet.schema_adapter_factory,
249247
parquet_file_reader_factory: parquet.parquet_file_reader_factory,
250248
table_parquet_options: parquet.table_parquet_options,

datafusion/datasource-parquet/src/opener.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ impl FileOpener for ParquetOpener {
178178

179179
// Build predicates for this specific file
180180
let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates(
181-
&predicate,
181+
predicate.as_ref(),
182182
&physical_file_schema,
183183
&predicate_creation_errors,
184184
);
@@ -390,8 +390,8 @@ pub(crate) fn build_page_pruning_predicate(
390390
))
391391
}
392392

393-
fn build_pruning_predicates(
394-
predicate: &Option<Arc<dyn PhysicalExpr>>,
393+
pub(crate) fn build_pruning_predicates(
394+
predicate: Option<&Arc<dyn PhysicalExpr>>,
395395
file_schema: &SchemaRef,
396396
predicate_creation_errors: &Count,
397397
) -> (

0 commit comments

Comments
 (0)