Skip to content

Commit 3757091

Browse files
AngersZhuuuucloud-fan
authored andcommitted
[SPARK-54319][SQL] BHJ LeftAnti update numOutputRows wrong when codegen is disabled
### What changes were proposed in this pull request? BHJ LeftAnti update numOutputRows missing case for hashed = EmptyHashedRelation <img width="1754" height="1148" alt="image" src="https://github.com/user-attachments/assets/a71e4546-578e-4e4d-9434-9287074ebe75" /> ### Why are the changes needed? Fix missing sql metrics for BHJ ### Does this PR introduce _any_ user-facing change? Yes, BHJ LeftAnti will update numOutputRows when hashed = EmptyHashedRelation ### How was this patch tested? Existed UT ### Was this patch authored or co-authored using generative AI tooling? No Closes #53014 from AngersZhuuuu/SPARK-54319. Authored-by: Angerszhuuuu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 96e0d4b commit 3757091

File tree

2 files changed

+21
-7
lines changed

2 files changed

+21
-7
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,10 @@ case class BroadcastHashJoinExec(
129129
val hashed = broadcastRelation.value.asReadOnlyCopy()
130130
TaskContext.get().taskMetrics().incPeakExecutionMemory(hashed.estimatedSize)
131131
if (hashed == EmptyHashedRelation) {
132-
streamedIter
132+
streamedIter.map { row =>
133+
numOutputRows += 1
134+
row
135+
}
133136
} else if (hashed == HashedRelationWithAllNullKeys) {
134137
Iterator.empty
135138
} else {

sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -915,16 +915,27 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
915915
withTable("t1", "t2") {
916916
spark.range(4).write.saveAsTable("t1")
917917
spark.range(2).write.saveAsTable("t2")
918-
val df = sql("SELECT * FROM t1 WHERE id NOT IN (SELECT id FROM t2)")
919-
df.collect()
920-
val plan = df.queryExecution.executedPlan
918+
val df1 = sql("SELECT * FROM t1 WHERE id NOT IN (SELECT id FROM t2)")
919+
df1.collect()
920+
val plan1 = df1.queryExecution.executedPlan
921921

922-
val joins = plan.collect {
922+
val joins1 = plan1.collect {
923923
case s: BroadcastHashJoinExec => s
924924
}
925925

926-
assert(joins.size === 1)
927-
testMetricsInSparkPlanOperator(joins.head, Map("numOutputRows" -> 2))
926+
assert(joins1.size === 1)
927+
testMetricsInSparkPlanOperator(joins1.head, Map("numOutputRows" -> 2))
928+
929+
val df2 = sql("SELECT * FROM t1 WHERE id NOT IN (SELECT id FROM t2 WHERE 1 = 2)")
930+
df2.collect()
931+
val plan2 = df2.queryExecution.executedPlan
932+
933+
val joins2 = plan2.collect {
934+
case s: BroadcastHashJoinExec => s
935+
}
936+
937+
assert(joins2.size === 1)
938+
testMetricsInSparkPlanOperator(joins2.head, Map("numOutputRows" -> 4))
928939
}
929940
}
930941
}

0 commit comments

Comments
 (0)