Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve http graceful termination #4957

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion common/scala/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,17 @@ kamon {
}
}
}

akka {
# required to allow longer shutdown-termination-limit below
coordinated-shutdown.default-phase-timeout = 61 s
}
whisk {
http {
shutdown-unready-delay = 15 seconds # /ready will return 500 for this amount of time before connection draining starts
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems it would wait for 15 seconds, and start the termination phase.
But if there are still client connections connected, are they getting connection closed error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only allows an unready stage, which only affects the added /ready endpoint to return 503 (no other endpoints affected). This does not affect how akka http handles existing client connections during shutdown, so if akka allows gracefully terminating in flight requests, then no connection closed errors, but no new connections allowed. The purpose here is to allow external systems (e.g. kubernetes or reverse proxies) to probe the service and stop routing to this instance for some time before akka shutdown initiates.

shutdown-termination-limit = 61 seconds # hard limit on how long (after unready delay) requests have to complete
}


shared-packages-execute-only = false
metrics {
# Enable/disable Prometheus support. If enabled then metrics would be exposed at `/metrics` endpoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.openwhisk.http

import akka.actor.ActorSystem
import akka.Done
import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.event.Logging
import akka.http.scaladsl.{Http, HttpConnectionContext}
import akka.http.scaladsl.model.{HttpRequest, _}
Expand All @@ -29,16 +30,26 @@ import kamon.metric.MeasurementUnit
import spray.json._
import org.apache.openwhisk.common.Https.HttpsConfig
import org.apache.openwhisk.common._

import akka.pattern.after
import org.apache.openwhisk.common.TransactionId.systemPrefix
import pureconfig._
import pureconfig.generic.auto._
import scala.collection.immutable.Seq
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.Future
import scala.util.{Failure, Success}

case class BasicHttpServiceConfig(shutdownUnreadyDelay: FiniteDuration, shutdownTerminationLimit: FiniteDuration)

/**
* This trait extends the Akka Directives and Actor with logging and transaction counting
* facilities common to all OpenWhisk REST services.
*/
trait BasicHttpService extends Directives {
implicit val logging: Logging

//start with ready true
protected var readyState = true

val OW_EXTRA_LOGGING_HEADER = "X-OW-EXTRA-LOGGING"

Expand Down Expand Up @@ -156,24 +167,51 @@ trait BasicHttpService extends Directives {
}

object BasicHttpService {
implicit val tid = TransactionId(systemPrefix + "http_service")

/**
* Starts an HTTP(S) route handler on given port and registers a shutdown hook.
*/
def startHttpService(route: Route, port: Int, config: Option[HttpsConfig] = None, interface: String = "0.0.0.0")(
implicit actorSystem: ActorSystem,
materializer: ActorMaterializer): Unit = {
val connectionContext = config.map(Https.connectionContext(_)).getOrElse(HttpConnectionContext)
val httpBinding = Http().bindAndHandle(route, interface, port, connectionContext = connectionContext)
addShutdownHook(httpBinding)
def startHttpService(service: BasicHttpService,
port: Int,
httpsConfig: Option[HttpsConfig] = None,
interface: String = "0.0.0.0")(implicit actorSystem: ActorSystem,
materializer: ActorMaterializer,
logging: Logging): Unit = {
val connectionContext = httpsConfig.map(Https.connectionContext(_)).getOrElse(HttpConnectionContext)
val httpBinding = Http().bindAndHandle(service.route, interface, port, connectionContext = connectionContext)
logging.info(this, "starting http service...")
addShutdownHook(service, httpBinding)
}

def addShutdownHook(binding: Future[Http.ServerBinding])(implicit actorSystem: ActorSystem,
materializer: ActorMaterializer): Unit = {
def addShutdownHook(service: BasicHttpService,
binding: Future[Http.ServerBinding],
httpServiceConfig: BasicHttpServiceConfig =
loadConfigOrThrow[BasicHttpServiceConfig]("whisk.http"))(implicit actorSystem: ActorSystem,
materializer: ActorMaterializer,
logging: Logging): Unit = {
implicit val executionContext = actorSystem.dispatcher
sys.addShutdownHook {
Await.result(binding.map(_.unbind()), 30.seconds)
Await.result(actorSystem.whenTerminated, 30.seconds)

CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "http_unready") { () =>
logging.info(this, "shutdown unready...")
//return 503 status at /ready endpoint for some time before actual termination begins
service.readyState = false
after(httpServiceConfig.shutdownUnreadyDelay, actorSystem.scheduler) {
logging.info(this, "shutdown unready complete...")
Future.successful(Done)
}
}
CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseServiceUnbind, "http_termination") { () =>
logging.info(this, "shutdown terminating...")
binding
.flatMap(_.terminate(hardDeadline = httpServiceConfig.shutdownTerminationLimit))
.andThen {
case Success(_) => logging.info(this, "shutdown termination complete...")
case Failure(t) => logging.info(this, s"shutdown termination failed... ${t}")
}
.map { _ =>
Done
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
package org.apache.openwhisk.http

import akka.event.Logging
import org.apache.openwhisk.common.{MetricsRoute, TransactionId}
import akka.http.scaladsl.model.StatusCodes
import org.apache.openwhisk.common.{Logging, MetricsRoute, TransactionId}

/**
* This trait extends the BasicHttpService with a standard "ping" endpoint which
* responds to health queries, intended for monitoring.
*/
trait BasicRasService extends BasicHttpService {
class BasicRasService(implicit val logging: Logging) extends BasicHttpService {

override def routes(implicit transid: TransactionId) = ping ~ MetricsRoute()
override def routes(implicit transid: TransactionId) = ping ~ ready ~ MetricsRoute()

override def loglevelForRoute(route: String): Logging.LogLevel = {
if (route == "/ping" || route == "/metrics") {
Expand All @@ -39,4 +40,14 @@ trait BasicRasService extends BasicHttpService {
val ping = path("ping") {
get { complete("pong") }
}
val ready = path("ready") {
get {
if (readyState) {
complete("ok")
} else {
logging.warn(this, "not ready...")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this just be a debug? I think ready route returning 503 should be self explanatory

complete(StatusCodes.ServiceUnavailable, "notready")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class Controller(val instance: ControllerInstanceId,
implicit val whiskConfig: WhiskConfig,
implicit val actorSystem: ActorSystem,
implicit val materializer: ActorMaterializer,
implicit val logging: Logging)
override implicit val logging: Logging)
extends BasicRasService {

TransactionId.controller.mark(
Expand Down Expand Up @@ -286,9 +286,10 @@ object Controller {
val httpsConfig =
if (Controller.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.controller.https")) else None

BasicHttpService.startHttpService(controller.route, port, httpsConfig, interface)(
BasicHttpService.startHttpService(controller, port, httpsConfig, interface)(
actorSystem,
controller.materializer)
controller.materializer,
logger)

case Failure(t) =>
abort(s"Invalid runtimes manifest: $t")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object Main {
ConfigMXBean.register()
Kamon.init()
val port = CacheInvalidatorConfig(system.settings.config).invalidatorConfig.port
BasicHttpService.startHttpService(new BasicRasService {}.route, port, None)
BasicHttpService.startHttpService(new BasicRasService {}, port, None)
val (start, finish) = new CacheInvalidator(system.settings.config).start()
start
.map(_ => log.info(this, s"Started the server at http://localhost:$port"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,10 @@ object Invoker {
if (Invoker.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.invoker.https")) else None

val invokerServer = SpiLoader.get[InvokerServerProvider].instance(invoker)
BasicHttpService.startHttpService(invokerServer.route, port, httpsConfig)(
BasicHttpService.startHttpService(invokerServer, port, httpsConfig)(
actorSystem,
ActorMaterializer.create(actorSystem))
ActorMaterializer.create(actorSystem),
logger)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import akka.stream.scaladsl.{Sink, Source}
import ch.megard.akka.http.cors.scaladsl.CorsDirectives._
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.SystemUtils
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.common.{AkkaLogging, Logging, TransactionId}
import org.apache.openwhisk.core.ExecManifestSupport
import org.apache.openwhisk.http.BasicHttpService
import pureconfig._
Expand Down Expand Up @@ -75,7 +75,7 @@ class PlaygroundLauncher(host: String,
private val wsk = new Wsk(host, controllerPort, authKey)

def run(): ServiceContainer = {
BasicHttpService.startHttpService(PlaygroundService.route, pgPort, None, interface)(actorSystem, materializer)
BasicHttpService.startHttpService(PlaygroundService, pgPort, None, interface)(actorSystem, materializer, logging)
ServiceContainer(pgPort, pgUrl, "Playground")
}

Expand Down Expand Up @@ -119,6 +119,7 @@ class PlaygroundLauncher(host: String,
}

object PlaygroundService extends BasicHttpService {
override implicit val logging = new AkkaLogging(actorSystem.log)
override def routes(implicit transid: TransactionId): Route =
path(PathEnd | Slash | pg) { redirect(s"/$pg/ui/index.html", StatusCodes.Found) } ~
cors() {
Expand Down