Skip to content

Commit 91114f7

Browse files
committed
call exit callback even if AsyncProcess is reaped elsewhere
1 parent d88c1d2 commit 91114f7

File tree

3 files changed

+80
-14
lines changed

3 files changed

+80
-14
lines changed

distributed/process.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -232,9 +232,11 @@ def _start():
232232
def _watch_process(cls, selfref, process, state, q):
233233
r = repr(selfref())
234234
process.join()
235-
exitcode = process.exitcode
236-
assert exitcode is not None
237-
logger.debug("[%s] process %r exited with code %r", r, state.pid, exitcode)
235+
exitcode = original_exit_code = process.exitcode
236+
if exitcode is None:
237+
# The child process is already reaped
238+
# (may happen if waitpid() is called elsewhere).
239+
exitcode = 255
238240
state.is_alive = False
239241
state.exitcode = exitcode
240242
# Make sure the process is removed from the global list
@@ -247,6 +249,16 @@ def _watch_process(cls, selfref, process, state, q):
247249
finally:
248250
self = None # lose reference
249251

252+
# logging may fail - defer calls to after the callback is added
253+
if original_exit_code is None:
254+
logger.warning(
255+
"[%s] process %r exit status was already read will report exitcode 255",
256+
r,
257+
state.pid,
258+
)
259+
else:
260+
logger.debug("[%s] process %r exited with code %r", r, state.pid, exitcode)
261+
250262
def start(self):
251263
"""
252264
Start the child process.

distributed/tests/test_asyncprocess.py

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import concurrent.futures
45
import gc
6+
import multiprocessing
57
import os
68
import signal
9+
import socket
710
import sys
811
import threading
912
import weakref
@@ -12,11 +15,9 @@
1215

1316
import psutil
1417
import pytest
15-
from tornado import gen
1618
from tornado.ioloop import IOLoop
17-
from tornado.locks import Event
1819

19-
from distributed.compatibility import LINUX, MACOS, WINDOWS
20+
from distributed.compatibility import LINUX, MACOS, WINDOWS, to_thread
2021
from distributed.metrics import time
2122
from distributed.process import AsyncProcess
2223
from distributed.utils import get_mp_context
@@ -238,13 +239,9 @@ async def test_close():
238239
async def test_exit_callback():
239240
to_child = get_mp_context().Queue()
240241
from_child = get_mp_context().Queue()
241-
evt = Event()
242+
evt = asyncio.Event()
242243

243-
# FIXME: this breaks if changed to async def...
244-
@gen.coroutine
245244
def on_stop(_proc):
246-
assert _proc is proc
247-
yield gen.moment
248245
evt.set()
249246

250247
# Normal process exit
@@ -275,10 +272,69 @@ def on_stop(_proc):
275272
assert not evt.is_set()
276273

277274
await proc.terminate()
278-
await evt.wait(timedelta(seconds=5))
275+
await asyncio.wait_for(evt.wait(), 5)
279276
assert evt.is_set()
280277

281278

279+
def _run_and_close_tornado(async_fn, /, *args, **kwargs):
280+
tornado_loop = None
281+
282+
async def inner_fn():
283+
nonlocal tornado_loop
284+
tornado_loop = IOLoop.current()
285+
return await async_fn(*args, **kwargs)
286+
287+
try:
288+
return asyncio.run(inner_fn())
289+
finally:
290+
tornado_loop.close(all_fds=True)
291+
292+
293+
def _write_byte_wait_closed(sock):
294+
with sock:
295+
sock.send(b"\x00")
296+
sock.recv(1)
297+
298+
299+
async def _check_process_reaped_elsewhere():
300+
loop = asyncio.get_running_loop()
301+
302+
def psutil_terminate(pid):
303+
proc = psutil.Process(pid)
304+
proc.terminate()
305+
proc.wait()
306+
307+
a, b = socket.socketpair()
308+
with a:
309+
with b:
310+
proc = AsyncProcess(
311+
target=_write_byte_wait_closed, args=(b,), loop=IOLoop.current()
312+
)
313+
await proc.start()
314+
315+
a.setblocking(False)
316+
assert await loop.sock_recv(a, 1) == b"\00"
317+
await to_thread(psutil_terminate, proc.pid)
318+
await proc.join()
319+
return proc.exitcode
320+
321+
322+
def test_process_reaped_elsewhere(cleanup):
323+
with concurrent.futures.ProcessPoolExecutor(
324+
max_workers=1, mp_context=multiprocessing.get_context("spawn")
325+
) as pool:
326+
# this needs to be run in a process pool because reaping a process
327+
# outside multiprocessing causes it to remain in
328+
# multiprocessing.active_children() forever - which blocks cleanup for
329+
# 40 seconds
330+
assert (
331+
pool.submit(
332+
_run_and_close_tornado, _check_process_reaped_elsewhere
333+
).result()
334+
== 255
335+
)
336+
337+
282338
@gen_test()
283339
async def test_child_main_thread():
284340
"""

setup.cfg

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,6 @@ filterwarnings =
7979
ignore:Scheduler already contains a plugin with name nonidempotentplugin. overwriting:UserWarning
8080
ignore:Increasing number of chunks by factor of 20:dask.array.core.PerformanceWarning
8181
ignore::distributed.versions.VersionMismatchWarning
82-
ignore:(?s)Exception in thread AsyncProcess Dask Worker process \(from Nanny\) watch process join.*assert exitcode is not None:pytest.PytestUnhandledThreadExceptionWarning
83-
ignore:(?s)Exception in thread AsyncProcess SpawnProcess-\d+ watch process join.*assert exitcode is not None:pytest.PytestUnhandledThreadExceptionWarning
8482
ignore:(?s)Exception in thread.*old_ssh.*channel\.send\(b"\\x03"\).*Socket is closed:pytest.PytestUnhandledThreadExceptionWarning
8583
ignore:(?s)Exception in thread.*paramiko\.ssh_exception\.NoValidConnectionsError:pytest.PytestUnhandledThreadExceptionWarning
8684
ignore:(?s)Exception ignored in. <Finalize object, dead>.*sem_unlink.*FileNotFoundError:pytest.PytestUnraisableExceptionWarning

0 commit comments

Comments
 (0)