diff --git a/distributed/process.py b/distributed/process.py index c73c9bfe17b..0f0a6a50720 100644 --- a/distributed/process.py +++ b/distributed/process.py @@ -232,9 +232,11 @@ def _start(): def _watch_process(cls, selfref, process, state, q): r = repr(selfref()) process.join() - exitcode = process.exitcode - assert exitcode is not None - logger.debug("[%s] process %r exited with code %r", r, state.pid, exitcode) + exitcode = original_exit_code = process.exitcode + if exitcode is None: + # The child process is already reaped + # (may happen if waitpid() is called elsewhere). + exitcode = 255 state.is_alive = False state.exitcode = exitcode # Make sure the process is removed from the global list @@ -247,6 +249,16 @@ def _watch_process(cls, selfref, process, state, q): finally: self = None # lose reference + # logging may fail - defer calls to after the callback is added + if original_exit_code is None: + logger.warning( + "[%s] process %r exit status was already read will report exitcode 255", + r, + state.pid, + ) + else: + logger.debug("[%s] process %r exited with code %r", r, state.pid, exitcode) + def start(self): """ Start the child process. diff --git a/distributed/tests/test_asyncprocess.py b/distributed/tests/test_asyncprocess.py index 65aa0e303cc..fb4906e4c55 100644 --- a/distributed/tests/test_asyncprocess.py +++ b/distributed/tests/test_asyncprocess.py @@ -7,14 +7,11 @@ import sys import threading import weakref -from datetime import timedelta from time import sleep import psutil import pytest -from tornado import gen from tornado.ioloop import IOLoop -from tornado.locks import Event from distributed.compatibility import LINUX, MACOS, WINDOWS from distributed.metrics import time @@ -238,13 +235,9 @@ async def test_close(): async def test_exit_callback(): to_child = get_mp_context().Queue() from_child = get_mp_context().Queue() - evt = Event() + evt = asyncio.Event() - # FIXME: this breaks if changed to async def... - @gen.coroutine def on_stop(_proc): - assert _proc is proc - yield gen.moment evt.set() # Normal process exit @@ -259,7 +252,7 @@ def on_stop(_proc): assert not evt.is_set() to_child.put(None) - await evt.wait(timedelta(seconds=5)) + await asyncio.wait_for(evt.wait(), 5) assert evt.is_set() assert not proc.is_alive() @@ -275,7 +268,7 @@ def on_stop(_proc): assert not evt.is_set() await proc.terminate() - await evt.wait(timedelta(seconds=5)) + await asyncio.wait_for(evt.wait(), 5) assert evt.is_set() diff --git a/setup.cfg b/setup.cfg index bc76731dde2..08e404fbddc 100644 --- a/setup.cfg +++ b/setup.cfg @@ -79,8 +79,6 @@ filterwarnings = ignore:Scheduler already contains a plugin with name nonidempotentplugin. overwriting:UserWarning ignore:Increasing number of chunks by factor of 20:dask.array.core.PerformanceWarning ignore::distributed.versions.VersionMismatchWarning - ignore:(?s)Exception in thread AsyncProcess Dask Worker process \(from Nanny\) watch process join.*assert exitcode is not None:pytest.PytestUnhandledThreadExceptionWarning - ignore:(?s)Exception in thread AsyncProcess SpawnProcess-\d+ watch process join.*assert exitcode is not None:pytest.PytestUnhandledThreadExceptionWarning ignore:(?s)Exception in thread.*old_ssh.*channel\.send\(b"\\x03"\).*Socket is closed:pytest.PytestUnhandledThreadExceptionWarning ignore:(?s)Exception in thread.*paramiko\.ssh_exception\.NoValidConnectionsError:pytest.PytestUnhandledThreadExceptionWarning ignore:(?s)Exception ignored in. .*sem_unlink.*FileNotFoundError:pytest.PytestUnraisableExceptionWarning