Skip to content
Open
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
c4ba7b7
Update DML filter extraction and delete tests
kosiew Jan 19, 2026
c2fd0f8
docs: clarify filter extraction includes TableScan and deduplication
kosiew Jan 19, 2026
7ad302d
test: add compound filter deduplication test with pushdown
kosiew Jan 19, 2026
0b71180
refactor: use functional style for filter deduplication
kosiew Jan 19, 2026
b3240bf
refactor: enhance filter extraction logic for various logical plans w…
kosiew Jan 20, 2026
5fcb927
feat: add UPDATE tests , filter pushdown support to CaptureUpdateProv…
kosiew Jan 20, 2026
98be1b5
test: add test for DELETE with mixed filter locations in CaptureDelet…
kosiew Jan 20, 2026
b66edf4
feat: enhance DELETE filter extraction to scope to target table only
kosiew Jan 20, 2026
cfc945d
feat: add validation and stripping of table qualifiers in DML filters
kosiew Jan 20, 2026
599f0b1
fix: correct string interpolation in filter validation assertion
kosiew Jan 20, 2026
2ad307c
Refine DML filter extraction logic
kosiew Jan 20, 2026
f1663f1
Enhance filter handling for UPDATE ... FROM queries
kosiew Jan 20, 2026
234785e
refactor: improve comments and streamline table registration in DML t…
kosiew Jan 20, 2026
96671c7
refactor: - [ ] Replace deduplication's `.collect() → .into_iter()` w…
kosiew Jan 20, 2026
1aeb486
refactor: enhance DML filter extraction logic with detailed documenta…
kosiew Jan 20, 2026
6d7749b
Optimize `predicate_is_on_target` to short-circuit on first mismatch
kosiew Jan 20, 2026
7aca792
Add error context when qualifier stripping fails
kosiew Jan 20, 2026
290abd7
clippy fix
kosiew Jan 20, 2026
bc5bb4c
Add per-filter pushdown coverage
kosiew Jan 20, 2026
84f77cf
cargo fmt
kosiew Jan 20, 2026
cbd3b2e
clippy fix Refactor filter pushdown logic in CaptureDeleteProvider an…
kosiew Jan 20, 2026
34f802c
Use t2-only column in test_update_from_drops_non_target_predicates
kosiew Jan 22, 2026
7ad749e
Enhance DML filter extraction to enforce fail-closed behavior on non-…
kosiew Jan 22, 2026
de794b6
Revert "Enhance DML filter extraction to enforce fail-closed behavior…
kosiew Jan 22, 2026
b34c612
Enhance DML filter extraction to support aliases for target table ref…
kosiew Jan 22, 2026
3ef5060
merge main
kosiew Jan 22, 2026
1f0b095
clippy fix
kosiew Jan 22, 2026
20a454c
Add tests for updates from another table with and without aliases
kosiew Jan 22, 2026
3e0898f
Update tests to reflect unsupported UPDATE ... FROM syntax for TableP…
kosiew Jan 23, 2026
4afb705
Implement error handling for unsupported UPDATE ... FROM syntax
kosiew Jan 23, 2026
e0e198a
Remove unused function expr_has_table_reference from dml_planning.rs
kosiew Jan 23, 2026
1bd888b
Update comments to clarify the unsupported status of UPDATE ... FROM …
kosiew Jan 23, 2026
354cff3
Remove unused import of TableReference in dml_planning.rs
kosiew Jan 23, 2026
4698c5f
Add TODO comments for fixing unsupported UPDATE ... FROM syntax
kosiew Jan 23, 2026
64be88c
clippy fix
kosiew Jan 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 115 additions & 14 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Planner for [`LogicalPlan`] to [`ExecutionPlan`]

use std::borrow::Cow;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use crate::datasource::file_format::file_type_to_format;
Expand Down Expand Up @@ -84,7 +84,7 @@ use datafusion_expr::expr::{
};
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::utils::split_conjunction;
use datafusion_expr::utils::{expr_to_columns, split_conjunction};
use datafusion_expr::{
Analyze, BinaryExpr, DescribeTable, DmlStatement, Explain, ExplainFormat, Extension,
FetchType, Filter, JoinType, Operator, RecursiveQuery, SkipType, StringifiedPlan,
Expand Down Expand Up @@ -613,7 +613,7 @@ impl DefaultPhysicalPlanner {
if let Some(provider) =
target.as_any().downcast_ref::<DefaultTableSource>()
{
let filters = extract_dml_filters(input)?;
let filters = extract_dml_filters(input, table_name)?;
provider
.table_provider
.delete_from(session_state, filters)
Expand All @@ -639,7 +639,7 @@ impl DefaultPhysicalPlanner {
{
// For UPDATE, the assignments are encoded in the projection of input
// We pass the filters and let the provider handle the projection
let filters = extract_dml_filters(input)?;
let filters = extract_dml_filters(input, table_name)?;
// Extract assignments from the projection in input plan
let assignments = extract_update_assignments(input)?;
provider
Expand Down Expand Up @@ -1907,24 +1907,125 @@ fn get_physical_expr_pair(
}

/// Extract filter predicates from a DML input plan (DELETE/UPDATE).
/// Walks the logical plan tree and collects Filter predicates,
/// splitting AND conjunctions into individual expressions.
/// Column qualifiers are stripped so expressions can be evaluated against
/// the TableProvider's schema.
///
fn extract_dml_filters(input: &Arc<LogicalPlan>) -> Result<Vec<Expr>> {
/// Walks the logical plan tree and collects Filter predicates and any filters
/// pushed down into TableScan nodes, splitting AND conjunctions into individual expressions.
///
/// For UPDATE...FROM queries involving multiple tables, this function only extracts predicates
/// that reference the target table. Filters from source table scans are excluded to prevent
/// incorrect filter semantics.
///
/// Column qualifiers are stripped so expressions can be evaluated against the TableProvider's
/// schema. Deduplication is performed because filters may appear in both Filter nodes and
/// TableScan.filters when the optimizer performs partial (Inexact) filter pushdown.
///
/// # Parameters
/// - `input`: The logical plan tree to extract filters from (typically a DELETE or UPDATE plan)
/// - `target`: The target table reference to scope filter extraction (prevents multi-table filter leakage)
///
/// # Returns
/// A vector of unqualified filter expressions that can be passed to the TableProvider for execution.
/// Returns an empty vector if no applicable filters are found.
///
fn extract_dml_filters(
input: &Arc<LogicalPlan>,
target: &TableReference,
) -> Result<Vec<Expr>> {
let mut filters = Vec::new();

input.apply(|node| {
if let LogicalPlan::Filter(filter) = node {
// Split AND predicates into individual expressions
filters.extend(split_conjunction(&filter.predicate).into_iter().cloned());
match node {
LogicalPlan::Filter(filter) => {
// Split AND predicates into individual expressions
for predicate in split_conjunction(&filter.predicate) {
if predicate_is_on_target(predicate, target)? {
filters.push(predicate.clone());
}
}
}
LogicalPlan::TableScan(TableScan {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This collects TableScan.filters from any scan in the subtree. Works for single-table DELETE/UPDATE, but unsafe for UPDATE … FROM (extra scans). Should we restrict extraction to the DML target scan (match table_name or provider identity) and fail-closed if multiple candidate scans exist?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - b66edf4

table_name,
filters: scan_filters,
..
}) => {
// Only extract filters from the target table scan.
// This prevents incorrect filter extraction in UPDATE...FROM scenarios
// where multiple table scans may have filters.
if table_name.resolved_eq(target) {
for filter in scan_filters {
filters.extend(split_conjunction(filter).into_iter().cloned());
}
}
}
// Plans without filter information
LogicalPlan::EmptyRelation(_)
| LogicalPlan::Values(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Explain(_)
| LogicalPlan::Analyze(_)
| LogicalPlan::Distinct(_)
| LogicalPlan::Extension(_)
| LogicalPlan::Statement(_)
| LogicalPlan::Dml(_)
| LogicalPlan::Ddl(_)
| LogicalPlan::Copy(_)
| LogicalPlan::Unnest(_)
| LogicalPlan::RecursiveQuery(_) => {
// No filters to extract from leaf/meta plans
}
// Plans with inputs (may contain filters in children)
LogicalPlan::Projection(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Limit(_)
| LogicalPlan::Sort(_)
| LogicalPlan::Union(_)
| LogicalPlan::Join(_)
| LogicalPlan::Repartition(_)
| LogicalPlan::Aggregate(_)
| LogicalPlan::Window(_)
| LogicalPlan::Subquery(_) => {
// Filter information may appear in child nodes; continue traversal
// to extract filters from Filter/TableScan nodes deeper in the plan
}
}
Ok(TreeNodeRecursion::Continue)
})?;

// Strip table qualifiers from column references
filters.into_iter().map(strip_column_qualifiers).collect()
// Strip qualifiers and deduplicate. This ensures:
// 1. Only target-table predicates are retained from Filter nodes
// 2. Qualifiers stripped for TableProvider compatibility
// 3. Duplicates removed (from Filter nodes + TableScan.filters)
//
// Deduplication is necessary because filters may appear in both Filter nodes
// and TableScan.filters when the optimizer performs partial (Inexact) pushdown.
let mut seen_filters = HashSet::new();
filters
.into_iter()
.try_fold(Vec::new(), |mut deduped, filter| {
let unqualified = strip_column_qualifiers(filter).map_err(|e| {
e.context(format!(
"Failed to strip column qualifiers for DML filter on table '{target}'"
))
})?;
if seen_filters.insert(unqualified.clone()) {
deduped.push(unqualified);
}
Ok(deduped)
})
}

/// Determine whether a predicate references only columns from the target table.
fn predicate_is_on_target(expr: &Expr, target: &TableReference) -> Result<bool> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two concerns here:

  • alias-qualified columns (x.id from UPDATE t AS x) may not match target if it's the base ref -- see update.slt:91 for the plan shape
  • Dropping join predicates silently can broaden scope. Consider failing closed if any conjunct is rejected.

Copy link
Contributor Author

@kosiew kosiew Jan 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dropping join predicates silently can broaden scope. Consider failing closed if any conjunct is rejected.

Failing close causes these slt tests to fail:

query TT
explain update t1 set b = t2.b, c = t2.a, d = 1 from t2 where t1.a = t2.a and t1.b > 'foo' and t2.c > 1.0;
----
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: t1.a AS a, t2.b AS b, CAST(t2.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d
03)----Filter: t1.a = t2.a AND t1.b > CAST(Utf8("foo") AS Utf8View) AND t2.c > Float64(1)
04)------Cross Join:
05)--------TableScan: t1
06)--------TableScan: t2
physical_plan
01)CooperativeExec
02)--DmlResultExec: rows_affected=0
statement ok
create table t3(a int, b varchar, c double, d int);
# set from multiple tables, DataFusion only supports from one table
query error DataFusion error: Error during planning: Multiple tables in UPDATE SET FROM not yet supported
explain update t1 set b = t2.b, c = t3.a, d = 1 from t2, t3 where t1.a = t2.a and t1.a = t3.a;
# test table alias
query TT
explain update t1 as T set b = t2.b, c = t.a, d = 1 from t2 where t.a = t2.a and t.b > 'foo' and t2.c > 1.0;
----
logical_plan
01)Dml: op=[Update] table=[t1]
02)--Projection: t.a AS a, t2.b AS b, CAST(t.a AS Float64) AS c, CAST(Int64(1) AS Int32) AS d
03)----Filter: t.a = t2.a AND t.b > CAST(Utf8("foo") AS Utf8View) AND t2.c > Float64(1)
04)------Cross Join:
05)--------SubqueryAlias: t
06)----------TableScan: t1
07)--------TableScan: t2
physical_plan
01)CooperativeExec
02)--DmlResultExec: rows_affected=0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alias-qualified columns (x.id from UPDATE t AS x) may not match target if it's the base ref -- see update.slt:91 for the plan shape

addressed.

let mut columns = HashSet::new();
expr_to_columns(expr, &mut columns)?;

// Short-circuit on first mismatch: returns false if any column references a different table
Ok(!columns.iter().any(|column| {
column
.relation
.as_ref()
.is_some_and(|relation| !relation.resolved_eq(target))
}))
}

/// Strip table qualifiers from column references in an expression.
Expand Down
Loading