Skip to content

Commit ee89330

Browse files
committed
prepare API forkill_workers()
1 parent 177575b commit ee89330

File tree

2 files changed

+33
-0
lines changed

2 files changed

+33
-0
lines changed

capsul/database/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,9 @@ def workers_command(self, engine_id):
166166
]
167167
return workers_command
168168

169+
def kill_worker_command(self, engine_id, worker_id):
170+
raise NotImplementedError
171+
169172
def new_execution(
170173
self, executable, engine_id, execution_context, workflow, start_time
171174
):
@@ -497,6 +500,12 @@ def worker_ended(self, engine_id, worker_id):
497500
"""
498501
raise NotImplementedError
499502

503+
def get_workers(self, engine_id):
504+
"""
505+
returns the workers IDs list
506+
"""
507+
raise NotImplementedError
508+
500509
def persistent(self, engine_id):
501510
"""
502511
Return whether an engine is persistent or not.

capsul/engine/__init__.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,8 @@ def start_workers(self):
184184
if start_count:
185185
for i in range(start_count):
186186
workers_command = self.database.workers_command(self.engine_id)
187+
# TODO: we should keep track of the running worker in order
188+
# to be able to contact / kill him later.
187189
try:
188190
subprocess.run(
189191
workers_command,
@@ -199,6 +201,28 @@ def quote(x):
199201
f'Command failed: {" ".join(quote(i) for i in workers_command)}'
200202
) from e
201203

204+
def kill_workers(self, worker_ids=None):
205+
if worker_ids is None:
206+
# kill all workers
207+
worker_ids = self.database_get_workers(self.engine_id)
208+
for worker_id in worker_ids:
209+
cmd = self.database.kill_worker_command(self.engine_id, worker_id)
210+
try:
211+
subprocess.run(
212+
cmd,
213+
capture_output=False,
214+
check=True,
215+
)
216+
except Exception as e:
217+
218+
def quote(x):
219+
return f"'{x}'"
220+
221+
raise RuntimeError(
222+
f'Command failed: {" ".join(quote(i) for i in cmd)}'
223+
) from e
224+
self.database.worker_ended(self.engine_id, worker_id)
225+
202226
def __exit__(self, exception_type, exception_value, exception_traceback):
203227
# exiting the engine disposes it from the database: executions will
204228
# be deleted from it, and later inspection will not be possible.

0 commit comments

Comments
 (0)