Skip to content

Commit 573efc4

Browse files
committed
OW-5097: Implement LoadBalancer Strategy based on annotation
- implemented MuxLoadBalancer to dynamically select load balancing strategy based on annotation of the action
1 parent a201e02 commit 573efc4

File tree

8 files changed

+357
-5
lines changed

8 files changed

+357
-5
lines changed

core/controller/src/main/resources/reference.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ whisk {
2020
use-cluster-bootstrap: false
2121
}
2222
loadbalancer {
23+
strategy {
24+
default = ""
25+
custom = {}
26+
}
2327
managed-fraction: 90%
2428
blackbox-fraction: 10%
2529
# factor to increase the timeout for forced active acks
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package org.apache.openwhisk.core.loadBalancer
2+
3+
import akka.actor.{ActorRef, ActorSystem, Props}
4+
import akka.stream.ActorMaterializer
5+
import org.apache.openwhisk.common.{Logging, TransactionId}
6+
import org.apache.openwhisk.core.{ConfigKeys, WhiskConfig}
7+
import org.apache.openwhisk.core.WhiskConfig._
8+
import org.apache.openwhisk.core.connector.{ActivationMessage, MessagingProvider}
9+
import org.apache.openwhisk.core.entity._
10+
import org.apache.openwhisk.spi.SpiLoader
11+
import pureconfig.loadConfigOrThrow
12+
import spray.json._
13+
import pureconfig._
14+
import pureconfig.generic.auto._
15+
16+
import scala.concurrent.Future
17+
18+
class MuxBalancer(config: WhiskConfig,
19+
feedFactory: FeedFactory,
20+
controllerInstance: ControllerInstanceId,
21+
implicit val messagingProvider: MessagingProvider = SpiLoader.get[MessagingProvider],
22+
override val lbConfig: ShardingContainerPoolBalancerConfig =
23+
loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer))(
24+
implicit actorSystem: ActorSystem,
25+
logging: Logging,
26+
materializer: ActorMaterializer)
27+
extends CommonLoadBalancer(config, feedFactory, controllerInstance) {
28+
29+
private val defaultLoadBalancer =
30+
getClass[LoadBalancerProvider](lbConfig.strategy.default).instance(config, controllerInstance)
31+
private val customLoadBalancerMap: Map[String, LoadBalancer] =
32+
lbConfig.strategy.custom.foldLeft(Map.empty[String, LoadBalancer]) {
33+
case (result, (name, strategyConfig)) =>
34+
result + (name -> getClass[LoadBalancerProvider](strategyConfig.className).instance(config, controllerInstance))
35+
}
36+
37+
/**
38+
* Instantiates an object of the given type.
39+
*
40+
* Similar to SpiLoader.get, with the difference that the constructed class does not need to be declared as Spi.
41+
* Thus there could be multiple classes implementing same interface constructed at the same time
42+
*
43+
* @param name the name of the class
44+
* @tparam A expected type to return
45+
* @return instance of the class
46+
*/
47+
private def getClass[A](name: String): A = {
48+
val clazz = Class.forName(name + "$")
49+
val classInst = clazz.getField("MODULE$").get(clazz).asInstanceOf[A]
50+
classInst
51+
}
52+
53+
override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = Future.successful(IndexedSeq.empty[InvokerHealth])
54+
override protected def releaseInvoker(invoker: InvokerInstanceId, entry: ActivationEntry) = {
55+
// Currently do nothing
56+
}
57+
override protected val invokerPool: ActorRef = actorSystem.actorOf(Props.empty)
58+
59+
/**
60+
* Publish a message to the loadbalancer
61+
*
62+
* Select the LoadBalancer based on the annotation, if available, otherwise use the default one
63+
**/
64+
override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
65+
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
66+
action.annotations.get("activationStrategy") match {
67+
case None =>
68+
defaultLoadBalancer.publish(action, msg)
69+
case Some(JsString(value)) => {
70+
if (customLoadBalancerMap.contains(value)) {
71+
customLoadBalancerMap(value).publish(action, msg)
72+
} else {
73+
defaultLoadBalancer.publish(action, msg)
74+
}
75+
}
76+
case Some(_) => defaultLoadBalancer.publish(action, msg)
77+
}
78+
}
79+
}
80+
81+
object MuxBalancer extends LoadBalancerProvider {
82+
83+
override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(
84+
implicit actorSystem: ActorSystem,
85+
logging: Logging,
86+
materializer: ActorMaterializer): LoadBalancer = {
87+
88+
new MuxBalancer(whiskConfig, createFeedFactory(whiskConfig, instance), instance)
89+
}
90+
91+
def requiredProperties =
92+
ExecManifest.requiredProperties ++
93+
wskApiHost
94+
}

core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -598,11 +598,28 @@ case class ClusterConfig(useClusterBootstrap: Boolean)
598598
* @param timeoutFactor factor to influence the timeout period for forced active acks (time-limit.std * timeoutFactor + timeoutAddon)
599599
* @param timeoutAddon extra time to influence the timeout period for forced active acks (time-limit.std * timeoutFactor + timeoutAddon)
600600
*/
601-
case class ShardingContainerPoolBalancerConfig(managedFraction: Double,
601+
case class ShardingContainerPoolBalancerConfig(strategy: ActivationStrategy,
602+
managedFraction: Double,
602603
blackboxFraction: Double,
603604
timeoutFactor: Int,
604605
timeoutAddon: FiniteDuration)
605606

607+
/**
608+
* Configuration for the annotation-based load balancer multiplexer
609+
*
610+
* @param default the default strategy to be used if nothing is configured for the given annotation
611+
* @param custom the Map of the strategy name to strategy configuration
612+
*/
613+
case class ActivationStrategy(default: String,
614+
custom: Map[String, StrategyConfig])
615+
616+
/**
617+
* Configuration for the strategy
618+
*
619+
* @param className indicates the class which will handle this strategy name
620+
*/
621+
case class StrategyConfig(className: String)
622+
606623
/**
607624
* State kept for each activation slot until completion.
608625
*

core/standalone/src/main/resources/standalone.conf

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ whisk {
3636
spi {
3737
ArtifactStoreProvider = "org.apache.openwhisk.core.database.memory.MemoryArtifactStoreProvider"
3838
MessagingProvider = "org.apache.openwhisk.connector.lean.LeanMessagingProvider"
39-
LoadBalancerProvider = "org.apache.openwhisk.core.loadBalancer.LeanBalancer"
39+
LoadBalancerProvider = "org.apache.openwhisk.core.loadBalancer.MuxBalancer"
4040
# Use cli based log store for all setups as its more stable to use
4141
# and does not require root user access
4242
LogStoreProvider = "org.apache.openwhisk.core.containerpool.docker.DockerCliLogStoreProvider"
@@ -56,6 +56,12 @@ whisk {
5656
limits-actions-invokes-concurrent = 30
5757
}
5858

59+
loadbalancer {
60+
strategy {
61+
default = "org.apache.openwhisk.core.loadBalancer.LeanBalancer"
62+
}
63+
}
64+
5965
controller {
6066
protocol = http
6167

core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.openwhisk.common.{Logging, TransactionId}
2828
import org.apache.openwhisk.core.WhiskConfig
2929
import org.apache.openwhisk.core.WhiskConfig.kafkaHosts
3030
import org.apache.openwhisk.core.entity.ControllerInstanceId
31-
import org.apache.openwhisk.core.loadBalancer.{LeanBalancer, LoadBalancer, LoadBalancerProvider}
31+
import org.apache.openwhisk.core.loadBalancer.{LoadBalancer, LoadBalancerProvider, MuxBalancer}
3232
import org.apache.openwhisk.standalone.StandaloneDockerSupport.{checkOrAllocatePort, containerName, createRunCmd}
3333

3434
import scala.concurrent.{ExecutionContext, Future}
@@ -113,12 +113,12 @@ class KafkaLauncher(docker: StandaloneDockerClient,
113113
}
114114

115115
object KafkaAwareLeanBalancer extends LoadBalancerProvider {
116-
override def requiredProperties: Map[String, String] = LeanBalancer.requiredProperties ++ kafkaHosts
116+
override def requiredProperties: Map[String, String] = MuxBalancer.requiredProperties ++ kafkaHosts
117117

118118
override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(
119119
implicit actorSystem: ActorSystem,
120120
logging: Logging,
121-
materializer: ActorMaterializer): LoadBalancer = LeanBalancer.instance(whiskConfig, instance)
121+
materializer: ActorMaterializer): LoadBalancer = MuxBalancer.instance(whiskConfig, instance)
122122
}
123123

124124
object KafkaLauncher {
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.loadBalancer.test
19+
20+
import akka.actor.ActorSystem
21+
import akka.stream.ActorMaterializer
22+
import org.apache.openwhisk.common.{Logging, TransactionId}
23+
import org.apache.openwhisk.core.WhiskConfig
24+
import org.apache.openwhisk.core.connector.ActivationMessage
25+
import org.apache.openwhisk.core.entity.{
26+
ActivationId,
27+
ControllerInstanceId,
28+
ExecManifest,
29+
ExecutableWhiskActionMetaData,
30+
UUID,
31+
WhiskActivation
32+
}
33+
import org.apache.openwhisk.core.loadBalancer.{InvokerHealth, LoadBalancer, LoadBalancerProvider}
34+
import org.apache.openwhisk.core.WhiskConfig._
35+
36+
import scala.concurrent.Future
37+
38+
class MockLoadBalancer(prefix: String) extends LoadBalancer {
39+
override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = Future.successful(IndexedSeq.empty[InvokerHealth])
40+
override def clusterSize: Int = 1
41+
override def totalActiveActivations: Future[Int] = Future.successful(1)
42+
override def activeActivationsFor(namespace: UUID): Future[Int] =
43+
Future.successful(0)
44+
override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
45+
implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
46+
Future.successful(Future.successful(Left(ActivationId(prefix + "-mockLoadBalancerId0"))))
47+
}
48+
}
49+
50+
object MockLoadBalancerCustom extends LoadBalancerProvider {
51+
override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(
52+
implicit actorSystem: ActorSystem,
53+
logging: Logging,
54+
materializer: ActorMaterializer): LoadBalancer = {
55+
56+
new MockLoadBalancer("custom")
57+
}
58+
59+
def requiredProperties =
60+
ExecManifest.requiredProperties ++
61+
wskApiHost
62+
}
63+
64+
object MockLoadBalancerDefault extends LoadBalancerProvider {
65+
override def instance(whiskConfig: WhiskConfig, instance: ControllerInstanceId)(
66+
implicit actorSystem: ActorSystem,
67+
logging: Logging,
68+
materializer: ActorMaterializer): LoadBalancer = {
69+
70+
new MockLoadBalancer("default")
71+
}
72+
73+
def requiredProperties =
74+
ExecManifest.requiredProperties ++
75+
wskApiHost
76+
}

0 commit comments

Comments
 (0)