Skip to content

Race condition connecting to vs shutting down subprocesses in tests using cluster()  #6828

Closed
@gjoseph92

Description

@gjoseph92

The utils_test.cluster() contextmanager creates a lightweight cluster using subprocesses. It's used in a number of tests directly, as well as via the client, a, b, etc. fixtures.

In a finally block of the contextmanager, it tries to open an RPC to all subprocesses that are still alive, and use that RPC to call close on the server.

However, a few tests have a pattern where they call terminate on one of the processes themselves right before exiting:

def test_submit_after_failed_worker_sync(loop):
with cluster() as (s, [a, b]):
with Client(s["address"], loop=loop) as c:
L = c.map(inc, range(10))
wait(L)
a["proc"]().terminate()
total = c.submit(sum, L)
assert total.result() == sum(map(inc, range(10)))

Subprocess.terminate just sends SIGTERM; it doesn't block until the process has actually shut down. So what can happen:

  1. The test calls terminate on worker A, but the process is still running
  2. The test finishes and returns control to the contextmanager, which looks at which workers are still alive:
    alive_workers = [
    w["address"]
    for w in workers_by_pid.values()
    if w["proc"].is_alive()
    ]
    Worker A is still alive, so it's in the list.
  3. Worker A actually shuts down
  4. Calling the terminate RPC times out trying to connect to the now-dead worker A
    async with rpc(addr, **rpc_kwargs) as w:
    # If the worker was killed hard (e.g. sigterm) during test runtime,
    # we do not know at this point and may not be able to connect
    with suppress(EnvironmentError, CommClosedError):
    # Do not request a reply since comms will be closed by the
    # worker before a reply can be made and we will always trigger
    # the timeout
    await w.terminate(reply=False)
    The comment notes this possibility. In theory this would be fine thanks to the suppress(CommClosedError), but it relies on the RPC's internal timeout being shorter than the timeout on the wait_for
    await asyncio.wait_for(do_disconnect(), timeout=timeout)
    otherwise an asyncio.TimeoutError would be raised. This would not be the case after Only set 5s connect timeout in gen_cluster tests #6822.

I propose that we entirely remove the "RPC to the server and call close on it" logic.

Because we're already adding a callback to the ExitStack to terminate and join every subprocess:

stack.callback(_terminate_join, scheduler)

So the RPC method is

  1. Belt-and-suspenders (we have another mechanism to shut down the suppresses)
  2. Superfluous (the clean close may start via RPC before the SIGTERM, but there are no handlers registered for SIGTERM by default, so the SIGTERM will then forcibly terminate the subprocess in the middle of its close)
  3. Way more brittle (connecting to a subprocess as we're terminating)
  4. Rather pointless (the RPC doesn't block until the server is actually shut down, so it has no benefit compared to sending a signal)

In general when working with subprocesses, using signals and join to shut them down seems way simpler and more reliable than RPCs.

Metadata

Metadata

Assignees

No one assigned

    Labels

    flaky testIntermittent failures on CI.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions