Skip to content

Commit ed58b23

Browse files
[New Scheduler] Add a centralized watcher for etcd data (#5069)
* Add a centralized watcher for etcd data * Fix class name format * Fix compilation error * Enable test for WatcherService
1 parent 4a13303 commit ed58b23

File tree

3 files changed

+456
-0
lines changed

3 files changed

+456
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.openwhisk.core.service
19+
20+
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
21+
import com.ibm.etcd.api.Event.EventType
22+
import com.ibm.etcd.client.kv.WatchUpdate
23+
import org.apache.openwhisk.common.Logging
24+
import org.apache.openwhisk.core.etcd.EtcdClient
25+
import org.apache.openwhisk.core.etcd.EtcdType._
26+
import scala.collection.JavaConverters._
27+
import scala.collection.concurrent.TrieMap
28+
29+
// messages received by this actor
30+
case class WatchEndpoint(key: String,
31+
value: String,
32+
isPrefix: Boolean,
33+
name: String,
34+
listenEvents: Set[EtcdEvent] = Set.empty)
35+
case class UnwatchEndpoint(watchKey: String, isPrefix: Boolean, watchName: String, needFeedback: Boolean = false)
36+
37+
// the watchKey is the string user want to watch, it can be a prefix, the key is a record's key in Etcd
38+
// so if `isPrefix = true`, the `watchKey != key`, else the `watchKey == key`
39+
sealed abstract class WatchEndpointOperation(val watchKey: String,
40+
val key: String,
41+
val value: String,
42+
val isPrefix: Boolean)
43+
case class WatchEndpointRemoved(override val watchKey: String,
44+
override val key: String,
45+
override val value: String,
46+
override val isPrefix: Boolean)
47+
extends WatchEndpointOperation(watchKey, key, value, isPrefix)
48+
case class WatchEndpointInserted(override val watchKey: String,
49+
override val key: String,
50+
override val value: String,
51+
override val isPrefix: Boolean)
52+
extends WatchEndpointOperation(watchKey, key, value, isPrefix)
53+
case class WatcherClosed(key: String, isPrefix: Boolean)
54+
55+
sealed trait EtcdEvent
56+
case object PutEvent extends EtcdEvent
57+
case object DeleteEvent extends EtcdEvent
58+
59+
// there may be several watchers for a same watcher key, so add a watcherName to distinguish them
60+
case class WatcherKey(watchKey: String, watchName: String)
61+
62+
class WatcherService(etcdClient: EtcdClient)(implicit logging: Logging, actorSystem: ActorSystem) extends Actor {
63+
64+
implicit val ec = context.dispatcher
65+
66+
private[service] val putWatchers = TrieMap[WatcherKey, ActorRef]()
67+
private[service] val deleteWatchers = TrieMap[WatcherKey, ActorRef]()
68+
private[service] val prefixPutWatchers = TrieMap[WatcherKey, ActorRef]()
69+
private[service] val prefixDeleteWatchers = TrieMap[WatcherKey, ActorRef]()
70+
71+
private val watcher = etcdClient.watchAllKeys { res: WatchUpdate =>
72+
res.getEvents.asScala.foreach { event =>
73+
event.getType match {
74+
case EventType.DELETE =>
75+
val key = ByteStringToString(event.getPrevKv.getKey)
76+
val value = ByteStringToString(event.getPrevKv.getValue)
77+
val watchEvent = WatchEndpointRemoved(key, key, value, false)
78+
deleteWatchers
79+
.foreach { watcher =>
80+
if (watcher._1.watchKey == key) {
81+
watcher._2 ! watchEvent
82+
}
83+
}
84+
prefixDeleteWatchers
85+
.foreach { watcher =>
86+
if (key.startsWith(watcher._1.watchKey)) {
87+
watcher._2 ! WatchEndpointRemoved(watcher._1.watchKey, key, value, true)
88+
}
89+
}
90+
case EventType.PUT =>
91+
val key = ByteStringToString(event.getKv.getKey)
92+
val value = ByteStringToString(event.getKv.getValue)
93+
val watchEvent = WatchEndpointInserted(key, key, value, false)
94+
putWatchers
95+
.foreach { watcher =>
96+
if (watcher._1.watchKey == key) {
97+
watcher._2 ! watchEvent
98+
}
99+
}
100+
prefixPutWatchers
101+
.foreach { watcher =>
102+
if (key.startsWith(watcher._1.watchKey)) {
103+
watcher._2 ! WatchEndpointInserted(watcher._1.watchKey, key, value, true)
104+
}
105+
}
106+
case msg =>
107+
logging.debug(this, s"watch event received: $msg.")
108+
}
109+
}
110+
111+
}
112+
113+
override def receive: Receive = {
114+
case request: WatchEndpoint =>
115+
logging.info(this, s"watch endpoint: $request")
116+
val watcherKey = WatcherKey(request.key, request.name)
117+
if (request.listenEvents.contains(PutEvent))
118+
if (request.isPrefix)
119+
prefixPutWatchers.update(watcherKey, sender())
120+
else
121+
putWatchers.update(watcherKey, sender())
122+
123+
if (request.listenEvents.contains(DeleteEvent))
124+
if (request.isPrefix)
125+
prefixDeleteWatchers.update(watcherKey, sender())
126+
else
127+
deleteWatchers.update(watcherKey, sender())
128+
129+
case request: UnwatchEndpoint =>
130+
val watcherKey = WatcherKey(request.watchKey, request.watchName)
131+
if (request.isPrefix) {
132+
prefixPutWatchers.remove(watcherKey)
133+
prefixDeleteWatchers.remove(watcherKey)
134+
} else {
135+
putWatchers.remove(watcherKey)
136+
deleteWatchers.remove(watcherKey)
137+
}
138+
139+
// always send WatcherClosed back to sender if it need a feedback
140+
if (request.needFeedback)
141+
sender ! WatcherClosed(request.watchKey, request.isPrefix)
142+
}
143+
}
144+
145+
object WatcherService {
146+
def props(etcdClient: EtcdClient)(implicit logging: Logging, actorSystem: ActorSystem): Props = {
147+
Props(new WatcherService(etcdClient))
148+
}
149+
}

tests/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ ext.testSets = [
9090
"includes" : [
9191
"org/apache/openwhisk/common/etcd/**",
9292
"org/apache/openwhisk/core/scheduler/**",
93+
"org/apache/openwhisk/core/service/**",
9394
]
9495
],
9596
"REQUIRE_MULTI_RUNTIME" : [

0 commit comments

Comments
 (0)