Skip to content

fix query results for predicates referencing partition columns and data columns #15935

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,16 +464,16 @@ impl FileFormat for ParquetFormat {
fn supports_filters_pushdown(
&self,
file_schema: &Schema,
table_schema: &Schema,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused, as we talked like
"keep supports_filters_pushdown so that TableProviders can do Exact pruning of filters, e.g. using partition columns.",
and that is making sense to me now. So, instead, should we pass only table_schema's to these supports_filters_pushdown() API's at ListingTable level? Theory and practice conflict in my mind now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead, should we pass only table_schema's to these supports_filters_pushdown() API's at ListingTable level

We could! But we'd do that in #15769 right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't change any API?
I mean can we remove file_schema: &Schema or table_schema: &Schemafrom these supports_filters_pushdown() API's? If we can, which one should we remove? In this PR, the fix shows that we utilize file_schema: &Schema, however, what I understand from that conversation is we should depend on table_schema: &Schema, not file_schema: &Schema. That's what confuses me

Copy link
Contributor Author

@adriangb adriangb May 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well keep only the table_schema. This fix (this PR) becomes irrelevant once we merge the other one. But since that may take longer I thought it was best to make this PR as a simpler fix and not mix together a bug fix and larger change. Helps others who have forks copy the change, etc.

_table_schema: &Schema,
filters: &[&Expr],
) -> Result<FilePushdownSupport> {
if !self.options().global.pushdown_filters {
return Ok(FilePushdownSupport::NoSupport);
}

let all_supported = filters.iter().all(|filter| {
can_expr_be_pushed_down_with_schemas(filter, file_schema, table_schema)
});
let all_supported = filters
.iter()
.all(|filter| can_expr_be_pushed_down_with_schemas(filter, file_schema));

Ok(if all_supported {
FilePushdownSupport::Supported
Expand Down
53 changes: 8 additions & 45 deletions datafusion/datasource-parquet/src/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ struct PushdownChecker<'schema> {
non_primitive_columns: bool,
/// Does the expression reference any columns that are in the table
/// schema but not in the file schema?
/// This includes partition columns and projected columns.
projected_columns: bool,
// Indices into the table schema of the columns required to evaluate the expression
required_columns: BTreeSet<usize>,
Expand Down Expand Up @@ -387,13 +388,12 @@ fn would_column_prevent_pushdown(column_name: &str, table_schema: &Schema) -> bo
/// Otherwise, true.
pub fn can_expr_be_pushed_down_with_schemas(
expr: &datafusion_expr::Expr,
_file_schema: &Schema,
table_schema: &Schema,
file_schema: &Schema,
) -> bool {
let mut can_be_pushed = true;
expr.apply(|expr| match expr {
datafusion_expr::Expr::Column(column) => {
can_be_pushed &= !would_column_prevent_pushdown(column.name(), table_schema);
can_be_pushed &= !would_column_prevent_pushdown(column.name(), file_schema);
Ok(if can_be_pushed {
TreeNodeRecursion::Jump
} else {
Expand Down Expand Up @@ -649,8 +649,6 @@ mod test {

#[test]
fn nested_data_structures_prevent_pushdown() {
let table_schema = get_basic_table_schema();

let file_schema = Schema::new(vec![Field::new(
"list_col",
DataType::Struct(Fields::empty()),
Expand All @@ -659,49 +657,31 @@ mod test {

let expr = col("list_col").is_not_null();

assert!(!can_expr_be_pushed_down_with_schemas(
&expr,
&file_schema,
&table_schema
));
assert!(!can_expr_be_pushed_down_with_schemas(&expr, &file_schema,));
}

#[test]
fn projected_columns_prevent_pushdown() {
let table_schema = get_basic_table_schema();

fn projected_or_partition_columns_prevent_pushdown() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test actually tests for this... except that since both schemas were being passed in it hit the bug as well.

let file_schema =
Schema::new(vec![Field::new("existing_col", DataType::Int64, true)]);

let expr = col("nonexistent_column").is_null();

assert!(!can_expr_be_pushed_down_with_schemas(
&expr,
&file_schema,
&table_schema
));
assert!(!can_expr_be_pushed_down_with_schemas(&expr, &file_schema,));
}

#[test]
fn basic_expr_doesnt_prevent_pushdown() {
let table_schema = get_basic_table_schema();

let file_schema =
Schema::new(vec![Field::new("string_col", DataType::Utf8, true)]);

let expr = col("string_col").is_null();

assert!(can_expr_be_pushed_down_with_schemas(
&expr,
&file_schema,
&table_schema
));
assert!(can_expr_be_pushed_down_with_schemas(&expr, &file_schema,));
}

#[test]
fn complex_expr_doesnt_prevent_pushdown() {
let table_schema = get_basic_table_schema();

let file_schema = Schema::new(vec![
Field::new("string_col", DataType::Utf8, true),
Field::new("bigint_col", DataType::Int64, true),
Expand All @@ -711,23 +691,6 @@ mod test {
.is_not_null()
.or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5)))));

assert!(can_expr_be_pushed_down_with_schemas(
&expr,
&file_schema,
&table_schema
));
}

fn get_basic_table_schema() -> Schema {
let testdata = datafusion_common::test_util::parquet_test_data();
let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet"))
.expect("opening file");

let reader = SerializedFileReader::new(file).expect("creating reader");

let metadata = reader.metadata();

parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
.expect("parsing schema")
assert!(can_expr_be_pushed_down_with_schemas(&expr, &file_schema,));
}
}
67 changes: 67 additions & 0 deletions datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,70 @@ DROP TABLE t;

statement ok
DROP TABLE t_pushdown;

## Test filter pushdown with a predicate that references both a partition column and a file column
statement ok
set datafusion.execution.parquet.pushdown_filters = true;

## Create table
statement ok
CREATE EXTERNAL TABLE t_pushdown(part text, val text)
STORED AS PARQUET
PARTITIONED BY (part)
LOCATION 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/';

statement ok
COPY (
SELECT arrow_cast('a', 'Utf8') AS val
) TO 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet'
STORED AS PARQUET;

statement ok
COPY (
SELECT arrow_cast('b', 'Utf8') AS val
) TO 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet'
STORED AS PARQUET;

statement ok
COPY (
SELECT arrow_cast('xyz', 'Utf8') AS val
) TO 'test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet'
STORED AS PARQUET;

query TT
select * from t_pushdown where part == val
----
a a
b b

query TT
select * from t_pushdown where part != val
----
xyz c

# If we reference both a file and partition column the predicate cannot be pushed down
query TT
EXPLAIN select * from t_pushdown where part != val
----
logical_plan
01)Filter: t_pushdown.val != t_pushdown.part
02)--TableScan: t_pushdown projection=[val, part], partial_filters=[t_pushdown.val != t_pushdown.part]
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--FilterExec: val@0 != part@1
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=3
04)------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != part@1

# If we reference only a partition column it gets evaluted during the listing phase
query TT
EXPLAIN select * from t_pushdown where part != 'a';
----
logical_plan TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.part != Utf8("a")]
physical_plan DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet

# And if we reference only a file column it gets pushed down
query TT
EXPLAIN select * from t_pushdown where val != 'c';
----
logical_plan TableScan: t_pushdown projection=[val, part], full_filters=[t_pushdown.val != Utf8("c")]
physical_plan DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=a/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=b/file.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_part_test/part=c/file.parquet]]}, projection=[val, part], file_type=parquet, predicate=val@0 != c, pruning_predicate=val_null_count@2 != row_count@3 AND (val_min@0 != c OR c != val_max@1), required_guarantees=[val not in (c)]