Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add administrative interface to invoker and controller to reconfigure runtimes #4790

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ansible/group_vars/all
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ controller:
authentication:
spi: "{{ controller_authentication_spi | default('') }}"
loglevel: "{{ controller_loglevel | default(whisk_loglevel) | default('INFO') }}"
username: "{{ controller_username | default('controller.user') }}"
password: "{{ controller_password | default('controller.pass') }}"
entitlement:
spi: "{{ controller_entitlement_spi | default('') }}"
protocol: "{{ controller_protocol | default('https') }}"
Expand Down Expand Up @@ -209,6 +211,8 @@ invoker:
{% endif %}"
extraEnv: "{{ invoker_extraEnv | default({}) }}"
protocol: "{{ invoker_protocol | default('https') }}"
username: "{{ invoker_username | default('invoker.user') }}"
password: "{{ invoker_password | default('invoker.pass') }}"
ssl:
cn: "openwhisk-invokers"
keyPrefix: "{{ __invoker_ssl_keyPrefix }}"
Expand Down
3 changes: 3 additions & 0 deletions ansible/roles/controller/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@
"CONFIG_whisk_db_activationsFilterDdoc": "{{ db_whisk_activations_filter_ddoc | default() }}"
"CONFIG_whisk_userEvents_enabled": "{{ user_events | default(false) | lower }}"

"CONFIG_whisk_controller_username": "{{ controller.username }}"
"CONFIG_whisk_controller_password": "{{ controller.password }}"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If generate controller/invoker credentials to container's /conf/, the Standalone Tests run failed due to lack /conf/ under the build machine

On the other hand, couchdb credentials is passed via environment variable, so controller/invoker credentials can be passed via environment variable as well.

"LIMITS_ACTIONS_INVOKES_PERMINUTE": "{{ limits.invocationsPerMinute }}"
"LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}"
"LIMITS_TRIGGERS_FIRES_PERMINUTE": "{{ limits.firesPerMinute }}"
Expand Down
2 changes: 2 additions & 0 deletions ansible/roles/invoker/tasks/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@
"CONFIG_whisk_invoker_https_keystoreFlavor": "{{ invoker.ssl.storeFlavor }}"
"CONFIG_whisk_invoker_https_clientAuth": "{{ invoker.ssl.clientAuth }}"
"CONFIG_whisk_containerPool_prewarmExpirationCheckInterval": "{{ container_pool_prewarm_expirationCheckInterval | default('1 minute') }}"
"CONFIG_whisk_invoker_username": "{{ invoker.username }}"
"CONFIG_whisk_invoker_password": "{{ invoker.password }}"

- name: extend invoker dns env
set_fact:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,4 +291,9 @@ object ConfigKeys {
val azBlob = "whisk.azure-blob"

val whiskClusterName = "whisk.cluster.name"

val whiskControllerUsername = "whisk.controller.username"
val whiskControllerPassword = "whisk.controller.password"
val whiskInvokerUsername = "whisk.invoker.username"
val whiskInvokerPassword = "whisk.invoker.password"
}
Original file line number Diff line number Diff line change
Expand Up @@ -457,3 +457,30 @@ object StatusData extends DefaultJsonProtocol {
implicit val serdes =
jsonFormat(StatusData.apply _, "invocationNamespace", "fqn", "waitingActivation", "status", "data")
}

case class RuntimeMessage(runtime: String) extends Message {
override def serialize = RuntimeMessage.serdes.write(this).compactPrint
}

object RuntimeMessage extends DefaultJsonProtocol {
def parse(msg: String) = Try(serdes.read(msg.parseJson))
implicit val serdes = jsonFormat(RuntimeMessage.apply _, "runtime")
}

case class PrewarmContainerData(kind: String, memory: Long, var number: Int) extends Message {
override def serialize: String = PrewarmContainerData.serdes.write(this).compactPrint
}

object PrewarmContainerData extends DefaultJsonProtocol {
implicit val serdes = jsonFormat(PrewarmContainerData.apply _, "kind", "memory", "number")
}

case class PrewarmContainerDataList(items: List[PrewarmContainerData])

object PrewarmContainerDataProtocol extends DefaultJsonProtocol {
implicit val prewarmContainerDataFormat = jsonFormat(PrewarmContainerData.apply _, "kind", "memory", "number")
implicit object prewarmContainerDataListJsonFormat extends RootJsonFormat[PrewarmContainerDataList] {
def read(value: JsValue) = PrewarmContainerDataList(value.convertTo[List[PrewarmContainerData]])
def write(f: PrewarmContainerDataList) = ???
}
}
4 changes: 4 additions & 0 deletions core/controller/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,8 @@ whisk{
file-system : true
dir-path : "/swagger-ui/"
}
controller {
username: "controller.user"
password: "controller.pass"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import akka.Done
import akka.actor.{ActorSystem, CoordinatedShutdown}
import akka.event.Logging.InfoLevel
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model.{StatusCodes, Uri}
import akka.http.scaladsl.model.StatusCodes._
import akka.http.scaladsl.model.Uri
import akka.http.scaladsl.model.headers.BasicHttpCredentials
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import kamon.Kamon
Expand All @@ -32,7 +33,7 @@ import spray.json.DefaultJsonProtocol._
import spray.json._
import org.apache.openwhisk.common.Https.HttpsConfig
import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
import org.apache.openwhisk.core.connector.MessagingProvider
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
import org.apache.openwhisk.core.database.{ActivationStoreProvider, CacheChangeNotification, RemoteCacheInvalidation}
Expand Down Expand Up @@ -97,7 +98,7 @@ class Controller(val instance: ControllerInstanceId,
(pathEndOrSingleSlash & get) {
complete(info)
}
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth ~ configRuntime
}

// initialize datastores
Expand Down Expand Up @@ -176,6 +177,59 @@ class Controller(val instance: ControllerInstanceId,
LogLimit.config,
runtimes,
List(apiV1.basepath()))

private val controllerUsername = loadConfigOrThrow[String](ConfigKeys.whiskControllerUsername)
private val controllerPassword = loadConfigOrThrow[String](ConfigKeys.whiskControllerPassword)

/**
* config runtime
*/
private val configRuntime = {
implicit val executionContext = actorSystem.dispatcher
(path("config" / "runtime") & post) {
extractCredentials {
case Some(BasicHttpCredentials(username, password)) =>
if (username == controllerUsername && password == controllerPassword) {
entity(as[String]) { runtime =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually, entity(as[Runtimes]) may be better, but if we apply this, need to change Runtimes to support convert json to entity Runtimes, and on the other hand, runtime.json's format doesn't match Runtimes entity, need to change a lot if use entity(as[Runtimes]) to receive the data.

Fortunately,we can reuse openwhisk itself's initialize method, just convert the runtime string to Runtimes.

So here, i think pass runtime string would be ok.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can do as json object instead of string to tighten this a bit more. I agree not to deserialize into a Runtimes instance.

val execManifest = ExecManifest.initialize(whiskConfig, Some(runtime))
if (execManifest.isFailure) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala nit: you can use a case match here instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just follow other codes using execManifest.isFailure to judge whether success or fail

logging.info(this, s"received invalid runtimes manifest")
complete(StatusCodes.BadRequest)
} else {
parameter('limit.?) { limit =>
Copy link
Contributor Author

@ningyougang ningyougang Jan 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Support passed limit invokers, e.g. ?limit=0:1 ( sent to invoker0, invoker1 only)
And the config runtime request can be sent to some limited invokers which included in managed invokers as well.

limit match {
case Some(targetValue) =>
val pattern = """\d+:\d"""
if (targetValue.matches(pattern)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala nit: you can rewrite this either if/else and nested clauses using case matching on regex.

Copy link
Contributor Author

@ningyougang ningyougang Feb 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm.. can you show a example? my codes like below

if (targetValue.matches(pattern)) {
  val invokerArray = targetValue.split(":")
  val beginIndex = invokerArray(0).toInt
  val finishIndex = invokerArray(1).toInt
  if (finishIndex < beginIndex) {
    logging.info(this, "finishIndex can't be less than beginIndex")
    complete(StatusCodes.BadRequest)
  } else {
    val targetInvokers = (beginIndex to finishIndex).toList
    loadBalancer.sendRuntimeToInvokers(runtime, Some(targetInvokers))
    logging.info(this, "config runtime request is already sent to target invokers")
    complete(StatusCodes.BadRequest)
  }
} else {
  logging.info(this, "limit value can't match [beginIndex:finishIndex]")
  complete(StatusCodes.BadRequest)
}

val invokerArray = targetValue.split(":")
val beginIndex = invokerArray(0).toInt
val finishIndex = invokerArray(1).toInt
if (finishIndex < beginIndex) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great to support 0:0 case as well.
Ansible is using the same way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, already support 0:0 case.

complete(StatusCodes.BadRequest, "finishIndex can't be less than beginIndex")
} else {
val targetInvokers = (beginIndex to finishIndex).toList
loadBalancer.sendRuntimeToInvokers(runtime, Some(targetInvokers))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a later validation to check that the invoker indexing is in range?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, validate it in ShardingContainerPoolLoadbalacer.scala

  /** send runtime to invokers*/
  override def sendRuntimeToInvokers(runtime: String, targetInvokers: Option[List[Int]]): Unit = {
    val runtimeMessage = RuntimeMessage(runtime)
    schedulingState.managedInvokers.filter { manageInvoker =>
      targetInvokers.getOrElse(schedulingState.managedInvokers.map(_.id.instance)).contains(manageInvoker.id.instance)
    } foreach { invokerHealth =>
      val topic = s"invoker${invokerHealth.id.toInt}"
      messageProducer.send(topic, runtimeMessage).andThen {
        case Success(_) =>
          logging.info(this, s"Successfully posted runtime to topic $topic")
        case Failure(_) =>
          logging.error(this, s"Failed posted runtime to topic $topic")
      }
    }
  }

logging.info(this, "config runtime request is already sent to target invokers")
complete(StatusCodes.Accepted)
}
} else {
complete(StatusCodes.BadRequest, "limit value can't match [beginIndex:finishIndex]")
}
case None =>
loadBalancer.sendRuntimeToInvokers(runtime, None)
logging.info(this, "config runtime request is already sent to all managed invokers")
complete(StatusCodes.Accepted)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed to complete(StatusCodes.Accepted), because it is a async operation.

}
}
}
}
} else {
complete(StatusCodes.Unauthorized, "username or password is wrong")
}
case _ => complete(StatusCodes.Unauthorized)
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ trait LoadBalancer {
def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]]

/**
* send runtime to invokers
*
* @param runtime
* @param targetInvokers
*/
def sendRuntimeToInvokers(runtime: String, targetInvokers: Option[List[Int]]): Unit = {}

/**
* Returns a message indicating the health of the containers and/or container pool in general.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.openwhisk.spi.SpiLoader
import scala.annotation.tailrec
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.{Failure, Success}

/**
* A loadbalancer that schedules workload based on a hashing-algorithm.
Expand Down Expand Up @@ -316,6 +317,22 @@ class ShardingContainerPoolBalancer(
}
}

/** send runtime to invokers*/
override def sendRuntimeToInvokers(runtime: String, targetInvokers: Option[List[Int]]): Unit = {
val runtimeMessage = RuntimeMessage(runtime)
schedulingState.managedInvokers.filter { manageInvoker =>
targetInvokers.getOrElse(schedulingState.managedInvokers.map(_.id.instance)).contains(manageInvoker.id.instance)
} foreach { invokerHealth =>
val topic = s"invoker${invokerHealth.id.toInt}"
messageProducer.send(topic, runtimeMessage).andThen {
case Success(_) =>
logging.info(this, s"Successfully posted runtime to topic $topic")
case Failure(_) =>
logging.error(this, s"Failed posted runtime to topic $topic")
}
}
}

override val invokerPool =
invokerPoolFactory.createInvokerPool(
actorSystem,
Expand Down
2 changes: 2 additions & 0 deletions core/invoker/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ whisk {
}

invoker {
username: "invoker.user"
password: "invoker.pass"
protocol: http
}
runtime.delete.timeout = "30 seconds"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@ package org.apache.openwhisk.core.containerpool

import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
import org.apache.openwhisk.core.connector.MessageFeed
import org.apache.openwhisk.core.connector.{MessageFeed, PrewarmContainerData}
import org.apache.openwhisk.core.entity.ExecManifest.ReactivePrewarmingConfig
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._

import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration._
import scala.util.{Random, Try}

case class ColdStartKey(kind: String, memory: ByteSize)

case class PreWarmConfigList(list: List[PrewarmingConfig])
object PrewarmQuery

case object EmitMetrics

case object AdjustPrewarmedContainer
Expand Down Expand Up @@ -68,6 +72,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
var busyPool = immutable.Map.empty[ActorRef, ContainerData]
var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmedData]
var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)]
var latestPrewarmConfig = prewarmConfig
// If all memory slots are occupied and if there is currently no container to be removed, than the actions will be
// buffered here to keep order of computation.
// Otherwise actions with small memory-limits could block actions with large memory limits.
Expand Down Expand Up @@ -297,6 +302,34 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
case RescheduleJob =>
freePool = freePool - sender()
busyPool = busyPool - sender()
case prewarmConfigList: PreWarmConfigList =>
logging.info(this, "update prewarm configuration request is send to invoker")
val passedPrewarmConfig = prewarmConfigList.list
var newPrewarmConfig: List[PrewarmingConfig] = List.empty
latestPrewarmConfig foreach { config =>
newPrewarmConfig = newPrewarmConfig :+ passedPrewarmConfig
.find(passedConfig =>
passedConfig.exec.kind == config.exec.kind && passedConfig.memoryLimit == config.memoryLimit)
.getOrElse(config)
}
latestPrewarmConfig = newPrewarmConfig
Copy link
Contributor Author

@ningyougang ningyougang Jun 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Support just change specify runtime config as well, e.g.
Let's assume runtimes.json include nodejs:12, python:2, swift4.1, we can change nodejs:12 runtime only via just pass nodejs:12 runtime info, for other runtimes, just use previous runtime info directly.

// Delete prewarmedPool firstly
prewarmedPool foreach { element =>
val actor = element._1
actor ! Remove
prewarmedPool = prewarmedPool - actor
}
latestPrewarmConfig foreach { config =>
logging.info(
this,
s"add pre-warming ${config.initialCount} ${config.exec.kind} ${config.memoryLimit.toString}")(
TransactionId.invokerWarmup)
(1 to config.initialCount).foreach { _ =>
prewarmContainer(config.exec, config.memoryLimit, config.reactive.map(_.ttl))
}
}
case PrewarmQuery =>
sender() ! getPrewarmContainer()
case EmitMetrics =>
emitMetrics()

Expand Down Expand Up @@ -327,7 +360,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
def adjustPrewarmedContainer(init: Boolean, scheduled: Boolean): Unit = {
if (scheduled) {
//on scheduled time, remove expired prewarms
ContainerPool.removeExpired(poolConfig, prewarmConfig, prewarmedPool).foreach { p =>
ContainerPool.removeExpired(poolConfig, latestPrewarmConfig, prewarmedPool).foreach { p =>
prewarmedPool = prewarmedPool - p
p ! Remove
}
Expand All @@ -340,7 +373,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
}
//fill in missing prewarms (replaces any deletes)
ContainerPool
.increasePrewarms(init, scheduled, coldStartCount, prewarmConfig, prewarmedPool, prewarmStartingPool)
.increasePrewarms(init, scheduled, coldStartCount, latestPrewarmConfig, prewarmedPool, prewarmStartingPool)
.foreach { c =>
val config = c._1
val currentCount = c._2._1
Expand Down Expand Up @@ -380,7 +413,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,

/** this is only for cold start statistics of prewarm configs, e.g. not blackbox or other configs. */
def incrementColdStartCount(kind: String, memoryLimit: ByteSize): Unit = {
prewarmConfig
latestPrewarmConfig
.filter { config =>
kind == config.exec.kind && memoryLimit == config.memoryLimit
}
Expand Down Expand Up @@ -421,7 +454,9 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,

//get the appropriate ttl from prewarm configs
val ttl =
prewarmConfig.find(pc => pc.memoryLimit == memory && pc.exec.kind == kind).flatMap(_.reactive.map(_.ttl))
latestPrewarmConfig
.find(pc => pc.memoryLimit == memory && pc.exec.kind == kind)
.flatMap(_.reactive.map(_.ttl))
prewarmContainer(action.exec, memory, ttl)
(ref, data)
}
Expand All @@ -434,6 +469,31 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
busyPool = busyPool - toDelete
}

/**
* get the prewarm container
* @return
*/
def getPrewarmContainer(): ListBuffer[PrewarmContainerData] = {
val containerDataList = prewarmedPool.values.toList

var resultList: ListBuffer[PrewarmContainerData] = new ListBuffer[PrewarmContainerData]()
containerDataList.foreach { prewarmData =>
val isInclude = resultList.filter { resultData =>
prewarmData.kind == resultData.kind && prewarmData.memoryLimit.toMB == resultData.memory
}.size > 0

if (isInclude) {
var resultData = resultList.filter { resultData =>
prewarmData.kind == resultData.kind && prewarmData.memoryLimit.toMB == resultData.memory
}.head
resultData.number += 1
} else {
resultList += PrewarmContainerData(prewarmData.kind, prewarmData.memoryLimit.toMB, 1)
}
}
resultList
}

/**
* Calculate if there is enough free memory within a given pool.
*
Expand Down
Loading