diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 166f93be05bed..d7c45bbd96521 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -3297,21 +3297,8 @@ private[spark] object Utils extends Logging { } private[util] object CallerContext extends Logging { - val callerContextSupported: Boolean = { - SparkHadoopUtil.get.conf.getBoolean("hadoop.caller.context.enabled", false) && { - try { - Utils.classForName("org.apache.hadoop.ipc.CallerContext") - Utils.classForName("org.apache.hadoop.ipc.CallerContext$Builder") - true - } catch { - case _: ClassNotFoundException => - false - case NonFatal(e) => - logWarning("Fail to load the CallerContext class", e) - false - } - } - } + val callerContextEnabled: Boolean = + SparkHadoopUtil.get.conf.getBoolean("hadoop.caller.context.enabled", false) } /** @@ -3371,22 +3358,11 @@ private[spark] class CallerContext( /** * Set up the caller context [[context]] by invoking Hadoop CallerContext API of - * [[org.apache.hadoop.ipc.CallerContext]], which was added in hadoop 2.8. + * [[org.apache.hadoop.ipc.CallerContext]]. */ - def setCurrentContext(): Unit = { - if (CallerContext.callerContextSupported) { - try { - val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext") - val builder: Class[AnyRef] = - Utils.classForName("org.apache.hadoop.ipc.CallerContext$Builder") - val builderInst = builder.getConstructor(classOf[String]).newInstance(context) - val hdfsContext = builder.getMethod("build").invoke(builderInst) - callerContext.getMethod("setCurrent", callerContext).invoke(null, hdfsContext) - } catch { - case NonFatal(e) => - logWarning("Fail to set Spark caller context", e) - } - } + def setCurrentContext(): Unit = if (CallerContext.callerContextEnabled) { + val hdfsContext = new org.apache.hadoop.ipc.CallerContext.Builder(context).build() + org.apache.hadoop.ipc.CallerContext.setCurrent(hdfsContext) } } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 56fb5bf6c6cfe..ed4f8dbdb640b 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -957,10 +957,8 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties { test("Set Spark CallerContext") { val context = "test" new CallerContext(context).setCurrentContext() - if (CallerContext.callerContextSupported) { - val callerContext = Utils.classForName("org.apache.hadoop.ipc.CallerContext") - assert(s"SPARK_$context" === - callerContext.getMethod("getCurrent").invoke(null).toString) + if (CallerContext.callerContextEnabled) { + assert(s"SPARK_$context" === org.apache.hadoop.ipc.CallerContext.getCurrent.toString) } }