Skip to content

Commit 12201bb

Browse files
committed
Config runtime
Sometimes, admin may want to reinitalize the runtime config depend on the real requirements, e.g. increase some prewarm containers
1 parent 394e9f6 commit 12201bb

File tree

19 files changed

+563
-90
lines changed

19 files changed

+563
-90
lines changed

ansible/group_vars/all

+2
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ controller:
105105
authentication:
106106
spi: "{{ controller_authentication_spi | default('') }}"
107107
loglevel: "{{ controller_loglevel | default(whisk_loglevel) | default('INFO') }}"
108+
username: "{{ controller_username | default('controller.user') }}"
109+
password: "{{ controller_password | default('controller.pass') }}"
108110
entitlement:
109111
spi: "{{ controller_entitlement_spi | default('') }}"
110112
protocol: "{{ controller_protocol | default('https') }}"

ansible/roles/controller/tasks/deploy.yml

+3
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,9 @@
203203
"CONFIG_whisk_db_activationsFilterDdoc": "{{ db_whisk_activations_filter_ddoc | default() }}"
204204
"CONFIG_whisk_userEvents_enabled": "{{ user_events | default(false) | lower }}"
205205

206+
"CONFIG_whisk_credentials_controller_username": "{{ controller.username }}"
207+
"CONFIG_whisk_credentials_controller_password": "{{ controller.password }}"
208+
206209
"LIMITS_ACTIONS_INVOKES_PERMINUTE": "{{ limits.invocationsPerMinute }}"
207210
"LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}"
208211
"LIMITS_TRIGGERS_FIRES_PERMINUTE": "{{ limits.firesPerMinute }}"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.common
19+
20+
case class ControllerCredentials(username: String, password: String)

common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala

+2
Original file line numberDiff line numberDiff line change
@@ -266,4 +266,6 @@ object ConfigKeys {
266266
val swaggerUi = "whisk.swagger-ui"
267267

268268
val apacheClientConfig = "whisk.apache-client"
269+
270+
val controllerCredentials = "whisk.credentials.controller"
269271
}

common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala

+27
Original file line numberDiff line numberDiff line change
@@ -429,3 +429,30 @@ object EventMessage extends DefaultJsonProtocol {
429429

430430
def parse(msg: String) = Try(format.read(msg.parseJson))
431431
}
432+
433+
case class RuntimeMessage(runtime: String) extends Message {
434+
override def serialize = RuntimeMessage.serdes.write(this).compactPrint
435+
}
436+
437+
object RuntimeMessage extends DefaultJsonProtocol {
438+
def parse(msg: String) = Try(serdes.read(msg.parseJson))
439+
implicit val serdes = jsonFormat(RuntimeMessage.apply _, "runtime")
440+
}
441+
442+
case class PrewarmContainerData(kind: String, memory: Long, var number: Int) extends Message {
443+
override def serialize: String = PrewarmContainerData.serdes.write(this).compactPrint
444+
}
445+
446+
object PrewarmContainerData extends DefaultJsonProtocol {
447+
implicit val serdes = jsonFormat(PrewarmContainerData.apply _, "kind", "memory", "number")
448+
}
449+
450+
case class PrewarmContainerDataList(items: List[PrewarmContainerData])
451+
452+
object PrewarmContainerDataProtocol extends DefaultJsonProtocol {
453+
implicit val prewarmContainerDataFormat = jsonFormat(PrewarmContainerData.apply _, "kind", "memory", "number")
454+
implicit object prewarmContainerDataListJsonFormat extends RootJsonFormat[PrewarmContainerDataList] {
455+
def read(value: JsValue) = PrewarmContainerDataList(value.convertTo[List[PrewarmContainerData]])
456+
def write(f: PrewarmContainerDataList) = ???
457+
}
458+
}

common/scala/src/main/scala/org/apache/openwhisk/core/entity/ExecManifest.scala

+14
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,20 @@ protected[core] object ExecManifest {
5555
mf
5656
}
5757

58+
/**
59+
* Reads runtimes manifest from runtime string
60+
*
61+
* @param runtime
62+
* @return the manifest if initialized successfully, or an failure
63+
*/
64+
protected[core] def initialize(runtime: String): Try[Runtimes] = {
65+
val rmc = loadConfigOrThrow[RuntimeManifestConfig](ConfigKeys.runtimes)
66+
val mf = Try(runtime.parseJson.asJsObject).flatMap(runtimes(_, rmc))
67+
var manifest: Option[Runtimes] = None
68+
mf.foreach(m => manifest = Some(m))
69+
mf
70+
}
71+
5872
/**
5973
* Gets existing runtime manifests.
6074
*

core/controller/src/main/scala/org/apache/openwhisk/core/controller/Controller.scala

+67-4
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ import akka.Done
2121
import akka.actor.{ActorSystem, CoordinatedShutdown}
2222
import akka.event.Logging.InfoLevel
2323
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
24-
import akka.http.scaladsl.model.Uri
24+
import akka.http.scaladsl.model.{StatusCodes, Uri}
25+
import akka.http.scaladsl.model.headers.BasicHttpCredentials
2526
import akka.http.scaladsl.server.Route
2627
import akka.stream.ActorMaterializer
2728
import kamon.Kamon
@@ -30,8 +31,15 @@ import pureconfig.generic.auto._
3031
import spray.json.DefaultJsonProtocol._
3132
import spray.json._
3233
import org.apache.openwhisk.common.Https.HttpsConfig
33-
import org.apache.openwhisk.common.{AkkaLogging, ConfigMXBean, Logging, LoggingMarkers, TransactionId}
34-
import org.apache.openwhisk.core.WhiskConfig
34+
import org.apache.openwhisk.common.{
35+
AkkaLogging,
36+
ConfigMXBean,
37+
ControllerCredentials,
38+
Logging,
39+
LoggingMarkers,
40+
TransactionId
41+
}
42+
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
3543
import org.apache.openwhisk.core.connector.MessagingProvider
3644
import org.apache.openwhisk.core.containerpool.logging.LogStoreProvider
3745
import org.apache.openwhisk.core.database.{ActivationStoreProvider, CacheChangeNotification, RemoteCacheInvalidation}
@@ -96,7 +104,7 @@ class Controller(val instance: ControllerInstanceId,
96104
(pathEndOrSingleSlash & get) {
97105
complete(info)
98106
}
99-
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth
107+
} ~ apiV1.routes ~ swagger.swaggerRoutes ~ internalInvokerHealth ~ configRuntime
100108
}
101109

102110
// initialize datastores
@@ -163,6 +171,61 @@ class Controller(val instance: ControllerInstanceId,
163171
LogLimit.config,
164172
runtimes,
165173
List(apiV1.basepath()))
174+
175+
private val controllerCredentials = loadConfigOrThrow[ControllerCredentials](ConfigKeys.controllerCredentials)
176+
177+
/**
178+
* config runtime
179+
*/
180+
private val configRuntime = {
181+
implicit val executionContext = actorSystem.dispatcher
182+
(path("config" / "runtime") & post) {
183+
extractCredentials {
184+
case Some(BasicHttpCredentials(username, password)) =>
185+
if (username == controllerCredentials.username && password == controllerCredentials.password) {
186+
entity(as[String]) { runtime =>
187+
val execManifest = ExecManifest.initialize(runtime)
188+
if (execManifest.isFailure) {
189+
logging.info(this, s"received invalid runtimes manifest")
190+
complete(StatusCodes.BadRequest)
191+
} else {
192+
parameter('limit.?) { limit =>
193+
limit match {
194+
case Some(targetValue) =>
195+
val pattern = """\d+:\d"""
196+
if (targetValue.matches(pattern)) {
197+
val invokerArray = targetValue.split(":")
198+
val beginIndex = invokerArray(0).toInt
199+
val finishIndex = invokerArray(1).toInt
200+
if (finishIndex < beginIndex) {
201+
logging.info(this, "finishIndex can't be less than beginIndex")
202+
complete(StatusCodes.BadRequest)
203+
} else {
204+
val targetInvokers = (beginIndex to finishIndex).toList
205+
loadBalancer.sendRuntimeToInvokers(runtime, Some(targetInvokers))
206+
logging.info(this, "config runtime request is already sent to target invokers")
207+
complete(StatusCodes.BadRequest)
208+
}
209+
} else {
210+
logging.info(this, "limit value can't match [beginIndex:finishIndex]")
211+
complete(StatusCodes.BadRequest)
212+
}
213+
case None =>
214+
loadBalancer.sendRuntimeToInvokers(runtime, None)
215+
logging.info(this, "config runtime request is already sent to all managed invokers")
216+
complete(StatusCodes.Accepted)
217+
}
218+
}
219+
}
220+
}
221+
} else {
222+
logging.info(this, s"username or password is wrong")
223+
complete(StatusCodes.Unauthorized)
224+
}
225+
case _ => complete(StatusCodes.Unauthorized)
226+
}
227+
}
228+
}
166229
}
167230

168231
/**

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/LoadBalancer.scala

+8
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,14 @@ trait LoadBalancer {
6060
def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
6161
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]]
6262

63+
/**
64+
* send runtime to invokers
65+
*
66+
* @param runtime
67+
* @param targetInvokers
68+
*/
69+
def sendRuntimeToInvokers(runtime: String, targetInvokers: Option[List[Int]]): Unit = {}
70+
6371
/**
6472
* Returns a message indicating the health of the containers and/or container pool in general.
6573
*

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala

+17
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import org.apache.openwhisk.spi.SpiLoader
4343
import scala.annotation.tailrec
4444
import scala.concurrent.Future
4545
import scala.concurrent.duration.FiniteDuration
46+
import scala.util.{Failure, Success}
4647

4748
/**
4849
* A loadbalancer that schedules workload based on a hashing-algorithm.
@@ -316,6 +317,22 @@ class ShardingContainerPoolBalancer(
316317
}
317318
}
318319

320+
/** send runtime to invokers*/
321+
override def sendRuntimeToInvokers(runtime: String, targetInvokers: Option[List[Int]]): Unit = {
322+
val runtimeMessage = RuntimeMessage(runtime)
323+
schedulingState.managedInvokers.filter { manageInvoker =>
324+
targetInvokers.getOrElse(schedulingState.managedInvokers.map(_.id.instance)).contains(manageInvoker.id.instance)
325+
} foreach { invokerHealth =>
326+
val topic = s"invoker${invokerHealth.id.toInt}"
327+
messageProducer.send(topic, runtimeMessage).andThen {
328+
case Success(_) =>
329+
logging.info(this, s"Successfully posted runtime to topic $topic")
330+
case Failure(_) =>
331+
logging.error(this, s"Failed posted runtime to topic $topic")
332+
}
333+
}
334+
}
335+
319336
override val invokerPool =
320337
invokerPoolFactory.createInvokerPool(
321338
actorSystem,

core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerPool.scala

+57-2
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ package org.apache.openwhisk.core.containerpool
2020
import akka.actor.{Actor, ActorRef, ActorRefFactory, Props}
2121
import org.apache.openwhisk.common.MetricEmitter
2222
import org.apache.openwhisk.common.{AkkaLogging, LoggingMarkers, TransactionId}
23-
import org.apache.openwhisk.core.connector.MessageFeed
23+
import org.apache.openwhisk.core.connector.{MessageFeed, PrewarmContainerData}
2424
import org.apache.openwhisk.core.entity._
2525
import org.apache.openwhisk.core.entity.size._
26+
2627
import scala.annotation.tailrec
2728
import scala.collection.immutable
29+
import scala.collection.mutable.ListBuffer
2830
import scala.concurrent.duration._
2931
import scala.util.Try
3032

@@ -34,6 +36,9 @@ case object Free extends WorkerState
3436

3537
case class WorkerData(data: ContainerData, state: WorkerState)
3638

39+
case class PreWarmConfigList(list: List[PrewarmingConfig])
40+
object PrewarmQuery
41+
3742
case object EmitMetrics
3843

3944
/**
@@ -70,6 +75,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
7075
var busyPool = immutable.Map.empty[ActorRef, ContainerData]
7176
var prewarmedPool = immutable.Map.empty[ActorRef, ContainerData]
7277
var prewarmStartingPool = immutable.Map.empty[ActorRef, (String, ByteSize)]
78+
var latestPrewarmConfig = prewarmConfig
7379
// If all memory slots are occupied and if there is currently no container to be removed, than the actions will be
7480
// buffered here to keep order of computation.
7581
// Otherwise actions with small memory-limits could block actions with large memory limits.
@@ -279,6 +285,28 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
279285
case RescheduleJob =>
280286
freePool = freePool - sender()
281287
busyPool = busyPool - sender()
288+
case prewarmConfigList: PreWarmConfigList =>
289+
latestPrewarmConfig = prewarmConfigList.list
290+
prewarmConfigList.list foreach { config =>
291+
// Delete matched prewarm container from prewarmedPool firstly
292+
val kind = config.exec.kind
293+
val memory = config.memoryLimit
294+
prewarmedPool.filter {
295+
case (_, PreWarmedData(_, `kind`, `memory`, _)) => true
296+
case _ => false
297+
} foreach { element =>
298+
val actor = element._1
299+
actor ! Remove
300+
prewarmedPool = prewarmedPool - actor
301+
}
302+
logging.info(this, s"add pre-warming ${config.count} ${config.exec.kind} ${config.memoryLimit.toString}")(
303+
TransactionId.invokerWarmup)
304+
(1 to config.count).foreach { _ =>
305+
prewarmContainer(config.exec, config.memoryLimit)
306+
}
307+
}
308+
case PrewarmQuery =>
309+
sender() ! getPrewarmContainer()
282310
case EmitMetrics =>
283311
emitMetrics()
284312
}
@@ -304,7 +332,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
304332

305333
/** Install prewarm containers up to the configured requirements for each kind/memory combination. */
306334
def backfillPrewarms(init: Boolean) = {
307-
prewarmConfig.foreach { config =>
335+
latestPrewarmConfig.foreach { config =>
308336
val kind = config.exec.kind
309337
val memory = config.memoryLimit
310338
val currentCount = prewarmedPool.count {
@@ -375,6 +403,33 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef,
375403
busyPool = busyPool - toDelete
376404
}
377405

406+
/**
407+
* get the prewarm container
408+
* @return
409+
*/
410+
def getPrewarmContainer(): ListBuffer[PrewarmContainerData] = {
411+
val containerDataList = prewarmedPool.values.toList.map { data =>
412+
data.asInstanceOf[PreWarmedData]
413+
}
414+
415+
var resultList: ListBuffer[PrewarmContainerData] = new ListBuffer[PrewarmContainerData]()
416+
containerDataList.foreach { prewarmData =>
417+
val isInclude = resultList.filter { resultData =>
418+
prewarmData.kind == resultData.kind && prewarmData.memoryLimit.toMB == resultData.memory
419+
}.size > 0
420+
421+
if (isInclude) {
422+
var resultData = resultList.filter { resultData =>
423+
prewarmData.kind == resultData.kind && prewarmData.memoryLimit.toMB == resultData.memory
424+
}.head
425+
resultData.number += 1
426+
} else {
427+
resultList += PrewarmContainerData(prewarmData.kind, prewarmData.memoryLimit.toMB, 1)
428+
}
429+
}
430+
resultList
431+
}
432+
378433
/**
379434
* Calculate if there is enough free memory within a given pool.
380435
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.invoker
19+
20+
import akka.actor.ActorSystem
21+
import akka.http.scaladsl.server.Route
22+
import org.apache.openwhisk.common.{Logging, TransactionId}
23+
24+
import org.apache.openwhisk.http.BasicRasService
25+
26+
import scala.concurrent.ExecutionContext
27+
28+
/**
29+
* Implements web server to handle certain REST API calls.
30+
*/
31+
class DefaultInvokerServer(val invoker: InvokerCore)(implicit val ec: ExecutionContext,
32+
val actorSystem: ActorSystem,
33+
val logger: Logging)
34+
extends BasicRasService {
35+
36+
override def routes(implicit transid: TransactionId): Route = {
37+
super.routes ~ {
38+
(path("getRuntime") & get) {
39+
invoker.getRuntime()
40+
}
41+
}
42+
}
43+
}
44+
45+
object DefaultInvokerServer extends InvokerServerProvider {
46+
override def instance(
47+
invoker: InvokerCore)(implicit ec: ExecutionContext, actorSystem: ActorSystem, logger: Logging): BasicRasService =
48+
new DefaultInvokerServer(invoker)
49+
}

0 commit comments

Comments
 (0)