Skip to content

Commit 00ac46e

Browse files
authored
Gracefully end source-related threads (#53)
* Gracefully end source-related threads. * Add timer for waiting for requests before joining thread.
1 parent 429cf98 commit 00ac46e

File tree

6 files changed

+111
-39
lines changed

6 files changed

+111
-39
lines changed

rayvens/core/common.py

+76-33
Original file line numberDiff line numberDiff line change
@@ -125,22 +125,37 @@ def append(self, data):
125125
pass
126126

127127

128-
# Method for sending events to the queue.
129-
def send_to(handle, server_address, route):
130-
def append():
131-
while True:
128+
class QueueReadThread(threading.Thread):
129+
def __init__(self, handle, server_address, route):
130+
threading.Thread.__init__(self)
131+
self.stop_flag = threading.Event()
132+
self.run_flag = threading.Event()
133+
self.handle = handle
134+
self.server_address = server_address
135+
self.route = route
136+
137+
def run(self):
138+
while not self.stop_flag.is_set():
132139
try:
133-
response = requests.get(f'{server_address}{route}',
140+
response = requests.get(f'{self.server_address}{self.route}',
134141
timeout=(5, None))
135142
if response.status_code != 200:
136143
time.sleep(1)
137144
continue
138145
response.encoding = 'latin-1'
139-
handle.append.remote(response.text)
146+
self.handle.append.remote(response.text)
140147
except requests.exceptions.ConnectionError:
141148
time.sleep(1)
142149

143-
threading.Thread(target=append).start()
150+
self.run_flag.set()
151+
152+
153+
# Method for sending events to the queue.
154+
def send_to(handle, server_address, route):
155+
queue_reading_thread = QueueReadThread(handle, server_address, route)
156+
queue_reading_thread.daemon = True
157+
queue_reading_thread.start()
158+
return queue_reading_thread
144159

145160

146161
# Method for sending events to the sink.
@@ -222,30 +237,63 @@ def close_consumer(self):
222237
self.kafka_consumer.close()
223238

224239

240+
class ConsumerEnablingThread(threading.Thread):
241+
def __init__(self, handle, consumers):
242+
threading.Thread.__init__(self)
243+
self.stop_flag = threading.Event()
244+
self.run_flag = threading.Event()
245+
self.handle = handle
246+
self.consumers = consumers
247+
248+
def run(self):
249+
subscribers, operator = ray.get(self.handle._fetch_processors.remote())
250+
try:
251+
consumer_references = [
252+
consumer.enable_consumer.remote(subscribers, operator)
253+
for consumer in self.consumers
254+
]
255+
ray.get(consumer_references)
256+
except KeyboardInterrupt:
257+
for consumer in self.consumers:
258+
consumer.disable_consumer.remote()
259+
finally:
260+
for consumer in self.consumers:
261+
consumer.close_consumer.remote()
262+
263+
self.run_flag.set()
264+
265+
266+
class ConsumerThread(threading.Thread):
267+
def __init__(self, handle, consumer):
268+
threading.Thread.__init__(self)
269+
self.stop_flag = threading.Event()
270+
self.run_flag = threading.Event()
271+
self.handle = handle
272+
self.consumer = consumer
273+
274+
def run(self):
275+
while not self.stop_flag.is_set():
276+
msg = self.consumer.poll()
277+
if msg.error():
278+
print(f'consumer error: {msg.error()}')
279+
else:
280+
self.handle.append.remote(msg.value().decode('utf-8'))
281+
282+
self.run_flag.set()
283+
284+
225285
def kafka_send_to(kafka_transport_topic, kafka_transport_partitions, handle):
286+
# If source uses a scaling transport start multiple Kafka Consumers.
226287
if kafka_transport_partitions > 1:
227288
consumers = [
228289
KafkaConsumer.remote(kafka_transport_topic, handle)
229290
for _ in range(kafka_transport_partitions)
230291
]
231292

232-
def append():
233-
subscribers, operator = ray.get(handle._fetch_processors.remote())
234-
try:
235-
consumer_references = [
236-
consumer.enable_consumer.remote(subscribers, operator)
237-
for consumer in consumers
238-
]
239-
ray.get(consumer_references)
240-
except KeyboardInterrupt:
241-
for consumer in consumers:
242-
consumer.disable_consumer.remote()
243-
finally:
244-
for consumer in consumers:
245-
consumer.close_consumer.remote()
246-
247-
threading.Thread(target=append).start()
248-
return
293+
consumer_enabling_thread = ConsumerEnablingThread(handle, consumers)
294+
consumer_enabling_thread.daemon = True
295+
consumer_enabling_thread.start()
296+
return consumer_enabling_thread
249297

250298
# Use kafka consumer thread to push from camel source to rayvens stream
251299
consumer = Consumer({
@@ -256,15 +304,10 @@ def append():
256304

257305
consumer.subscribe([kafka_transport_topic])
258306

259-
def append():
260-
while True:
261-
msg = consumer.poll()
262-
if msg.error():
263-
print(f'consumer error: {msg.error()}')
264-
else:
265-
handle.append.remote(msg.value().decode('utf-8'))
266-
267-
threading.Thread(target=append).start()
307+
consumer_thread = ConsumerThread(handle, consumer)
308+
consumer_thread.daemon = True
309+
consumer_thread.start()
310+
return consumer_thread
268311

269312

270313
def kafka_recv_from(integration_name, kafka_transport_topic, handle):

rayvens/core/integration.py

+23
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
# limitations under the License.
1515
#
1616

17+
import time
1718
from rayvens.core.common import brokers
1819
from rayvens.core.utils import random_port, create_partitioned_topic
1920
from rayvens.core.name import name_integration
@@ -58,6 +59,7 @@ def __init__(self, stream_name, source_sink_name, config):
5859
self.service_name = None
5960
self.server_address = None
6061
self.environment_preparators = []
62+
self.thread = None
6163

6264
def invoke_local_run(self, mode, integration_content):
6365
self.port = random_port(mode.check_port)
@@ -87,6 +89,27 @@ def invoke_run(self, mode, integration_content):
8789
# created using the kamel operator and `kamel run` or an integration
8890
# created using `kamel local run`.
8991
def disconnect(self, mode):
92+
# Set stop_flag to true.
93+
if self.thread is not None:
94+
# Signal the end of ongoing loops.
95+
self.thread.stop_flag.set()
96+
97+
# Wait for threads to perform perform/finish the current requests.
98+
# Small 10 s timer to wait for request completion.
99+
countdown = 10
100+
while not self.thread.run_flag.is_set():
101+
time.sleep(1)
102+
if countdown == 0:
103+
break
104+
countdown -= 1
105+
106+
# Join back successful threads otherwise threads will be killed
107+
# when the application ends.
108+
if self.thread.run_flag.is_set():
109+
self.thread.join()
110+
print("Successfully terminated additional"
111+
f" {self.integration_name} integration thread.")
112+
90113
if self.invocation.uses_operator():
91114
# If kamel is running the cluster then use kamel delete to
92115
# terminate the integration. First we terminate any services

rayvens/core/kafka.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@ def add_source(self, stream, config, source_name):
3636
f'kafka:{integration.kafka_transport_topic}?brokers={brokers()}')
3737
integration.prepare_environment(self.mode)
3838
integration.invoke_local_run(self.mode, spec)
39-
kafka_send_to(integration.kafka_transport_topic,
40-
integration.kafka_transport_partitions, stream.actor)
39+
integration.thread = kafka_send_to(
40+
integration.kafka_transport_topic,
41+
integration.kafka_transport_partitions, stream.actor)
4142
return integration
4243

4344
def add_sink(self, stream, config, sink_name):

rayvens/core/local.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ def add_source(self, stream, config, source_name):
3636
inverted=True)
3737
integration.prepare_environment(self.mode)
3838
integration.invoke_local_run(self.mode, spec)
39-
send_to(stream.actor, self.mode.server_address(integration), route)
39+
integration.thread = send_to(stream.actor,
40+
self.mode.server_address(integration),
41+
route)
4042
if not await_start(self.mode, integration):
4143
raise RuntimeError('Could not start source')
4244
return integration

rayvens/core/operator.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ def add_source(self, stream, source, source_name):
5050
integration.invoke_run(self.mode, integration_content)
5151

5252
# Set up source for the HTTP connector case.
53-
send_to(stream.actor, self.mode.server_address(integration), route)
53+
integration.thread = send_to(stream.actor,
54+
self.mode.server_address(integration),
55+
route)
5456

5557
if not await_start(self.mode, integration):
5658
raise RuntimeError('Could not start source')

rayvens/core/operator_kafka.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,9 @@ def add_source(self, stream, source, source_name):
4747
integration.invoke_run(self.mode, integration_content)
4848

4949
# Set up source for the HTTP connector case.
50-
kafka_send_to(integration.kafka_transport_topic,
51-
integration.kafka_transport_partitions, stream.actor)
50+
integration.thread = kafka_send_to(
51+
integration.kafka_transport_topic,
52+
integration.kafka_transport_partitions, stream.actor)
5253

5354
if not await_start(self.mode, integration):
5455
raise RuntimeError('Could not start source')

0 commit comments

Comments
 (0)