@@ -29,21 +29,18 @@ use datafusion_datasource::file_meta::FileMeta;
2929use datafusion_datasource:: file_stream:: { FileOpenFuture , FileOpener } ;
3030use datafusion_datasource:: schema_adapter:: SchemaAdapterFactory ;
3131
32- use arrow:: datatypes:: { FieldRef , Schema , SchemaRef , TimeUnit } ;
32+ use arrow:: datatypes:: { FieldRef , SchemaRef , TimeUnit } ;
3333use arrow:: error:: ArrowError ;
34- use datafusion_common:: pruning:: {
35- CompositePruningStatistics , PartitionPruningStatistics , PrunableStatistics ,
36- PruningStatistics ,
37- } ;
38- use datafusion_common:: { exec_err, Result } ;
34+ use datafusion_common:: { exec_err, DataFusionError , Result } ;
3935use datafusion_datasource:: PartitionedFile ;
4036use datafusion_physical_expr:: PhysicalExprSchemaRewriter ;
41- use datafusion_physical_expr_common:: physical_expr:: PhysicalExpr ;
42- use datafusion_physical_optimizer:: pruning:: PruningPredicate ;
37+ use datafusion_physical_expr_common:: physical_expr:: {
38+ is_dynamic_physical_expr, PhysicalExpr ,
39+ } ;
4340use datafusion_physical_plan:: metrics:: { Count , ExecutionPlanMetricsSet , MetricBuilder } ;
41+ use datafusion_pruning:: { build_pruning_predicate, FilePruner , PruningPredicate } ;
4442
4543use futures:: { StreamExt , TryStreamExt } ;
46- use itertools:: Itertools ;
4744use log:: debug;
4845use parquet:: arrow:: arrow_reader:: { ArrowReaderMetadata , ArrowReaderOptions } ;
4946use parquet:: arrow:: async_reader:: AsyncFileReader ;
@@ -134,66 +131,40 @@ impl FileOpener for ParquetOpener {
134131 let enable_page_index = self . enable_page_index ;
135132
136133 Ok ( Box :: pin ( async move {
137- // Prune this file using the file level statistics.
134+ // Prune this file using the file level statistics and partition values .
138135 // Since dynamic filters may have been updated since planning it is possible that we are able
139136 // to prune files now that we couldn't prune at planning time.
140- if let Some ( predicate) = & predicate {
141- // Build a pruning schema that combines the file fields and partition fields.
142- // Partition fileds are always at the end.
143- let pruning_schema = Arc :: new (
144- Schema :: new (
145- logical_file_schema
146- . fields ( )
147- . iter ( )
148- . cloned ( )
149- . chain ( partition_fields. iter ( ) . cloned ( ) )
150- . collect_vec ( ) ,
137+ // It is assumed that there is no point in doing pruning here if the predicate is not dynamic,
138+ // as it would have been done at planning time.
139+ // We'll also check this after every record batch we read,
140+ // and if at some point we are able to prove we can prune the file using just the file level statistics
141+ // we can end the stream early.
142+ let mut file_pruner = predicate
143+ . as_ref ( )
144+ . map ( |p| {
145+ Ok :: < _ , DataFusionError > (
146+ ( is_dynamic_physical_expr ( p) | file. has_statistics ( ) ) . then_some (
147+ FilePruner :: new (
148+ Arc :: clone ( p) ,
149+ & logical_file_schema,
150+ partition_fields. clone ( ) ,
151+ file. clone ( ) ,
152+ predicate_creation_errors. clone ( ) ,
153+ ) ?,
154+ ) ,
151155 )
152- . with_metadata ( logical_file_schema. metadata ( ) . clone ( ) ) ,
153- ) ;
154- let pruning_predicate = build_pruning_predicate (
155- Arc :: clone ( predicate) ,
156- & pruning_schema,
157- & predicate_creation_errors,
158- ) ;
159- if let Some ( pruning_predicate) = pruning_predicate {
160- // The partition column schema is the schema of the table - the schema of the file
161- let mut pruning = Box :: new ( PartitionPruningStatistics :: try_new (
162- vec ! [ file. partition_values. clone( ) ] ,
163- partition_fields. clone ( ) ,
164- ) ?)
165- as Box < dyn PruningStatistics > ;
166- if let Some ( stats) = file. statistics {
167- let stats_pruning = Box :: new ( PrunableStatistics :: new (
168- vec ! [ stats] ,
169- Arc :: clone ( & pruning_schema) ,
170- ) ) ;
171- pruning = Box :: new ( CompositePruningStatistics :: new ( vec ! [
172- pruning,
173- stats_pruning,
174- ] ) ) ;
175- }
176- match pruning_predicate. prune ( pruning. as_ref ( ) ) {
177- Ok ( values) => {
178- assert ! ( values. len( ) == 1 ) ;
179- // We expect a single container -> if all containers are false skip this file
180- if values. into_iter ( ) . all ( |v| !v) {
181- // Return an empty stream
182- file_metrics. files_pruned_statistics . add ( 1 ) ;
183- return Ok ( futures:: stream:: empty ( ) . boxed ( ) ) ;
184- }
185- }
186- // Stats filter array could not be built, so we can't prune
187- Err ( e) => {
188- debug ! (
189- "Ignoring error building pruning predicate for file '{}': {e}" ,
190- file_meta. location( ) ,
191- ) ;
192- predicate_creation_errors. add ( 1 ) ;
193- }
194- }
156+ } )
157+ . transpose ( ) ?
158+ . flatten ( ) ;
159+
160+ if let Some ( file_pruner) = & mut file_pruner {
161+ if file_pruner. should_prune ( ) ? {
162+ // Return an empty stream immediately to skip the work of setting up the actual stream
163+ file_metrics. files_ranges_pruned_statistics . add ( 1 ) ;
164+ return Ok ( futures:: stream:: empty ( ) . boxed ( ) ) ;
195165 }
196166 }
167+
197168 // Don't load the page index yet. Since it is not stored inline in
198169 // the footer, loading the page index if it is not needed will do
199170 // unecessary I/O. We decide later if it is needed to evaluate the
@@ -439,30 +410,6 @@ fn create_initial_plan(
439410 Ok ( ParquetAccessPlan :: new_all ( row_group_count) )
440411}
441412
442- /// Build a pruning predicate from an optional predicate expression.
443- /// If the predicate is None or the predicate cannot be converted to a pruning
444- /// predicate, return None.
445- /// If there is an error creating the pruning predicate it is recorded by incrementing
446- /// the `predicate_creation_errors` counter.
447- pub ( crate ) fn build_pruning_predicate (
448- predicate : Arc < dyn PhysicalExpr > ,
449- file_schema : & SchemaRef ,
450- predicate_creation_errors : & Count ,
451- ) -> Option < Arc < PruningPredicate > > {
452- match PruningPredicate :: try_new ( predicate, Arc :: clone ( file_schema) ) {
453- Ok ( pruning_predicate) => {
454- if !pruning_predicate. always_true ( ) {
455- return Some ( Arc :: new ( pruning_predicate) ) ;
456- }
457- }
458- Err ( e) => {
459- debug ! ( "Could not create pruning predicate for: {e}" ) ;
460- predicate_creation_errors. add ( 1 ) ;
461- }
462- }
463- None
464- }
465-
466413/// Build a page pruning predicate from an optional predicate expression.
467414/// If the predicate is None or the predicate cannot be converted to a page pruning
468415/// predicate, return None.
@@ -554,7 +501,9 @@ mod test {
554501 schema_adapter:: DefaultSchemaAdapterFactory , PartitionedFile ,
555502 } ;
556503 use datafusion_expr:: { col, lit} ;
557- use datafusion_physical_expr:: planner:: logical2physical;
504+ use datafusion_physical_expr:: {
505+ expressions:: DynamicFilterPhysicalExpr , planner:: logical2physical, PhysicalExpr ,
506+ } ;
558507 use datafusion_physical_plan:: metrics:: ExecutionPlanMetricsSet ;
559508 use futures:: { Stream , StreamExt } ;
560509 use object_store:: { memory:: InMemory , path:: Path , ObjectMeta , ObjectStore } ;
@@ -601,6 +550,13 @@ mod test {
601550 data_len
602551 }
603552
553+ fn make_dynamic_expr ( expr : Arc < dyn PhysicalExpr > ) -> Arc < dyn PhysicalExpr > {
554+ Arc :: new ( DynamicFilterPhysicalExpr :: new (
555+ expr. children ( ) . into_iter ( ) . map ( Arc :: clone) . collect ( ) ,
556+ expr,
557+ ) )
558+ }
559+
604560 #[ tokio:: test]
605561 async fn test_prune_on_statistics ( ) {
606562 let store = Arc :: new ( InMemory :: new ( ) ) as Arc < dyn ObjectStore > ;
@@ -691,7 +647,7 @@ mod test {
691647 }
692648
693649 #[ tokio:: test]
694- async fn test_prune_on_partition_statistics ( ) {
650+ async fn test_prune_on_partition_statistics_with_dynamic_expression ( ) {
695651 let store = Arc :: new ( InMemory :: new ( ) ) as Arc < dyn ObjectStore > ;
696652
697653 let batch = record_batch ! ( ( "a" , Int32 , vec![ Some ( 1 ) , Some ( 2 ) , Some ( 3 ) ] ) ) . unwrap ( ) ;
@@ -753,7 +709,9 @@ mod test {
753709
754710 // Filter should match the partition value
755711 let expr = col ( "part" ) . eq ( lit ( 1 ) ) ;
756- let predicate = logical2physical ( & expr, & table_schema) ;
712+ // Mark the expression as dynamic even if it's not to force partition pruning to happen
713+ // Otherwise we assume it already happened at the planning stage and won't re-do the work here
714+ let predicate = make_dynamic_expr ( logical2physical ( & expr, & table_schema) ) ;
757715 let opener = make_opener ( predicate) ;
758716 let stream = opener
759717 . open ( make_meta ( ) , file. clone ( ) )
@@ -766,7 +724,9 @@ mod test {
766724
767725 // Filter should not match the partition value
768726 let expr = col ( "part" ) . eq ( lit ( 2 ) ) ;
769- let predicate = logical2physical ( & expr, & table_schema) ;
727+ // Mark the expression as dynamic even if it's not to force partition pruning to happen
728+ // Otherwise we assume it already happened at the planning stage and won't re-do the work here
729+ let predicate = make_dynamic_expr ( logical2physical ( & expr, & table_schema) ) ;
770730 let opener = make_opener ( predicate) ;
771731 let stream = opener. open ( make_meta ( ) , file) . unwrap ( ) . await . unwrap ( ) ;
772732 let ( num_batches, num_rows) = count_batches_and_rows ( stream) . await ;
@@ -1005,4 +965,92 @@ mod test {
1005965 assert_eq ! ( num_batches, 0 ) ;
1006966 assert_eq ! ( num_rows, 0 ) ;
1007967 }
968+
969+ /// Test that if the filter is not a dynamic filter and we have no stats we don't do extra pruning work at the file level.
970+ #[ tokio:: test]
971+ async fn test_opener_pruning_skipped_on_static_filters ( ) {
972+ let store = Arc :: new ( InMemory :: new ( ) ) as Arc < dyn ObjectStore > ;
973+
974+ let batch = record_batch ! ( ( "a" , Int32 , vec![ Some ( 1 ) , Some ( 2 ) , Some ( 3 ) ] ) ) . unwrap ( ) ;
975+ let data_size =
976+ write_parquet ( Arc :: clone ( & store) , "part=1/file.parquet" , batch. clone ( ) ) . await ;
977+
978+ let file_schema = batch. schema ( ) ;
979+ let mut file = PartitionedFile :: new (
980+ "part=1/file.parquet" . to_string ( ) ,
981+ u64:: try_from ( data_size) . unwrap ( ) ,
982+ ) ;
983+ file. partition_values = vec ! [ ScalarValue :: Int32 ( Some ( 1 ) ) ] ;
984+
985+ let table_schema = Arc :: new ( Schema :: new ( vec ! [
986+ Field :: new( "part" , DataType :: Int32 , false ) ,
987+ Field :: new( "a" , DataType :: Int32 , false ) ,
988+ ] ) ) ;
989+
990+ let make_opener = |predicate| {
991+ ParquetOpener {
992+ partition_index : 0 ,
993+ projection : Arc :: new ( [ 0 ] ) ,
994+ batch_size : 1024 ,
995+ limit : None ,
996+ predicate : Some ( predicate) ,
997+ logical_file_schema : file_schema. clone ( ) ,
998+ metadata_size_hint : None ,
999+ metrics : ExecutionPlanMetricsSet :: new ( ) ,
1000+ parquet_file_reader_factory : Arc :: new (
1001+ DefaultParquetFileReaderFactory :: new ( Arc :: clone ( & store) ) ,
1002+ ) ,
1003+ partition_fields : vec ! [ Arc :: new( Field :: new(
1004+ "part" ,
1005+ DataType :: Int32 ,
1006+ false ,
1007+ ) ) ] ,
1008+ pushdown_filters : false , // note that this is false!
1009+ reorder_filters : false ,
1010+ enable_page_index : false ,
1011+ enable_bloom_filter : false ,
1012+ schema_adapter_factory : Arc :: new ( DefaultSchemaAdapterFactory ) ,
1013+ enable_row_group_stats_pruning : true ,
1014+ coerce_int96 : None ,
1015+ }
1016+ } ;
1017+
1018+ let make_meta = || FileMeta {
1019+ object_meta : ObjectMeta {
1020+ location : Path :: from ( "part=1/file.parquet" ) ,
1021+ last_modified : Utc :: now ( ) ,
1022+ size : u64:: try_from ( data_size) . unwrap ( ) ,
1023+ e_tag : None ,
1024+ version : None ,
1025+ } ,
1026+ range : None ,
1027+ extensions : None ,
1028+ metadata_size_hint : None ,
1029+ } ;
1030+
1031+ // Filter should NOT match the stats but the file is never attempted to be pruned because the filters are not dynamic
1032+ let expr = col ( "part" ) . eq ( lit ( 2 ) ) ;
1033+ let predicate = logical2physical ( & expr, & table_schema) ;
1034+ let opener = make_opener ( predicate) ;
1035+ let stream = opener
1036+ . open ( make_meta ( ) , file. clone ( ) )
1037+ . unwrap ( )
1038+ . await
1039+ . unwrap ( ) ;
1040+ let ( num_batches, num_rows) = count_batches_and_rows ( stream) . await ;
1041+ assert_eq ! ( num_batches, 1 ) ;
1042+ assert_eq ! ( num_rows, 3 ) ;
1043+
1044+ // If we make the filter dynamic, it should prune
1045+ let predicate = make_dynamic_expr ( logical2physical ( & expr, & table_schema) ) ;
1046+ let opener = make_opener ( predicate) ;
1047+ let stream = opener
1048+ . open ( make_meta ( ) , file. clone ( ) )
1049+ . unwrap ( )
1050+ . await
1051+ . unwrap ( ) ;
1052+ let ( num_batches, num_rows) = count_batches_and_rows ( stream) . await ;
1053+ assert_eq ! ( num_batches, 0 ) ;
1054+ assert_eq ! ( num_rows, 0 ) ;
1055+ }
10081056}
0 commit comments