Skip to content

Commit 8d9151f

Browse files
authored
Add event stream batching (#62)
* Add support for batching events at stream level. * Add example with both batching and multitask operators. * Add batching timeout. * Add HTTP and slack combined batching examples.
1 parent 833dd62 commit 8d9151f

File tree

5 files changed

+313
-11
lines changed

5 files changed

+313
-11
lines changed

examples/batching/batching.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
#
2+
# Copyright IBM Corporation 2021
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
import ray
18+
import rayvens
19+
import sys
20+
21+
# Send message to Slack sink using a stream which batches events.
22+
23+
# Command line arguments and validation:
24+
if len(sys.argv) < 4:
25+
print(f'usage: {sys.argv[0]} <slack_channel> <slack_webhook> <run_mode>')
26+
sys.exit(1)
27+
slack_channel = sys.argv[1]
28+
slack_webhook = sys.argv[2]
29+
run_mode = sys.argv[3]
30+
if run_mode not in ['local', 'mixed', 'operator']:
31+
raise RuntimeError(f'Invalid run mode provided: {run_mode}')
32+
33+
# Initialize ray either on the cluster or locally otherwise.
34+
if run_mode == 'operator':
35+
ray.init(address='auto')
36+
else:
37+
ray.init()
38+
39+
# Start rayvens in operator mode.
40+
rayvens.init(mode=run_mode)
41+
42+
# Create stream.
43+
stream = rayvens.Stream('slack', batch_size=2)
44+
45+
46+
# Operator task:
47+
@ray.remote
48+
def batching_operator(incoming_events):
49+
print(incoming_events)
50+
return " ".join(incoming_events)
51+
52+
53+
# Event sink config.
54+
sink_config = dict(kind='slack-sink',
55+
route='/toslack',
56+
channel=slack_channel,
57+
webhook_url=slack_webhook)
58+
59+
# Add sink to stream.
60+
sink = stream.add_sink(sink_config)
61+
62+
# Add multi-task operator to stream.
63+
stream.add_operator(batching_operator)
64+
65+
# Sends messages to all sinks attached to this stream.
66+
stream << "Hello"
67+
stream << "World"
68+
stream << "Hello"
69+
stream << "Mars"
70+
stream << "Hello"
71+
stream << "Jupiter"
72+
73+
# Disconnect any sources or sinks attached to the stream 2 seconds after
74+
# the stream is idle (i.e. no events were propagated by the stream).
75+
stream.disconnect_all(after_idle_for=2)

examples/batching/batching_source.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
#
2+
# Copyright IBM Corporation 2021
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
import ray
18+
import rayvens
19+
import sys
20+
import json
21+
22+
# Send message to Slack sink using a stream which batches events received
23+
# from an HTTP source.
24+
25+
# Command line arguments and validation:
26+
if len(sys.argv) < 4:
27+
print(f'usage: {sys.argv[0]} <slack_channel> <slack_webhook> <run_mode>')
28+
sys.exit(1)
29+
slack_channel = sys.argv[1]
30+
slack_webhook = sys.argv[2]
31+
run_mode = sys.argv[3]
32+
if run_mode not in ['local', 'mixed', 'operator']:
33+
raise RuntimeError(f'Invalid run mode provided: {run_mode}')
34+
35+
# Initialize ray either on the cluster or locally otherwise.
36+
if run_mode == 'operator':
37+
ray.init(address='auto')
38+
else:
39+
ray.init()
40+
41+
# Start rayvens in operator mode.
42+
rayvens.init(mode=run_mode)
43+
44+
# Create stream.
45+
stream = rayvens.Stream('slack', batch_size=2)
46+
47+
48+
# Operator task:
49+
@ray.remote
50+
def batching_operator(incoming_events):
51+
quotes = []
52+
for event in incoming_events:
53+
payload = json.loads(event) # parse event string to json
54+
quotes.append(
55+
str(payload['quoteResponse']['result'][0]['regularMarketPrice']))
56+
return " ".join(quotes)
57+
58+
59+
# Event sink config.
60+
sink_config = dict(kind='slack-sink',
61+
route='/toslack',
62+
channel=slack_channel,
63+
webhook_url=slack_webhook)
64+
65+
# Add sink to stream.
66+
sink = stream.add_sink(sink_config)
67+
68+
# Add multi-task operator to stream.
69+
stream.add_operator(batching_operator)
70+
71+
# Event source config.
72+
source_config = dict(
73+
kind='http-source',
74+
name='source-1',
75+
url='https://query1.finance.yahoo.com/v7/finance/quote?symbols=AAPL',
76+
route='/from-http',
77+
period=3000)
78+
79+
# Attach source to stream.
80+
source = stream.add_source(source_config)
81+
82+
# Disconnect any sources or sinks attached to the stream 2 seconds after
83+
# the stream is idle (i.e. no events were propagated by the stream).
84+
stream.disconnect_all(after=10)
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
#
2+
# Copyright IBM Corporation 2021
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
import ray
18+
import rayvens
19+
import sys
20+
21+
# Send message to Slack sink using a stream which batches events and a
22+
# multi-task operator.
23+
24+
# Command line arguments and validation:
25+
if len(sys.argv) < 4:
26+
print(f'usage: {sys.argv[0]} <slack_channel> <slack_webhook> <run_mode>')
27+
sys.exit(1)
28+
slack_channel = sys.argv[1]
29+
slack_webhook = sys.argv[2]
30+
run_mode = sys.argv[3]
31+
if run_mode not in ['local', 'mixed', 'operator']:
32+
raise RuntimeError(f'Invalid run mode provided: {run_mode}')
33+
34+
# Initialize ray either on the cluster or locally otherwise.
35+
if run_mode == 'operator':
36+
ray.init(address='auto')
37+
else:
38+
ray.init()
39+
40+
# Start rayvens in operator mode.
41+
rayvens.init(mode=run_mode)
42+
43+
# Create stream.
44+
stream = rayvens.Stream('slack', batch_size=6)
45+
46+
47+
# Operator sub-task:
48+
@ray.remote
49+
def sub_task(context, sub_event):
50+
context.publish(sub_event)
51+
52+
53+
# Operator task:
54+
@ray.remote
55+
def batching_multi_task_operator(context, incoming_events):
56+
for sub_event in incoming_events:
57+
sub_task.remote(context, sub_event)
58+
59+
60+
# Event sink config.
61+
sink_config = dict(kind='slack-sink',
62+
route='/toslack',
63+
channel=slack_channel,
64+
webhook_url=slack_webhook)
65+
66+
# Add sink to stream.
67+
sink = stream.add_sink(sink_config)
68+
69+
# Add multi-task operator to stream.
70+
stream.add_multitask_operator(batching_multi_task_operator)
71+
72+
# Sends messages to all sinks attached to this stream.
73+
stream << "Hello"
74+
stream << "World"
75+
stream << "Hello"
76+
stream << "Mars"
77+
stream << "Hello"
78+
stream << "Jupiter"
79+
80+
# Disconnect any sources or sinks attached to the stream 2 seconds after
81+
# the stream is idle (i.e. no events were propagated by the stream).
82+
stream.disconnect_all(after_idle_for=2)

rayvens/api.py

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from rayvens.core.ray_serve import start as start_operator_ray_serve
2626
from rayvens.core.name import name_source, name_sink
2727
from rayvens.core.verify import verify_do
28+
from queue import Queue
2829

2930

3031
class Stream:
@@ -33,13 +34,14 @@ def __init__(self,
3334
actor_options=None,
3435
operator=None,
3536
source_config=None,
36-
sink_config=None):
37+
sink_config=None,
38+
batch_size=None):
3739
if _global_camel is None:
3840
raise RuntimeError(
3941
"Rayvens has not been started. Start with 'rayvens.init()'.")
4042
self.name = name
4143
self.actor = StreamActor.options(actor_options).remote(
42-
name, operator=operator)
44+
name, operator=operator, batch_size=batch_size)
4345
if sink_config is not None:
4446
self.add_sink(sink_config)
4547
if source_config is not None:
@@ -53,11 +55,12 @@ def append(self, data):
5355
self.actor.append.remote(data)
5456
return self
5557

56-
def add_operator(self, operator):
57-
return ray.get(self.actor.add_operator.remote(operator))
58+
def add_operator(self, operator, batch_size=None):
59+
return ray.get(self.actor.add_operator.remote(operator, batch_size))
5860

59-
def add_multi_operator(self, operator):
60-
return ray.get(self.actor.add_multi_operator.remote(operator))
61+
def add_multitask_operator(self, operator, batch_size=None):
62+
return ray.get(
63+
self.actor.add_multitask_operator.remote(operator, batch_size))
6164

6265
def add_source(self, source_config):
6366
return ray.get(self.actor.add_source.remote(self, source_config))
@@ -116,13 +119,14 @@ def _idle_time(self):
116119

117120

118121
class StreamContext:
119-
def __init__(self):
122+
def __init__(self, stream_batch_size):
120123
self.sink_restrictions = {}
121124
self.subscribers = {}
122125
self.latest_sent_event_timestamp = None
123126
self.event_counter = 0
124127
self.limit_subscribers = False
125128
self.is_multi_operator = False
129+
self.stream_batch_size = stream_batch_size
126130

127131
def publish(self, data):
128132
if data is not None:
@@ -148,12 +152,15 @@ def _accepts_data_type(self, data, type_restrictions):
148152

149153
@ray.remote(num_cpus=0)
150154
class StreamActor:
151-
def __init__(self, name, operator=None):
155+
def __init__(self, name, operator=None, batch_size=None):
152156
self.name = name
153157
self._operator = operator
154158
self._sources = {}
155159
self._sinks = {}
156-
self.context = StreamContext()
160+
self.context = StreamContext(batch_size)
161+
self.queue = None
162+
if self.context.stream_batch_size is not None:
163+
self.queue = Queue(maxsize=self.context.stream_batch_size)
157164

158165
def send_to(self, subscriber, name=None):
159166
if self.context.limit_subscribers:
@@ -168,17 +175,26 @@ def send_to(self, subscriber, name=None):
168175
def append(self, data):
169176
if data is None:
170177
return
178+
if self.queue is not None:
179+
self.queue.put_nowait(data)
180+
if not self.queue.full():
181+
return
182+
data = list(self.queue.queue)
183+
with self.queue.mutex:
184+
self.queue.queue.clear()
171185
if self._operator is not None:
172186
data = _eval(self.context, self._operator, data)
173187
self.context.publish(data)
174188
self.context.latest_sent_event_timestamp = time.time()
175189
self.context.event_counter += 1
176190

177-
def add_operator(self, operator):
191+
def add_operator(self, operator, batch_size):
178192
self._operator = operator
193+
self.context.batch_size = batch_size
179194

180-
def add_multitask_operator(self, operator):
195+
def add_multitask_operator(self, operator, batch_size):
181196
self._operator = operator
197+
self.context.batch_size = batch_size
182198
self.context.is_multi_operator = True
183199

184200
def add_source(self, stream, source_config):
@@ -229,13 +245,29 @@ def disconnect_sink(self, sink_name):
229245
def disconnect_all(self, stream_drain_timeout):
230246
for source_name in dict(self._sources):
231247
self.disconnect_source(source_name)
248+
if self.queue is not None:
249+
self.flush_batch()
232250
time.sleep(stream_drain_timeout)
233251
for sink_name in dict(self._sinks):
234252
self.disconnect_sink(sink_name)
235253

236254
def event_count(self):
237255
return self.context.event_counter
238256

257+
def flush_batch(self):
258+
data = None
259+
if self.queue is not None:
260+
data = list(self.queue.queue)
261+
with self.queue.mutex:
262+
self.queue.queue.clear()
263+
if data is None:
264+
return
265+
if self._operator is not None:
266+
data = _eval(self.context, self._operator, data)
267+
self.context.publish(data)
268+
self.context.latest_sent_event_timestamp = time.time()
269+
self.context.event_counter += 1
270+
239271
def _meta(self, action, *args, **kwargs):
240272
return verify_do(self, _global_camel, action, *args, **kwargs)
241273

0 commit comments

Comments
 (0)