Skip to content

Commit

Permalink
[SPARK-41210][K8S] Port executor failure tracker from Spark on YARN t…
Browse files Browse the repository at this point in the history
…o K8s

### What changes were proposed in this pull request?

Fail Spark Application when the number of executor failures reaches the threshold.

### Why are the changes needed?

Sometimes, the executors can not launch successfully because of the wrong configuration, but in K8s, Driver does not know that, and just keep requesting new executors.

This PR ports the window-based executor failure tracking mechanism to K8s(only takes effect when `spark.kubernetes.allocation.pods.allocator` is set to 'direct'), to reduce functionality gap between YARN and K8s.

Note that, YARN mode also supports host-based executor allocation failure tracking and application terminating mechanism[2], this PR does not port such functionalities to Kubernetes since it's kind of an independent and big feature, and relies on some YARN features which I'm not sure if K8s has similar one.

[1] [SPARK-6735](https://issues.apache.org/jira/browse/SPARK-6735)
[2] [SPARK-17675](https://issues.apache.org/jira/browse/SPARK-17675)

### Does this PR introduce _any_ user-facing change?

Yes, this PR provides two new configurations

- `spark.executor.maxNumFailures`
- `spark.executor.failuresValidityInterval`

which takes effect on YARN, or on Kubernetes when `spark.kubernetes.allocation.pods.allocator` is set to 'direct'.

### How was this patch tested?

New UT added, and manually tested in internal K8s cluster.

Closes apache#40774 from pan3793/SPARK-41210.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
  • Loading branch information
pan3793 authored and yaooqinn committed Apr 18, 2023
1 parent cbe94a1 commit 40872e9
Show file tree
Hide file tree
Showing 14 changed files with 224 additions and 109 deletions.
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,11 @@ private[spark] object SparkConf extends Logging {
AlternateConfig("spark.yarn.access.namenodes", "2.2"),
AlternateConfig("spark.yarn.access.hadoopFileSystems", "3.0")),
"spark.kafka.consumer.cache.capacity" -> Seq(
AlternateConfig("spark.sql.kafkaConsumerCache.capacity", "3.0"))
AlternateConfig("spark.sql.kafkaConsumerCache.capacity", "3.0")),
MAX_EXECUTOR_FAILURES.key -> Seq(
AlternateConfig("spark.yarn.max.executor.failures", "3.5")),
EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key -> Seq(
AlternateConfig("spark.yarn.executor.failuresValidityInterval", "3.5"))
)

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy

import scala.collection.mutable

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Streaming.STREAMING_DYN_ALLOCATION_MAX_EXECUTORS
import org.apache.spark.util.{Clock, SystemClock, Utils}

/**
* ExecutorFailureTracker is responsible for tracking executor failures both for each host
* separately and for all hosts altogether.
*/
private[spark] class ExecutorFailureTracker(
sparkConf: SparkConf,
val clock: Clock = new SystemClock) extends Logging {

private val executorFailuresValidityInterval =
sparkConf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS).getOrElse(-1L)

// Queue to store the timestamp of failed executors for each host
private val failedExecutorsTimeStampsPerHost = mutable.Map[String, mutable.Queue[Long]]()

private val failedExecutorsTimeStamps = new mutable.Queue[Long]()

private def updateAndCountFailures(failedExecutorsWithTimeStamps: mutable.Queue[Long]): Int = {
val endTime = clock.getTimeMillis()
while (executorFailuresValidityInterval > 0 &&
failedExecutorsWithTimeStamps.nonEmpty &&
failedExecutorsWithTimeStamps.head < endTime - executorFailuresValidityInterval) {
failedExecutorsWithTimeStamps.dequeue()
}
failedExecutorsWithTimeStamps.size
}

def numFailedExecutors: Int = synchronized {
updateAndCountFailures(failedExecutorsTimeStamps)
}

def registerFailureOnHost(hostname: String): Unit = synchronized {
val timeMillis = clock.getTimeMillis()
failedExecutorsTimeStamps.enqueue(timeMillis)
val failedExecutorsOnHost =
failedExecutorsTimeStampsPerHost.getOrElse(hostname, {
val failureOnHost = mutable.Queue[Long]()
failedExecutorsTimeStampsPerHost.put(hostname, failureOnHost)
failureOnHost
})
failedExecutorsOnHost.enqueue(timeMillis)
}

def registerExecutorFailure(): Unit = synchronized {
val timeMillis = clock.getTimeMillis()
failedExecutorsTimeStamps.enqueue(timeMillis)
}

def numFailuresOnHost(hostname: String): Int = {
failedExecutorsTimeStampsPerHost.get(hostname).map { failedExecutorsOnHost =>
updateAndCountFailures(failedExecutorsOnHost)
}.getOrElse(0)
}
}

object ExecutorFailureTracker {

// Default to twice the number of executors (twice the maximum number of executors if dynamic
// allocation is enabled), with a minimum of 3.
def maxNumExecutorFailures(sparkConf: SparkConf): Int = {
val effectiveNumExecutors =
if (Utils.isStreamingDynamicAllocationEnabled(sparkConf)) {
sparkConf.get(STREAMING_DYN_ALLOCATION_MAX_EXECUTORS)
} else if (Utils.isDynamicAllocationEnabled(sparkConf)) {
sparkConf.get(DYN_ALLOCATION_MAX_EXECUTORS)
} else {
sparkConf.get(EXECUTOR_INSTANCES).getOrElse(0)
}
// By default, effectiveNumExecutors is Int.MaxValue if dynamic allocation is enabled. We need
// avoid the integer overflow here.
val defaultMaxNumExecutorFailures = math.max(3,
if (effectiveNumExecutors > Int.MaxValue / 2) Int.MaxValue else 2 * effectiveNumExecutors)

sparkConf.get(MAX_EXECUTOR_FAILURES).getOrElse(defaultMaxNumExecutorFailures)
}
}
18 changes: 18 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -921,6 +921,24 @@ package object config {
.booleanConf
.createWithDefault(false)

private[spark] val MAX_EXECUTOR_FAILURES =
ConfigBuilder("spark.executor.maxNumFailures")
.doc("Spark exits if the number of failed executors exceeds this threshold. " +
"This configuration only takes effect on YARN, or Kubernetes when " +
"`spark.kubernetes.allocation.pods.allocator` is set to 'direct'.")
.version("3.5.0")
.intConf
.createOptional

private[spark] val EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS =
ConfigBuilder("spark.executor.failuresValidityInterval")
.doc("Interval after which Executor failures will be considered independent and not " +
"accumulate towards the attempt count. This configuration only takes effect on YARN, " +
"or Kubernetes when `spark.kubernetes.allocation.pods.allocator` is set to 'direct'.")
.version("3.5.0")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional

private[spark] val UNREGISTER_OUTPUT_ON_HOST_ON_FETCH_FAILURE =
ConfigBuilder("spark.files.fetchFailure.unRegisterOutputOnHost")
.doc("Whether to un-register all the outputs on the host in condition that we receive " +
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/util/SparkExitCode.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ private[spark] object SparkExitCode {
/** Exception appears when the computer cannot find the specified path. */
val ERROR_PATH_NOT_FOUND = 3

/** Exit due to executor failures exceeds the threshold. */
val EXCEED_MAX_EXECUTOR_FAILURES = 11

/** The default uncaught exception handler was reached. */
val UNCAUGHT_EXCEPTION = 50

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.yarn

package org.apache.spark.deploy

import org.scalatest.matchers.must.Matchers
import org.scalatest.matchers.should.Matchers._

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config
import org.apache.spark.util.ManualClock

class FailureTrackerSuite extends SparkFunSuite with Matchers {
class ExecutorFailureTrackerSuite extends SparkFunSuite with Matchers {

override def beforeAll(): Unit = {
super.beforeAll()
Expand All @@ -33,7 +35,7 @@ class FailureTrackerSuite extends SparkFunSuite with Matchers {
sparkConf.set(config.EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS, 100L)

val clock = new ManualClock()
val failureTracker = new FailureTracker(sparkConf, clock)
val failureTracker = new ExecutorFailureTracker(sparkConf, clock)

clock.setTime(0)
failureTracker.registerFailureOnHost("host1")
Expand Down Expand Up @@ -70,7 +72,7 @@ class FailureTrackerSuite extends SparkFunSuite with Matchers {
val sparkConf = new SparkConf()

val clock = new ManualClock()
val failureTracker = new FailureTracker(sparkConf, clock)
val failureTracker = new ExecutorFailureTracker(sparkConf, clock)

clock.setTime(0)
failureTracker.registerFailureOnHost("host1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@ import io.fabric8.kubernetes.api.model.{HasMetadata, PersistentVolumeClaim, Pod,
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException}

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.ExecutorFailureTracker
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.KubernetesConf
import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, DYN_ALLOCATION_MAX_EXECUTORS, EXECUTOR_INSTANCES}
import org.apache.spark.internal.config._
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.cluster.SchedulerBackendUtils.DEFAULT_NUMBER_EXECUTORS
import org.apache.spark.util.{Clock, Utils}
import org.apache.spark.util.SparkExitCode.EXCEED_MAX_EXECUTOR_FAILURES

class ExecutorPodsAllocator(
conf: SparkConf,
Expand Down Expand Up @@ -71,6 +73,8 @@ class ExecutorPodsAllocator(

protected val maxPendingPods = conf.get(KUBERNETES_MAX_PENDING_PODS)

protected val maxNumExecutorFailures = ExecutorFailureTracker.maxNumExecutorFailures(conf)

protected val podCreationTimeout = math.max(
podAllocationDelay * 5,
conf.get(KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT))
Expand Down Expand Up @@ -117,6 +121,12 @@ class ExecutorPodsAllocator(
// if they happen to come up before the deletion takes effect.
@volatile protected var deletedExecutorIds = Set.empty[Long]

@volatile private var failedExecutorIds = Set.empty[Long]

protected val failureTracker = new ExecutorFailureTracker(conf, clock)

protected[spark] def getNumExecutorsFailed: Int = failureTracker.numFailedExecutors

def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
appId = applicationId
driverPod.foreach { pod =>
Expand All @@ -130,8 +140,12 @@ class ExecutorPodsAllocator(
.waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
}
}
snapshotsStore.addSubscriber(podAllocationDelay) {
onNewSnapshots(applicationId, schedulerBackend, _)
snapshotsStore.addSubscriber(podAllocationDelay) { executorPodsSnapshot =>
onNewSnapshots(applicationId, schedulerBackend, executorPodsSnapshot)
if (failureTracker.numFailedExecutors > maxNumExecutorFailures) {
logError(s"Max number of executor failures ($maxNumExecutorFailures) reached")
stopApplication(EXCEED_MAX_EXECUTOR_FAILURES)
}
}
}

Expand All @@ -148,6 +162,10 @@ class ExecutorPodsAllocator(

def isDeleted(executorId: String): Boolean = deletedExecutorIds.contains(executorId.toLong)

private[k8s] def stopApplication(exitCode: Int): Unit = {
sys.exit(exitCode)
}

protected def onNewSnapshots(
applicationId: String,
schedulerBackend: KubernetesClusterSchedulerBackend,
Expand Down Expand Up @@ -254,6 +272,18 @@ class ExecutorPodsAllocator(
case _ => false
}

val currentFailedExecutorIds = podsForRpId.filter {
case (_, PodFailed(_)) => true
case _ => false
}.keySet

val newFailedExecutorIds = currentFailedExecutorIds -- failedExecutorIds
if (newFailedExecutorIds.nonEmpty) {
logWarning(s"${newFailedExecutorIds.size} new failed executors.")
newFailedExecutorIds.foreach { _ => failureTracker.registerExecutorFailure() }
}
failedExecutorIds = failedExecutorIds ++ currentFailedExecutorIds

val (schedulerKnownPendingExecsForRpId, currentPendingExecutorsForRpId) = podsForRpId.filter {
case (_, PodPending(_)) => true
case _ => false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesExecutorSp
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.internal.config.{DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, EXECUTOR_INSTANCES}
import org.apache.spark.internal.config._
import org.apache.spark.resource._
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
import org.apache.spark.util.ManualClock
import org.apache.spark.util.{ManualClock, SparkExitCode}

class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {

Expand Down Expand Up @@ -147,6 +147,46 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
when(persistentVolumeClaimList.getItems).thenReturn(Seq.empty[PersistentVolumeClaim].asJava)
}

test("SPARK-41210: Window based executor failure tracking mechanism") {
var _exitCode = -1
val _conf = conf.clone
.set(MAX_EXECUTOR_FAILURES.key, "2")
.set(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS.key, "2s")
podsAllocatorUnderTest = new ExecutorPodsAllocator(_conf, secMgr,
executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) {
override private[spark] def stopApplication(exitCode: Int): Unit = {
_exitCode = exitCode
}
}
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3))
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
assert(podsAllocatorUnderTest.getNumExecutorsFailed === 0)

waitForExecutorPodsClock.advance(1000)
snapshotsStore.updatePod(failedExecutorWithoutDeletion(1))
snapshotsStore.updatePod(failedExecutorWithoutDeletion(2))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.getNumExecutorsFailed === 2)
assert(_exitCode === -1)

waitForExecutorPodsClock.advance(1000)
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.getNumExecutorsFailed === 2)
assert(_exitCode === -1)

waitForExecutorPodsClock.advance(2000)
assert(podsAllocatorUnderTest.getNumExecutorsFailed === 0)
assert(_exitCode === -1)

waitForExecutorPodsClock.advance(1000)
snapshotsStore.updatePod(failedExecutorWithoutDeletion(3))
snapshotsStore.updatePod(failedExecutorWithoutDeletion(4))
snapshotsStore.updatePod(failedExecutorWithoutDeletion(5))
snapshotsStore.notifySubscribers()
assert(podsAllocatorUnderTest.getNumExecutorsFailed === 3)
assert(_exitCode === SparkExitCode.EXCEED_MAX_EXECUTOR_FAILURES)
}

test("SPARK-36052: test splitSlots") {
val seq1 = Seq("a")
assert(ExecutorPodsAllocator.splitSlots(seq1, 0) === Seq(("a", 0)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.scalatest.BeforeAndAfter

import org.apache.spark._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.internal.config._

class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter {

Expand All @@ -42,6 +43,9 @@ class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter {
MockitoAnnotations.openMocks(this).close()
when(sc.conf).thenReturn(sparkConf)
when(sc.conf.get(KUBERNETES_DRIVER_POD_NAME)).thenReturn(None)
when(sc.conf.get(EXECUTOR_INSTANCES)).thenReturn(None)
when(sc.conf.get(MAX_EXECUTOR_FAILURES)).thenReturn(None)
when(sc.conf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS)).thenReturn(None)
when(sc.env).thenReturn(env)
}

Expand All @@ -51,7 +55,7 @@ class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter {
classOf[ExecutorPodsAllocator].getName)
validConfigs.foreach { c =>
val manager = new KubernetesClusterManager()
when(sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR)).thenReturn(c)
when(sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR)).thenReturn(c)
manager.makeExecutorPodsAllocator(sc, kubernetesClient, null)
}
}
Expand Down
Loading

0 comments on commit 40872e9

Please sign in to comment.