From 263bb3645d2bd532237ffa9d4156b303ac773fc6 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 2 Aug 2022 20:44:08 -0600 Subject: [PATCH 01/12] Apply 5s timeout, etc. to fixtures selectively --- distributed/utils_test.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 23f8817ab10..44015dbea72 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -672,7 +672,7 @@ def cluster( ws = weakref.WeakSet() enable_proctitle_on_children() - with check_process_leak(check=True), check_instances(), _reconfigure(): + with check_process_leak(check=True), check_instances(), set_default_test_config(): if nanny: _run_worker = run_nanny else: @@ -835,6 +835,7 @@ async def async_fn_outer(async_fn, /, *args, **kwargs): def _(func): @functools.wraps(func) + @set_default_test_config() @clean(**clean_kwargs) def test_func(*args, **kwargs): if not iscoroutinefunction(func): @@ -1038,6 +1039,7 @@ def _(func): raise RuntimeError("gen_cluster only works for coroutine functions.") @functools.wraps(func) + @set_default_test_config() @clean(**clean_kwargs) def test_func(*outer_args, **kwargs): async def async_fn(): @@ -1880,7 +1882,7 @@ def check_instances(): @contextmanager -def _reconfigure(): +def set_default_test_config(): reset_config() with dask.config.set( @@ -1905,8 +1907,7 @@ def clean(threads=True, instances=True, processes=True): with check_thread_leak() if threads else nullcontext(): with check_process_leak(check=processes): with check_instances() if instances else nullcontext(): - with _reconfigure(): - yield + yield @pytest.fixture From d5c8a3f22ae214b7c74bd9c5113bfe92459bca2d Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 2 Aug 2022 21:28:03 -0600 Subject: [PATCH 02/12] Only set timeout in `gen_cluster` This is basically option 3 in https://github.com/dask/distributed/issues/6731#issuecomment-1203391993. I can't think of a justification why this timeout should be set globally. All the other things in there are necessary to make things run more reasonably in tests. The timeout is the opposite; there's nothing about Ci that should make us think connections will be faster. --- distributed/utils_test.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 44015dbea72..fe0d82fd2b2 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -152,7 +152,7 @@ def loop_in_thread(cleanup): loop_started = concurrent.futures.Future() with concurrent.futures.ThreadPoolExecutor( 1, thread_name_prefix="test IOLoop" - ) as tpe: + ) as tpe, set_default_test_config(): async def run(): io_loop = IOLoop.current() @@ -1039,7 +1039,7 @@ def _(func): raise RuntimeError("gen_cluster only works for coroutine functions.") @functools.wraps(func) - @set_default_test_config() + @set_default_test_config(**{"distributed.comm.timeouts.connect": "5s"}) @clean(**clean_kwargs) def test_func(*outer_args, **kwargs): async def async_fn(): @@ -1882,16 +1882,16 @@ def check_instances(): @contextmanager -def set_default_test_config(): +def set_default_test_config(**extra_config): reset_config() with dask.config.set( { "local_directory": tempfile.gettempdir(), - "distributed.comm.timeouts.connect": "5s", "distributed.admin.tick.interval": "500 ms", "distributed.worker.profile.enabled": False, - } + }, + **extra_config, ): # Restore default logging levels # XXX use pytest hooks/fixtures instead? From e59b33110616ee2ac7a4b7684a44b5ef81f91b92 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 2 Aug 2022 21:32:01 -0600 Subject: [PATCH 03/12] rename --- distributed/utils_test.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index fe0d82fd2b2..3441d53e7d2 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -152,7 +152,7 @@ def loop_in_thread(cleanup): loop_started = concurrent.futures.Future() with concurrent.futures.ThreadPoolExecutor( 1, thread_name_prefix="test IOLoop" - ) as tpe, set_default_test_config(): + ) as tpe, default_test_config(): async def run(): io_loop = IOLoop.current() @@ -672,7 +672,7 @@ def cluster( ws = weakref.WeakSet() enable_proctitle_on_children() - with check_process_leak(check=True), check_instances(), set_default_test_config(): + with check_process_leak(check=True), check_instances(), default_test_config(): if nanny: _run_worker = run_nanny else: @@ -835,7 +835,7 @@ async def async_fn_outer(async_fn, /, *args, **kwargs): def _(func): @functools.wraps(func) - @set_default_test_config() + @default_test_config() @clean(**clean_kwargs) def test_func(*args, **kwargs): if not iscoroutinefunction(func): @@ -1039,7 +1039,7 @@ def _(func): raise RuntimeError("gen_cluster only works for coroutine functions.") @functools.wraps(func) - @set_default_test_config(**{"distributed.comm.timeouts.connect": "5s"}) + @default_test_config(**{"distributed.comm.timeouts.connect": "5s"}) @clean(**clean_kwargs) def test_func(*outer_args, **kwargs): async def async_fn(): @@ -1882,7 +1882,7 @@ def check_instances(): @contextmanager -def set_default_test_config(**extra_config): +def default_test_config(**extra_config): reset_config() with dask.config.set( From 9b1bed1f46c2abab020b037646c0eadaf9ce1a29 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Tue, 2 Aug 2022 23:17:10 -0600 Subject: [PATCH 04/12] fix `test_worker_doesnt_await_task_completion` When no timeout was given to `restart`, it used 4x `Client.timeout`, which is set to `distributed.comm.timeouts.connect` :facepalm:. So what used to be a 20s timeout became a 2min timeout. And that timeout is passed down into `Worker.close`, so it gives the ThreadPoolExecutor a longer time to wait. --- distributed/tests/test_failed_workers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/tests/test_failed_workers.py b/distributed/tests/test_failed_workers.py index fc7b202a961..a928edbd0fa 100644 --- a/distributed/tests/test_failed_workers.py +++ b/distributed/tests/test_failed_workers.py @@ -167,9 +167,9 @@ def test_worker_doesnt_await_task_completion(loop): future = c.submit(sleep, 100) sleep(0.1) start = time() - c.restart() + c.restart(timeout="5s", wait_for_workers=False) stop = time() - assert stop - start < 20 + assert stop - start < 10 @gen_cluster(Worker=Nanny, timeout=60) From 9794b50b0f55e4cc0736f139016d105902a2a25c Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 3 Aug 2022 10:28:11 -0600 Subject: [PATCH 05/12] REVERT add debugging logs --- distributed/config.py | 2 +- distributed/utils_test.py | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/distributed/config.py b/distributed/config.py index 1c0c72fbaf5..60cb51e92f1 100644 --- a/distributed/config.py +++ b/distributed/config.py @@ -80,7 +80,7 @@ def _initialize_logging_old_style(config): } """ loggers = { # default values - "distributed": "info", + "distributed": "debug", "distributed.client": "warning", "bokeh": "error", "tornado": "critical", diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 3441d53e7d2..0863ea87e37 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -787,7 +787,9 @@ async def disconnect(addr, timeout=3, rpc_kwargs=None): rpc_kwargs = rpc_kwargs or {} async def do_disconnect(): + logger.info(f"Disconnecting {addr}") async with rpc(addr, **rpc_kwargs) as w: + logger.info(f"Disconnecting {addr} - RPC connected") # If the worker was killed hard (e.g. sigterm) during test runtime, # we do not know at this point and may not be able to connect with suppress(EnvironmentError, CommClosedError): @@ -795,11 +797,13 @@ async def do_disconnect(): # worker before a reply can be made and we will always trigger # the timeout await w.terminate(reply=False) + logger.info(f"Disconnecting {addr} - sent terminate") await asyncio.wait_for(do_disconnect(), timeout=timeout) async def disconnect_all(addresses, timeout=3, rpc_kwargs=None): + logger.info(f"Disconnecting {addresses}") await asyncio.gather(*(disconnect(addr, timeout, rpc_kwargs) for addr in addresses)) From 6b37a0ae7babf8125fc60193e1f16c65b1e59791 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 3 Aug 2022 12:29:39 -0600 Subject: [PATCH 06/12] test_submit_after_failed_worker_sync fail for logs --- distributed/tests/test_failed_workers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/distributed/tests/test_failed_workers.py b/distributed/tests/test_failed_workers.py index a928edbd0fa..5249b0bd230 100644 --- a/distributed/tests/test_failed_workers.py +++ b/distributed/tests/test_failed_workers.py @@ -42,6 +42,7 @@ def test_submit_after_failed_worker_sync(loop): a["proc"]().terminate() total = c.submit(sum, L) assert total.result() == sum(map(inc, range(10))) + assert False @pytest.mark.slow() From 3c3dab35ce34ced0ed130ed5699708836734c102 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 4 Aug 2022 00:55:37 -0600 Subject: [PATCH 07/12] rename to `config_for_cluster_tests` --- distributed/utils_test.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 0863ea87e37..8c7ddd2779e 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -152,7 +152,7 @@ def loop_in_thread(cleanup): loop_started = concurrent.futures.Future() with concurrent.futures.ThreadPoolExecutor( 1, thread_name_prefix="test IOLoop" - ) as tpe, default_test_config(): + ) as tpe, config_for_cluster_tests(): async def run(): io_loop = IOLoop.current() @@ -672,7 +672,7 @@ def cluster( ws = weakref.WeakSet() enable_proctitle_on_children() - with check_process_leak(check=True), check_instances(), default_test_config(): + with check_process_leak(check=True), check_instances(), config_for_cluster_tests(): if nanny: _run_worker = run_nanny else: @@ -839,7 +839,7 @@ async def async_fn_outer(async_fn, /, *args, **kwargs): def _(func): @functools.wraps(func) - @default_test_config() + @config_for_cluster_tests() @clean(**clean_kwargs) def test_func(*args, **kwargs): if not iscoroutinefunction(func): @@ -1043,7 +1043,7 @@ def _(func): raise RuntimeError("gen_cluster only works for coroutine functions.") @functools.wraps(func) - @default_test_config(**{"distributed.comm.timeouts.connect": "5s"}) + @config_for_cluster_tests(**{"distributed.comm.timeouts.connect": "5s"}) @clean(**clean_kwargs) def test_func(*outer_args, **kwargs): async def async_fn(): @@ -1886,7 +1886,8 @@ def check_instances(): @contextmanager -def default_test_config(**extra_config): +def config_for_cluster_tests(**extra_config): + "Set recommended config values for tests that create or interact with clusters." reset_config() with dask.config.set( From 57b3827c51e49cf5d06ebb8c7fbe768a3576a744 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 4 Aug 2022 00:58:20 -0600 Subject: [PATCH 08/12] log level back to info so tests pass --- distributed/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/config.py b/distributed/config.py index 60cb51e92f1..1c0c72fbaf5 100644 --- a/distributed/config.py +++ b/distributed/config.py @@ -80,7 +80,7 @@ def _initialize_logging_old_style(config): } """ loggers = { # default values - "distributed": "debug", + "distributed": "info", "distributed.client": "warning", "bokeh": "error", "tornado": "critical", From 8943f051b1a94d18e0f56bf7670f602876190d14 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 4 Aug 2022 01:00:51 -0600 Subject: [PATCH 09/12] un-fail test --- distributed/tests/test_failed_workers.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/tests/test_failed_workers.py b/distributed/tests/test_failed_workers.py index 5249b0bd230..a928edbd0fa 100644 --- a/distributed/tests/test_failed_workers.py +++ b/distributed/tests/test_failed_workers.py @@ -42,7 +42,6 @@ def test_submit_after_failed_worker_sync(loop): a["proc"]().terminate() total = c.submit(sum, L) assert total.result() == sum(map(inc, range(10))) - assert False @pytest.mark.slow() From 1db1d9f3742f6ecd1fb7afc07040f0d607a53a31 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 4 Aug 2022 01:04:27 -0600 Subject: [PATCH 10/12] debug logs for test_failed_workers cluster tests --- distributed/tests/test_failed_workers.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/distributed/tests/test_failed_workers.py b/distributed/tests/test_failed_workers.py index a928edbd0fa..1441ea72f78 100644 --- a/distributed/tests/test_failed_workers.py +++ b/distributed/tests/test_failed_workers.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import logging import os import random from contextlib import suppress @@ -36,6 +37,7 @@ @pytest.mark.slow() def test_submit_after_failed_worker_sync(loop): with cluster() as (s, [a, b]): + logging.getLogger("distributed").setLevel("DEBUG") with Client(s["address"], loop=loop) as c: L = c.map(inc, range(10)) wait(L) @@ -74,6 +76,7 @@ async def test_submit_after_failed_worker(c, s, a, b): @pytest.mark.slow def test_gather_after_failed_worker(loop): with cluster() as (s, [a, b]): + logging.getLogger("distributed").setLevel("DEBUG") with Client(s["address"], loop=loop) as c: L = c.map(inc, range(10)) wait(L) @@ -144,6 +147,7 @@ async def test_restart_cleared(c, s, a, b): def test_restart_sync(loop): with cluster(nanny=True) as (s, [a, b]): + logging.getLogger("distributed").setLevel("DEBUG") with Client(s["address"], loop=loop) as c: x = c.submit(div, 1, 2) x.result() @@ -163,6 +167,7 @@ def test_restart_sync(loop): def test_worker_doesnt_await_task_completion(loop): with cluster(nanny=True, nworkers=1) as (s, [w]): + logging.getLogger("distributed").setLevel("DEBUG") with Client(s["address"], loop=loop) as c: future = c.submit(sleep, 100) sleep(0.1) From e131cae4617ec4e74c0908146a4ed1842a1f0a06 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 4 Aug 2022 10:39:34 -0600 Subject: [PATCH 11/12] Revert "debug logs for test_failed_workers cluster tests" This reverts commit 1db1d9f3742f6ecd1fb7afc07040f0d607a53a31. --- distributed/tests/test_failed_workers.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/distributed/tests/test_failed_workers.py b/distributed/tests/test_failed_workers.py index 1441ea72f78..a928edbd0fa 100644 --- a/distributed/tests/test_failed_workers.py +++ b/distributed/tests/test_failed_workers.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -import logging import os import random from contextlib import suppress @@ -37,7 +36,6 @@ @pytest.mark.slow() def test_submit_after_failed_worker_sync(loop): with cluster() as (s, [a, b]): - logging.getLogger("distributed").setLevel("DEBUG") with Client(s["address"], loop=loop) as c: L = c.map(inc, range(10)) wait(L) @@ -76,7 +74,6 @@ async def test_submit_after_failed_worker(c, s, a, b): @pytest.mark.slow def test_gather_after_failed_worker(loop): with cluster() as (s, [a, b]): - logging.getLogger("distributed").setLevel("DEBUG") with Client(s["address"], loop=loop) as c: L = c.map(inc, range(10)) wait(L) @@ -147,7 +144,6 @@ async def test_restart_cleared(c, s, a, b): def test_restart_sync(loop): with cluster(nanny=True) as (s, [a, b]): - logging.getLogger("distributed").setLevel("DEBUG") with Client(s["address"], loop=loop) as c: x = c.submit(div, 1, 2) x.result() @@ -167,7 +163,6 @@ def test_restart_sync(loop): def test_worker_doesnt_await_task_completion(loop): with cluster(nanny=True, nworkers=1) as (s, [w]): - logging.getLogger("distributed").setLevel("DEBUG") with Client(s["address"], loop=loop) as c: future = c.submit(sleep, 100) sleep(0.1) From 5303f90a179e89b0da72535c59ce32024f5f3833 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 4 Aug 2022 10:41:16 -0600 Subject: [PATCH 12/12] Remove debug code --- distributed/utils_test.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index 8c7ddd2779e..e519ef18f78 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -787,9 +787,7 @@ async def disconnect(addr, timeout=3, rpc_kwargs=None): rpc_kwargs = rpc_kwargs or {} async def do_disconnect(): - logger.info(f"Disconnecting {addr}") async with rpc(addr, **rpc_kwargs) as w: - logger.info(f"Disconnecting {addr} - RPC connected") # If the worker was killed hard (e.g. sigterm) during test runtime, # we do not know at this point and may not be able to connect with suppress(EnvironmentError, CommClosedError): @@ -797,13 +795,11 @@ async def do_disconnect(): # worker before a reply can be made and we will always trigger # the timeout await w.terminate(reply=False) - logger.info(f"Disconnecting {addr} - sent terminate") await asyncio.wait_for(do_disconnect(), timeout=timeout) async def disconnect_all(addresses, timeout=3, rpc_kwargs=None): - logger.info(f"Disconnecting {addresses}") await asyncio.gather(*(disconnect(addr, timeout, rpc_kwargs) for addr in addresses))