@@ -100,6 +100,7 @@ mod tests {
100100 predicate : Option < Expr > ,
101101 pushdown_predicate : bool ,
102102 page_index_predicate : bool ,
103+ bloom_filters : bool ,
103104 }
104105
105106 impl RoundTrip {
@@ -132,6 +133,11 @@ mod tests {
132133 self
133134 }
134135
136+ fn with_bloom_filters ( mut self ) -> Self {
137+ self . bloom_filters = true ;
138+ self
139+ }
140+
135141 /// run the test, returning only the resulting RecordBatches
136142 async fn round_trip_to_batches (
137143 self ,
@@ -156,10 +162,20 @@ mod tests {
156162 source = source
157163 . with_pushdown_filters ( true )
158164 . with_reorder_filters ( true ) ;
165+ } else {
166+ source = source. with_pushdown_filters ( false ) ;
159167 }
160168
161169 if self . page_index_predicate {
162170 source = source. with_enable_page_index ( true ) ;
171+ } else {
172+ source = source. with_enable_page_index ( false ) ;
173+ }
174+
175+ if self . bloom_filters {
176+ source = source. with_bloom_filter_on_read ( true ) ;
177+ } else {
178+ source = source. with_bloom_filter_on_read ( false ) ;
163179 }
164180
165181 source. with_schema ( Arc :: clone ( & file_schema) )
@@ -859,23 +875,30 @@ mod tests {
859875
860876 // Predicate should prune all row groups
861877 let filter = col ( "c1" ) . eq ( lit ( ScalarValue :: Utf8 ( Some ( "aaa" . to_string ( ) ) ) ) ) ;
862- let read = RoundTrip :: new ( )
878+ let rt = RoundTrip :: new ( )
863879 . with_predicate ( filter)
864880 . with_schema ( schema. clone ( ) )
865- . round_trip_to_batches ( vec ! [ batch. clone( ) ] )
866- . await
867- . unwrap ( ) ;
868- assert_eq ! ( read. len( ) , 0 ) ;
881+ . round_trip ( vec ! [ batch. clone( ) ] )
882+ . await ;
883+ // There should be no predicate evaluation errors
884+ let metrics = rt. parquet_exec . metrics ( ) . unwrap ( ) ;
885+ assert_eq ! ( get_value( & metrics, "predicate_evaluation_errors" ) , 0 ) ;
886+ assert_eq ! ( get_value( & metrics, "pushdown_rows_matched" ) , 0 ) ;
887+ assert_eq ! ( rt. batches. unwrap( ) . len( ) , 0 ) ;
869888
870889 // Predicate should prune no row groups
871890 let filter = col ( "c1" ) . eq ( lit ( ScalarValue :: Utf8 ( Some ( "foo" . to_string ( ) ) ) ) ) ;
872- let read = RoundTrip :: new ( )
891+ let rt = RoundTrip :: new ( )
873892 . with_predicate ( filter)
874893 . with_schema ( schema)
875- . round_trip_to_batches ( vec ! [ batch] )
876- . await
877- . unwrap ( ) ;
878- assert_eq ! ( read. len( ) , 1 ) ;
894+ . round_trip ( vec ! [ batch] )
895+ . await ;
896+ // There should be no predicate evaluation errors
897+ let metrics = rt. parquet_exec . metrics ( ) . unwrap ( ) ;
898+ assert_eq ! ( get_value( & metrics, "predicate_evaluation_errors" ) , 0 ) ;
899+ assert_eq ! ( get_value( & metrics, "pushdown_rows_matched" ) , 0 ) ;
900+ let read = rt. batches . unwrap ( ) . iter ( ) . map ( |b| b. num_rows ( ) ) . sum :: < usize > ( ) ;
901+ assert_eq ! ( read, 2 , "Expected 2 rows to match the predicate" ) ;
879902 }
880903
881904 #[ tokio:: test]
@@ -889,25 +912,28 @@ mod tests {
889912
890913 // Predicate should prune all row groups
891914 let filter = col ( "c1" ) . eq ( lit ( ScalarValue :: UInt64 ( Some ( 5 ) ) ) ) ;
892- let read = RoundTrip :: new ( )
915+ let rt = RoundTrip :: new ( )
893916 . with_predicate ( filter)
894917 . with_schema ( schema. clone ( ) )
895- . round_trip_to_batches ( vec ! [ batch. clone( ) ] )
896- . await
897- . unwrap ( ) ;
898- assert_eq ! ( read. len( ) , 0 ) ;
918+ . round_trip ( vec ! [ batch. clone( ) ] )
919+ . await ;
920+ // There should be no predicate evaluation errors
921+ let metrics = rt. parquet_exec . metrics ( ) . unwrap ( ) ;
922+ assert_eq ! ( get_value( & metrics, "predicate_evaluation_errors" ) , 0 ) ;
923+ assert_eq ! ( rt. batches. unwrap( ) . len( ) , 0 ) ;
899924
900- // TODO: this is failing on main, and has been for a long time!
901- // See https://github.com/apache/datafusion/pull/16086#discussion_r2094817127
902- // // Predicate should prune no row groups
903- // let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1))));
904- // let read = RoundTrip::new()
905- // .with_predicate(filter)
906- // .with_schema(schema)
907- // .round_trip_to_batches(vec![batch])
908- // .await
909- // .unwrap();
910- // assert_eq!(read.len(), 1);
925+ // Predicate should prune no row groups
926+ let filter = col ( "c1" ) . eq ( lit ( ScalarValue :: UInt64 ( Some ( 1 ) ) ) ) ;
927+ let rt = RoundTrip :: new ( )
928+ . with_predicate ( filter)
929+ . with_schema ( schema)
930+ . round_trip ( vec ! [ batch] )
931+ . await ;
932+ // There should be no predicate evaluation errors
933+ let metrics = rt. parquet_exec . metrics ( ) . unwrap ( ) ;
934+ assert_eq ! ( get_value( & metrics, "predicate_evaluation_errors" ) , 0 ) ;
935+ let read = rt. batches . unwrap ( ) . iter ( ) . map ( |b| b. num_rows ( ) ) . sum :: < usize > ( ) ;
936+ assert_eq ! ( read, 2 , "Expected 2 rows to match the predicate" ) ;
911937 }
912938
913939 #[ tokio:: test]
@@ -1692,6 +1718,7 @@ mod tests {
16921718 let rt = RoundTrip :: new ( )
16931719 . with_predicate ( filter. clone ( ) )
16941720 . with_pushdown_predicate ( )
1721+ . with_bloom_filters ( )
16951722 . round_trip ( vec ! [ batch1] )
16961723 . await ;
16971724
0 commit comments