Skip to content

Fix duplicate field name error in Join::try_new_with_project_input during physical planning #16454

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,24 @@ impl DFSchema {
Ok(dfschema)
}

/// Return the same schema, where all fields have a given qualifier.
pub fn with_field_specific_qualified_schema(
&self,
qualifiers: Vec<Option<TableReference>>,
) -> Result<Self> {
if qualifiers.len() != self.fields().len() {
return _plan_err!(
"{}",
"Number of qualifiers must match number of fields".to_string()
);
}
Ok(DFSchema {
inner: Arc::clone(&self.inner),
field_qualifiers: qualifiers,
functional_dependencies: self.functional_dependencies.clone(),
})
}

/// Check if the schema have some fields with the same name
pub fn check_names(&self) -> Result<()> {
let mut qualified_names = BTreeSet::new();
Expand Down
93 changes: 85 additions & 8 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ use datafusion_physical_plan::execution_plan::InvariantLevel;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::recursive_query::RecursiveQueryExec;
use datafusion_physical_plan::unnest::ListUnnest;
use datafusion_sql::TableReference;
use sqlparser::ast::NullTreatment;

use async_trait::async_trait;
Expand Down Expand Up @@ -896,8 +897,8 @@ impl DefaultPhysicalPlanner {

// 2 Children
LogicalPlan::Join(Join {
left,
right,
left: original_left,
right: original_right,
on: keys,
filter,
join_type,
Expand All @@ -920,23 +921,25 @@ impl DefaultPhysicalPlanner {
let (left, left_col_keys, left_projected) =
wrap_projection_for_join_if_necessary(
&left_keys,
left.as_ref().clone(),
original_left.as_ref().clone(),
)?;
let (right, right_col_keys, right_projected) =
wrap_projection_for_join_if_necessary(
&right_keys,
right.as_ref().clone(),
original_right.as_ref().clone(),
)?;
let column_on = (left_col_keys, right_col_keys);

let left = Arc::new(left);
let right = Arc::new(right);
let new_join = LogicalPlan::Join(Join::try_new_with_project_input(
let (new_join, requalified) = Join::try_new_with_project_input(
node,
Arc::clone(&left),
Arc::clone(&right),
column_on,
)?);
)?;

let new_join = LogicalPlan::Join(new_join);

// If inputs were projected then create ExecutionPlan for these new
// LogicalPlan nodes.
Expand Down Expand Up @@ -969,8 +972,24 @@ impl DefaultPhysicalPlanner {

// Remove temporary projected columns
if left_projected || right_projected {
let final_join_result =
join_schema.iter().map(Expr::from).collect::<Vec<_>>();
// Re-qualify the join schema only if the inputs were previously requalified in
// `try_new_with_project_input`. This ensures that when building the Projection
// it can correctly resolve field nullability and data types
// by disambiguating fields from the left and right sides of the join.
let qualified_join_schema = if requalified {
Arc::new(qualify_join_schema_sides(
join_schema,
original_left,
original_right,
)?)
} else {
Arc::clone(join_schema)
};
Comment on lines +975 to +987
Copy link
Contributor Author

@LiaCastaneda LiaCastaneda Jun 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rationale for qualifying the schema is that when building the logical Projection after, it will build the fields out of the expression names in exprlist_to_fields so it will look in new_join.schema() and try to match each expr to a field in the schema, if the expr::Column does not have a qualifier and there are multiple candidates Fields that could correspond to this expr::Column , we will get an ambiguity error, qualifying the schema allows us to prevent this.


let final_join_result = qualified_join_schema
.iter()
.map(Expr::from)
.collect::<Vec<_>>();
let projection = LogicalPlan::Projection(Projection::try_new(
final_join_result,
Arc::new(new_join),
Expand Down Expand Up @@ -1467,6 +1486,64 @@ fn get_null_physical_expr_pair(
Ok((Arc::new(null_value), physical_name))
}

/// Qualifies the fields in a join schema with "left" and "right" qualifiers
/// without mutating the original schema. This function should only be used when
/// the join inputs have already been requalified earlier in `try_new_with_project_input`.
///
/// The purpose is to avoid ambiguity errors later in planning (e.g., in nullability or data type resolution)
/// when converting expressions to fields.
fn qualify_join_schema_sides(
join_schema: &DFSchema,
left: &LogicalPlan,
right: &LogicalPlan,
) -> Result<DFSchema> {
let left_fields = left.schema().fields();
let right_fields = right.schema().fields();
let join_fields = join_schema.fields();

// Validate lengths
if join_fields.len() != left_fields.len() + right_fields.len() {
return internal_err!(
"Join schema field count must match left and right field count."
);
}

// Validate field names match
for (i, (field, expected)) in join_fields
.iter()
.zip(left_fields.iter().chain(right_fields.iter()))
.enumerate()
{
if field.name() != expected.name() {
return internal_err!(
"Field name mismatch at index {}: expected '{}', found '{}'",
i,
expected.name(),
field.name()
);
}
}

// qualify sides
let qualifiers = join_fields
.iter()
.enumerate()
.map(|(i, _)| {
if i < left_fields.len() {
Some(TableReference::Bare {
table: Arc::from("left"),
})
} else {
Some(TableReference::Bare {
table: Arc::from("right"),
})
}
})
.collect();

join_schema.with_field_specific_qualified_schema(qualifiers)
}

fn get_physical_expr_pair(
expr: &Expr,
input_dfschema: &DFSchema,
Expand Down
28 changes: 28 additions & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1667,6 +1667,34 @@ pub fn build_join_schema(
dfschema.with_functional_dependencies(func_dependencies)
}

/// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise
/// conflict with the columns from the other.
/// This is especially useful for queries that come as Substrait, since Substrait doesn't currently allow specifying
/// aliases, neither for columns nor for tables. DataFusion requires columns to be uniquely identifiable, in some
/// places (see e.g. DFSchema::check_names).
pub fn requalify_sides_if_needed(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this helper function to the logical plan builder module since now its not used only by the substrait consumer but also by plan.rs.

left: LogicalPlanBuilder,
right: LogicalPlanBuilder,
) -> Result<(LogicalPlanBuilder, LogicalPlanBuilder, bool)> {
let left_cols = left.schema().columns();
let right_cols = right.schema().columns();
if left_cols.iter().any(|l| {
right_cols.iter().any(|r| {
l == r || (l.name == r.name && (l.relation.is_none() || r.relation.is_none()))
})
}) {
// These names have no connection to the original plan, but they'll make the columns
// (mostly) unique.
Ok((
left.alias(TableReference::bare("left"))?,
right.alias(TableReference::bare("right"))?,
true,
))
} else {
Ok((left, right, false))
}
}

/// Add additional "synthetic" group by expressions based on functional
/// dependencies.
///
Expand Down
5 changes: 3 additions & 2 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ mod statement;
pub mod tree_node;

pub use builder::{
build_join_schema, table_scan, union, wrap_projection_for_join_if_necessary,
LogicalPlanBuilder, LogicalPlanBuilderOptions, LogicalTableSource, UNNAMED_TABLE,
build_join_schema, requalify_sides_if_needed, table_scan, union,
wrap_projection_for_join_if_necessary, LogicalPlanBuilder, LogicalPlanBuilderOptions,
LogicalTableSource, UNNAMED_TABLE,
};
pub use ddl::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction,
Expand Down
56 changes: 39 additions & 17 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ use crate::utils::{
grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
};
use crate::{
build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Execute,
Expr, ExprSchemable, LogicalPlanBuilder, Operator, Prepare,
TableProviderFilterPushDown, TableSource, WindowFunctionDefinition,
build_join_schema, expr_vec_fmt, requalify_sides_if_needed, BinaryExpr,
CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable, LogicalPlanBuilder,
Operator, Prepare, TableProviderFilterPushDown, TableSource,
WindowFunctionDefinition,
};

use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
Expand Down Expand Up @@ -3754,37 +3755,58 @@ impl Join {
})
}

/// Create Join with input which wrapped with projection, this method is used to help create physical join.
/// Create Join with input which wrapped with projection, this method is used in physcial planning only to help
/// create the physical join.
pub fn try_new_with_project_input(
original: &LogicalPlan,
left: Arc<LogicalPlan>,
right: Arc<LogicalPlan>,
column_on: (Vec<Column>, Vec<Column>),
) -> Result<Self> {
) -> Result<(Self, bool)> {
let original_join = match original {
LogicalPlan::Join(join) => join,
_ => return plan_err!("Could not create join with project input"),
};

let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left));
let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right));

let mut requalified = false;

// By definition, the resulting schema of an inner join will have first the left side fields and then the right,
// potentially having duplicate field names. Note this will only qualify fields if they have not been qualified before.
// TODO: handle left and right joins as well.
if original_join.join_type == JoinType::Inner {
(left_sch, right_sch, requalified) =
requalify_sides_if_needed(left_sch.clone(), right_sch.clone())?;
}

let on: Vec<(Expr, Expr)> = column_on
.0
.into_iter()
.zip(column_on.1)
.map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
.collect();
let join_schema =
build_join_schema(left.schema(), right.schema(), &original_join.join_type)?;

Ok(Join {
left,
right,
on,
filter: original_join.filter.clone(),
join_type: original_join.join_type,
join_constraint: original_join.join_constraint,
schema: Arc::new(join_schema),
null_equality: original_join.null_equality,
})
let join_schema = build_join_schema(
left_sch.schema(),
right_sch.schema(),
&original_join.join_type,
)?;

Ok((
Join {
left,
right,
on,
filter: original_join.filter.clone(),
join_type: original_join.join_type,
join_constraint: original_join.join_constraint,
schema: Arc::new(join_schema),
null_equality: original_join.null_equality,
},
requalified,
))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use crate::logical_plan::consumer::utils::requalify_sides_if_needed;
use crate::logical_plan::consumer::SubstraitConsumer;
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder};

use datafusion::logical_expr::requalify_sides_if_needed;

use substrait::proto::CrossRel;

pub async fn from_cross_rel(
Expand All @@ -30,6 +32,6 @@ pub async fn from_cross_rel(
let right = LogicalPlanBuilder::from(
consumer.consume_rel(cross.right.as_ref().unwrap()).await?,
);
let (left, right) = requalify_sides_if_needed(left, right)?;
let (left, right, _requalified) = requalify_sides_if_needed(left, right)?;
left.cross_join(right.build()?)?.build()
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use crate::logical_plan::consumer::utils::requalify_sides_if_needed;
use crate::logical_plan::consumer::SubstraitConsumer;
use datafusion::common::{not_impl_err, plan_err, Column, JoinType, NullEquality};
use datafusion::logical_expr::requalify_sides_if_needed;
use datafusion::logical_expr::utils::split_conjunction;
use datafusion::logical_expr::{
BinaryExpr, Expr, LogicalPlan, LogicalPlanBuilder, Operator,
};

use substrait::proto::{join_rel, JoinRel};

pub async fn from_join_rel(
Expand All @@ -38,7 +39,7 @@ pub async fn from_join_rel(
let right = LogicalPlanBuilder::from(
consumer.consume_rel(join.right.as_ref().unwrap()).await?,
);
let (left, right) = requalify_sides_if_needed(left, right)?;
let (left, right, _requalified) = requalify_sides_if_needed(left, right)?;

let join_type = from_substrait_jointype(join.r#type)?;
// The join condition expression needs full input schema and not the output schema from join since we lose columns from
Expand Down
31 changes: 2 additions & 29 deletions datafusion/substrait/src/logical_plan/consumer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ use crate::logical_plan::consumer::SubstraitConsumer;
use datafusion::arrow::datatypes::{DataType, Field, Schema, UnionFields};
use datafusion::common::{
exec_err, not_impl_err, substrait_datafusion_err, substrait_err, DFSchema,
DFSchemaRef, TableReference,
DFSchemaRef,
};
use datafusion::logical_expr::expr::Sort;
use datafusion::logical_expr::{Cast, Expr, ExprSchemable, LogicalPlanBuilder};
use datafusion::logical_expr::{Cast, Expr, ExprSchemable};
use std::collections::HashSet;
use std::sync::Arc;
use substrait::proto::sort_field::SortDirection;
Expand All @@ -36,33 +36,6 @@ use substrait::proto::SortField;
// https://github.com/apache/arrow-rs/blob/ee5694078c86c8201549654246900a4232d531a9/arrow-cast/src/cast/mod.rs#L1749).
pub(super) const DEFAULT_TIMEZONE: &str = "UTC";

/// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise
/// conflict with the columns from the other.
/// Substrait doesn't currently allow specifying aliases, neither for columns nor for tables. For
/// Substrait the names don't matter since it only refers to columns by indices, however DataFusion
/// requires columns to be uniquely identifiable, in some places (see e.g. DFSchema::check_names).
pub(super) fn requalify_sides_if_needed(
left: LogicalPlanBuilder,
right: LogicalPlanBuilder,
) -> datafusion::common::Result<(LogicalPlanBuilder, LogicalPlanBuilder)> {
let left_cols = left.schema().columns();
let right_cols = right.schema().columns();
if left_cols.iter().any(|l| {
right_cols.iter().any(|r| {
l == r || (l.name == r.name && (l.relation.is_none() || r.relation.is_none()))
})
}) {
// These names have no connection to the original plan, but they'll make the columns
// (mostly) unique.
Ok((
left.alias(TableReference::bare("left"))?,
right.alias(TableReference::bare("right"))?,
))
} else {
Ok((left, right))
}
}

pub(super) fn next_struct_field_name(
column_idx: usize,
dfs_names: &[String],
Expand Down