Skip to content

Commit edefa8a

Browse files
committed
Config runtime
1 parent d8cf172 commit edefa8a

File tree

20 files changed

+498
-44
lines changed

20 files changed

+498
-44
lines changed

ansible/group_vars/all

+4
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,8 @@ controller:
107107
authentication:
108108
spi: "{{ controller_authentication_spi | default('') }}"
109109
loglevel: "{{ controller_loglevel | default(whisk_loglevel) | default('INFO') }}"
110+
username: "{{ controller_username | default('controller.user') }}"
111+
password: "{{ controller_password | default('controller.pass') }}"
110112
entitlement:
111113
spi: "{{ controller_entitlement_spi | default('') }}"
112114
protocol: "{{ controller_protocol | default('https') }}"
@@ -209,6 +211,8 @@ invoker:
209211
{% endif %}"
210212
extraEnv: "{{ invoker_extraEnv | default({}) }}"
211213
protocol: "{{ invoker_protocol | default('https') }}"
214+
username: "{{ invoker_username | default('invoker.user') }}"
215+
password: "{{ invoker_password | default('invoker.pass') }}"
212216
ssl:
213217
cn: "openwhisk-invokers"
214218
keyPrefix: "{{ __invoker_ssl_keyPrefix }}"

ansible/roles/controller/tasks/deploy.yml

+3
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,9 @@
203203
"CONFIG_whisk_db_activationsFilterDdoc": "{{ db_whisk_activations_filter_ddoc | default() }}"
204204
"CONFIG_whisk_userEvents_enabled": "{{ user_events | default(false) | lower }}"
205205

206+
"CONFIG_whisk_controller_username": "{{ controller.username }}"
207+
"CONFIG_whisk_controller_password": "{{ controller.password }}"
208+
206209
"LIMITS_ACTIONS_INVOKES_PERMINUTE": "{{ limits.invocationsPerMinute }}"
207210
"LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}"
208211
"LIMITS_TRIGGERS_FIRES_PERMINUTE": "{{ limits.firesPerMinute }}"

ansible/roles/invoker/tasks/deploy.yml

+2
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,8 @@
279279
"CONFIG_whisk_invoker_https_keystoreFlavor": "{{ invoker.ssl.storeFlavor }}"
280280
"CONFIG_whisk_invoker_https_clientAuth": "{{ invoker.ssl.clientAuth }}"
281281
"CONFIG_whisk_containerPool_prewarmExpirationCheckInterval": "{{ container_pool_prewarm_expirationCheckInterval | default('1 minute') }}"
282+
"CONFIG_whisk_invoker_username": "{{ invoker.username }}"
283+
"CONFIG_whisk_invoker_password": "{{ invoker.password }}"
282284

283285
- name: extend invoker dns env
284286
set_fact:

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

+5
Original file line numberDiff line numberDiff line change
@@ -291,4 +291,9 @@ object ConfigKeys {
291291
val azBlob = "whisk.azure-blob"
292292

293293
val whiskClusterName = "whisk.cluster.name"
294+
295+
val whiskControllerUsername = "whisk.controller.username"
296+
val whiskControllerPassword = "whisk.controller.password"
297+
val whiskInvokerUsername = "whisk.invoker.username"
298+
val whiskInvokerPassword = "whisk.invoker.password"
294299
}

common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala

+27
Original file line numberDiff line numberDiff line change
@@ -457,3 +457,30 @@ object StatusData extends DefaultJsonProtocol {
457457
implicit val serdes =
458458
jsonFormat(StatusData.apply _, "invocationNamespace", "fqn", "waitingActivation", "status", "data")
459459
}
460+
461+
case class RuntimeMessage(runtime: String) extends Message {
462+
override def serialize = RuntimeMessage.serdes.write(this).compactPrint
463+
}
464+
465+
object RuntimeMessage extends DefaultJsonProtocol {
466+
def parse(msg: String) = Try(serdes.read(msg.parseJson))
467+
implicit val serdes = jsonFormat(RuntimeMessage.apply _, "runtime")
468+
}
469+
470+
case class PrewarmContainerData(kind: String, memory: Long, var number: Int) extends Message {
471+
override def serialize: String = PrewarmContainerData.serdes.write(this).compactPrint
472+
}
473+
474+
object PrewarmContainerData extends DefaultJsonProtocol {
475+
implicit val serdes = jsonFormat(PrewarmContainerData.apply _, "kind", "memory", "number")
476+
}
477+
478+
case class PrewarmContainerDataList(items: List[PrewarmContainerData])
479+
480+
object PrewarmContainerDataProtocol extends DefaultJsonProtocol {
481+
implicit val prewarmContainerDataFormat = jsonFormat(PrewarmContainerData.apply _, "kind", "memory", "number")
482+
implicit object prewarmContainerDataListJsonFormat extends RootJsonFormat[PrewarmContainerDataList] {
483+
def read(value: JsValue) = PrewarmContainerDataList(value.convertTo[List[PrewarmContainerData]])
484+
def write(f: PrewarmContainerDataList) = ???
485+
}
486+
}

core/controller/src/main/resources/application.conf

+4
Original file line numberDiff line numberDiff line change
@@ -117,4 +117,8 @@ whisk{
117117
file-system : true
118118
dir-path : "/swagger-ui/"
119119
}
120+
controller {
121+
username: "controller.user"
122+
password: "controller.pass"
123+
}
120124
}

core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala

+57-3
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ import akka.Done
2121
import akka.actor.{ActorSystem, CoordinatedShutdown}
2222
import akka.event.Logging.InfoLevel
2323
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
24+
import akka.http.scaladsl.model.{StatusCodes, Uri}
2425
import akka.http.scaladsl.model.StatusCodes._
25-
import akka.http.scaladsl.model.Uri
26+
import akka.http.scaladsl.model.headers.BasicHttpCredentials
2627
import akka.http.scaladsl.server.Route
2728
import akka.stream.ActorMaterializer
2829
import kamon.Kamon
@@ -32,7 +33,7 @@ import spray.json.DefaultJsonProtocol._
3233
import spray.json._
3334
import org.apache.openwhisk.common.Https.HttpsConfig
3435
import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId}
35-
import org.apache.openwhisk.core.WhiskConfig
36+
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
3637
import org.apache.openwhisk.core.connector.MessagingProvider
3738
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
3839
import org.apache.openwhisk.core.database.{ActivationStoreProvider, CacheChangeNotification, RemoteCacheInvalidation}
@@ -97,7 +98,7 @@ class Controller(val instance: ControllerInstanceId,
9798
(pathEndOrSingleSlash & get) {
9899
complete(info)
99100
}
100-
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth
101+
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth ~ configRuntime
101102
}
102103

103104
// initialize datastores
@@ -176,6 +177,59 @@ class Controller(val instance: ControllerInstanceId,
176177
LogLimit.config,
177178
runtimes,
178179
List(apiV1.basepath()))
180+
181+
private val controllerUsername = loadConfigOrThrow[String](ConfigKeys.whiskControllerUsername)
182+
private val controllerPassword = loadConfigOrThrow[String](ConfigKeys.whiskControllerPassword)
183+
184+
/**
185+
* config runtime
186+
*/
187+
private val configRuntime = {
188+
implicit val executionContext = actorSystem.dispatcher
189+
(path("config" / "runtime") & post) {
190+
extractCredentials {
191+
case Some(BasicHttpCredentials(username, password)) =>
192+
if (username == controllerUsername && password == controllerPassword) {
193+
entity(as[String]) { runtime =>
194+
val execManifest = ExecManifest.initialize(whiskConfig, Some(runtime))
195+
if (execManifest.isFailure) {
196+
logging.info(this, s"received invalid runtimes manifest")
197+
complete(StatusCodes.BadRequest)
198+
} else {
199+
parameter('limit.?) { limit =>
200+
limit match {
201+
case Some(targetValue) =>
202+
val pattern = """\d+:\d"""
203+
if (targetValue.matches(pattern)) {
204+
val invokerArray = targetValue.split(":")
205+
val beginIndex = invokerArray(0).toInt
206+
val finishIndex = invokerArray(1).toInt
207+
if (finishIndex < beginIndex) {
208+
complete(StatusCodes.BadRequest, "finishIndex can't be less than beginIndex")
209+
} else {
210+
val targetInvokers = (beginIndex to finishIndex).toList
211+
loadBalancer.sendRuntimeToInvokers(runtime, Some(targetInvokers))
212+
logging.info(this, "config runtime request is already sent to target invokers")
213+
complete(StatusCodes.Accepted)
214+
}
215+
} else {
216+
complete(StatusCodes.BadRequest, "limit value can't match [beginIndex:finishIndex]")
217+
}
218+
case None =>
219+
loadBalancer.sendRuntimeToInvokers(runtime, None)
220+
logging.info(this, "config runtime request is already sent to all managed invokers")
221+
complete(StatusCodes.Accepted)
222+
}
223+
}
224+
}
225+
}
226+
} else {
227+
complete(StatusCodes.Unauthorized, "username or password is wrong")
228+
}
229+
case _ => complete(StatusCodes.Unauthorized)
230+
}
231+
}
232+
}
179233
}
180234

181235
/**

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala

+8
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@ trait LoadBalancer {
6060
def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
6161
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]]
6262

63+
/**
64+
* send runtime to invokers
65+
*
66+
* @param runtime
67+
* @param targetInvokers
68+
*/
69+
def sendRuntimeToInvokers(runtime: String, targetInvokers: Option[List[Int]]): Unit = {}
70+
6371
/**
6472
* Returns a message indicating the health of the containers and/or container pool in general.
6573
*

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala

+17
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import org.apache.openwhisk.spi.SpiLoader
4343
import scala.annotation.tailrec
4444
import scala.concurrent.Future
4545
import scala.concurrent.duration.FiniteDuration
46+
import scala.util.{Failure, Success}
4647

4748
/**
4849
* A loadbalancer that schedules workload based on a hashing-algorithm.
@@ -316,6 +317,22 @@ class ShardingContainerPoolBalancer(
316317
}
317318
}
318319

320+
/** send runtime to invokers*/
321+
override def sendRuntimeToInvokers(runtime: String, targetInvokers: Option[List[Int]]): Unit = {
322+
val runtimeMessage = RuntimeMessage(runtime)
323+
schedulingState.managedInvokers.filter { manageInvoker =>
324+
targetInvokers.getOrElse(schedulingState.managedInvokers.map(_.id.instance)).contains(manageInvoker.id.instance)
325+
} foreach { invokerHealth =>
326+
val topic = s"invoker${invokerHealth.id.toInt}"
327+
messageProducer.send(topic, runtimeMessage).andThen {
328+
case Success(_) =>
329+
logging.info(this, s"Successfully posted runtime to topic $topic")
330+
case Failure(_) =>
331+
logging.error(this, s"Failed posted runtime to topic $topic")
332+
}
333+
}
334+
}
335+
319336
override val invokerPool =
320337
invokerPoolFactory.createInvokerPool(
321338
actorSystem,

core/invoker/src/main/resources/application.conf

+2
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ whisk {
171171
}
172172

173173
invoker {
174+
username: "invoker.user"
175+
password: "invoker.pass"
174176
protocol: http
175177
}
176178
runtime.delete.timeout = "30 seconds"

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala

+65-5
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,22 @@ package org.apache.openwhisk.core.containerpool
1919

2020
import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
2121
import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
22-
import org.apache.openwhisk.core.connector.MessageFeed
22+
import org.apache.openwhisk.core.connector.{MessageFeed, PrewarmContainerData}
2323
import org.apache.openwhisk.core.entity.ExecManifest.ReactivePrewarmingConfig
2424
import org.apache.openwhisk.core.entity._
2525
import org.apache.openwhisk.core.entity.size._
2626

2727
import scala.annotation.tailrec
2828
import scala.collection.immutable
29+
import scala.collection.mutable.ListBuffer
2930
import scala.concurrent.duration._
3031
import scala.util.{Random, Try}
3132

3233
case class ColdStartKey(kind: String, memory: ByteSize)
3334

35+
case class PreWarmConfigList(list: List[PrewarmingConfig])
36+
object PrewarmQuery
37+
3438
case object EmitMetrics
3539

3640
case object AdjustPrewarmedContainer
@@ -68,6 +72,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
6872
var busyPool = immutable.Map.empty[ActorRef, ContainerData]
6973
var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmedData]
7074
var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)]
75+
var latestPrewarmConfig = prewarmConfig
7176
// If all memory slots are occupied and if there is currently no container to be removed, than the actions will be
7277
// buffered here to keep order of computation.
7378
// Otherwise actions with small memory-limits could block actions with large memory limits.
@@ -297,6 +302,34 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
297302
case RescheduleJob =>
298303
freePool = freePool - sender()
299304
busyPool = busyPool - sender()
305+
case prewarmConfigList: PreWarmConfigList =>
306+
logging.info(this, "update prewarm configuration request is send to invoker")
307+
val passedPrewarmConfig = prewarmConfigList.list
308+
var newPrewarmConfig: List[PrewarmingConfig] = List.empty
309+
latestPrewarmConfig foreach { config =>
310+
newPrewarmConfig = newPrewarmConfig :+ passedPrewarmConfig
311+
.find(passedConfig =>
312+
passedConfig.exec.kind == config.exec.kind && passedConfig.memoryLimit == config.memoryLimit)
313+
.getOrElse(config)
314+
}
315+
latestPrewarmConfig = newPrewarmConfig
316+
// Delete prewarmedPool firstly
317+
prewarmedPool foreach { element =>
318+
val actor = element._1
319+
actor ! Remove
320+
prewarmedPool = prewarmedPool - actor
321+
}
322+
latestPrewarmConfig foreach { config =>
323+
logging.info(
324+
this,
325+
s"add pre-warming ${config.initialCount} ${config.exec.kind} ${config.memoryLimit.toString}")(
326+
TransactionId.invokerWarmup)
327+
(1 to config.initialCount).foreach { _ =>
328+
prewarmContainer(config.exec, config.memoryLimit, config.reactive.map(_.ttl))
329+
}
330+
}
331+
case PrewarmQuery =>
332+
sender() ! getPrewarmContainer()
300333
case EmitMetrics =>
301334
emitMetrics()
302335

@@ -327,7 +360,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
327360
def adjustPrewarmedContainer(init: Boolean, scheduled: Boolean): Unit = {
328361
if (scheduled) {
329362
//on scheduled time, remove expired prewarms
330-
ContainerPool.removeExpired(poolConfig, prewarmConfig, prewarmedPool).foreach { p =>
363+
ContainerPool.removeExpired(poolConfig, latestPrewarmConfig, prewarmedPool).foreach { p =>
331364
prewarmedPool = prewarmedPool - p
332365
p ! Remove
333366
}
@@ -340,7 +373,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
340373
}
341374
//fill in missing prewarms (replaces any deletes)
342375
ContainerPool
343-
.increasePrewarms(init, scheduled, coldStartCount, prewarmConfig, prewarmedPool, prewarmStartingPool)
376+
.increasePrewarms(init, scheduled, coldStartCount, latestPrewarmConfig, prewarmedPool, prewarmStartingPool)
344377
.foreach { c =>
345378
val config = c._1
346379
val currentCount = c._2._1
@@ -380,7 +413,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
380413

381414
/** this is only for cold start statistics of prewarm configs, e.g. not blackbox or other configs. */
382415
def incrementColdStartCount(kind: String, memoryLimit: ByteSize): Unit = {
383-
prewarmConfig
416+
latestPrewarmConfig
384417
.filter { config =>
385418
kind == config.exec.kind && memoryLimit == config.memoryLimit
386419
}
@@ -421,7 +454,9 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
421454

422455
//get the appropriate ttl from prewarm configs
423456
val ttl =
424-
prewarmConfig.find(pc => pc.memoryLimit == memory && pc.exec.kind == kind).flatMap(_.reactive.map(_.ttl))
457+
latestPrewarmConfig
458+
.find(pc => pc.memoryLimit == memory && pc.exec.kind == kind)
459+
.flatMap(_.reactive.map(_.ttl))
425460
prewarmContainer(action.exec, memory, ttl)
426461
(ref, data)
427462
}
@@ -434,6 +469,31 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
434469
busyPool = busyPool - toDelete
435470
}
436471

472+
/**
473+
* get the prewarm container
474+
* @return
475+
*/
476+
def getPrewarmContainer(): ListBuffer[PrewarmContainerData] = {
477+
val containerDataList = prewarmedPool.values.toList
478+
479+
var resultList: ListBuffer[PrewarmContainerData] = new ListBuffer[PrewarmContainerData]()
480+
containerDataList.foreach { prewarmData =>
481+
val isInclude = resultList.filter { resultData =>
482+
prewarmData.kind == resultData.kind && prewarmData.memoryLimit.toMB == resultData.memory
483+
}.size > 0
484+
485+
if (isInclude) {
486+
var resultData = resultList.filter { resultData =>
487+
prewarmData.kind == resultData.kind && prewarmData.memoryLimit.toMB == resultData.memory
488+
}.head
489+
resultData.number += 1
490+
} else {
491+
resultList += PrewarmContainerData(prewarmData.kind, prewarmData.memoryLimit.toMB, 1)
492+
}
493+
}
494+
resultList
495+
}
496+
437497
/**
438498
* Calculate if there is enough free memory within a given pool.
439499
*

0 commit comments

Comments
 (0)