Skip to content

Commit 7494398

Browse files
committed
Use harness to manage kamel termination (#56)
1 parent fe9fb86 commit 7494398

File tree

2 files changed

+35
-19
lines changed

2 files changed

+35
-19
lines changed

rayvens/core/harness.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
#
2+
# Copyright IBM Corporation 2021
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
#
16+
17+
import os
18+
import psutil
19+
import signal
20+
import subprocess
21+
import sys
22+
23+
# start child process
24+
process = subprocess.Popen(sys.argv[1:], start_new_session=True)
25+
26+
# wait for parent process
27+
psutil.wait_procs([psutil.Process().parent()])
28+
29+
# kill child process
30+
os.killpg(os.getpgid(process.pid), signal.SIGKILL)

rayvens/core/impl.py

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
# limitations under the License.
1515
#
1616

17-
import atexit
1817
import os
1918
import ray
2019
import signal
@@ -26,20 +25,12 @@
2625
import random
2726
from rayvens.core.validation import Validation
2827
import rayvens.core.catalog as catalog
29-
30-
integrations = []
31-
32-
33-
def killall():
34-
global integrations
35-
for integration in integrations:
36-
integration.cancel()
28+
import sys
3729

3830

3931
# instantiate camel actor manager and setup exit hook
4032
def start(prefix, mode):
4133
camel = Camel(mode)
42-
atexit.register(camel.killall)
4334
return camel
4435

4536

@@ -92,10 +83,6 @@ def await_start(self, integration_name):
9283
def await_start_all(self, stream):
9384
return True
9485

95-
def killall(self):
96-
for stream in self.streams:
97-
stream._exec.remote(killall)
98-
9986

10087
# the low-level implementation-specific stuff
10188

@@ -126,15 +113,14 @@ def __init__(self, name, spec):
126113
with open(filename, 'w') as f:
127114
yaml.dump(spec, f)
128115
self.port = random_port()
116+
harness = os.path.join(os.path.dirname(__file__), 'harness.py')
129117
queue = os.path.join(os.path.dirname(__file__), 'Queue.java')
130118
command = [
131-
'kamel', 'local', 'run', queue, '--property',
132-
f'quarkus.http.port={self.port}', filename
119+
sys.executable, harness, 'kamel', 'local', 'run', queue,
120+
'--property', f'quarkus.http.port={self.port}', filename
133121
]
134122
process = subprocess.Popen(command, start_new_session=True)
135123
self.pid = process.pid
136-
global integrations
137-
integrations.append(self)
138124

139125
def send_to(self, stream):
140126
def append():
@@ -158,6 +144,6 @@ def recv_from(self, stream):
158144

159145
def cancel(self):
160146
try:
161-
os.killpg(os.getpgid(self.pid), signal.SIGTERM)
147+
os.kill(self.pid, signal.SIGTERM)
162148
except ProcessLookupError:
163149
pass

0 commit comments

Comments
 (0)