Skip to content

Commit 61fca1c

Browse files
authored
Clean up cluster process reaping (#6840)
1 parent 7768f6c commit 61fca1c

File tree

1 file changed

+12
-14
lines changed

1 file changed

+12
-14
lines changed

distributed/utils_test.py

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -568,14 +568,14 @@ def security():
568568
return tls_only_security()
569569

570570

571-
def _kill_join(proc, timeout):
572-
proc.kill()
573-
proc.join(timeout)
574-
if proc.is_alive():
575-
raise multiprocessing.TimeoutError(
576-
f"Process {proc} did not shut down within {timeout}s"
577-
)
578-
proc.close()
571+
def _kill_join_processes(processes):
572+
# Join may hang or cause issues, so make sure all are killed first.
573+
# Note that we don't use a timeout, but rely on the overall pytest timeout.
574+
for proc in processes:
575+
proc.kill()
576+
for proc in processes:
577+
proc.join()
578+
proc.close()
579579

580580

581581
def _close_queue(q):
@@ -590,15 +590,13 @@ def cluster(
590590
nanny=False,
591591
worker_kwargs=None,
592592
active_rpc_timeout=10,
593-
shutdown_timeout=20,
594593
scheduler_kwargs=None,
595594
config=None,
596595
):
597596
worker_kwargs = worker_kwargs or {}
598597
scheduler_kwargs = scheduler_kwargs or {}
599598
config = config or {}
600599

601-
ws = weakref.WeakSet()
602600
enable_proctitle_on_children()
603601

604602
with check_process_leak(check=True), check_instances(), config_for_cluster_tests():
@@ -608,6 +606,8 @@ def cluster(
608606
_run_worker = run_worker
609607

610608
with contextlib.ExitStack() as stack:
609+
processes = []
610+
stack.callback(_kill_join_processes, processes)
611611
# The scheduler queue will receive the scheduler's address
612612
scheduler_q = get_mp_context().Queue()
613613
stack.callback(_close_queue, scheduler_q)
@@ -620,9 +620,8 @@ def cluster(
620620
kwargs=scheduler_kwargs,
621621
daemon=True,
622622
)
623-
ws.add(scheduler)
624623
scheduler.start()
625-
stack.callback(_kill_join, scheduler, shutdown_timeout)
624+
processes.append(scheduler)
626625

627626
# Launch workers
628627
workers_by_pid = {}
@@ -642,9 +641,8 @@ def cluster(
642641
args=(q, scheduler_q, config),
643642
kwargs=kwargs,
644643
)
645-
ws.add(proc)
646644
proc.start()
647-
stack.callback(_kill_join, proc, shutdown_timeout)
645+
processes.append(proc)
648646
workers_by_pid[proc.pid] = {"proc": proc}
649647

650648
saddr_or_exception = scheduler_q.get()

0 commit comments

Comments
 (0)