Skip to content

Commit

Permalink
[SPARK-43191][CORE] Replace reflection w/ direct calling for Hadoop C…
Browse files Browse the repository at this point in the history
…allerContext

### What changes were proposed in this pull request?

Replace reflection w/ direct calling for `org.apache.hadoop.ipc.CallerContext`

### Why are the changes needed?

`org.apache.hadoop.ipc.CallerContext` was added in [HDFS-9184](https://issues.apache.org/jira/browse/HDFS-9184) (Hadoop 2.8.0/3.0.0), previously, Spark uses reflection to invoke it for compatible w/ Hadoop 2.7, since SPARK-42452 removed support for Hadoop2, we can call it directly instead of using reflection.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass GA.

Closes apache#40850 from pan3793/SPARK-43191.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
  • Loading branch information
pan3793 authored and sunchao committed Apr 20, 2023
1 parent 78407a7 commit 34d1c22
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 34 deletions.
36 changes: 6 additions & 30 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3297,21 +3297,8 @@ private[spark] object Utils extends Logging {
}

private[util] object CallerContext extends Logging {
val callerContextSupported: Boolean = {
SparkHadoopUtil.get.conf.getBoolean("hadoop.caller.context.enabled", false) && {
try {
Utils.classForName("org.apache.hadoop.ipc.CallerContext")
Utils.classForName("org.apache.hadoop.ipc.CallerContext$Builder")
true
} catch {
case _: ClassNotFoundException =>
false
case NonFatal(e) =>
logWarning("Fail to load the CallerContext class", e)
false
}
}
}
val callerContextEnabled: Boolean =
SparkHadoopUtil.get.conf.getBoolean("hadoop.caller.context.enabled", false)
}

/**
Expand Down Expand Up @@ -3371,22 +3358,11 @@ private[spark] class CallerContext(

/**
* Set up the caller context [[context]] by invoking Hadoop CallerContext API of
* [[org.apache.hadoop.ipc.CallerContext]], which was added in hadoop 2.8.
* [[org.apache.hadoop.ipc.CallerContext]].
*/
def setCurrentContext(): Unit = {
if (CallerContext.callerContextSupported) {
try {
val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext")
val builder: Class[AnyRef] =
Utils.classForName("org.apache.hadoop.ipc.CallerContext$Builder")
val builderInst = builder.getConstructor(classOf[String]).newInstance(context)
val hdfsContext = builder.getMethod("build").invoke(builderInst)
callerContext.getMethod("setCurrent", callerContext).invoke(null, hdfsContext)
} catch {
case NonFatal(e) =>
logWarning("Fail to set Spark caller context", e)
}
}
def setCurrentContext(): Unit = if (CallerContext.callerContextEnabled) {
val hdfsContext = new org.apache.hadoop.ipc.CallerContext.Builder(context).build()
org.apache.hadoop.ipc.CallerContext.setCurrent(hdfsContext)
}
}

Expand Down
6 changes: 2 additions & 4 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -957,10 +957,8 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties {
test("Set Spark CallerContext") {
val context = "test"
new CallerContext(context).setCurrentContext()
if (CallerContext.callerContextSupported) {
val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext")
assert(s"SPARK_$context" ===
callerContext.getMethod("getCurrent").invoke(null).toString)
if (CallerContext.callerContextEnabled) {
assert(s"SPARK_$context" === org.apache.hadoop.ipc.CallerContext.getCurrent.toString)
}
}

Expand Down

0 comments on commit 34d1c22

Please sign in to comment.