Skip to content

Commit fb9d758

Browse files
committed
Fix duplicates on Join creation during physcial planning
1 parent 4084894 commit fb9d758

File tree

8 files changed

+183
-60
lines changed

8 files changed

+183
-60
lines changed

datafusion/common/src/dfschema.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,24 @@ impl DFSchema {
206206
Ok(dfschema)
207207
}
208208

209+
/// Return the same schema, where all fields have a given qualifier.
210+
pub fn with_field_specific_qualified_schema(
211+
&self,
212+
qualifiers: Vec<Option<TableReference>>,
213+
) -> Result<Self> {
214+
if qualifiers.len() != self.fields().len() {
215+
return _plan_err!(
216+
"{}",
217+
"Number of qualifiers must match number of fields".to_string()
218+
);
219+
}
220+
Ok(DFSchema {
221+
inner: Arc::clone(&self.inner),
222+
field_qualifiers: qualifiers,
223+
functional_dependencies: self.functional_dependencies.clone(),
224+
})
225+
}
226+
209227
/// Check if the schema have some fields with the same name
210228
pub fn check_names(&self) -> Result<()> {
211229
let mut qualified_names = BTreeSet::new();

datafusion/core/src/physical_planner.rs

Lines changed: 86 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ use datafusion_physical_plan::execution_plan::InvariantLevel;
9393
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
9494
use datafusion_physical_plan::recursive_query::RecursiveQueryExec;
9595
use datafusion_physical_plan::unnest::ListUnnest;
96+
use datafusion_sql::TableReference;
9697
use sqlparser::ast::NullTreatment;
9798

9899
use async_trait::async_trait;
@@ -896,8 +897,8 @@ impl DefaultPhysicalPlanner {
896897

897898
// 2 Children
898899
LogicalPlan::Join(Join {
899-
left,
900-
right,
900+
left: original_left,
901+
right: original_right,
901902
on: keys,
902903
filter,
903904
join_type,
@@ -920,23 +921,25 @@ impl DefaultPhysicalPlanner {
920921
let (left, left_col_keys, left_projected) =
921922
wrap_projection_for_join_if_necessary(
922923
&left_keys,
923-
left.as_ref().clone(),
924+
original_left.as_ref().clone(),
924925
)?;
925926
let (right, right_col_keys, right_projected) =
926927
wrap_projection_for_join_if_necessary(
927928
&right_keys,
928-
right.as_ref().clone(),
929+
original_right.as_ref().clone(),
929930
)?;
930931
let column_on = (left_col_keys, right_col_keys);
931932

932933
let left = Arc::new(left);
933934
let right = Arc::new(right);
934-
let new_join = LogicalPlan::Join(Join::try_new_with_project_input(
935+
let (new_join, requalified) = Join::try_new_with_project_input(
935936
node,
936937
Arc::clone(&left),
937938
Arc::clone(&right),
938939
column_on,
939-
)?);
940+
)?;
941+
942+
let new_join = LogicalPlan::Join(new_join);
940943

941944
// If inputs were projected then create ExecutionPlan for these new
942945
// LogicalPlan nodes.
@@ -969,8 +972,25 @@ impl DefaultPhysicalPlanner {
969972

970973
// Remove temporary projected columns
971974
if left_projected || right_projected {
972-
let final_join_result =
973-
join_schema.iter().map(Expr::from).collect::<Vec<_>>();
975+
// Re-qualify the join schema only if the inputs were previously requalified in
976+
// `try_new_with_project_input`. This ensures that when building the Projection
977+
// it can correctly resolve field nullability and data types
978+
// by disambiguating fields from the left and right sides of the join.
979+
let qualified_join_schema =
980+
if *join_type == JoinType::Inner && requalified {
981+
Arc::new(qualify_join_schema_sides(
982+
join_schema,
983+
original_left,
984+
original_right,
985+
)?)
986+
} else {
987+
Arc::clone(join_schema)
988+
};
989+
990+
let final_join_result = qualified_join_schema
991+
.iter()
992+
.map(Expr::from)
993+
.collect::<Vec<_>>();
974994
let projection = LogicalPlan::Projection(Projection::try_new(
975995
final_join_result,
976996
Arc::new(new_join),
@@ -1467,6 +1487,64 @@ fn get_null_physical_expr_pair(
14671487
Ok((Arc::new(null_value), physical_name))
14681488
}
14691489

1490+
/// Qualifies the fields in a join schema with "left" and "right" qualifiers
1491+
/// without mutating the original schema. This function should only be used when
1492+
/// the join inputs have already been requalified earlier in `try_new_with_project_input`.
1493+
///
1494+
/// The purpose is to avoid ambiguity errors later in planning (e.g., in nullability or data type resolution)
1495+
/// when converting expressions to fields.
1496+
fn qualify_join_schema_sides(
1497+
join_schema: &DFSchema,
1498+
left: &LogicalPlan,
1499+
right: &LogicalPlan,
1500+
) -> Result<DFSchema> {
1501+
let left_fields = left.schema().fields();
1502+
let right_fields = right.schema().fields();
1503+
let join_fields = join_schema.fields();
1504+
1505+
// Validate lengths
1506+
if join_fields.len() != left_fields.len() + right_fields.len() {
1507+
return internal_err!(
1508+
"Join schema field count must match left and right field count."
1509+
);
1510+
}
1511+
1512+
// Validate field names match
1513+
for (i, (field, expected)) in join_fields
1514+
.iter()
1515+
.zip(left_fields.iter().chain(right_fields.iter()))
1516+
.enumerate()
1517+
{
1518+
if field.name() != expected.name() {
1519+
return internal_err!(
1520+
"Field name mismatch at index {}: expected '{}', found '{}'",
1521+
i,
1522+
expected.name(),
1523+
field.name()
1524+
);
1525+
}
1526+
}
1527+
1528+
// qualify sides
1529+
let qualifiers = join_fields
1530+
.iter()
1531+
.enumerate()
1532+
.map(|(i, _)| {
1533+
if i < left_fields.len() {
1534+
Some(TableReference::Bare {
1535+
table: Arc::from("left"),
1536+
})
1537+
} else {
1538+
Some(TableReference::Bare {
1539+
table: Arc::from("right"),
1540+
})
1541+
}
1542+
})
1543+
.collect();
1544+
1545+
join_schema.with_field_specific_qualified_schema(qualifiers)
1546+
}
1547+
14701548
fn get_physical_expr_pair(
14711549
expr: &Expr,
14721550
input_dfschema: &DFSchema,

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1667,6 +1667,34 @@ pub fn build_join_schema(
16671667
dfschema.with_functional_dependencies(func_dependencies)
16681668
}
16691669

1670+
/// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise
1671+
/// conflict with the columns from the other.
1672+
/// This is especially useful for queries that come as Substrait, since Substrait doesn't currently allow specifying
1673+
/// aliases, neither for columns nor for tables. DataFusion requires columns to be uniquely identifiable, in some
1674+
/// places (see e.g. DFSchema::check_names).
1675+
pub fn requalify_sides_if_needed(
1676+
left: LogicalPlanBuilder,
1677+
right: LogicalPlanBuilder,
1678+
) -> Result<(LogicalPlanBuilder, LogicalPlanBuilder, bool)> {
1679+
let left_cols = left.schema().columns();
1680+
let right_cols = right.schema().columns();
1681+
if left_cols.iter().any(|l| {
1682+
right_cols.iter().any(|r| {
1683+
l == r || (l.name == r.name && (l.relation.is_none() || r.relation.is_none()))
1684+
})
1685+
}) {
1686+
// These names have no connection to the original plan, but they'll make the columns
1687+
// (mostly) unique.
1688+
Ok((
1689+
left.alias(TableReference::bare("left"))?,
1690+
right.alias(TableReference::bare("right"))?,
1691+
true,
1692+
))
1693+
} else {
1694+
Ok((left, right, false))
1695+
}
1696+
}
1697+
16701698
/// Add additional "synthetic" group by expressions based on functional
16711699
/// dependencies.
16721700
///

datafusion/expr/src/logical_plan/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ mod statement;
2727
pub mod tree_node;
2828

2929
pub use builder::{
30-
build_join_schema, table_scan, union, wrap_projection_for_join_if_necessary,
31-
LogicalPlanBuilder, LogicalPlanBuilderOptions, LogicalTableSource, UNNAMED_TABLE,
30+
build_join_schema, requalify_sides_if_needed, table_scan, union,
31+
wrap_projection_for_join_if_necessary, LogicalPlanBuilder, LogicalPlanBuilderOptions,
32+
LogicalTableSource, UNNAMED_TABLE,
3233
};
3334
pub use ddl::{
3435
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction,

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,10 @@ use crate::utils::{
4343
grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction,
4444
};
4545
use crate::{
46-
build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Execute,
47-
Expr, ExprSchemable, LogicalPlanBuilder, Operator, Prepare,
48-
TableProviderFilterPushDown, TableSource, WindowFunctionDefinition,
46+
build_join_schema, expr_vec_fmt, requalify_sides_if_needed, BinaryExpr,
47+
CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable, LogicalPlanBuilder,
48+
Operator, Prepare, TableProviderFilterPushDown, TableSource,
49+
WindowFunctionDefinition,
4950
};
5051

5152
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
@@ -3754,37 +3755,58 @@ impl Join {
37543755
})
37553756
}
37563757

3757-
/// Create Join with input which wrapped with projection, this method is used to help create physical join.
3758+
/// Create Join with input which wrapped with projection, this method is used in physcial planning only to help
3759+
/// create the physical join.
37583760
pub fn try_new_with_project_input(
37593761
original: &LogicalPlan,
37603762
left: Arc<LogicalPlan>,
37613763
right: Arc<LogicalPlan>,
37623764
column_on: (Vec<Column>, Vec<Column>),
3763-
) -> Result<Self> {
3765+
) -> Result<(Self, bool)> {
37643766
let original_join = match original {
37653767
LogicalPlan::Join(join) => join,
37663768
_ => return plan_err!("Could not create join with project input"),
37673769
};
37683770

3771+
let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left));
3772+
let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right));
3773+
3774+
let mut requalified = false;
3775+
3776+
// By definition, the resulting schema of an inner join will have first the left side fields and then the right,
3777+
// potentially having duplicate field names. Note this will only qualify fields if they have not been qualified before.
3778+
// TODO: handle left and right joins as well.
3779+
if original_join.join_type == JoinType::Inner {
3780+
(left_sch, right_sch, requalified) =
3781+
requalify_sides_if_needed(left_sch.clone(), right_sch.clone())?;
3782+
}
3783+
37693784
let on: Vec<(Expr, Expr)> = column_on
37703785
.0
37713786
.into_iter()
37723787
.zip(column_on.1)
37733788
.map(|(l, r)| (Expr::Column(l), Expr::Column(r)))
37743789
.collect();
3775-
let join_schema =
3776-
build_join_schema(left.schema(), right.schema(), &original_join.join_type)?;
37773790

3778-
Ok(Join {
3779-
left,
3780-
right,
3781-
on,
3782-
filter: original_join.filter.clone(),
3783-
join_type: original_join.join_type,
3784-
join_constraint: original_join.join_constraint,
3785-
schema: Arc::new(join_schema),
3786-
null_equality: original_join.null_equality,
3787-
})
3791+
let join_schema = build_join_schema(
3792+
left_sch.schema(),
3793+
right_sch.schema(),
3794+
&original_join.join_type,
3795+
)?;
3796+
3797+
Ok((
3798+
Join {
3799+
left,
3800+
right,
3801+
on,
3802+
filter: original_join.filter.clone(),
3803+
join_type: original_join.join_type,
3804+
join_constraint: original_join.join_constraint,
3805+
schema: Arc::new(join_schema),
3806+
null_equality: original_join.null_equality,
3807+
},
3808+
requalified,
3809+
))
37883810
}
37893811
}
37903812

datafusion/substrait/src/logical_plan/consumer/rel/cross_rel.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::logical_plan::consumer::utils::requalify_sides_if_needed;
1918
use crate::logical_plan::consumer::SubstraitConsumer;
2019
use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder};
20+
21+
use datafusion::logical_expr::requalify_sides_if_needed;
22+
2123
use substrait::proto::CrossRel;
2224

2325
pub async fn from_cross_rel(
@@ -30,6 +32,6 @@ pub async fn from_cross_rel(
3032
let right = LogicalPlanBuilder::from(
3133
consumer.consume_rel(cross.right.as_ref().unwrap()).await?,
3234
);
33-
let (left, right) = requalify_sides_if_needed(left, right)?;
35+
let (left, right, _requalified) = requalify_sides_if_needed(left, right)?;
3436
left.cross_join(right.build()?)?.build()
3537
}

datafusion/substrait/src/logical_plan/consumer/rel/join_rel.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,14 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use crate::logical_plan::consumer::utils::requalify_sides_if_needed;
1918
use crate::logical_plan::consumer::SubstraitConsumer;
2019
use datafusion::common::{not_impl_err, plan_err, Column, JoinType, NullEquality};
20+
use datafusion::logical_expr::requalify_sides_if_needed;
2121
use datafusion::logical_expr::utils::split_conjunction;
2222
use datafusion::logical_expr::{
2323
BinaryExpr, Expr, LogicalPlan, LogicalPlanBuilder, Operator,
2424
};
25+
2526
use substrait::proto::{join_rel, JoinRel};
2627

2728
pub async fn from_join_rel(
@@ -38,7 +39,7 @@ pub async fn from_join_rel(
3839
let right = LogicalPlanBuilder::from(
3940
consumer.consume_rel(join.right.as_ref().unwrap()).await?,
4041
);
41-
let (left, right) = requalify_sides_if_needed(left, right)?;
42+
let (left, right, _requalified) = requalify_sides_if_needed(left, right)?;
4243

4344
let join_type = from_substrait_jointype(join.r#type)?;
4445
// The join condition expression needs full input schema and not the output schema from join since we lose columns from

datafusion/substrait/src/logical_plan/consumer/utils.rs

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ use crate::logical_plan::consumer::SubstraitConsumer;
1919
use datafusion::arrow::datatypes::{DataType, Field, Schema, UnionFields};
2020
use datafusion::common::{
2121
exec_err, not_impl_err, substrait_datafusion_err, substrait_err, DFSchema,
22-
DFSchemaRef, TableReference,
22+
DFSchemaRef,
2323
};
2424
use datafusion::logical_expr::expr::Sort;
25-
use datafusion::logical_expr::{Cast, Expr, ExprSchemable, LogicalPlanBuilder};
25+
use datafusion::logical_expr::{Cast, Expr, ExprSchemable};
2626
use std::collections::HashSet;
2727
use std::sync::Arc;
2828
use substrait::proto::sort_field::SortDirection;
@@ -36,33 +36,6 @@ use substrait::proto::SortField;
3636
// https://github.com/apache/arrow-rs/blob/ee5694078c86c8201549654246900a4232d531a9/arrow-cast/src/cast/mod.rs#L1749).
3737
pub(super) const DEFAULT_TIMEZONE: &str = "UTC";
3838

39-
/// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise
40-
/// conflict with the columns from the other.
41-
/// Substrait doesn't currently allow specifying aliases, neither for columns nor for tables. For
42-
/// Substrait the names don't matter since it only refers to columns by indices, however DataFusion
43-
/// requires columns to be uniquely identifiable, in some places (see e.g. DFSchema::check_names).
44-
pub(super) fn requalify_sides_if_needed(
45-
left: LogicalPlanBuilder,
46-
right: LogicalPlanBuilder,
47-
) -> datafusion::common::Result<(LogicalPlanBuilder, LogicalPlanBuilder)> {
48-
let left_cols = left.schema().columns();
49-
let right_cols = right.schema().columns();
50-
if left_cols.iter().any(|l| {
51-
right_cols.iter().any(|r| {
52-
l == r || (l.name == r.name && (l.relation.is_none() || r.relation.is_none()))
53-
})
54-
}) {
55-
// These names have no connection to the original plan, but they'll make the columns
56-
// (mostly) unique.
57-
Ok((
58-
left.alias(TableReference::bare("left"))?,
59-
right.alias(TableReference::bare("right"))?,
60-
))
61-
} else {
62-
Ok((left, right))
63-
}
64-
}
65-
6639
pub(super) fn next_struct_field_name(
6740
column_idx: usize,
6841
dfs_names: &[String],

0 commit comments

Comments
 (0)