Skip to content

Commit 214f8e4

Browse files
committed
Config runtime
1 parent 66a9417 commit 214f8e4

File tree

19 files changed

+567
-93
lines changed

19 files changed

+567
-93
lines changed

ansible/group_vars/all

+2
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ controller:
105105
authentication:
106106
spi: "{{ controller_authentication_spi | default('') }}"
107107
loglevel: "{{ controller_loglevel | default(whisk_loglevel) | default('INFO') }}"
108+
username: "{{ controller_username | default('controller.user') }}"
109+
password: "{{ controller_password | default('controller.pass') }}"
108110
entitlement:
109111
spi: "{{ controller_entitlement_spi | default('') }}"
110112
protocol: "{{ controller_protocol | default('https') }}"

ansible/roles/controller/tasks/deploy.yml

+3
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,9 @@
203203
"CONFIG_whisk_db_activationsFilterDdoc": "{{ db_whisk_activations_filter_ddoc | default() }}"
204204
"CONFIG_whisk_userEvents_enabled": "{{ user_events | default(false) | lower }}"
205205

206+
"CONFIG_whisk_credentials_controller_username": "{{ controller.username }}"
207+
"CONFIG_whisk_credentials_controller_password": "{{ controller.password }}"
208+
206209
"LIMITS_ACTIONS_INVOKES_PERMINUTE": "{{ limits.invocationsPerMinute }}"
207210
"LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}"
208211
"LIMITS_TRIGGERS_FIRES_PERMINUTE": "{{ limits.firesPerMinute }}"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.common
19+
20+
case class ControllerCredentials(username: String, password: String)

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

+2
Original file line numberDiff line numberDiff line change
@@ -275,4 +275,6 @@ object ConfigKeys {
275275
val parameterStorage = "whisk.parameter-storage"
276276

277277
val azBlob = "whisk.azure-blob"
278+
279+
val controllerCredentials = "whisk.credentials.controller"
278280
}

common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala

+27
Original file line numberDiff line numberDiff line change
@@ -426,3 +426,30 @@ object EventMessage extends DefaultJsonProtocol {
426426

427427
def parse(msg: String) = Try(format.read(msg.parseJson))
428428
}
429+
430+
case class RuntimeMessage(runtime: String) extends Message {
431+
override def serialize = RuntimeMessage.serdes.write(this).compactPrint
432+
}
433+
434+
object RuntimeMessage extends DefaultJsonProtocol {
435+
def parse(msg: String) = Try(serdes.read(msg.parseJson))
436+
implicit val serdes = jsonFormat(RuntimeMessage.apply _, "runtime")
437+
}
438+
439+
case class PrewarmContainerData(kind: String, memory: Long, var number: Int) extends Message {
440+
override def serialize: String = PrewarmContainerData.serdes.write(this).compactPrint
441+
}
442+
443+
object PrewarmContainerData extends DefaultJsonProtocol {
444+
implicit val serdes = jsonFormat(PrewarmContainerData.apply _, "kind", "memory", "number")
445+
}
446+
447+
case class PrewarmContainerDataList(items: List[PrewarmContainerData])
448+
449+
object PrewarmContainerDataProtocol extends DefaultJsonProtocol {
450+
implicit val prewarmContainerDataFormat = jsonFormat(PrewarmContainerData.apply _, "kind", "memory", "number")
451+
implicit object prewarmContainerDataListJsonFormat extends RootJsonFormat[PrewarmContainerDataList] {
452+
def read(value: JsValue) = PrewarmContainerDataList(value.convertTo[List[PrewarmContainerData]])
453+
def write(f: PrewarmContainerDataList) = ???
454+
}
455+
}

common/scala/src/main/scala/org/apache/openwhisk/core/entity/ExecManifest.scala

+14
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,20 @@ protected[core] object ExecManifest {
5858
mf
5959
}
6060

61+
/**
62+
* Reads runtimes manifest from runtime string
63+
*
64+
* @param runtime
65+
* @return the manifest if initialized successfully, or an failure
66+
*/
67+
protected[core] def initialize(runtime: String): Try[Runtimes] = {
68+
val rmc = loadConfigOrThrow[RuntimeManifestConfig](ConfigKeys.runtimes)
69+
val mf = Try(runtime.parseJson.asJsObject).flatMap(runtimes(_, rmc))
70+
var manifest: Option[Runtimes] = None
71+
mf.foreach(m => manifest = Some(m))
72+
mf
73+
}
74+
6175
/**
6276
* Gets existing runtime manifests.
6377
*

core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala

+64-4
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ import akka.Done
2121
import akka.actor.{ActorSystem, CoordinatedShutdown}
2222
import akka.event.Logging.InfoLevel
2323
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
24+
import akka.http.scaladsl.model.{StatusCodes, Uri}
2425
import akka.http.scaladsl.model.StatusCodes._
25-
import akka.http.scaladsl.model.Uri
26+
import akka.http.scaladsl.model.headers.BasicHttpCredentials
2627
import akka.http.scaladsl.server.Route
2728
import akka.stream.ActorMaterializer
2829
import kamon.Kamon
@@ -31,8 +32,15 @@ import pureconfig.generic.auto._
3132
import spray.json.DefaultJsonProtocol._
3233
import spray.json._
3334
import org.apache.openwhisk.common.Https.HttpsConfig
34-
import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId}
35-
import org.apache.openwhisk.core.WhiskConfig
35+
import org.apache.openwhisk.common.{
36+
AkkaLogging,
37+
ConfigMXBean,
38+
ControllerCredentials,
39+
Logging,
40+
LoggingMarkers,
41+
TransactionId
42+
}
43+
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
3644
import org.apache.openwhisk.core.connector.MessagingProvider
3745
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
3846
import org.apache.openwhisk.core.database.{ActivationStoreProvider, CacheChangeNotification, RemoteCacheInvalidation}
@@ -97,7 +105,7 @@ class Controller(val instance: ControllerInstanceId,
97105
(pathEndOrSingleSlash & get) {
98106
complete(info)
99107
}
100-
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth
108+
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth ~ configRuntime
101109
}
102110

103111
// initialize datastores
@@ -176,6 +184,58 @@ class Controller(val instance: ControllerInstanceId,
176184
LogLimit.config,
177185
runtimes,
178186
List(apiV1.basepath()))
187+
188+
private val controllerCredentials = loadConfigOrThrow[ControllerCredentials](ConfigKeys.controllerCredentials)
189+
190+
/**
191+
* config runtime
192+
*/
193+
private val configRuntime = {
194+
implicit val executionContext = actorSystem.dispatcher
195+
(path("config" / "runtime") & post) {
196+
extractCredentials {
197+
case Some(BasicHttpCredentials(username, password)) =>
198+
if (username == controllerCredentials.username && password == controllerCredentials.password) {
199+
entity(as[String]) { runtime =>
200+
val execManifest = ExecManifest.initialize(runtime)
201+
if (execManifest.isFailure) {
202+
logging.info(this, s"received invalid runtimes manifest")
203+
complete(StatusCodes.BadRequest)
204+
} else {
205+
parameter('limit.?) { limit =>
206+
limit match {
207+
case Some(targetValue) =>
208+
val pattern = """\d+:\d"""
209+
if (targetValue.matches(pattern)) {
210+
val invokerArray = targetValue.split(":")
211+
val beginIndex = invokerArray(0).toInt
212+
val finishIndex = invokerArray(1).toInt
213+
if (finishIndex < beginIndex) {
214+
complete(StatusCodes.BadRequest, "finishIndex can't be less than beginIndex")
215+
} else {
216+
val targetInvokers = (beginIndex to finishIndex).toList
217+
loadBalancer.sendRuntimeToInvokers(runtime, Some(targetInvokers))
218+
logging.info(this, "config runtime request is already sent to target invokers")
219+
complete(StatusCodes.Accepted)
220+
}
221+
} else {
222+
complete(StatusCodes.BadRequest, "limit value can't match [beginIndex:finishIndex]")
223+
}
224+
case None =>
225+
loadBalancer.sendRuntimeToInvokers(runtime, None)
226+
logging.info(this, "config runtime request is already sent to all managed invokers")
227+
complete(StatusCodes.Accepted)
228+
}
229+
}
230+
}
231+
}
232+
} else {
233+
complete(StatusCodes.Unauthorized, "username or password is wrong")
234+
}
235+
case _ => complete(StatusCodes.Unauthorized)
236+
}
237+
}
238+
}
179239
}
180240

181241
/**

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala

+8
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@ trait LoadBalancer {
6060
def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
6161
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]]
6262

63+
/**
64+
* send runtime to invokers
65+
*
66+
* @param runtime
67+
* @param targetInvokers
68+
*/
69+
def sendRuntimeToInvokers(runtime: String, targetInvokers: Option[List[Int]]): Unit = {}
70+
6371
/**
6472
* Returns a message indicating the health of the containers and/or container pool in general.
6573
*

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala

+17
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import org.apache.openwhisk.spi.SpiLoader
4343
import scala.annotation.tailrec
4444
import scala.concurrent.Future
4545
import scala.concurrent.duration.FiniteDuration
46+
import scala.util.{Failure, Success}
4647

4748
/**
4849
* A loadbalancer that schedules workload based on a hashing-algorithm.
@@ -316,6 +317,22 @@ class ShardingContainerPoolBalancer(
316317
}
317318
}
318319

320+
/** send runtime to invokers*/
321+
override def sendRuntimeToInvokers(runtime: String, targetInvokers: Option[List[Int]]): Unit = {
322+
val runtimeMessage = RuntimeMessage(runtime)
323+
schedulingState.managedInvokers.filter { manageInvoker =>
324+
targetInvokers.getOrElse(schedulingState.managedInvokers.map(_.id.instance)).contains(manageInvoker.id.instance)
325+
} foreach { invokerHealth =>
326+
val topic = s"invoker${invokerHealth.id.toInt}"
327+
messageProducer.send(topic, runtimeMessage).andThen {
328+
case Success(_) =>
329+
logging.info(this, s"Successfully posted runtime to topic $topic")
330+
case Failure(_) =>
331+
logging.error(this, s"Failed posted runtime to topic $topic")
332+
}
333+
}
334+
}
335+
319336
override val invokerPool =
320337
invokerPoolFactory.createInvokerPool(
321338
actorSystem,

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala

+65-5
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@ package org.apache.openwhisk.core.containerpool
1919

2020
import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
2121
import org.apache.openwhisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId}
22-
import org.apache.openwhisk.core.connector.MessageFeed
22+
import org.apache.openwhisk.core.connector.{MessageFeed, PrewarmContainerData}
2323
import org.apache.openwhisk.core.entity.ExecManifest.ReactivePrewarmingConfig
2424
import org.apache.openwhisk.core.entity._
2525
import org.apache.openwhisk.core.entity.size._
2626

2727
import scala.annotation.tailrec
2828
import scala.collection.immutable
29+
import scala.collection.mutable.ListBuffer
2930
import scala.concurrent.duration._
3031
import scala.util.{Random, Try}
3132

@@ -37,6 +38,9 @@ case class ColdStartKey(kind: String, memory: ByteSize)
3738

3839
case class WorkerData(data: ContainerData, state: WorkerState)
3940

41+
case class PreWarmConfigList(list: List[PrewarmingConfig])
42+
object PrewarmQuery
43+
4044
case object EmitMetrics
4145

4246
case object AdjustPrewarmedContainer
@@ -74,6 +78,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
7478
var busyPool = immutable.Map.empty[ActorRef, ContainerData]
7579
var prewarmedPool = immutable.Map.empty[ActorRef, PreWarmedData]
7680
var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)]
81+
var latestPrewarmConfig = prewarmConfig
7782
// If all memory slots are occupied and if there is currently no container to be removed, than the actions will be
7883
// buffered here to keep order of computation.
7984
// Otherwise actions with small memory-limits could block actions with large memory limits.
@@ -305,6 +310,34 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
305310
case RescheduleJob =>
306311
freePool = freePool - sender()
307312
busyPool = busyPool - sender()
313+
case prewarmConfigList: PreWarmConfigList =>
314+
logging.info(this, "update prewarm configuration request is send to invoker")
315+
val passedPrewarmConfig = prewarmConfigList.list
316+
var newPrewarmConfig: List[PrewarmingConfig] = List.empty
317+
latestPrewarmConfig foreach { config =>
318+
newPrewarmConfig = newPrewarmConfig :+ passedPrewarmConfig
319+
.find(passedConfig =>
320+
passedConfig.exec.kind == config.exec.kind && passedConfig.memoryLimit == config.memoryLimit)
321+
.getOrElse(config)
322+
}
323+
latestPrewarmConfig = newPrewarmConfig
324+
// Delete prewarmedPool firstly
325+
prewarmedPool foreach { element =>
326+
val actor = element._1
327+
actor ! Remove
328+
prewarmedPool = prewarmedPool - actor
329+
}
330+
latestPrewarmConfig foreach { config =>
331+
logging.info(
332+
this,
333+
s"add pre-warming ${config.initialCount} ${config.exec.kind} ${config.memoryLimit.toString}")(
334+
TransactionId.invokerWarmup)
335+
(1 to config.initialCount).foreach { _ =>
336+
prewarmContainer(config.exec, config.memoryLimit, config.reactive.map(_.ttl))
337+
}
338+
}
339+
case PrewarmQuery =>
340+
sender() ! getPrewarmContainer()
308341
case EmitMetrics =>
309342
emitMetrics()
310343

@@ -335,7 +368,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
335368
def adjustPrewarmedContainer(init: Boolean, scheduled: Boolean): Unit = {
336369
if (scheduled) {
337370
//on scheduled time, remove expired prewarms
338-
ContainerPool.removeExpired(poolConfig, prewarmConfig, prewarmedPool).foreach { p =>
371+
ContainerPool.removeExpired(poolConfig, latestPrewarmConfig, prewarmedPool).foreach { p =>
339372
prewarmedPool = prewarmedPool - p
340373
p ! Remove
341374
}
@@ -348,7 +381,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
348381
}
349382
//fill in missing prewarms (replaces any deletes)
350383
ContainerPool
351-
.increasePrewarms(init, scheduled, coldStartCount, prewarmConfig, prewarmedPool, prewarmStartingPool)
384+
.increasePrewarms(init, scheduled, coldStartCount, latestPrewarmConfig, prewarmedPool, prewarmStartingPool)
352385
.foreach { c =>
353386
val config = c._1
354387
val currentCount = c._2._1
@@ -382,7 +415,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
382415

383416
/** this is only for cold start statistics of prewarm configs, e.g. not blackbox or other configs. */
384417
def incrementColdStartCount(kind: String, memoryLimit: ByteSize): Unit = {
385-
prewarmConfig
418+
latestPrewarmConfig
386419
.filter { config =>
387420
kind == config.exec.kind && memoryLimit == config.memoryLimit
388421
}
@@ -423,7 +456,9 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
423456

424457
//get the appropriate ttl from prewarm configs
425458
val ttl =
426-
prewarmConfig.find(pc => pc.memoryLimit == memory && pc.exec.kind == kind).flatMap(_.reactive.map(_.ttl))
459+
latestPrewarmConfig
460+
.find(pc => pc.memoryLimit == memory && pc.exec.kind == kind)
461+
.flatMap(_.reactive.map(_.ttl))
427462
prewarmContainer(action.exec, memory, ttl)
428463
(ref, data)
429464
}
@@ -436,6 +471,31 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
436471
busyPool = busyPool - toDelete
437472
}
438473

474+
/**
475+
* get the prewarm container
476+
* @return
477+
*/
478+
def getPrewarmContainer(): ListBuffer[PrewarmContainerData] = {
479+
val containerDataList = prewarmedPool.values.toList
480+
481+
var resultList: ListBuffer[PrewarmContainerData] = new ListBuffer[PrewarmContainerData]()
482+
containerDataList.foreach { prewarmData =>
483+
val isInclude = resultList.filter { resultData =>
484+
prewarmData.kind == resultData.kind && prewarmData.memoryLimit.toMB == resultData.memory
485+
}.size > 0
486+
487+
if (isInclude) {
488+
var resultData = resultList.filter { resultData =>
489+
prewarmData.kind == resultData.kind && prewarmData.memoryLimit.toMB == resultData.memory
490+
}.head
491+
resultData.number += 1
492+
} else {
493+
resultList += PrewarmContainerData(prewarmData.kind, prewarmData.memoryLimit.toMB, 1)
494+
}
495+
}
496+
resultList
497+
}
498+
439499
/**
440500
* Calculate if there is enough free memory within a given pool.
441501
*

0 commit comments

Comments
 (0)