diff --git a/datafusion/datasource-parquet/src/file_format.rs b/datafusion/datasource-parquet/src/file_format.rs index a0ca46422786..e1d393caa8f3 100644 --- a/datafusion/datasource-parquet/src/file_format.rs +++ b/datafusion/datasource-parquet/src/file_format.rs @@ -464,16 +464,16 @@ impl FileFormat for ParquetFormat { fn supports_filters_pushdown( &self, file_schema: &Schema, - table_schema: &Schema, + _table_schema: &Schema, filters: &[&Expr], ) -> Result { if !self.options().global.pushdown_filters { return Ok(FilePushdownSupport::NoSupport); } - let all_supported = filters.iter().all(|filter| { - can_expr_be_pushed_down_with_schemas(filter, file_schema, table_schema) - }); + let all_supported = filters + .iter() + .all(|filter| can_expr_be_pushed_down_with_schemas(filter, file_schema)); Ok(if all_supported { FilePushdownSupport::Supported diff --git a/datafusion/datasource-parquet/src/row_filter.rs b/datafusion/datasource-parquet/src/row_filter.rs index 2d2993c29a6f..d7bbe30c8943 100644 --- a/datafusion/datasource-parquet/src/row_filter.rs +++ b/datafusion/datasource-parquet/src/row_filter.rs @@ -299,6 +299,7 @@ struct PushdownChecker<'schema> { non_primitive_columns: bool, /// Does the expression reference any columns that are in the table /// schema but not in the file schema? + /// This includes partition columns and projected columns. projected_columns: bool, // Indices into the table schema of the columns required to evaluate the expression required_columns: BTreeSet, @@ -387,13 +388,12 @@ fn would_column_prevent_pushdown(column_name: &str, table_schema: &Schema) -> bo /// Otherwise, true. pub fn can_expr_be_pushed_down_with_schemas( expr: &datafusion_expr::Expr, - _file_schema: &Schema, - table_schema: &Schema, + file_schema: &Schema, ) -> bool { let mut can_be_pushed = true; expr.apply(|expr| match expr { datafusion_expr::Expr::Column(column) => { - can_be_pushed &= !would_column_prevent_pushdown(column.name(), table_schema); + can_be_pushed &= !would_column_prevent_pushdown(column.name(), file_schema); Ok(if can_be_pushed { TreeNodeRecursion::Jump } else { @@ -649,8 +649,6 @@ mod test { #[test] fn nested_data_structures_prevent_pushdown() { - let table_schema = get_basic_table_schema(); - let file_schema = Schema::new(vec![Field::new( "list_col", DataType::Struct(Fields::empty()), @@ -659,49 +657,31 @@ mod test { let expr = col("list_col").is_not_null(); - assert!(!can_expr_be_pushed_down_with_schemas( - &expr, - &file_schema, - &table_schema - )); + assert!(!can_expr_be_pushed_down_with_schemas(&expr, &file_schema,)); } #[test] - fn projected_columns_prevent_pushdown() { - let table_schema = get_basic_table_schema(); - + fn projected_or_partition_columns_prevent_pushdown() { let file_schema = Schema::new(vec![Field::new("existing_col", DataType::Int64, true)]); let expr = col("nonexistent_column").is_null(); - assert!(!can_expr_be_pushed_down_with_schemas( - &expr, - &file_schema, - &table_schema - )); + assert!(!can_expr_be_pushed_down_with_schemas(&expr, &file_schema,)); } #[test] fn basic_expr_doesnt_prevent_pushdown() { - let table_schema = get_basic_table_schema(); - let file_schema = Schema::new(vec![Field::new("string_col", DataType::Utf8, true)]); let expr = col("string_col").is_null(); - assert!(can_expr_be_pushed_down_with_schemas( - &expr, - &file_schema, - &table_schema - )); + assert!(can_expr_be_pushed_down_with_schemas(&expr, &file_schema,)); } #[test] fn complex_expr_doesnt_prevent_pushdown() { - let table_schema = get_basic_table_schema(); - let file_schema = Schema::new(vec![ Field::new("string_col", DataType::Utf8, true), Field::new("bigint_col", DataType::Int64, true), @@ -711,23 +691,6 @@ mod test { .is_not_null() .or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5))))); - assert!(can_expr_be_pushed_down_with_schemas( - &expr, - &file_schema, - &table_schema - )); - } - - fn get_basic_table_schema() -> Schema { - let testdata = datafusion_common::test_util::parquet_test_data(); - let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet")) - .expect("opening file"); - - let reader = SerializedFileReader::new(file).expect("creating reader"); - - let metadata = reader.metadata(); - - parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None) - .expect("parsing schema") + assert!(can_expr_be_pushed_down_with_schemas(&expr, &file_schema,)); } } diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 758113b70835..a32db2ff0524 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -156,3 +156,70 @@ DROP TABLE t; statement ok DROP TABLE t_pushdown; + +## Test filter pushdown with a predicate that references both a partition column and a file column +statement ok +set datafusion.execution.parquet.pushdown_filters = true; + +## Create table +statement ok +CREATE EXTERNAL TABLE t_pushdown(part text, val text) +STORED AS PARQUET +PARTITIONED BY (part) +LOCATION 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/'; + +statement ok +COPY ( + SELECT arrow_cast('a', 'Utf8') AS val +) TO 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet' +STORED AS PARQUET; + +statement ok +COPY ( + SELECT arrow_cast('b', 'Utf8') AS val +) TO 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet' +STORED AS PARQUET; + +statement ok +COPY ( + SELECT arrow_cast('xyz', 'Utf8') AS val +) TO 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet' +STORED AS PARQUET; + +query TT +select * from t_pushdown where part == val +---- +a a +b b + +query TT +select * from t_pushdown where part != val +---- +xyz c + +# If we reference both a file and partition column the predicate cannot be pushed down +query TT +EXPLAIN select * from t_pushdown where part != val +---- +logical_plan +01)Filter: t_pushdown.val != t_pushdown.part +02)--TableScan: t_pushdown projection=[val, part], partial_filters=[t_pushdown.val != t_pushdown.part] +physical_plan +01)CoalesceBatchesExec: target_batch_size=8192 +02)--FilterExec: val@0 != part@1 +03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3 +04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != part@1 + +# If we reference only a partition column it gets evaluted during the listing phase +query TT +EXPLAIN select * from t_pushdown where part != 'a'; +---- +logical_plan TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.part != Utf8("a")] +physical_plan DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet + +# And if we reference only a file column it gets pushed down +query TT +EXPLAIN select * from t_pushdown where val != 'c'; +---- +logical_plan TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.val != Utf8("c")] +physical_plan DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != c, pruning_predicate=val_null_count@2 != row_count@3 AND (val_min@0 != c OR c != val_max@1), required_guarantees=[val not in (c)]