Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion repl/src/main/scala/org/apache/livy/repl/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class Session(
entries
}(interpreterExecutor)

future.onFailure { case _ => changeState(SessionState.Error()) }(interpreterExecutor)
future.failed.foreach { _ => changeState(SessionState.Error()) }(interpreterExecutor)
future
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ package object scalaapi {
private[livy] def getJavaFutureResult[T](jFuture: JFuture[T],
atMost: Duration = Duration.Undefined): T = {
try {
if (!atMost.isFinite()) jFuture.get else jFuture.get(atMost.toMillis, TimeUnit.MILLISECONDS)
if (!atMost.isFinite) jFuture.get else jFuture.get(atMost.toMillis, TimeUnit.MILLISECONDS)
} catch {
case executionException: ExecutionException => throw executionException.getCause
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class LivyServer extends Logging {
override def contextInitialized(sce: ServletContextEvent): Unit = {
try {
val context = sce.getServletContext()
context.initParameters(org.scalatra.EnvironmentKey) = livyConf.get(ENVIRONMENT)
context.setInitParameter(org.scalatra.EnvironmentKey, livyConf.get(ENVIRONMENT))

val interactiveServlet = new InteractiveSessionServlet(
interactiveSessionManager, sessionStore, livyConf, accessManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,15 +479,15 @@ class InteractiveSession(
client.get.getServerUri.get()
}(sessionManageExecutors)

uriFuture.onSuccess { case url =>
uriFuture.foreach { url =>
rscDriverUri = Option(url)
sessionSaveLock.synchronized {
sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
}
}(sessionManageExecutors)

uriFuture.onFailure {
case e => warn("Fail to get rsc uri", e)
uriFuture.failed.foreach { e =>
warn("Fail to get rsc uri", e)
}(sessionManageExecutors)

// Send a dummy job that will return once the client is ready to be used, and set the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class InteractiveSessionServlet(
val from = math.max(0, lines.length - size)
val until = from + size

lines.view(from, until)
lines.slice(from, until)
}
.getOrElse(Nil)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class ZooKeeperManager(
if (curatorClient.checkExists().forPath(key) == null) {
Seq.empty[String]
} else {
curatorClient.getChildren.forPath(key).asScala
curatorClient.getChildren.forPath(key).asScala.toSeq
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ class SparkKubernetesApp private[utils] (
"Please check Livy log and KUBERNETES log to know the details."

error(s"Failed monitoring the app $appTag: $msg")
kubernetesDiagnostics = ArrayBuffer(msg)
kubernetesDiagnostics = IndexedSeq(msg)
failToMonitor()
}
}
Expand Down Expand Up @@ -360,12 +360,12 @@ class SparkKubernetesApp private[utils] (
kubernetesAppMonitorFailedTimes += 1
if (kubernetesAppMonitorFailedTimes > appLookupMaxFailedTimes) {
error(s"Monitoring of the app $appTag was interrupted.", e)
kubernetesDiagnostics = ArrayBuffer(e.getMessage)
kubernetesDiagnostics = IndexedSeq(e.getMessage)
failToMonitor()
}
case NonFatal(e) =>
error(s"Error while refreshing Kubernetes state", e)
kubernetesDiagnostics = ArrayBuffer(e.getMessage)
kubernetesDiagnostics = IndexedSeq(e.getMessage)
changeState(SparkApp.State.FAILED)
} finally {
if (!isRunning) {
Expand Down Expand Up @@ -636,7 +636,7 @@ private[utils] case class KubernetesAppReport(driver: Option[Pod], executors: Se

private def buildSparkPodDiagnosticsPrettyString(pod: Pod): String = {
import scala.collection.JavaConverters._
def printMap(map: Map[_, _]): String = map.map {
def printMap[K, V](map: Map[K, V]): String = map.map {
case (key, value) => s"$key=$value"
}.mkString(", ")

Expand Down Expand Up @@ -690,7 +690,7 @@ private[utils] object KubernetesExtensions {
.withLabels(labels.asJava)
.withLabel(appTagLabel)
.withLabel(appIdLabel)
.list.getItems.asScala.map(new KubernetesApplication(_))
.list.getItems.asScala.map(new KubernetesApplication(_)).toSeq
}

def killApplication(app: KubernetesApplication): Boolean = {
Expand All @@ -705,7 +705,7 @@ private[utils] object KubernetesExtensions {
): KubernetesAppReport = {
val pods = client.pods.inNamespace(app.getApplicationNamespace)
.withLabels(Map(appTagLabel -> app.getApplicationTag).asJava)
.list.getItems.asScala
.list.getItems.asScala.toSeq
val driver = pods.find(_.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_DRIVER)
val executors =
pods.filter(_.getMetadata.getLabels.get(SPARK_ROLE_LABEL) == SPARK_ROLE_EXECUTOR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package org.apache.livy.utils

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent._
import scala.concurrent.duration._
import scala.language.postfixOps
Expand Down Expand Up @@ -139,8 +138,8 @@ class SparkYarnApp private[utils] (
private var yarnDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String]

override def log(): IndexedSeq[String] =
("stdout: " +: process.map(_.inputLines).getOrElse(ArrayBuffer.empty[String])) ++
("\nstderr: " +: process.map(_.errorLines).getOrElse(ArrayBuffer.empty[String])) ++
("stdout: " +: process.map(_.inputLines).getOrElse(IndexedSeq.empty[String])) ++
("\nstderr: " +: process.map(_.errorLines).getOrElse(IndexedSeq.empty[String])) ++
("\nYARN Diagnostics: " +: yarnDiagnostics)

override def kill(): Unit = synchronized {
Expand Down Expand Up @@ -344,11 +343,11 @@ class SparkYarnApp private[utils] (
debug(s"$appId $state ${yarnDiagnostics.mkString(" ")}")
} catch {
case _: InterruptedException =>
yarnDiagnostics = ArrayBuffer("Session stopped by user.")
yarnDiagnostics = IndexedSeq("Session stopped by user.")
changeState(SparkApp.State.KILLED)
case NonFatal(e) =>
error(s"Error whiling refreshing YARN state", e)
yarnDiagnostics = ArrayBuffer(e.getMessage)
yarnDiagnostics = IndexedSeq(e.getMessage)
changeState(SparkApp.State.FAILED)
}
}
Expand Down