diff --git a/ansible/group_vars/all b/ansible/group_vars/all index 555ad881044..d16aec01d89 100644 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -107,6 +107,8 @@ controller: authentication: spi: "{{ controller_authentication_spi | default('') }}" loglevel: "{{ controller_loglevel | default(whisk_loglevel) | default('INFO') }}" + username: "{{ controller_username | default('controller.user') }}" + password: "{{ controller_password | default('controller.pass') }}" entitlement: spi: "{{ controller_entitlement_spi | default('') }}" protocol: "{{ controller_protocol | default('https') }}" @@ -209,6 +211,8 @@ invoker: {% endif %}" extraEnv: "{{ invoker_extraEnv | default({}) }}" protocol: "{{ invoker_protocol | default('https') }}" + username: "{{ invoker_username | default('invoker.user') }}" + password: "{{ invoker_password | default('invoker.pass') }}" ssl: cn: "openwhisk-invokers" keyPrefix: "{{ __invoker_ssl_keyPrefix }}" diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml index 64724c41d54..e79adeab631 100644 --- a/ansible/roles/controller/tasks/deploy.yml +++ b/ansible/roles/controller/tasks/deploy.yml @@ -203,6 +203,9 @@ "CONFIG_whisk_db_activationsFilterDdoc": "{{ db_whisk_activations_filter_ddoc | default() }}" "CONFIG_whisk_userEvents_enabled": "{{ user_events | default(false) | lower }}" + "CONFIG_whisk_controller_username": "{{ controller.username }}" + "CONFIG_whisk_controller_password": "{{ controller.password }}" + "LIMITS_ACTIONS_INVOKES_PERMINUTE": "{{ limits.invocationsPerMinute }}" "LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}" "LIMITS_TRIGGERS_FIRES_PERMINUTE": "{{ limits.firesPerMinute }}" diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml index ea4ce48114b..f893f7bb4f7 100644 --- a/ansible/roles/invoker/tasks/deploy.yml +++ b/ansible/roles/invoker/tasks/deploy.yml @@ -279,6 +279,8 @@ "CONFIG_whisk_invoker_https_keystoreFlavor": "{{ invoker.ssl.storeFlavor }}" "CONFIG_whisk_invoker_https_clientAuth": "{{ invoker.ssl.clientAuth }}" "CONFIG_whisk_containerPool_prewarmExpirationCheckInterval": "{{ container_pool_prewarm_expirationCheckInterval | default('1 minute') }}" + "CONFIG_whisk_invoker_username": "{{ invoker.username }}" + "CONFIG_whisk_invoker_password": "{{ invoker.password }}" - name: extend invoker dns env set_fact: diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala index ba15f5f2342..b5c4f5e8dfa 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala @@ -291,4 +291,9 @@ object ConfigKeys { val azBlob = "whisk.azure-blob" val whiskClusterName = "whisk.cluster.name" + + val whiskControllerUsername = "whisk.controller.username" + val whiskControllerPassword = "whisk.controller.password" + val whiskInvokerUsername = "whisk.invoker.username" + val whiskInvokerPassword = "whisk.invoker.password" } diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala index 60522586842..f05dbfc2dfc 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala @@ -457,3 +457,30 @@ object StatusData extends DefaultJsonProtocol { implicit val serdes = jsonFormat(StatusData.apply _, "invocationNamespace", "fqn", "waitingActivation", "status", "data") } + +case class RuntimeMessage(runtime: String) extends Message { + override def serialize = RuntimeMessage.serdes.write(this).compactPrint +} + +object RuntimeMessage extends DefaultJsonProtocol { + def parse(msg: String) = Try(serdes.read(msg.parseJson)) + implicit val serdes = jsonFormat(RuntimeMessage.apply _, "runtime") +} + +case class PrewarmContainerData(kind: String, memory: Long, var number: Int) extends Message { + override def serialize: String = PrewarmContainerData.serdes.write(this).compactPrint +} + +object PrewarmContainerData extends DefaultJsonProtocol { + implicit val serdes = jsonFormat(PrewarmContainerData.apply _, "kind", "memory", "number") +} + +case class PrewarmContainerDataList(items: List[PrewarmContainerData]) + +object PrewarmContainerDataProtocol extends DefaultJsonProtocol { + implicit val prewarmContainerDataFormat = jsonFormat(PrewarmContainerData.apply _, "kind", "memory", "number") + implicit object prewarmContainerDataListJsonFormat extends RootJsonFormat[PrewarmContainerDataList] { + def read(value: JsValue) = PrewarmContainerDataList(value.convertTo[List[PrewarmContainerData]]) + def write(f: PrewarmContainerDataList) = ??? + } +} diff --git a/core/controller/src/main/resources/application.conf b/core/controller/src/main/resources/application.conf index 8358ced2109..7d3ef190d43 100644 --- a/core/controller/src/main/resources/application.conf +++ b/core/controller/src/main/resources/application.conf @@ -117,4 +117,8 @@ whisk{ file-system : true dir-path : "/swagger-ui/" } + controller { + username: "controller.user" + password: "controller.pass" + } } diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala index 935219685ed..d5f4146315c 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala @@ -21,8 +21,9 @@ import akka.Done import akka.actor.{ActorSystem, CoordinatedShutdown} import akka.event.Logging.InfoLevel import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import akka.http.scaladsl.model.{StatusCodes, Uri} import akka.http.scaladsl.model.StatusCodes._ -import akka.http.scaladsl.model.Uri +import akka.http.scaladsl.model.headers.BasicHttpCredentials import akka.http.scaladsl.server.Route import akka.stream.ActorMaterializer import kamon.Kamon @@ -32,7 +33,7 @@ import spray.json.DefaultJsonProtocol._ import spray.json._ import org.apache.openwhisk.common.Https.HttpsConfig import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId} -import org.apache.openwhisk.core.WhiskConfig +import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig} import org.apache.openwhisk.core.connector.MessagingProvider import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider import org.apache.openwhisk.core.database.{ActivationStoreProvider, CacheChangeNotification, RemoteCacheInvalidation} @@ -97,7 +98,7 @@ class Controller(val instance: ControllerInstanceId, (pathEndOrSingleSlash & get) { complete(info) } - } ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth + } ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth ~ configRuntime } // initialize datastores @@ -176,6 +177,59 @@ class Controller(val instance: ControllerInstanceId, LogLimit.config, runtimes, List(apiV1.basepath())) + + private val controllerUsername = loadConfigOrThrow[String](ConfigKeys.whiskControllerUsername) + private val controllerPassword = loadConfigOrThrow[String](ConfigKeys.whiskControllerPassword) + + /** + * config runtime + */ + private val configRuntime = { + implicit val executionContext = actorSystem.dispatcher + (path("config" / "runtime") & post) { + extractCredentials { + case Some(BasicHttpCredentials(username, password)) => + if (username == controllerUsername && password == controllerPassword) { + entity(as[String]) { runtime => + val execManifest = ExecManifest.initialize(whiskConfig, Some(runtime)) + if (execManifest.isFailure) { + logging.info(this, s"received invalid runtimes manifest") + complete(StatusCodes.BadRequest) + } else { + parameter('limit.?) { limit => + limit match { + case Some(targetValue) => + val pattern = """\d+:\d""" + if (targetValue.matches(pattern)) { + val invokerArray = targetValue.split(":") + val beginIndex = invokerArray(0).toInt + val finishIndex = invokerArray(1).toInt + if (finishIndex < beginIndex) { + complete(StatusCodes.BadRequest, "finishIndex can't be less than beginIndex") + } else { + val targetInvokers = (beginIndex to finishIndex).toList + loadBalancer.sendRuntimeToInvokers(runtime, Some(targetInvokers)) + logging.info(this, "config runtime request is already sent to target invokers") + complete(StatusCodes.Accepted) + } + } else { + complete(StatusCodes.BadRequest, "limit value can't match [beginIndex:finishIndex]") + } + case None => + loadBalancer.sendRuntimeToInvokers(runtime, None) + logging.info(this, "config runtime request is already sent to all managed invokers") + complete(StatusCodes.Accepted) + } + } + } + } + } else { + complete(StatusCodes.Unauthorized, "username or password is wrong") + } + case _ => complete(StatusCodes.Unauthorized) + } + } + } } /** diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala index 20225127c3c..967682e7cdd 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala @@ -60,6 +60,14 @@ trait LoadBalancer { def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)( implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] + /** + * send runtime to invokers + * + * @param runtime + * @param targetInvokers + */ + def sendRuntimeToInvokers(runtime: String, targetInvokers: Option[List[Int]]): Unit = {} + /** * Returns a message indicating the health of the containers and/or container pool in general. * diff --git a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala index f06cb04279b..6831426dde3 100644 --- a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala +++ b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala @@ -43,6 +43,7 @@ import org.apache.openwhisk.spi.SpiLoader import scala.annotation.tailrec import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration +import scala.util.{Failure, Success} /** * A loadbalancer that schedules workload based on a hashing-algorithm. @@ -316,6 +317,22 @@ class ShardingContainerPoolBalancer( } } + /** send runtime to invokers*/ + override def sendRuntimeToInvokers(runtime: String, targetInvokers: Option[List[Int]]): Unit = { + val runtimeMessage = RuntimeMessage(runtime) + schedulingState.managedInvokers.filter { manageInvoker => + targetInvokers.getOrElse(schedulingState.managedInvokers.map(_.id.instance)).contains(manageInvoker.id.instance) + } foreach { invokerHealth => + val topic = s"invoker${invokerHealth.id.toInt}" + messageProducer.send(topic, runtimeMessage).andThen { + case Success(_) => + logging.info(this, s"Successfully posted runtime to topic $topic") + case Failure(_) => + logging.error(this, s"Failed posted runtime to topic $topic") + } + } + } + override val invokerPool = invokerPoolFactory.createInvokerPool( actorSystem, diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf index be683338593..737b7294a9b 100644 --- a/core/invoker/src/main/resources/application.conf +++ b/core/invoker/src/main/resources/application.conf @@ -171,6 +171,8 @@ whisk { } invoker { + username: "invoker.user" + password: "invoker.pass" protocol: http } runtime.delete.timeout = "30 seconds" diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala index 724cd5971ed..5cb2e400a08 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala @@ -19,18 +19,22 @@ package org.apache.openwhisk.core.containerpool import akka.actor.{Actor, ActorRef, ActorRefFactory, Props} import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId} -import org.apache.openwhisk.core.connector.MessageFeed +import org.apache.openwhisk.core.connector.{MessageFeed, PrewarmContainerData} import org.apache.openwhisk.core.entity.ExecManifest.ReactivePrewarmingConfig import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.entity.size._ import scala.annotation.tailrec import scala.collection.immutable +import scala.collection.mutable.ListBuffer import scala.concurrent.duration._ import scala.util.{Random, Try} case class ColdStartKey(kind: String, memory: ByteSize) +case class PreWarmConfigList(list: List[PrewarmingConfig]) +object PrewarmQuery + case object EmitMetrics case object AdjustPrewarmedContainer @@ -68,6 +72,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, var busyPool = immutable.Map.empty[ActorRef, ContainerData] var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmedData] var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)] + var latestPrewarmConfig = prewarmConfig // If all memory slots are occupied and if there is currently no container to be removed, than the actions will be // buffered here to keep order of computation. // Otherwise actions with small memory-limits could block actions with large memory limits. @@ -297,6 +302,34 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, case RescheduleJob => freePool = freePool - sender() busyPool = busyPool - sender() + case prewarmConfigList: PreWarmConfigList => + logging.info(this, "update prewarm configuration request is send to invoker") + val passedPrewarmConfig = prewarmConfigList.list + var newPrewarmConfig: List[PrewarmingConfig] = List.empty + latestPrewarmConfig foreach { config => + newPrewarmConfig = newPrewarmConfig :+ passedPrewarmConfig + .find(passedConfig => + passedConfig.exec.kind == config.exec.kind && passedConfig.memoryLimit == config.memoryLimit) + .getOrElse(config) + } + latestPrewarmConfig = newPrewarmConfig + // Delete prewarmedPool firstly + prewarmedPool foreach { element => + val actor = element._1 + actor ! Remove + prewarmedPool = prewarmedPool - actor + } + latestPrewarmConfig foreach { config => + logging.info( + this, + s"add pre-warming ${config.initialCount} ${config.exec.kind} ${config.memoryLimit.toString}")( + TransactionId.invokerWarmup) + (1 to config.initialCount).foreach { _ => + prewarmContainer(config.exec, config.memoryLimit, config.reactive.map(_.ttl)) + } + } + case PrewarmQuery => + sender() ! getPrewarmContainer() case EmitMetrics => emitMetrics() @@ -327,7 +360,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, def adjustPrewarmedContainer(init: Boolean, scheduled: Boolean): Unit = { if (scheduled) { //on scheduled time, remove expired prewarms - ContainerPool.removeExpired(poolConfig, prewarmConfig, prewarmedPool).foreach { p => + ContainerPool.removeExpired(poolConfig, latestPrewarmConfig, prewarmedPool).foreach { p => prewarmedPool = prewarmedPool - p p ! Remove } @@ -340,7 +373,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, } //fill in missing prewarms (replaces any deletes) ContainerPool - .increasePrewarms(init, scheduled, coldStartCount, prewarmConfig, prewarmedPool, prewarmStartingPool) + .increasePrewarms(init, scheduled, coldStartCount, latestPrewarmConfig, prewarmedPool, prewarmStartingPool) .foreach { c => val config = c._1 val currentCount = c._2._1 @@ -380,7 +413,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, /** this is only for cold start statistics of prewarm configs, e.g. not blackbox or other configs. */ def incrementColdStartCount(kind: String, memoryLimit: ByteSize): Unit = { - prewarmConfig + latestPrewarmConfig .filter { config => kind == config.exec.kind && memoryLimit == config.memoryLimit } @@ -421,7 +454,9 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, //get the appropriate ttl from prewarm configs val ttl = - prewarmConfig.find(pc => pc.memoryLimit == memory && pc.exec.kind == kind).flatMap(_.reactive.map(_.ttl)) + latestPrewarmConfig + .find(pc => pc.memoryLimit == memory && pc.exec.kind == kind) + .flatMap(_.reactive.map(_.ttl)) prewarmContainer(action.exec, memory, ttl) (ref, data) } @@ -434,6 +469,31 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, busyPool = busyPool - toDelete } + /** + * get the prewarm container + * @return + */ + def getPrewarmContainer(): ListBuffer[PrewarmContainerData] = { + val containerDataList = prewarmedPool.values.toList + + var resultList: ListBuffer[PrewarmContainerData] = new ListBuffer[PrewarmContainerData]() + containerDataList.foreach { prewarmData => + val isInclude = resultList.filter { resultData => + prewarmData.kind == resultData.kind && prewarmData.memoryLimit.toMB == resultData.memory + }.size > 0 + + if (isInclude) { + var resultData = resultList.filter { resultData => + prewarmData.kind == resultData.kind && prewarmData.memoryLimit.toMB == resultData.memory + }.head + resultData.number += 1 + } else { + resultList += PrewarmContainerData(prewarmData.kind, prewarmData.memoryLimit.toMB, 1) + } + } + resultList + } + /** * Calculate if there is enough free memory within a given pool. * diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala new file mode 100644 index 00000000000..5e744620ca5 --- /dev/null +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/DefaultInvokerServer.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.core.invoker + +import akka.actor.ActorSystem +import akka.http.scaladsl.model.StatusCodes +import akka.http.scaladsl.model.headers.BasicHttpCredentials +import akka.http.scaladsl.server.Route +import org.apache.openwhisk.common.{Logging, TransactionId} +import org.apache.openwhisk.core.ConfigKeys +import org.apache.openwhisk.http.BasicRasService +import org.apache.openwhisk.http.ErrorResponse.terminate +import pureconfig._ +import spray.json.PrettyPrinter + +import scala.concurrent.ExecutionContext + +/** + * Implements web server to handle certain REST API calls. + */ +class DefaultInvokerServer(val invoker: InvokerCore)(implicit val ec: ExecutionContext, + val actorSystem: ActorSystem, + val logger: Logging) + extends BasicRasService { + + val invokerUsername = loadConfigOrThrow[String](ConfigKeys.whiskInvokerUsername) + val invokerPassword = loadConfigOrThrow[String](ConfigKeys.whiskInvokerPassword) + + override def routes(implicit transid: TransactionId): Route = { + super.routes ~ extractCredentials { + case Some(BasicHttpCredentials(username, password)) + if username == invokerUsername && password == invokerPassword => + (path("getRuntime") & get) { + invoker.getRuntime() + } + case _ => + implicit val jsonPrettyResponsePrinter = PrettyPrinter + terminate(StatusCodes.Unauthorized) + } + } +} + +object DefaultInvokerServer extends InvokerServerProvider { + override def instance( + invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService = + new DefaultInvokerServer(invoker) +} diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala index 1b0c8bf797e..07ecdf4c15b 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/Invoker.scala @@ -19,6 +19,7 @@ package org.apache.openwhisk.core.invoker import akka.Done import akka.actor.{ActorSystem, CoordinatedShutdown} +import akka.http.scaladsl.server.Route import akka.stream.ActorMaterializer import com.typesafe.config.ConfigValueFactory import kamon.Kamon @@ -217,7 +218,9 @@ trait InvokerProvider extends Spi { } // this trait can be used to add common implementation -trait InvokerCore {} +trait InvokerCore { + def getRuntime(): Route +} /** * An Spi for providing RestAPI implementation for invoker. @@ -227,9 +230,3 @@ trait InvokerServerProvider extends Spi { def instance( invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService } - -object DefaultInvokerServer extends InvokerServerProvider { - override def instance( - invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService = - new BasicRasService {} -} diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala index 6aa088444e9..9f3688ccf14 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala @@ -23,7 +23,12 @@ import java.time.Instant import akka.Done import akka.actor.{ActorRefFactory, ActorSystem, CoordinatedShutdown, Props} import akka.event.Logging.InfoLevel +import akka.http.scaladsl.server.Route +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import akka.http.scaladsl.server.Directives._ import akka.stream.ActorMaterializer +import akka.pattern.ask +import akka.util.Timeout import org.apache.openwhisk.common._ import org.apache.openwhisk.common.tracing.WhiskTracerProvider import org.apache.openwhisk.core.ack.{MessagingActiveAck, UserEventSender} @@ -40,6 +45,7 @@ import pureconfig._ import pureconfig.generic.auto._ import spray.json._ +import scala.collection.mutable.ListBuffer import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} @@ -221,38 +227,58 @@ class InvokerReactive( /** Is called when an ActivationMessage is read from Kafka */ def processActivationMessage(bytes: Array[Byte]): Future[Unit] = { - Future(ActivationMessage.parse(new String(bytes, StandardCharsets.UTF_8))) + Future( + ActivationMessage + .parse(new String(bytes, StandardCharsets.UTF_8)) + .orElse(RuntimeMessage.parse(new String(bytes, StandardCharsets.UTF_8)))) .flatMap(Future.fromTry) - .flatMap { msg => - // The message has been parsed correctly, thus the following code needs to *always* produce at least an - // active-ack. - - implicit val transid: TransactionId = msg.transid - - //set trace context to continue tracing - WhiskTracerProvider.tracer.setTraceContext(transid, msg.traceContext) - - if (!namespaceBlacklist.isBlacklisted(msg.user)) { - val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION, logLevel = InfoLevel) - handleActivationMessage(msg) - } else { - // Iff the current namespace is blacklisted, an active-ack is only produced to keep the loadbalancer protocol - // Due to the protective nature of the blacklist, a database entry is not written. + .flatMap { + case msg: ActivationMessage => + // The message has been parsed correctly, thus the following code needs to *always* produce at least an + // active-ack. + + implicit val transid: TransactionId = msg.transid + + //set trace context to continue tracing + WhiskTracerProvider.tracer.setTraceContext(transid, msg.traceContext) + + if (!namespaceBlacklist.isBlacklisted(msg.user)) { + val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION, logLevel = InfoLevel) + handleActivationMessage(msg) + } else { + // Iff the current namespace is blacklisted, an active-ack is only produced to keep the loadbalancer protocol + // Due to the protective nature of the blacklist, a database entry is not written. + activationFeed ! MessageFeed.Processed + + val activation = + generateFallbackActivation(msg, ActivationResponse.applicationError(Messages.namespacesBlacklisted)) + ack( + msg.transid, + activation, + false, + msg.rootControllerIndex, + msg.user.namespace.uuid, + CombinedCompletionAndResultMessage(transid, activation, instance)) + + logging.warn(this, s"namespace ${msg.user.namespace.name} was blocked in invoker.") + Future.successful(()) + } + case msg: RuntimeMessage => + val execManifest = ExecManifest.initialize(config, Some(msg.runtime)) + if (execManifest.isFailure) { + logging.error(this, s"Received invalid runtimes manifest:${execManifest.failed.get}") + Future.failed(new Exception(s"Received invalid runtimes manifest")) + } else { + val prewarmingConfigs: List[PrewarmingConfig] = execManifest.get.stemcells.flatMap { + case (mf, cells) => + cells.map { cell => + PrewarmingConfig(cell.initialCount, new CodeExecAsString(mf, "", None), cell.memory, cell.reactive) + } + }.toList + pool ! PreWarmConfigList(prewarmingConfigs) + } activationFeed ! MessageFeed.Processed - - val activation = - generateFallbackActivation(msg, ActivationResponse.applicationError(Messages.namespacesBlacklisted)) - ack( - msg.transid, - activation, - false, - msg.rootControllerIndex, - msg.user.namespace.uuid, - CombinedCompletionAndResultMessage(transid, activation, instance)) - - logging.warn(this, s"namespace ${msg.user.namespace.name} was blocked in invoker.") Future.successful(()) - } } .recoverWith { case t => @@ -299,4 +325,13 @@ class InvokerReactive( } }) + override def getRuntime(): Route = { + complete { + pool + .ask(PrewarmQuery)(Timeout(5.seconds)) + .mapTo[ListBuffer[PrewarmContainerData]] + .map(f => f.toList.toJson) + } + } + } diff --git a/core/standalone/src/main/resources/standalone.conf b/core/standalone/src/main/resources/standalone.conf index 0817680007a..299bb5ea352 100644 --- a/core/standalone/src/main/resources/standalone.conf +++ b/core/standalone/src/main/resources/standalone.conf @@ -57,6 +57,8 @@ whisk { } controller { + username = "controller.user" + password = "controller.pass" protocol = http # Bound only to localhost by default for better security diff --git a/docs/operation.md b/docs/operation.md new file mode 100644 index 00000000000..8882caf17c4 --- /dev/null +++ b/docs/operation.md @@ -0,0 +1,33 @@ + + +# Runtime configuration +## Change runtime to all managed invokers via controller. e.g. +``` +curl -u ${username}:${password} -X POST http://${controllerAddress}:${controllerPort}/config/runtime -d '{...}' +``` +Note: you can add `?limit` to specify target invokers, e.g. specify invoker0 and invoker1 +``` +curl -u ${username}:${password} -X POST http://${controllerAddress}:${controllerPort}/config/runtime?limit=0:1 -d '{...}' +``` +## Get prewarm container info on assigned invoker +``` +curl -u ${username}:${password} -X GET 'http://${invokerAddress}:${invokerPort}/getRuntime' +[{"kind":"nodejs:10","memory":256,"number":2}, {"kind":"nodejs:6","memory":256,"number":1}] +``` diff --git a/tests/build.gradle b/tests/build.gradle index 2d822346abd..01a5a3ab28e 100644 --- a/tests/build.gradle +++ b/tests/build.gradle @@ -53,6 +53,7 @@ def systemIncludes = [ "org/apache/openwhisk/core/apigw/actions/test/**", "org/apache/openwhisk/core/database/test/*CacheConcurrencyTests*", "org/apache/openwhisk/core/controller/test/*ControllerApiTests*", + "org/apache/openwhisk/operation/**", "apigw/healthtests/**", "ha/**", "services/**", @@ -73,6 +74,7 @@ ext.testSets = [ "org/apache/openwhisk/core/limits/**", "org/apache/openwhisk/core/scheduler/**", "org/apache/openwhisk/common/etcd/**", + "org/apache/openwhisk/operation/**", "**/*CacheConcurrencyTests*", "**/*ControllerApiTests*", "org/apache/openwhisk/testEntities/**", diff --git a/tests/src/test/resources/application.conf.j2 b/tests/src/test/resources/application.conf.j2 index cdae2bddaa7..bd8b1cccbe5 100644 --- a/tests/src/test/resources/application.conf.j2 +++ b/tests/src/test/resources/application.conf.j2 @@ -64,6 +64,8 @@ whisk { } controller { + username = "{{ controller.username }}" + password = "{{ controller.password }}" protocol = {{ controller.protocol }} https { keystore-flavor = "{{ controller.ssl.storeFlavor }}" @@ -73,6 +75,8 @@ whisk { } } invoker { + username = "{{ invoker.username }}" + password = "{{ invoker.password }}" protocol = {{ invoker.protocol }} https { keystore-flavor = "{{ invoker.ssl.storeFlavor }}" diff --git a/tests/src/test/scala/common/WhiskProperties.java b/tests/src/test/scala/common/WhiskProperties.java index 05d8b322638..05ce08b889b 100644 --- a/tests/src/test/scala/common/WhiskProperties.java +++ b/tests/src/test/scala/common/WhiskProperties.java @@ -262,6 +262,10 @@ public static String getBaseControllerHost() { return getControllerHosts().split(",")[0]; } + public static String getBaseInvokerAddress(){ + return getInvokerHosts()[0] + ":" + whiskProperties.getProperty("invoker.hosts.basePort"); + } + public static String getBaseDBHost() { return getDBHosts().split(",")[0]; } diff --git a/tests/src/test/scala/org/apache/openwhisk/operation/RuntimeConfigurationTests.scala b/tests/src/test/scala/org/apache/openwhisk/operation/RuntimeConfigurationTests.scala new file mode 100644 index 00000000000..56f6745dffc --- /dev/null +++ b/tests/src/test/scala/org/apache/openwhisk/operation/RuntimeConfigurationTests.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.openwhisk.operation + +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.headers.{Authorization, BasicHttpCredentials} +import akka.http.scaladsl.model.{ContentTypes, HttpEntity, HttpMethods, HttpRequest, StatusCodes} +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.stream.ActorMaterializer +import common._ +import common.rest.HttpConnection +import org.apache.openwhisk.core.connector.PrewarmContainerDataList +import org.apache.openwhisk.core.connector.PrewarmContainerDataProtocol._ +import org.junit.runner.RunWith +import org.scalatest.Matchers +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.junit.JUnitRunner +import pureconfig.loadConfigOrThrow +import spray.json._ +import system.rest.RestUtil + +import scala.concurrent.duration._ +import scala.util.Random + +@RunWith(classOf[JUnitRunner]) +class RuntimeConfigurationTests + extends TestHelpers + with RestUtil + with Matchers + with ScalaFutures + with WskActorSystem + with StreamLogging { + + implicit val materializer = ActorMaterializer() + + val kind = "nodejs:10" + val memory = 128 + var initialCount = new Random().nextInt(3) + 1 + + def getRuntimes = { + s""" { + "runtimes": { + "nodejs": [{ + "kind": "${kind}", + "default": true, + "image": { + "prefix": "openwhisk", + "name": "action-nodejs-v10", + "tag": "nightly" + }, + "deprecated": false, + "attached": { + "attachmentName": "codefile", + "attachmentType": "text/plain" + }, + "stemCells": [{ + "initialCount": ${initialCount}, + "memory": "${memory} MB" + }] + }] + } + }""" + } + + val invokerProtocol = loadConfigOrThrow[String]("whisk.invoker.protocol") + val invokerAddress = WhiskProperties.getBaseInvokerAddress + + val controllerProtocol = loadConfigOrThrow[String]("whisk.controller.protocol") + val controllerAddress = WhiskProperties.getBaseControllerAddress + val whiskControllerUsername = loadConfigOrThrow[String]("whisk.controller.username") + val whiskControllerPassword = loadConfigOrThrow[String]("whisk.controller.password") + val whiskInvokerUsername = loadConfigOrThrow[String]("whisk.invoker.username") + val whiskInvokerPassword = loadConfigOrThrow[String]("whisk.invoker.password") + val controllerAuthHeader = Authorization(BasicHttpCredentials(whiskControllerUsername, whiskControllerPassword)) + val invokerAuthHeader = Authorization(BasicHttpCredentials(whiskInvokerUsername, whiskInvokerPassword)) + + val getRuntimeUrl = s"${invokerProtocol}://${invokerAddress}/getRuntime" + val invokerChangeRuntimeUrl = s"${invokerProtocol}://${invokerAddress}/config/runtime" + val controllerChangeRuntimeUrl = + s"${controllerProtocol}://${controllerAddress}/config/runtime" + + it should "change all managed invokers's prewarm config" in { + //Change runtime config + Http() + .singleRequest( + HttpRequest( + method = HttpMethods.POST, + uri = s"${controllerChangeRuntimeUrl}", + headers = List(controllerAuthHeader), + entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, getRuntimes)), + connectionContext = HttpConnection.getContext(controllerProtocol)) + .map { response => + response.status shouldBe StatusCodes.Accepted + } + + // Make sure previous http post call successfully + Thread.sleep(5.seconds.toMillis) + + //Cal the prewarm container number whether right + Http() + .singleRequest( + HttpRequest(method = HttpMethods.GET, uri = s"${getRuntimeUrl}", headers = List(invokerAuthHeader)), + connectionContext = HttpConnection.getContext(invokerProtocol)) + .map { response => + response.status shouldBe StatusCodes.OK + val prewarmContainerDataList = + Unmarshal(response).to[String].futureValue.parseJson.convertTo[PrewarmContainerDataList] + val nodejs10ContainerData = prewarmContainerDataList.items.filter { prewarmContainerData => + prewarmContainerData.kind == kind && prewarmContainerData.memory == memory + } + nodejs10ContainerData.head.number shouldBe initialCount + } + } +}