From 75e137c827af3e72ce5b02c8852e37636e35bbd0 Mon Sep 17 00:00:00 2001 From: idzikovsky Date: Tue, 30 Jun 2026 18:40:49 +0300 Subject: [PATCH] [LIVY-1041] Initial changes to provide Scala 2.13 compatibility --- .../main/scala/org/apache/livy/repl/Session.scala | 2 +- .../scala/org/apache/livy/scalaapi/package.scala | 2 +- .../scala/org/apache/livy/server/LivyServer.scala | 2 +- .../livy/server/interactive/InteractiveSession.scala | 6 +++--- .../interactive/InteractiveSessionServlet.scala | 2 +- .../livy/server/recovery/ZooKeeperManager.scala | 2 +- .../org/apache/livy/utils/SparkKubernetesApp.scala | 12 ++++++------ .../scala/org/apache/livy/utils/SparkYarnApp.scala | 9 ++++----- 8 files changed, 18 insertions(+), 19 deletions(-) diff --git a/repl/src/main/scala/org/apache/livy/repl/Session.scala b/repl/src/main/scala/org/apache/livy/repl/Session.scala index 262c811c7..c1267bc45 100644 --- a/repl/src/main/scala/org/apache/livy/repl/Session.scala +++ b/repl/src/main/scala/org/apache/livy/repl/Session.scala @@ -137,7 +137,7 @@ class Session( entries }(interpreterExecutor) - future.onFailure { case _ => changeState(SessionState.Error()) }(interpreterExecutor) + future.failed.foreach { _ => changeState(SessionState.Error()) }(interpreterExecutor) future } diff --git a/scala-api/src/main/scala/org/apache/livy/scalaapi/package.scala b/scala-api/src/main/scala/org/apache/livy/scalaapi/package.scala index a08c7002a..a4914b056 100644 --- a/scala-api/src/main/scala/org/apache/livy/scalaapi/package.scala +++ b/scala-api/src/main/scala/org/apache/livy/scalaapi/package.scala @@ -42,7 +42,7 @@ package object scalaapi { private[livy] def getJavaFutureResult[T](jFuture: JFuture[T], atMost: Duration = Duration.Undefined): T = { try { - if (!atMost.isFinite()) jFuture.get else jFuture.get(atMost.toMillis, TimeUnit.MILLISECONDS) + if (!atMost.isFinite) jFuture.get else jFuture.get(atMost.toMillis, TimeUnit.MILLISECONDS) } catch { case executionException: ExecutionException => throw executionException.getCause } diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index 7b6f323e0..73f259d5b 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -229,7 +229,7 @@ class LivyServer extends Logging { override def contextInitialized(sce: ServletContextEvent): Unit = { try { val context = sce.getServletContext() - context.initParameters(org.scalatra.EnvironmentKey) = livyConf.get(ENVIRONMENT) + context.setInitParameter(org.scalatra.EnvironmentKey, livyConf.get(ENVIRONMENT)) val interactiveServlet = new InteractiveSessionServlet( interactiveSessionManager, sessionStore, livyConf, accessManager) diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index 0667b718c..38fa2045f 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -479,15 +479,15 @@ class InteractiveSession( client.get.getServerUri.get() }(sessionManageExecutors) - uriFuture.onSuccess { case url => + uriFuture.foreach { url => rscDriverUri = Option(url) sessionSaveLock.synchronized { sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata) } }(sessionManageExecutors) - uriFuture.onFailure { - case e => warn("Fail to get rsc uri", e) + uriFuture.failed.foreach { e => + warn("Fail to get rsc uri", e) }(sessionManageExecutors) // Send a dummy job that will return once the client is ready to be used, and set the diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala index 310504e75..d6ba9e9df 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala @@ -89,7 +89,7 @@ class InteractiveSessionServlet( val from = math.max(0, lines.length - size) val until = from + size - lines.view(from, until) + lines.slice(from, until) } .getOrElse(Nil) } else { diff --git a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperManager.scala b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperManager.scala index ff483e5df..9ad86bdc8 100644 --- a/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperManager.scala +++ b/server/src/main/scala/org/apache/livy/server/recovery/ZooKeeperManager.scala @@ -169,7 +169,7 @@ class ZooKeeperManager( if (curatorClient.checkExists().forPath(key) == null) { Seq.empty[String] } else { - curatorClient.getChildren.forPath(key).asScala + curatorClient.getChildren.forPath(key).asScala.toSeq } } diff --git a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala index 22ab6b8f8..9c813edc8 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkKubernetesApp.scala @@ -278,7 +278,7 @@ class SparkKubernetesApp private[utils] ( "Please check Livy log and KUBERNETES log to know the details." error(s"Failed monitoring the app $appTag: $msg") - kubernetesDiagnostics = ArrayBuffer(msg) + kubernetesDiagnostics = IndexedSeq(msg) failToMonitor() } } @@ -360,12 +360,12 @@ class SparkKubernetesApp private[utils] ( kubernetesAppMonitorFailedTimes += 1 if (kubernetesAppMonitorFailedTimes > appLookupMaxFailedTimes) { error(s"Monitoring of the app $appTag was interrupted.", e) - kubernetesDiagnostics = ArrayBuffer(e.getMessage) + kubernetesDiagnostics = IndexedSeq(e.getMessage) failToMonitor() } case NonFatal(e) => error(s"Error while refreshing Kubernetes state", e) - kubernetesDiagnostics = ArrayBuffer(e.getMessage) + kubernetesDiagnostics = IndexedSeq(e.getMessage) changeState(SparkApp.State.FAILED) } finally { if (!isRunning) { @@ -636,7 +636,7 @@ private[utils] case class KubernetesAppReport(driver: Option[Pod], executors: Se private def buildSparkPodDiagnosticsPrettyString(pod: Pod): String = { import scala.collection.JavaConverters._ - def printMap(map: Map[_, _]): String = map.map { + def printMap[K, V](map: Map[K, V]): String = map.map { case (key, value) => s"$key=$value" }.mkString(", ") @@ -690,7 +690,7 @@ private[utils] object KubernetesExtensions { .withLabels(labels.asJava) .withLabel(appTagLabel) .withLabel(appIdLabel) - .list.getItems.asScala.map(new KubernetesApplication(_)) + .list.getItems.asScala.map(new KubernetesApplication(_)).toSeq } def killApplication(app: KubernetesApplication): Boolean = { @@ -705,7 +705,7 @@ private[utils] object KubernetesExtensions { ): KubernetesAppReport = { val pods = client.pods.inNamespace(app.getApplicationNamespace) .withLabels(Map(appTagLabel -> app.getApplicationTag).asJava) - .list.getItems.asScala + .list.getItems.asScala.toSeq val driver = pods.find(_.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_DRIVER) val executors = pods.filter(_.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_EXECUTOR) diff --git a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala index bd7d29fa5..797a09451 100644 --- a/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala +++ b/server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala @@ -18,7 +18,6 @@ package org.apache.livy.utils import scala.annotation.tailrec import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer import scala.concurrent._ import scala.concurrent.duration._ import scala.language.postfixOps @@ -139,8 +138,8 @@ class SparkYarnApp private[utils] ( private var yarnDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String] override def log(): IndexedSeq[String] = - ("stdout: " +: process.map(_.inputLines).getOrElse(ArrayBuffer.empty[String])) ++ - ("\nstderr: " +: process.map(_.errorLines).getOrElse(ArrayBuffer.empty[String])) ++ + ("stdout: " +: process.map(_.inputLines).getOrElse(IndexedSeq.empty[String])) ++ + ("\nstderr: " +: process.map(_.errorLines).getOrElse(IndexedSeq.empty[String])) ++ ("\nYARN Diagnostics: " +: yarnDiagnostics) override def kill(): Unit = synchronized { @@ -344,11 +343,11 @@ class SparkYarnApp private[utils] ( debug(s"$appId $state ${yarnDiagnostics.mkString(" ")}") } catch { case _: InterruptedException => - yarnDiagnostics = ArrayBuffer("Session stopped by user.") + yarnDiagnostics = IndexedSeq("Session stopped by user.") changeState(SparkApp.State.KILLED) case NonFatal(e) => error(s"Error whiling refreshing YARN state", e) - yarnDiagnostics = ArrayBuffer(e.getMessage) + yarnDiagnostics = IndexedSeq(e.getMessage) changeState(SparkApp.State.FAILED) } }