diff --git a/datafusion/common/src/dfschema.rs b/datafusion/common/src/dfschema.rs index 804e14bf72fb..3d87d283afb8 100644 --- a/datafusion/common/src/dfschema.rs +++ b/datafusion/common/src/dfschema.rs @@ -206,6 +206,24 @@ impl DFSchema { Ok(dfschema) } + /// Return the same schema, where all fields have a given qualifier. + pub fn with_field_specific_qualified_schema( + &self, + qualifiers: Vec>, + ) -> Result { + if qualifiers.len() != self.fields().len() { + return _plan_err!( + "{}", + "Number of qualifiers must match number of fields".to_string() + ); + } + Ok(DFSchema { + inner: Arc::clone(&self.inner), + field_qualifiers: qualifiers, + functional_dependencies: self.functional_dependencies.clone(), + }) + } + /// Check if the schema have some fields with the same name pub fn check_names(&self) -> Result<()> { let mut qualified_names = BTreeSet::new(); diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 14188f6bf0c9..1dab5313d8d0 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -93,6 +93,7 @@ use datafusion_physical_plan::execution_plan::InvariantLevel; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::recursive_query::RecursiveQueryExec; use datafusion_physical_plan::unnest::ListUnnest; +use datafusion_sql::TableReference; use sqlparser::ast::NullTreatment; use async_trait::async_trait; @@ -896,8 +897,8 @@ impl DefaultPhysicalPlanner { // 2 Children LogicalPlan::Join(Join { - left, - right, + left: original_left, + right: original_right, on: keys, filter, join_type, @@ -920,23 +921,25 @@ impl DefaultPhysicalPlanner { let (left, left_col_keys, left_projected) = wrap_projection_for_join_if_necessary( &left_keys, - left.as_ref().clone(), + original_left.as_ref().clone(), )?; let (right, right_col_keys, right_projected) = wrap_projection_for_join_if_necessary( &right_keys, - right.as_ref().clone(), + original_right.as_ref().clone(), )?; let column_on = (left_col_keys, right_col_keys); let left = Arc::new(left); let right = Arc::new(right); - let new_join = LogicalPlan::Join(Join::try_new_with_project_input( + let (new_join, requalified) = Join::try_new_with_project_input( node, Arc::clone(&left), Arc::clone(&right), column_on, - )?); + )?; + + let new_join = LogicalPlan::Join(new_join); // If inputs were projected then create ExecutionPlan for these new // LogicalPlan nodes. @@ -969,8 +972,24 @@ impl DefaultPhysicalPlanner { // Remove temporary projected columns if left_projected || right_projected { - let final_join_result = - join_schema.iter().map(Expr::from).collect::>(); + // Re-qualify the join schema only if the inputs were previously requalified in + // `try_new_with_project_input`. This ensures that when building the Projection + // it can correctly resolve field nullability and data types + // by disambiguating fields from the left and right sides of the join. + let qualified_join_schema = if requalified { + Arc::new(qualify_join_schema_sides( + join_schema, + original_left, + original_right, + )?) + } else { + Arc::clone(join_schema) + }; + + let final_join_result = qualified_join_schema + .iter() + .map(Expr::from) + .collect::>(); let projection = LogicalPlan::Projection(Projection::try_new( final_join_result, Arc::new(new_join), @@ -1467,6 +1486,64 @@ fn get_null_physical_expr_pair( Ok((Arc::new(null_value), physical_name)) } +/// Qualifies the fields in a join schema with "left" and "right" qualifiers +/// without mutating the original schema. This function should only be used when +/// the join inputs have already been requalified earlier in `try_new_with_project_input`. +/// +/// The purpose is to avoid ambiguity errors later in planning (e.g., in nullability or data type resolution) +/// when converting expressions to fields. +fn qualify_join_schema_sides( + join_schema: &DFSchema, + left: &LogicalPlan, + right: &LogicalPlan, +) -> Result { + let left_fields = left.schema().fields(); + let right_fields = right.schema().fields(); + let join_fields = join_schema.fields(); + + // Validate lengths + if join_fields.len() != left_fields.len() + right_fields.len() { + return internal_err!( + "Join schema field count must match left and right field count." + ); + } + + // Validate field names match + for (i, (field, expected)) in join_fields + .iter() + .zip(left_fields.iter().chain(right_fields.iter())) + .enumerate() + { + if field.name() != expected.name() { + return internal_err!( + "Field name mismatch at index {}: expected '{}', found '{}'", + i, + expected.name(), + field.name() + ); + } + } + + // qualify sides + let qualifiers = join_fields + .iter() + .enumerate() + .map(|(i, _)| { + if i < left_fields.len() { + Some(TableReference::Bare { + table: Arc::from("left"), + }) + } else { + Some(TableReference::Bare { + table: Arc::from("right"), + }) + } + }) + .collect(); + + join_schema.with_field_specific_qualified_schema(qualifiers) +} + fn get_physical_expr_pair( expr: &Expr, input_dfschema: &DFSchema, diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 93dd6c2b89fc..c8e9e69d9636 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -1667,6 +1667,34 @@ pub fn build_join_schema( dfschema.with_functional_dependencies(func_dependencies) } +/// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise +/// conflict with the columns from the other. +/// This is especially useful for queries that come as Substrait, since Substrait doesn't currently allow specifying +/// aliases, neither for columns nor for tables. DataFusion requires columns to be uniquely identifiable, in some +/// places (see e.g. DFSchema::check_names). +pub fn requalify_sides_if_needed( + left: LogicalPlanBuilder, + right: LogicalPlanBuilder, +) -> Result<(LogicalPlanBuilder, LogicalPlanBuilder, bool)> { + let left_cols = left.schema().columns(); + let right_cols = right.schema().columns(); + if left_cols.iter().any(|l| { + right_cols.iter().any(|r| { + l == r || (l.name == r.name && (l.relation.is_none() || r.relation.is_none())) + }) + }) { + // These names have no connection to the original plan, but they'll make the columns + // (mostly) unique. + Ok(( + left.alias(TableReference::bare("left"))?, + right.alias(TableReference::bare("right"))?, + true, + )) + } else { + Ok((left, right, false)) + } +} + /// Add additional "synthetic" group by expressions based on functional /// dependencies. /// diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index a55f4d97b212..444fb50cc5b5 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -27,8 +27,9 @@ mod statement; pub mod tree_node; pub use builder::{ - build_join_schema, table_scan, union, wrap_projection_for_join_if_necessary, - LogicalPlanBuilder, LogicalPlanBuilderOptions, LogicalTableSource, UNNAMED_TABLE, + build_join_schema, requalify_sides_if_needed, table_scan, union, + wrap_projection_for_join_if_necessary, LogicalPlanBuilder, LogicalPlanBuilderOptions, + LogicalTableSource, UNNAMED_TABLE, }; pub use ddl::{ CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction, diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 876c14f1000f..84f8414fdde5 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -43,9 +43,10 @@ use crate::utils::{ grouping_set_expr_count, grouping_set_to_exprlist, split_conjunction, }; use crate::{ - build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Execute, - Expr, ExprSchemable, LogicalPlanBuilder, Operator, Prepare, - TableProviderFilterPushDown, TableSource, WindowFunctionDefinition, + build_join_schema, expr_vec_fmt, requalify_sides_if_needed, BinaryExpr, + CreateMemoryTable, CreateView, Execute, Expr, ExprSchemable, LogicalPlanBuilder, + Operator, Prepare, TableProviderFilterPushDown, TableSource, + WindowFunctionDefinition, }; use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; @@ -3754,37 +3755,58 @@ impl Join { }) } - /// Create Join with input which wrapped with projection, this method is used to help create physical join. + /// Create Join with input which wrapped with projection, this method is used in physcial planning only to help + /// create the physical join. pub fn try_new_with_project_input( original: &LogicalPlan, left: Arc, right: Arc, column_on: (Vec, Vec), - ) -> Result { + ) -> Result<(Self, bool)> { let original_join = match original { LogicalPlan::Join(join) => join, _ => return plan_err!("Could not create join with project input"), }; + let mut left_sch = LogicalPlanBuilder::from(Arc::clone(&left)); + let mut right_sch = LogicalPlanBuilder::from(Arc::clone(&right)); + + let mut requalified = false; + + // By definition, the resulting schema of an inner join will have first the left side fields and then the right, + // potentially having duplicate field names. Note this will only qualify fields if they have not been qualified before. + // TODO: handle left and right joins as well. + if original_join.join_type == JoinType::Inner { + (left_sch, right_sch, requalified) = + requalify_sides_if_needed(left_sch.clone(), right_sch.clone())?; + } + let on: Vec<(Expr, Expr)> = column_on .0 .into_iter() .zip(column_on.1) .map(|(l, r)| (Expr::Column(l), Expr::Column(r))) .collect(); - let join_schema = - build_join_schema(left.schema(), right.schema(), &original_join.join_type)?; - Ok(Join { - left, - right, - on, - filter: original_join.filter.clone(), - join_type: original_join.join_type, - join_constraint: original_join.join_constraint, - schema: Arc::new(join_schema), - null_equality: original_join.null_equality, - }) + let join_schema = build_join_schema( + left_sch.schema(), + right_sch.schema(), + &original_join.join_type, + )?; + + Ok(( + Join { + left, + right, + on, + filter: original_join.filter.clone(), + join_type: original_join.join_type, + join_constraint: original_join.join_constraint, + schema: Arc::new(join_schema), + null_equality: original_join.null_equality, + }, + requalified, + )) } } diff --git a/datafusion/substrait/src/logical_plan/consumer/rel/cross_rel.rs b/datafusion/substrait/src/logical_plan/consumer/rel/cross_rel.rs index a91366e47742..25c66a8e2297 100644 --- a/datafusion/substrait/src/logical_plan/consumer/rel/cross_rel.rs +++ b/datafusion/substrait/src/logical_plan/consumer/rel/cross_rel.rs @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. -use crate::logical_plan::consumer::utils::requalify_sides_if_needed; use crate::logical_plan::consumer::SubstraitConsumer; use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder}; + +use datafusion::logical_expr::requalify_sides_if_needed; + use substrait::proto::CrossRel; pub async fn from_cross_rel( @@ -30,6 +32,6 @@ pub async fn from_cross_rel( let right = LogicalPlanBuilder::from( consumer.consume_rel(cross.right.as_ref().unwrap()).await?, ); - let (left, right) = requalify_sides_if_needed(left, right)?; + let (left, right, _requalified) = requalify_sides_if_needed(left, right)?; left.cross_join(right.build()?)?.build() } diff --git a/datafusion/substrait/src/logical_plan/consumer/rel/join_rel.rs b/datafusion/substrait/src/logical_plan/consumer/rel/join_rel.rs index 0cf920dd6260..ade8a4e77e65 100644 --- a/datafusion/substrait/src/logical_plan/consumer/rel/join_rel.rs +++ b/datafusion/substrait/src/logical_plan/consumer/rel/join_rel.rs @@ -15,13 +15,14 @@ // specific language governing permissions and limitations // under the License. -use crate::logical_plan::consumer::utils::requalify_sides_if_needed; use crate::logical_plan::consumer::SubstraitConsumer; use datafusion::common::{not_impl_err, plan_err, Column, JoinType, NullEquality}; +use datafusion::logical_expr::requalify_sides_if_needed; use datafusion::logical_expr::utils::split_conjunction; use datafusion::logical_expr::{ BinaryExpr, Expr, LogicalPlan, LogicalPlanBuilder, Operator, }; + use substrait::proto::{join_rel, JoinRel}; pub async fn from_join_rel( @@ -38,7 +39,7 @@ pub async fn from_join_rel( let right = LogicalPlanBuilder::from( consumer.consume_rel(join.right.as_ref().unwrap()).await?, ); - let (left, right) = requalify_sides_if_needed(left, right)?; + let (left, right, _requalified) = requalify_sides_if_needed(left, right)?; let join_type = from_substrait_jointype(join.r#type)?; // The join condition expression needs full input schema and not the output schema from join since we lose columns from diff --git a/datafusion/substrait/src/logical_plan/consumer/utils.rs b/datafusion/substrait/src/logical_plan/consumer/utils.rs index 396c5e673f85..d88b6cb29963 100644 --- a/datafusion/substrait/src/logical_plan/consumer/utils.rs +++ b/datafusion/substrait/src/logical_plan/consumer/utils.rs @@ -19,10 +19,10 @@ use crate::logical_plan::consumer::SubstraitConsumer; use datafusion::arrow::datatypes::{DataType, Field, Schema, UnionFields}; use datafusion::common::{ exec_err, not_impl_err, substrait_datafusion_err, substrait_err, DFSchema, - DFSchemaRef, TableReference, + DFSchemaRef, }; use datafusion::logical_expr::expr::Sort; -use datafusion::logical_expr::{Cast, Expr, ExprSchemable, LogicalPlanBuilder}; +use datafusion::logical_expr::{Cast, Expr, ExprSchemable}; use std::collections::HashSet; use std::sync::Arc; use substrait::proto::sort_field::SortDirection; @@ -36,33 +36,6 @@ use substrait::proto::SortField; // https://github.com/apache/arrow-rs/blob/ee5694078c86c8201549654246900a4232d531a9/arrow-cast/src/cast/mod.rs#L1749). pub(super) const DEFAULT_TIMEZONE: &str = "UTC"; -/// (Re)qualify the sides of a join if needed, i.e. if the columns from one side would otherwise -/// conflict with the columns from the other. -/// Substrait doesn't currently allow specifying aliases, neither for columns nor for tables. For -/// Substrait the names don't matter since it only refers to columns by indices, however DataFusion -/// requires columns to be uniquely identifiable, in some places (see e.g. DFSchema::check_names). -pub(super) fn requalify_sides_if_needed( - left: LogicalPlanBuilder, - right: LogicalPlanBuilder, -) -> datafusion::common::Result<(LogicalPlanBuilder, LogicalPlanBuilder)> { - let left_cols = left.schema().columns(); - let right_cols = right.schema().columns(); - if left_cols.iter().any(|l| { - right_cols.iter().any(|r| { - l == r || (l.name == r.name && (l.relation.is_none() || r.relation.is_none())) - }) - }) { - // These names have no connection to the original plan, but they'll make the columns - // (mostly) unique. - Ok(( - left.alias(TableReference::bare("left"))?, - right.alias(TableReference::bare("right"))?, - )) - } else { - Ok((left, right)) - } -} - pub(super) fn next_struct_field_name( column_idx: usize, dfs_names: &[String],