Skip to content

Commit 27e224e

Browse files
AlexTatemr-c
authored andcommitted
Propagation of the WorkflowKillSwitch exception stops once it reaches an executor. The workflow_eval_lock release had to be moved to the finally block in MultithreadedJobExecutor.run_jobs(). Otherwise, TaskQueue threads running MultithreadedJobExecutor._runner() will never join() because _runner() waits indefinitely for the workflow_eval_lock in its own finally block.
1 parent b000b33 commit 27e224e

File tree

1 file changed

+12
-3
lines changed

1 file changed

+12
-3
lines changed

cwltool/executors.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from .context import RuntimeContext, getdefault
3030
from .cuda import cuda_version_and_device_count
3131
from .cwlprov.provenance_profile import ProvenanceProfile
32-
from .errors import WorkflowException
32+
from .errors import WorkflowException, WorkflowKillSwitch
3333
from .job import JobBase
3434
from .loghandler import _logger
3535
from .mutation import MutationManager
@@ -260,6 +260,11 @@ def run_jobs(
260260
WorkflowException,
261261
): # pylint: disable=try-except-raise
262262
raise
263+
except WorkflowKillSwitch as err:
264+
_logger.error(
265+
f"Workflow kill switch activated by [job {err.job_id}] "
266+
f"because on-error={runtime_context.on_error}"
267+
)
263268
except Exception as err:
264269
logger.exception("Got workflow error")
265270
raise WorkflowException(str(err)) from err
@@ -332,6 +337,11 @@ def _runner(
332337
except WorkflowException as err:
333338
_logger.exception(f"Got workflow error: {err}")
334339
self.exceptions.append(err)
340+
except WorkflowKillSwitch as err:
341+
_logger.error(
342+
f"Workflow kill switch activated by [job {err.job_id}] "
343+
f"because on-error={runtime_context.on_error}"
344+
)
335345
except Exception as err: # pylint: disable=broad-except
336346
_logger.exception(f"Got workflow error: {err}")
337347
self.exceptions.append(WorkflowException(str(err)))
@@ -466,9 +476,8 @@ def run_jobs(
466476
while self.taskqueue.in_flight > 0:
467477
self.wait_for_next_completion(runtime_context)
468478
self.run_job(None, runtime_context)
469-
470-
runtime_context.workflow_eval_lock.release()
471479
finally:
480+
runtime_context.workflow_eval_lock.release()
472481
self.taskqueue.drain()
473482
self.taskqueue.join()
474483

0 commit comments

Comments
 (0)