From 119ec5b2ea86b73afaeabcb1d52136029326cac7 Mon Sep 17 00:00:00 2001 From: Bruce Robbins Date: Tue, 18 Apr 2023 13:09:41 +0900 Subject: [PATCH] [SPARK-43113][SQL] Evaluate stream-side variables when generating code for a bound condition ### What changes were proposed in this pull request? In `JoinCodegenSupport#getJoinCondition`, evaluate any referenced stream-side variables before using them in the generated code. This patch doesn't evaluate the passed stream-side variables directly, but instead evaluates a copy (`streamVars2`). This is because `SortMergeJoin#codegenFullOuter` will want to evaluate the stream-side vars within a different scope than the condition check, so we mustn't delete the initialization code from the original `ExprCode` instances. ### Why are the changes needed? When a bound condition of a full outer join references the same stream-side column more than once, wholestage codegen generates bad code. For example, the following query fails with a compilation error: ``` create or replace temp view v1 as select * from values (1, 1), (2, 2), (3, 1) as v1(key, value); create or replace temp view v2 as select * from values (1, 22, 22), (3, -1, -1), (7, null, null) as v2(a, b, c); select * from v1 full outer join v2 on key = a and value > b and value > c; ``` The error is: ``` org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 277, Column 9: Redefinition of local variable "smj_isNull_7" ``` The same error occurs with code generated from ShuffleHashJoinExec: ``` select /*+ SHUFFLE_HASH(v2) */ * from v1 full outer join v2 on key = a and value > b and value > c; ``` In this case, the error is: ``` org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 174, Column 5: Redefinition of local variable "shj_value_1" ``` Neither `SortMergeJoin#codegenFullOuter` nor `ShuffledHashJoinExec#doProduce` evaluate the stream-side variables before calling `consumeFullOuterJoinRow#getJoinCondition`. As a result, `getJoinCondition` generates definition/initialization code for each referenced stream-side variable at the point of use. If a stream-side variable is used more than once in the bound condition, the definition/initialization code is generated more than once, resulting in the "Redefinition of local variable" error. In the end, the query succeeds, since Spark disables wholestage codegen and tries again. (In the case other join-type/strategy pairs, either the implementations don't call `JoinCodegenSupport#getJoinCondition`, or the stream-side variables are pre-evaluated before the call is made, so no error happens in those cases). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit tests. Closes #40766 from bersprockets/full_join_codegen_issue. Authored-by: Bruce Robbins Signed-off-by: Hyukjin Kwon --- .../execution/joins/JoinCodegenSupport.scala | 8 +++-- .../org/apache/spark/sql/JoinSuite.scala | 35 +++++++++++++++++++ 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala index 75f0a359a793b..ae91615da0f4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/JoinCodegenSupport.scala @@ -42,13 +42,15 @@ trait JoinCodegenSupport extends CodegenSupport with BaseJoinExec { buildRow: Option[String] = None): (String, String, Seq[ExprCode]) = { val buildSideRow = buildRow.getOrElse(ctx.freshName("buildRow")) val buildVars = genOneSideJoinVars(ctx, buildSideRow, buildPlan, setDefaultValue = false) + val streamVars2 = streamVars.map(_.copy()) val checkCondition = if (condition.isDefined) { val expr = condition.get - // evaluate the variables from build side that used by condition - val eval = evaluateRequiredVariables(buildPlan.output, buildVars, expr.references) + // evaluate the variables that are used by the condition + val eval = evaluateRequiredVariables(streamPlan.output ++ buildPlan.output, + streamVars2 ++ buildVars, expr.references) // filter the output via condition - ctx.currentVars = streamVars ++ buildVars + ctx.currentVars = streamVars2 ++ buildVars val ev = BindReferences.bindReference(expr, streamPlan.output ++ buildPlan.output).genCode(ctx) val skipRow = s"${ev.isNull} || !${ev.value}" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 6dd34d41cf6c1..0e0ca54256075 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -1455,4 +1455,39 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan checkAnswer(result1, result2) } } + + def dupStreamSideColTest(hint: String, check: SparkPlan => Unit): Unit = { + val query = + s"""select /*+ ${hint}(r) */ * + |from testData2 l + |full outer join testData3 r + |on l.a = r.a + |and l.b < (r.b + 1) + |and l.b < (r.a + 1)""".stripMargin + val df = sql(query) + val plan = df.queryExecution.executedPlan + check(plan) + val expected = Row(1, 1, null, null) :: + Row(1, 2, null, null) :: + Row(null, null, 1, null) :: + Row(2, 1, 2, 2) :: + Row(2, 2, 2, 2) :: + Row(3, 1, null, null) :: + Row(3, 2, null, null) :: Nil + checkAnswer(df, expected) + } + + test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SMJ)") { + def check(plan: SparkPlan): Unit = { + assert(collect(plan) { case _: SortMergeJoinExec => true }.size === 1) + } + dupStreamSideColTest("MERGE", check) + } + + test("SPARK-43113: Full outer join with duplicate stream-side references in condition (SHJ)") { + def check(plan: SparkPlan): Unit = { + assert(collect(plan) { case _: ShuffledHashJoinExec => true }.size === 1) + } + dupStreamSideColTest("SHUFFLE_HASH", check) + } }