Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
# What spark deploy mode Livy sessions should use.
# livy.spark.deploy-mode =

# What default YARN queue Livy sessions should use if not specified in the client request.
# By default, it is null and falls back to the global Hadoop cluster default queue.
# livy.spark.yarn.queue = default

# Configure Livy server http request and response header size.
# livy.server.request-header.size = 131072
# livy.server.response-header.size = 131072
Expand Down
4 changes: 4 additions & 0 deletions server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ object LivyConf {

val TEST_MODE = ClientConf.TEST_MODE

val SPARK_YARN_QUEUE = Entry("livy.spark.yarn.queue", null)
Comment thread
nileshrathi345 marked this conversation as resolved.
val SPARK_HOME = Entry("livy.server.spark-home", null)
val LIVY_SPARK_MASTER = Entry("livy.spark.master", "local")
val LIVY_SPARK_DEPLOY_MODE = Entry("livy.spark.deploy-mode", null)
Expand Down Expand Up @@ -485,6 +486,9 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
/** Return the spark master Livy sessions should use. */
def sparkMaster(): String = get(LIVY_SPARK_MASTER)

/** Return the value of spark yarn queue. */
def getYarnQueue(): Option[String] = Option(get(SPARK_YARN_QUEUE))

/** Return the path to the spark-submit executable. */
def sparkSubmit(): String = {
sparkHome().map { _ + File.separator + "bin" + File.separator + "spark-submit" }.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ case class BatchRecoveryMetadata(
appTag: String,
owner: String,
proxyUser: Option[String],
queue: Option[String],
Comment thread
nileshrathi345 marked this conversation as resolved.
Outdated
version: Int = 1)
extends RecoveryMetadata

Expand Down Expand Up @@ -83,7 +84,7 @@ object BatchSession extends Logging {
request.executorMemory.foreach(builder.executorMemory)
request.executorCores.foreach(builder.executorCores)
request.numExecutors.foreach(builder.numExecutors)
request.queue.foreach(builder.queue)
request.queue.orElse(livyConf.getYarnQueue()).foreach(builder.queue)
request.name.foreach(builder.name)

sessionStore.save(BatchSession.RECOVERY_SESSION_TYPE, s.recoveryMetadata)
Expand Down Expand Up @@ -120,6 +121,7 @@ object BatchSession extends Logging {
owner,
impersonatedUser,
sessionStore,
request.queue.orElse(livyConf.getYarnQueue()),
mockApp.map { m => (_: BatchSession) => m }.getOrElse(createSparkApp))
}

Expand All @@ -137,6 +139,7 @@ object BatchSession extends Logging {
m.owner,
m.proxyUser,
sessionStore,
m.queue,
mockApp.map { m => (_: BatchSession) => m }.getOrElse { s =>
SparkApp.create(m.appTag, m.appId, None, livyConf, Option(s))
})
Expand All @@ -152,6 +155,7 @@ class BatchSession(
owner: String,
override val proxyUser: Option[String],
sessionStore: SessionStore,
val queue: Option[String],
sparkApp: BatchSession => SparkApp)
extends Session(id, name, owner, livyConf) with SparkAppListener {
import BatchSession._
Expand Down Expand Up @@ -204,5 +208,5 @@ class BatchSession(
override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo }

override def recoveryMetadata: RecoveryMetadata =
BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser)
BatchRecoveryMetadata(id, name, appId, appTag, owner, proxyUser, queue)
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ case class BatchSessionView(
state: String,
appId: Option[String],
appInfo: AppInfo,
queue: Option[String],
Comment thread
nileshrathi345 marked this conversation as resolved.
Outdated
log: Seq[String])

class BatchSessionServlet(
Expand Down Expand Up @@ -76,7 +77,7 @@ class BatchSessionServlet(
Nil
}
BatchSessionView(session.id, session.name, session.owner, session.proxyUser,
session.state.toString, session.appId, session.appInfo, logs)
session.state.toString, session.appId, session.appInfo, session.queue, logs)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ object InteractiveSession extends Logging {
SparkLauncher.EXECUTOR_MEMORY -> request.executorMemory.map(_.toString),
"spark.executor.instances" -> request.numExecutors.map(_.toString),
"spark.app.name" -> request.name.map(_.toString),
"spark.yarn.queue" -> request.queue
"spark.yarn.queue" -> request.queue.orElse(livyConf.getYarnQueue())
)

userOpts.foreach { case (key, opt) =>
Expand Down Expand Up @@ -152,7 +152,7 @@ object InteractiveSession extends Logging {
request.jars,
request.numExecutors,
request.pyFiles,
request.queue,
request.queue.orElse(livyConf.getYarnQueue()),
mockApp)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,71 @@ class BatchSessionSpec
}) should be (true)
}

it("should inherit the default YARN queue from LivyConf when request queue is empty") {
val req = new CreateBatchRequest()
req.file = script.toString
req.queue = None // Explicitly empty
req.conf = Map("spark.driver.extraClassPath" -> sys.props("java.class.path"))

// Set default queue in LivyConf configuration
val conf = new LivyConf()
.set(LivyConf.LOCAL_FS_WHITELIST, sys.props("java.io.tmpdir"))
.set(LivyConf.SPARK_YARN_QUEUE, "livy-default-batch-queue")

val accessManager = new AccessManager(conf)
val mockApp = mock[SparkApp]

val batch = BatchSession.create(
id = 10,
name = None,
request = req,
livyConf = conf,
accessManager = accessManager,
owner = null,
proxyUser = None,
sessionStore = sessionStore,
mockApp = Some(mockApp)
)

// Verify that the batch session structure captured the fallback queue configuration
batch.queue shouldBe Some("livy-default-batch-queue")
}

it("should prioritize user-specified request queue over LivyConf configuration") {
val req = new CreateBatchRequest()
req.file = script.toString
req.queue = Some("user-custom-batch-queue") // Explicitly requested by user
req.conf = Map("spark.driver.extraClassPath" -> sys.props("java.class.path"))

val conf = new LivyConf()
.set(LivyConf.LOCAL_FS_WHITELIST, sys.props("java.io.tmpdir"))
.set(LivyConf.SPARK_YARN_QUEUE, "livy-default-batch-queue")

val accessManager = new AccessManager(conf)
val mockApp = mock[SparkApp]

val batch = BatchSession.create(
id = 20,
name = None,
request = req,
livyConf = conf,
accessManager = accessManager,
owner = null,
proxyUser = None,
sessionStore = sessionStore,
mockApp = Some(mockApp)
)

// Verify user context takes absolute priority over fallback definition
batch.queue shouldBe Some("user-custom-batch-queue")
}

def testRecoverSession(name: Option[String]): Unit = {
val conf = new LivyConf()
val req = new CreateBatchRequest()
val name = Some("Test Batch Session")
val mockApp = mock[SparkApp]
val m = BatchRecoveryMetadata(99, name, None, "appTag", null, None)
val m = BatchRecoveryMetadata(99, name, None, "appTag", null, None, None)
val batch = BatchSession.recover(m, conf, sessionStore, Some(mockApp))

batch.state shouldBe (SessionState.Recovering)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,4 +322,71 @@ class InteractiveSessionSpec extends FunSpec
s.logLines().mkString should include("RSCDriver URI is unknown")
}
}

describe("InteractiveSession") {
it("should inherit the default YARN queue from LivyConf when request queue is empty") {
// Create a clean instance of LivyConf for this isolated check
val testLivyConf = new LivyConf()
.set(LivyConf.REPL_JARS, "dummy.jar")
.set(LivyConf.SPARK_YARN_QUEUE, "livy-default-queue")

val req = new CreateInteractiveRequest()
req.kind = Spark
req.queue = None // Explicitly empty
req.conf = Map(RSCConf.Entry.LIVY_JARS.key() -> "")

// Create a mock RSCClient to avoid spawning a real background spark-submit process
val mockClient = Some(mock[RSCClient])

val s = InteractiveSession.create(
id = 101,
name = None,
owner = "systest",
proxyUser = None,
livyConf = testLivyConf,
accessManager = accessManager,
request = req,
sessionStore = mock[SessionStore],
ttl = None,
idleTimeout = None,
mockApp = None,
mockClient = mockClient
)

// Verify that the internal session state correctly holds the fallback queue string
s.queue shouldBe Some("livy-default-queue")
}

it("should prioritize user-specified request queue over LivyConf global configuration") {
val testLivyConf = new LivyConf()
.set(LivyConf.REPL_JARS, "dummy.jar")
.set(LivyConf.SPARK_YARN_QUEUE, "livy-default-queue")

val req = new CreateInteractiveRequest()
req.kind = Spark
req.queue = Some("user-custom-queue") // Explicitly provided by request
req.conf = Map(RSCConf.Entry.LIVY_JARS.key() -> "")

val mockClient = Some(mock[RSCClient])

val s = InteractiveSession.create(
id = 102,
name = None,
owner = "systest",
proxyUser = None,
livyConf = testLivyConf,
accessManager = accessManager,
request = req,
sessionStore = mock[SessionStore],
ttl = None,
idleTimeout = None,
mockApp = None,
mockClient = mockClient
)

// Verify that user context takes absolute priority over fallback definitions
s.queue shouldBe Some("user-custom-queue")
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit
implicit def executor: ExecutionContext = ExecutionContext.global

def makeMetadata(id: Int, appTag: String): BatchRecoveryMetadata = {
BatchRecoveryMetadata(id, Some(s"test-session-$id"), None, appTag, null, None)
BatchRecoveryMetadata(id, Some(s"test-session-$id"), None, appTag, null, None, None)
}

def mockSession(id: Int): BatchSession = {
Expand Down
Loading