From 7373576d2608a61fd3b8cdf1c191b924ea7049f8 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 31 Mar 2022 11:43:30 +0100 Subject: [PATCH 1/6] Regression: test_weakref_cache --- distributed/tests/test_spill.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/distributed/tests/test_spill.py b/distributed/tests/test_spill.py index 6ab595933e5..12d8f59b953 100644 --- a/distributed/tests/test_spill.py +++ b/distributed/tests/test_spill.py @@ -1,5 +1,6 @@ from __future__ import annotations +import gc import logging import os import uuid @@ -337,6 +338,9 @@ def test_weakref_cache(tmpdir, cls, expect_cached, size): # the same id as a deleted one id_x = x.id del x + # Surprisingly, even on CPython this is needed to ensure that the object is garbage + # collected, even if there are no obvious circular references going on + gc.collect() if size < 100: buf["y"] From 5cc5586ca62a885817d59550e6ca0fffc49edcf6 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 4 Apr 2022 11:53:00 +0100 Subject: [PATCH 2/6] update comments --- distributed/deploy/tests/test_adaptive.py | 2 +- distributed/deploy/tests/test_local.py | 10 +++----- .../diagnostics/tests/test_progress.py | 10 +++----- distributed/protocol/tests/test_pickle.py | 2 +- distributed/tests/test_asyncprocess.py | 2 +- distributed/tests/test_client.py | 18 ++++++------- distributed/tests/test_diskutils.py | 6 ++--- distributed/tests/test_failed_workers.py | 4 +-- distributed/tests/test_nanny.py | 2 +- distributed/tests/test_spill.py | 4 +-- distributed/tests/test_steal.py | 2 +- distributed/tests/test_utils.py | 25 +++++++++---------- 12 files changed, 37 insertions(+), 50 deletions(-) diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index b21a19ad6f0..8286867ea74 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -151,7 +151,7 @@ async def test_min_max(): assert len(adapt.log) == 2 and all(d["status"] == "up" for _, d in adapt.log) del futures - gc.collect() + gc.collect() # Needed because of distributed.profile start = time() while len(cluster.scheduler.workers) != 1: diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index 84b90b843be..5957299f52e 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -648,14 +648,10 @@ def test_adapt(loop): cluster.adapt(minimum=1, maximum=2, interval="10ms") assert cluster._adaptive.minimum == 1 - gc.collect() - # the old Adaptive class sticks around, not sure why - # start = time() - # while ref(): - # sleep(0.01) - # gc.collect() - # assert time() < start + 5 + # Even in absence of circular references, this is needed because of + # distributed.profile + gc.collect() start = time() while len(cluster.scheduler.workers) != 1: diff --git a/distributed/diagnostics/tests/test_progress.py b/distributed/diagnostics/tests/test_progress.py index 5e42836dde9..775dd22f316 100644 --- a/distributed/diagnostics/tests/test_progress.py +++ b/distributed/diagnostics/tests/test_progress.py @@ -1,4 +1,5 @@ import asyncio +import gc import pytest @@ -122,9 +123,8 @@ async def test_AllProgress(c, s, a, b): keys = {x.key, y.key, z.key} del x, y, z - import gc - gc.collect() + gc.collect() # Needed because of distributed.profile while any(k in s.who_has for k in keys): await asyncio.sleep(0.01) @@ -141,9 +141,8 @@ async def test_AllProgress(c, s, a, b): tkey = t.key del xx, yy, zz, t - import gc - gc.collect() + gc.collect() # Needed because of distributed.profile while tkey in s.tasks: await asyncio.sleep(0.01) @@ -157,9 +156,8 @@ def f(x): for i in range(4): future = c.submit(f, i) - import gc - gc.collect() + gc.collect() # Needed because of distributed.profile await asyncio.sleep(1) diff --git a/distributed/protocol/tests/test_pickle.py b/distributed/protocol/tests/test_pickle.py index 104a19a79a4..f6744f0b479 100644 --- a/distributed/protocol/tests/test_pickle.py +++ b/distributed/protocol/tests/test_pickle.py @@ -181,7 +181,7 @@ def funcs(): assert func3(1) == func(1) del func, func2, func3 - gc.collect() + gc.collect() # Needed because of distributed.profile assert wr() is None assert wr2() is None assert wr3() is None diff --git a/distributed/tests/test_asyncprocess.py b/distributed/tests/test_asyncprocess.py index 9eb8ba07dbe..a23db1ca4e1 100644 --- a/distributed/tests/test_asyncprocess.py +++ b/distributed/tests/test_asyncprocess.py @@ -107,7 +107,7 @@ async def test_simple(): assert dt <= 0.6 del proc - gc.collect() + gc.collect() # Needed because of distributed.profile start = time() while wr1() is not None and time() < start + 1: # Perhaps the GIL switched before _watch_process() exit, diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index c8b594c8467..ea3949d051f 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -286,7 +286,6 @@ async def test_compute_retries_annotations(c, s, a, b): y = delayed(varying(yargs))() x, y = c.compute([x, y], optimize_graph=False) - gc.collect() assert await x == 30 with pytest.raises(ZeroDivisionError, match="five"): @@ -682,9 +681,7 @@ def test_no_future_references(c): futures = c.map(inc, range(10)) ws.update(futures) del futures - import gc - - gc.collect() + gc.collect() # Needed because of distributed.profile start = time() while list(ws): sleep(0.01) @@ -820,9 +817,7 @@ async def test_recompute_released_key(c, s, a, b): result1 = await x xkey = x.key del x - import gc - - gc.collect() + gc.collect() # Needed because of distributed.profile await asyncio.sleep(0) assert c.refcount[xkey] == 0 @@ -1231,9 +1226,8 @@ async def test_scatter_hash_2(c, s, a, b): @gen_cluster(client=True) async def test_get_releases_data(c, s, a, b): await c.gather(c.get({"x": (inc, 1)}, ["x"], sync=False)) - import gc - gc.collect() + gc.collect() # Needed because of distributed.profile while c.refcount["x"]: await asyncio.sleep(0.01) @@ -3569,9 +3563,8 @@ async def test_Client_clears_references_after_restart(c, s, a, b): key = x.key del x - import gc - gc.collect() + gc.collect() # Needed because of distributed.profile await asyncio.sleep(0) assert key not in c.refcount @@ -3810,7 +3803,10 @@ def test_open_close_many_workers(loop, worker, count, repeat): proc = psutil.Process() with cluster(nworkers=0, active_rpc_timeout=2) as (s, _): + # Even in absence of circular references, this is needed because of + # distributed.profile gc.collect() + before = proc.num_fds() done = Semaphore(0) running = weakref.WeakKeyDictionary() diff --git a/distributed/tests/test_diskutils.py b/distributed/tests/test_diskutils.py index 1aaf22dba27..7edf9db337b 100644 --- a/distributed/tests/test_diskutils.py +++ b/distributed/tests/test_diskutils.py @@ -52,7 +52,7 @@ def test_workdir_simple(tmpdir): a.release() assert_contents(["bb", "bb.dirlock"]) del b - gc.collect() + gc.collect() # Needed because of distributed.profile assert_contents([]) # Generated temporary name with a prefix @@ -87,7 +87,7 @@ def test_two_workspaces_in_same_directory(tmpdir): del ws del b - gc.collect() + gc.collect() # Needed because of distributed.profile assert_contents(["aa", "aa.dirlock"], trials=5) del a gc.collect() @@ -184,7 +184,7 @@ def test_locking_disabled(tmpdir): a.release() assert_contents(["bb"]) del b - gc.collect() + gc.collect() # Needed because of distributed.profile assert_contents([]) lock_file.assert_not_called() diff --git a/distributed/tests/test_failed_workers.py b/distributed/tests/test_failed_workers.py index 0555ab20232..a89143b4f23 100644 --- a/distributed/tests/test_failed_workers.py +++ b/distributed/tests/test_failed_workers.py @@ -1,4 +1,5 @@ import asyncio +import gc import os import random from contextlib import suppress @@ -273,9 +274,8 @@ async def test_forgotten_futures_dont_clean_up_new_futures(c, s, a, b): await c.restart() y = c.submit(inc, 1) del x - import gc - gc.collect() + gc.collect() # Needed because of distributed.profile await asyncio.sleep(0.1) await y diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 934e52c4fe7..1006d80d712 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -208,7 +208,7 @@ async def test_num_fds(s): w = await Nanny(s.address) await w.close() del w - gc.collect() + gc.collect() # Needed because of distributed.profile before = proc.num_fds() diff --git a/distributed/tests/test_spill.py b/distributed/tests/test_spill.py index 12d8f59b953..aea7fa3a37e 100644 --- a/distributed/tests/test_spill.py +++ b/distributed/tests/test_spill.py @@ -338,9 +338,7 @@ def test_weakref_cache(tmpdir, cls, expect_cached, size): # the same id as a deleted one id_x = x.id del x - # Surprisingly, even on CPython this is needed to ensure that the object is garbage - # collected, even if there are no obvious circular references going on - gc.collect() + gc.collect() # Needed because of distributed.profile if size < 100: buf["y"] diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index e16269d92f8..99abdaf71a0 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -946,7 +946,7 @@ class Foo: assert not s.who_has assert not any(s.has_what.values()) - gc.collect() + gc.collect() # Needed because of distributed.profile assert not list(ws) diff --git a/distributed/tests/test_utils.py b/distributed/tests/test_utils.py index 931db6e090c..2620e92c09e 100644 --- a/distributed/tests/test_utils.py +++ b/distributed/tests/test_utils.py @@ -460,22 +460,21 @@ async def test_loop_runner_gen(): @gen_test() -async def test_all_exceptions_logging(): - async def throws(): - raise Exception("foo1234") - - with captured_logger("") as sio: - try: - await All([throws() for _ in range(5)], quiet_exceptions=Exception) - except Exception: - pass +async def test_all_quiet_exceptions(): + class CustomError(Exception): + pass - import gc + async def throws(msg): + raise CustomError(msg) - gc.collect() - await asyncio.sleep(0.1) + with captured_logger("") as sio: + with pytest.raises(CustomError): + await All([throws("foo") for _ in range(5)]) + with pytest.raises(CustomError): + await All([throws("bar") for _ in range(5)], quiet_exceptions=CustomError) - assert "foo1234" not in sio.getvalue() + assert "bar" not in sio.getvalue() + assert "foo" in sio.getvalue() def test_warn_on_duration(): From ecbf727132cabc67efd3696d9a3d001eb19e5016 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 5 Apr 2022 12:30:19 +0100 Subject: [PATCH 3/6] Prevent profiler from tampering with refcount --- distributed/deploy/tests/test_adaptive.py | 2 -- distributed/deploy/tests/test_local.py | 7 ------- distributed/diagnostics/tests/test_progress.py | 7 ------- distributed/profile.py | 18 ++++++++++++++++++ distributed/protocol/tests/test_pickle.py | 4 ++-- distributed/tests/test_asyncprocess.py | 3 ++- distributed/tests/test_client.py | 13 ++++--------- distributed/tests/test_diskutils.py | 11 ++++++++--- distributed/tests/test_failed_workers.py | 5 ++--- distributed/tests/test_nanny.py | 2 ++ distributed/tests/test_spill.py | 7 ++++--- distributed/tests/test_steal.py | 4 ++-- 12 files changed, 44 insertions(+), 39 deletions(-) diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index 8286867ea74..840b7721800 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -1,5 +1,4 @@ import asyncio -import gc import math from time import sleep @@ -151,7 +150,6 @@ async def test_min_max(): assert len(adapt.log) == 2 and all(d["status"] == "up" for _, d in adapt.log) del futures - gc.collect() # Needed because of distributed.profile start = time() while len(cluster.scheduler.workers) != 1: diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index 5957299f52e..d6ef2298c8e 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -1,9 +1,7 @@ import asyncio -import gc import subprocess import sys import unittest -import weakref from threading import Lock from time import sleep from urllib.parse import urlparse @@ -644,15 +642,10 @@ def test_adapt(loop): cluster.adapt(minimum=0, maximum=2, interval="10ms") assert cluster._adaptive.minimum == 0 assert cluster._adaptive.maximum == 2 - ref = weakref.ref(cluster._adaptive) cluster.adapt(minimum=1, maximum=2, interval="10ms") assert cluster._adaptive.minimum == 1 - # Even in absence of circular references, this is needed because of - # distributed.profile - gc.collect() - start = time() while len(cluster.scheduler.workers) != 1: sleep(0.01) diff --git a/distributed/diagnostics/tests/test_progress.py b/distributed/diagnostics/tests/test_progress.py index 775dd22f316..872b1e4c7f6 100644 --- a/distributed/diagnostics/tests/test_progress.py +++ b/distributed/diagnostics/tests/test_progress.py @@ -1,5 +1,4 @@ import asyncio -import gc import pytest @@ -124,8 +123,6 @@ async def test_AllProgress(c, s, a, b): keys = {x.key, y.key, z.key} del x, y, z - gc.collect() # Needed because of distributed.profile - while any(k in s.who_has for k in keys): await asyncio.sleep(0.01) @@ -142,8 +139,6 @@ async def test_AllProgress(c, s, a, b): tkey = t.key del xx, yy, zz, t - gc.collect() # Needed because of distributed.profile - while tkey in s.tasks: await asyncio.sleep(0.01) @@ -157,8 +152,6 @@ def f(x): for i in range(4): future = c.submit(f, i) - gc.collect() # Needed because of distributed.profile - await asyncio.sleep(1) await wait([future]) diff --git a/distributed/profile.py b/distributed/profile.py index 22a2fc80cff..46bf3244ff1 100644 --- a/distributed/profile.py +++ b/distributed/profile.py @@ -275,14 +275,28 @@ def traverse(state, start, stop, height): } +_watch_running: set[int] = set() + + +def wait_profiler() -> None: + """Wait until a moment when no instances of watch() are sampling the frames. + You must call this function whenever you would otherwise expect an object to be + immediately released after it's descoped. + """ + while _watch_running: + sleep(0.0001) + + def _watch(thread_id, log, interval="20ms", cycle="2s", omit=None, stop=lambda: False): interval = parse_timedelta(interval) cycle = parse_timedelta(cycle) recent = create() last = time() + watch_id = threading.get_ident() while not stop(): + _watch_running.add(watch_id) if time() > last + cycle: log.append((time(), recent)) recent = create() @@ -293,6 +307,10 @@ def _watch(thread_id, log, interval="20ms", cycle="2s", omit=None, stop=lambda: return process(frame, None, recent, omit=omit) + del frame + + _watch_running.remove(watch_id) + sleep(interval) diff --git a/distributed/protocol/tests/test_pickle.py b/distributed/protocol/tests/test_pickle.py index f6744f0b479..7fb486857c6 100644 --- a/distributed/protocol/tests/test_pickle.py +++ b/distributed/protocol/tests/test_pickle.py @@ -1,4 +1,3 @@ -import gc import pickle import weakref from functools import partial @@ -6,6 +5,7 @@ import pytest +from distributed.profile import wait_profiler from distributed.protocol import deserialize, serialize from distributed.protocol.pickle import HIGHEST_PROTOCOL, dumps, loads @@ -181,7 +181,7 @@ def funcs(): assert func3(1) == func(1) del func, func2, func3 - gc.collect() # Needed because of distributed.profile + wait_profiler() assert wr() is None assert wr2() is None assert wr3() is None diff --git a/distributed/tests/test_asyncprocess.py b/distributed/tests/test_asyncprocess.py index a23db1ca4e1..eb6d1510f60 100644 --- a/distributed/tests/test_asyncprocess.py +++ b/distributed/tests/test_asyncprocess.py @@ -107,13 +107,14 @@ async def test_simple(): assert dt <= 0.6 del proc - gc.collect() # Needed because of distributed.profile + start = time() while wr1() is not None and time() < start + 1: # Perhaps the GIL switched before _watch_process() exit, # help it a little sleep(0.001) gc.collect() + if wr1() is not None: # Help diagnosing from types import FrameType diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index ea3949d051f..ba67182da02 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -69,6 +69,7 @@ from distributed.core import Server, Status from distributed.metrics import time from distributed.objects import HasWhat, WhoHas +from distributed.profile import wait_profiler from distributed.scheduler import ( COMPILED, CollectTaskMetaDataPlugin, @@ -681,7 +682,7 @@ def test_no_future_references(c): futures = c.map(inc, range(10)) ws.update(futures) del futures - gc.collect() # Needed because of distributed.profile + wait_profiler() start = time() while list(ws): sleep(0.01) @@ -817,7 +818,7 @@ async def test_recompute_released_key(c, s, a, b): result1 = await x xkey = x.key del x - gc.collect() # Needed because of distributed.profile + wait_profiler() await asyncio.sleep(0) assert c.refcount[xkey] == 0 @@ -1226,9 +1227,6 @@ async def test_scatter_hash_2(c, s, a, b): @gen_cluster(client=True) async def test_get_releases_data(c, s, a, b): await c.gather(c.get({"x": (inc, 1)}, ["x"], sync=False)) - - gc.collect() # Needed because of distributed.profile - while c.refcount["x"]: await asyncio.sleep(0.01) @@ -3563,8 +3561,7 @@ async def test_Client_clears_references_after_restart(c, s, a, b): key = x.key del x - - gc.collect() # Needed because of distributed.profile + wait_profiler() await asyncio.sleep(0) assert key not in c.refcount @@ -3803,8 +3800,6 @@ def test_open_close_many_workers(loop, worker, count, repeat): proc = psutil.Process() with cluster(nworkers=0, active_rpc_timeout=2) as (s, _): - # Even in absence of circular references, this is needed because of - # distributed.profile gc.collect() before = proc.num_fds() diff --git a/distributed/tests/test_diskutils.py b/distributed/tests/test_diskutils.py index 7edf9db337b..50a1f1b865a 100644 --- a/distributed/tests/test_diskutils.py +++ b/distributed/tests/test_diskutils.py @@ -15,6 +15,7 @@ from distributed.compatibility import WINDOWS from distributed.diskutils import WorkSpace from distributed.metrics import time +from distributed.profile import wait_profiler from distributed.utils import mp_context from distributed.utils_test import captured_logger @@ -52,7 +53,8 @@ def test_workdir_simple(tmpdir): a.release() assert_contents(["bb", "bb.dirlock"]) del b - gc.collect() # Needed because of distributed.profile + wait_profiler() + gc.collect() assert_contents([]) # Generated temporary name with a prefix @@ -87,9 +89,11 @@ def test_two_workspaces_in_same_directory(tmpdir): del ws del b - gc.collect() # Needed because of distributed.profile + wait_profiler() + gc.collect() assert_contents(["aa", "aa.dirlock"], trials=5) del a + wait_profiler() gc.collect() assert_contents([], trials=5) @@ -184,7 +188,8 @@ def test_locking_disabled(tmpdir): a.release() assert_contents(["bb"]) del b - gc.collect() # Needed because of distributed.profile + wait_profiler() + gc.collect() assert_contents([]) lock_file.assert_not_called() diff --git a/distributed/tests/test_failed_workers.py b/distributed/tests/test_failed_workers.py index a89143b4f23..6886b5e1140 100644 --- a/distributed/tests/test_failed_workers.py +++ b/distributed/tests/test_failed_workers.py @@ -1,5 +1,4 @@ import asyncio -import gc import os import random from contextlib import suppress @@ -15,6 +14,7 @@ from distributed.comm import CommClosedError from distributed.compatibility import MACOS from distributed.metrics import time +from distributed.profile import wait_profiler from distributed.scheduler import COMPILED from distributed.utils import CancelledError, sync from distributed.utils_test import ( @@ -274,8 +274,7 @@ async def test_forgotten_futures_dont_clean_up_new_futures(c, s, a, b): await c.restart() y = c.submit(inc, 1) del x - - gc.collect() # Needed because of distributed.profile + wait_profiler() await asyncio.sleep(0.1) await y diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 1006d80d712..088c9d247ae 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -24,6 +24,7 @@ from distributed.core import CommClosedError, Status from distributed.diagnostics import SchedulerPlugin from distributed.metrics import time +from distributed.profile import wait_profiler from distributed.protocol.pickle import dumps from distributed.utils import TimeoutError, parse_ports from distributed.utils_test import captured_logger, gen_cluster, gen_test @@ -208,6 +209,7 @@ async def test_num_fds(s): w = await Nanny(s.address) await w.close() del w + wait_profiler() gc.collect() # Needed because of distributed.profile before = proc.num_fds() diff --git a/distributed/tests/test_spill.py b/distributed/tests/test_spill.py index aea7fa3a37e..5116757e4e8 100644 --- a/distributed/tests/test_spill.py +++ b/distributed/tests/test_spill.py @@ -1,6 +1,5 @@ from __future__ import annotations -import gc import logging import os import uuid @@ -10,6 +9,7 @@ from dask.sizeof import sizeof from distributed.compatibility import WINDOWS +from distributed.profile import wait_profiler from distributed.protocol import serialize_bytelist from distributed.spill import SpillBuffer, has_zict_210, has_zict_220 from distributed.utils_test import captured_logger @@ -310,6 +310,7 @@ class SupportsWeakRef(NoWeakRef): __slots__ = ("__weakref__",) +@pytest.mark.parametrize("TEMPRUN", range(100)) @pytest.mark.parametrize( "cls,expect_cached", [ @@ -318,7 +319,7 @@ class SupportsWeakRef(NoWeakRef): ], ) @pytest.mark.parametrize("size", [60, 110]) -def test_weakref_cache(tmpdir, cls, expect_cached, size): +def test_weakref_cache(tmpdir, cls, expect_cached, size, TEMPRUN): buf = SpillBuffer(str(tmpdir), target=100) # Run this test twice: @@ -338,7 +339,7 @@ def test_weakref_cache(tmpdir, cls, expect_cached, size): # the same id as a deleted one id_x = x.id del x - gc.collect() # Needed because of distributed.profile + wait_profiler() if size < 100: buf["y"] diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 99abdaf71a0..6034528377d 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1,6 +1,5 @@ import asyncio import contextlib -import gc import itertools import logging import random @@ -18,6 +17,7 @@ from distributed.config import config from distributed.core import Status from distributed.metrics import time +from distributed.profile import wait_profiler from distributed.scheduler import key_split from distributed.system import MEMORY_LIMIT from distributed.utils_test import ( @@ -946,7 +946,7 @@ class Foo: assert not s.who_has assert not any(s.has_what.values()) - gc.collect() # Needed because of distributed.profile + wait_profiler() assert not list(ws) From a36dbe968ea0acd45cb87f3d2b92b872e6e1602a Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 5 Apr 2022 12:39:24 +0100 Subject: [PATCH 4/6] polish --- distributed/profile.py | 2 -- distributed/tests/test_client.py | 13 +++++-------- distributed/tests/test_nanny.py | 2 +- distributed/tests/test_spill.py | 1 + 4 files changed, 7 insertions(+), 11 deletions(-) diff --git a/distributed/profile.py b/distributed/profile.py index 46bf3244ff1..b9568385780 100644 --- a/distributed/profile.py +++ b/distributed/profile.py @@ -308,9 +308,7 @@ def _watch(thread_id, log, interval="20ms", cycle="2s", omit=None, stop=lambda: process(frame, None, recent, omit=omit) del frame - _watch_running.remove(watch_id) - sleep(interval) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index ba67182da02..92d13939179 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -676,17 +676,15 @@ def test_get_sync(c): def test_no_future_references(c): - from weakref import WeakSet - - ws = WeakSet() + """Test that there are neither global references to Future objects nor circular + references that need to be collected by gc + """ + ws = weakref.WeakSet() futures = c.map(inc, range(10)) ws.update(futures) del futures wait_profiler() - start = time() - while list(ws): - sleep(0.01) - assert time() < start + 30 + assert not list(ws) def test_get_sync_optimize_graph_passes_through(c): @@ -3801,7 +3799,6 @@ def test_open_close_many_workers(loop, worker, count, repeat): with cluster(nworkers=0, active_rpc_timeout=2) as (s, _): gc.collect() - before = proc.num_fds() done = Semaphore(0) running = weakref.WeakKeyDictionary() diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 088c9d247ae..33cffc8d72a 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -210,7 +210,7 @@ async def test_num_fds(s): await w.close() del w wait_profiler() - gc.collect() # Needed because of distributed.profile + gc.collect() before = proc.num_fds() diff --git a/distributed/tests/test_spill.py b/distributed/tests/test_spill.py index 5116757e4e8..60e205be2b8 100644 --- a/distributed/tests/test_spill.py +++ b/distributed/tests/test_spill.py @@ -310,6 +310,7 @@ class SupportsWeakRef(NoWeakRef): __slots__ = ("__weakref__",) +# FIXME REMOVE BEFORE MERGING @pytest.mark.parametrize("TEMPRUN", range(100)) @pytest.mark.parametrize( "cls,expect_cached", From 8a5418ae477635415da45657b416e0ebd271e718 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Tue, 5 Apr 2022 14:57:31 +0100 Subject: [PATCH 5/6] fix deadlock --- distributed/profile.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/distributed/profile.py b/distributed/profile.py index b9568385780..5314c3aebee 100644 --- a/distributed/profile.py +++ b/distributed/profile.py @@ -297,18 +297,20 @@ def _watch(thread_id, log, interval="20ms", cycle="2s", omit=None, stop=lambda: while not stop(): _watch_running.add(watch_id) - if time() > last + cycle: - log.append((time(), recent)) - recent = create() - last = time() try: - frame = sys._current_frames()[thread_id] - except KeyError: - return - - process(frame, None, recent, omit=omit) - del frame - _watch_running.remove(watch_id) + if time() > last + cycle: + log.append((time(), recent)) + recent = create() + last = time() + try: + frame = sys._current_frames()[thread_id] + except KeyError: + return + + process(frame, None, recent, omit=omit) + del frame + finally: + _watch_running.remove(watch_id) sleep(interval) From 177def732ad8b4f94d5730549f67616bb4d4bc57 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Wed, 6 Apr 2022 00:27:51 +0100 Subject: [PATCH 6/6] remove stress test --- distributed/tests/test_spill.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/distributed/tests/test_spill.py b/distributed/tests/test_spill.py index 60e205be2b8..4bd08875303 100644 --- a/distributed/tests/test_spill.py +++ b/distributed/tests/test_spill.py @@ -310,8 +310,6 @@ class SupportsWeakRef(NoWeakRef): __slots__ = ("__weakref__",) -# FIXME REMOVE BEFORE MERGING -@pytest.mark.parametrize("TEMPRUN", range(100)) @pytest.mark.parametrize( "cls,expect_cached", [ @@ -320,7 +318,7 @@ class SupportsWeakRef(NoWeakRef): ], ) @pytest.mark.parametrize("size", [60, 110]) -def test_weakref_cache(tmpdir, cls, expect_cached, size, TEMPRUN): +def test_weakref_cache(tmpdir, cls, expect_cached, size): buf = SpillBuffer(str(tmpdir), target=100) # Run this test twice: