diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf index 22a9b575843..ba712b9de3c 100644 --- a/common/scala/src/main/resources/application.conf +++ b/common/scala/src/main/resources/application.conf @@ -97,8 +97,17 @@ kamon { } } } - +akka { + # required to allow longer shutdown-termination-limit below + coordinated-shutdown.default-phase-timeout = 61 s +} whisk { + http { + shutdown-unready-delay = 15 seconds # /ready will return 500 for this amount of time before connection draining starts + shutdown-termination-limit = 61 seconds # hard limit on how long (after unready delay) requests have to complete + } + + shared-packages-execute-only = false metrics { # Enable/disable Prometheus support. If enabled then metrics would be exposed at `/metrics` endpoint diff --git a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala index 27f1be9588a..feeeb828526 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicHttpService.scala @@ -17,7 +17,8 @@ package org.apache.openwhisk.http -import akka.actor.ActorSystem +import akka.Done +import akka.actor.{ActorSystem, CoordinatedShutdown} import akka.event.Logging import akka.http.scaladsl.{Http, HttpConnectionContext} import akka.http.scaladsl.model.{HttpRequest, _} @@ -29,16 +30,26 @@ import kamon.metric.MeasurementUnit import spray.json._ import org.apache.openwhisk.common.Https.HttpsConfig import org.apache.openwhisk.common._ - +import akka.pattern.after +import org.apache.openwhisk.common.TransactionId.systemPrefix +import pureconfig._ +import pureconfig.generic.auto._ import scala.collection.immutable.Seq -import scala.concurrent.duration.DurationInt -import scala.concurrent.{Await, Future} +import scala.concurrent.duration.{DurationInt, FiniteDuration} +import scala.concurrent.Future +import scala.util.{Failure, Success} + +case class BasicHttpServiceConfig(shutdownUnreadyDelay: FiniteDuration, shutdownTerminationLimit: FiniteDuration) /** * This trait extends the Akka Directives and Actor with logging and transaction counting * facilities common to all OpenWhisk REST services. */ trait BasicHttpService extends Directives { + implicit val logging: Logging + + //start with ready true + protected var readyState = true val OW_EXTRA_LOGGING_HEADER = "X-OW-EXTRA-LOGGING" @@ -156,24 +167,51 @@ trait BasicHttpService extends Directives { } object BasicHttpService { + implicit val tid = TransactionId(systemPrefix + "http_service") /** * Starts an HTTP(S) route handler on given port and registers a shutdown hook. */ - def startHttpService(route: Route, port: Int, config: Option[HttpsConfig] = None, interface: String = "0.0.0.0")( - implicit actorSystem: ActorSystem, - materializer: ActorMaterializer): Unit = { - val connectionContext = config.map(Https.connectionContext(_)).getOrElse(HttpConnectionContext) - val httpBinding = Http().bindAndHandle(route, interface, port, connectionContext = connectionContext) - addShutdownHook(httpBinding) + def startHttpService(service: BasicHttpService, + port: Int, + httpsConfig: Option[HttpsConfig] = None, + interface: String = "0.0.0.0")(implicit actorSystem: ActorSystem, + materializer: ActorMaterializer, + logging: Logging): Unit = { + val connectionContext = httpsConfig.map(Https.connectionContext(_)).getOrElse(HttpConnectionContext) + val httpBinding = Http().bindAndHandle(service.route, interface, port, connectionContext = connectionContext) + logging.info(this, "starting http service...") + addShutdownHook(service, httpBinding) } - def addShutdownHook(binding: Future[Http.ServerBinding])(implicit actorSystem: ActorSystem, - materializer: ActorMaterializer): Unit = { + def addShutdownHook(service: BasicHttpService, + binding: Future[Http.ServerBinding], + httpServiceConfig: BasicHttpServiceConfig = + loadConfigOrThrow[BasicHttpServiceConfig]("whisk.http"))(implicit actorSystem: ActorSystem, + materializer: ActorMaterializer, + logging: Logging): Unit = { implicit val executionContext = actorSystem.dispatcher - sys.addShutdownHook { - Await.result(binding.map(_.unbind()), 30.seconds) - Await.result(actorSystem.whenTerminated, 30.seconds) + + CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "http_unready") { () => + logging.info(this, "shutdown unready...") + //return 503 status at /ready endpoint for some time before actual termination begins + service.readyState = false + after(httpServiceConfig.shutdownUnreadyDelay, actorSystem.scheduler) { + logging.info(this, "shutdown unready complete...") + Future.successful(Done) + } + } + CoordinatedShutdown(actorSystem).addTask(CoordinatedShutdown.PhaseServiceUnbind, "http_termination") { () => + logging.info(this, "shutdown terminating...") + binding + .flatMap(_.terminate(hardDeadline = httpServiceConfig.shutdownTerminationLimit)) + .andThen { + case Success(_) => logging.info(this, "shutdown termination complete...") + case Failure(t) => logging.info(this, s"shutdown termination failed... ${t}") + } + .map { _ => + Done + } } } diff --git a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicRasService.scala b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicRasService.scala index cea06f431ad..5bb6f3c2569 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/http/BasicRasService.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/http/BasicRasService.scala @@ -18,15 +18,16 @@ package org.apache.openwhisk.http import akka.event.Logging -import org.apache.openwhisk.common.{MetricsRoute, TransactionId} +import akka.http.scaladsl.model.StatusCodes +import org.apache.openwhisk.common.{Logging, MetricsRoute, TransactionId} /** * This trait extends the BasicHttpService with a standard "ping" endpoint which * responds to health queries, intended for monitoring. */ -trait BasicRasService extends BasicHttpService { +class BasicRasService(implicit val logging: Logging) extends BasicHttpService { - override def routes(implicit transid: TransactionId) = ping ~ MetricsRoute() + override def routes(implicit transid: TransactionId) = ping ~ ready ~ MetricsRoute() override def loglevelForRoute(route: String): Logging.LogLevel = { if (route == "/ping" || route == "/metrics") { @@ -39,4 +40,14 @@ trait BasicRasService extends BasicHttpService { val ping = path("ping") { get { complete("pong") } } + val ready = path("ready") { + get { + if (readyState) { + complete("ok") + } else { + logging.warn(this, "not ready...") + complete(StatusCodes.ServiceUnavailable, "notready") + } + } + } } diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala index 935219685ed..cb2b0c17bc4 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala @@ -77,7 +77,7 @@ class Controller(val instance: ControllerInstanceId, implicit val whiskConfig: WhiskConfig, implicit val actorSystem: ActorSystem, implicit val materializer: ActorMaterializer, - implicit val logging: Logging) + override implicit val logging: Logging) extends BasicRasService { TransactionId.controller.mark( @@ -286,9 +286,10 @@ object Controller { val httpsConfig = if (Controller.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.controller.https")) else None - BasicHttpService.startHttpService(controller.route, port, httpsConfig, interface)( + BasicHttpService.startHttpService(controller, port, httpsConfig, interface)( actorSystem, - controller.materializer) + controller.materializer, + logger) case Failure(t) => abort(s"Invalid runtimes manifest: $t") diff --git a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala index 62d58a1bee8..02d6dc695df 100644 --- a/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala +++ b/core/cosmosdb/cache-invalidator/src/main/scala/org/apache/openwhisk/core/database/cosmosdb/cache/Main.scala @@ -34,7 +34,7 @@ object Main { ConfigMXBean.register() Kamon.init() val port = CacheInvalidatorConfig(system.settings.config).invalidatorConfig.port - BasicHttpService.startHttpService(new BasicRasService {}.route, port, None) + BasicHttpService.startHttpService(new BasicRasService {}, port, None) val (start, finish) = new CacheInvalidator(system.settings.config).start() start .map(_ => log.info(this, s"Started the server at http://localhost:$port")) diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala index 9901f951a37..87f792a1c27 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala @@ -192,9 +192,10 @@ object Invoker { if (Invoker.protocol == "https") Some(loadConfigOrThrow[HttpsConfig]("whisk.invoker.https")) else None val invokerServer = SpiLoader.get[InvokerServerProvider].instance(invoker) - BasicHttpService.startHttpService(invokerServer.route, port, httpsConfig)( + BasicHttpService.startHttpService(invokerServer, port, httpsConfig)( actorSystem, - ActorMaterializer.create(actorSystem)) + ActorMaterializer.create(actorSystem), + logger) } } diff --git a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/PlaygroundLauncher.scala b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/PlaygroundLauncher.scala index bac8cc91557..e718b5d650a 100644 --- a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/PlaygroundLauncher.scala +++ b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/PlaygroundLauncher.scala @@ -28,7 +28,7 @@ import akka.stream.scaladsl.{Sink, Source} import ch.megard.akka.http.cors.scaladsl.CorsDirectives._ import org.apache.commons.io.IOUtils import org.apache.commons.lang3.SystemUtils -import org.apache.openwhisk.common.{Logging, TransactionId} +import org.apache.openwhisk.common.{AkkaLogging, Logging, TransactionId} import org.apache.openwhisk.core.ExecManifestSupport import org.apache.openwhisk.http.BasicHttpService import pureconfig._ @@ -75,7 +75,7 @@ class PlaygroundLauncher(host: String, private val wsk = new Wsk(host, controllerPort, authKey) def run(): ServiceContainer = { - BasicHttpService.startHttpService(PlaygroundService.route, pgPort, None, interface)(actorSystem, materializer) + BasicHttpService.startHttpService(PlaygroundService, pgPort, None, interface)(actorSystem, materializer, logging) ServiceContainer(pgPort, pgUrl, "Playground") } @@ -119,6 +119,7 @@ class PlaygroundLauncher(host: String, } object PlaygroundService extends BasicHttpService { + override implicit val logging = new AkkaLogging(actorSystem.log) override def routes(implicit transid: TransactionId): Route = path(PathEnd | Slash | pg) { redirect(s"/$pg/ui/index.html", StatusCodes.Found) } ~ cors() {