Skip to content

Add config option to disable profiling and disable it in many tests per default #6490

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import uuid
import warnings
import weakref
from collections import defaultdict
from collections import defaultdict, deque
from collections.abc import Container
from contextlib import suppress
from enum import Enum
Expand Down Expand Up @@ -195,18 +195,21 @@ def __init__(
self.loop = self.io_loop

if not hasattr(self.io_loop, "profile"):
ref = weakref.ref(self.io_loop)

def stop() -> bool:
loop = ref()
return loop is None or loop.asyncio_loop.is_closed()

self.io_loop.profile = profile.watch(
omit=("profile.py", "selectors.py"),
interval=dask.config.get("distributed.worker.profile.interval"),
cycle=dask.config.get("distributed.worker.profile.cycle"),
stop=stop,
)
if dask.config.get("distributed.worker.profile.enabled"):
ref = weakref.ref(self.io_loop)

def stop() -> bool:
loop = ref()
return loop is None or loop.asyncio_loop.is_closed()

self.io_loop.profile = profile.watch(
omit=("profile.py", "selectors.py"),
interval=dask.config.get("distributed.worker.profile.interval"),
cycle=dask.config.get("distributed.worker.profile.cycle"),
stop=stop,
)
else:
self.io_loop.profile = deque()

# Statistics counters for various events
with suppress(ImportError):
Expand Down
17 changes: 16 additions & 1 deletion distributed/dashboard/components/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
NumeralTickFormatter,
Range1d,
Select,
Title,
)
from bokeh.palettes import Spectral9
from bokeh.plotting import figure
Expand Down Expand Up @@ -193,7 +194,6 @@ def __init__(self, server, doc=None, **kwargs):
data = profile.plot_data(self.state, profile_interval)
self.states = data.pop("states")
self.profile_plot, self.source = profile.plot_figure(data, **kwargs)

changing = [False] # avoid repeated changes from within callback

@without_property_validation
Expand Down Expand Up @@ -270,6 +270,14 @@ def select_cb(attr, old, new):
**kwargs,
)

self.subtitle = Title(text=" ", text_font_style="italic")
self.profile_plot.add_layout(self.subtitle, "above")
if not dask.config.get("distributed.worker.profile.enabled"):
self.subtitle.text = "Profiling is disabled."
self.select.disabled = True
self.reset_button.disabled = True
self.update_button.disabled = True

@without_property_validation
@log_errors
def update(self, state, metadata=None):
Expand Down Expand Up @@ -388,6 +396,13 @@ def ts_change(attr, old, new):
**kwargs,
)

self.subtitle = Title(text=" ", text_font_style="italic")
self.profile_plot.add_layout(self.subtitle, "above")
if not dask.config.get("distributed.worker.profile.enabled"):
self.subtitle.text = "Profiling is disabled."
self.reset_button.disabled = True
self.update_button.disabled = True

@without_property_validation
@log_errors
def update(self, state):
Expand Down
50 changes: 43 additions & 7 deletions distributed/dashboard/tests/test_components.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import asyncio

import pytest

pytest.importorskip("bokeh")
Expand All @@ -21,7 +19,11 @@ def test_basic(Component):
assert isinstance(c.root, Model)


@gen_cluster(client=True, clean_kwargs={"threads": False})
@gen_cluster(
client=True,
clean_kwargs={"threads": False},
config={"distributed.worker.profile.enabled": True},
)
async def test_profile_plot(c, s, a, b):
p = ProfilePlot()
assert not p.source.data["left"]
Expand All @@ -30,20 +32,54 @@ async def test_profile_plot(c, s, a, b):
p.update(a.profile_recent)


@gen_cluster(client=True, clean_kwargs={"threads": False})
@gen_cluster(
client=True,
clean_kwargs={"threads": False},
config={
"distributed.worker.profile.enabled": True,
"distributed.worker.profile.interval": "10ms",
"distributed.worker.profile.cycle": "50ms",
},
)
async def test_profile_time_plot(c, s, a, b):
from bokeh.io import curdoc

sp = ProfileTimePlot(s, doc=curdoc())
assert "disabled" not in sp.subtitle.text
sp.trigger_update()

ap = ProfileTimePlot(a, doc=curdoc())
assert "disabled" not in sp.subtitle.text
ap.trigger_update()

assert not len(sp.source.data["left"])
assert not len(ap.source.data["left"])
assert len(sp.source.data["left"]) == 0
assert len(ap.source.data["left"]) == 0

await c.gather(c.map(slowinc, range(10), delay=0.05))

ap.trigger_update()
sp.trigger_update()
await asyncio.sleep(0.05)


@gen_cluster(
client=True,
clean_kwargs={"threads": False},
config={
"distributed.worker.profile.enabled": False,
"distributed.worker.profile.interval": "10ms",
"distributed.worker.profile.cycle": "50ms",
},
)
async def test_profile_time_plot_disabled(c, s, a, b):
from bokeh.io import curdoc

sp = ProfileTimePlot(s, doc=curdoc())
assert "disabled" in sp.subtitle.text
sp.trigger_update()

ap = ProfileTimePlot(a, doc=curdoc())
assert "disabled" in sp.subtitle.text
ap.trigger_update()

assert len(sp.source.data["left"]) == 0
assert len(ap.source.data["left"]) == 0
20 changes: 20 additions & 0 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,12 +788,14 @@ async def test_TaskGroupGraph_arrows(c, s, a, b):
@gen_cluster(
client=True,
config={
"distributed.worker.profile.enabled": True,
"distributed.worker.profile.interval": "10ms",
"distributed.worker.profile.cycle": "50ms",
},
)
async def test_profile_server(c, s, a, b):
ptp = ProfileServer(s)
assert "disabled" not in ptp.subtitle.text
start = time()
await asyncio.sleep(0.100)
while len(ptp.ts_source.data["time"]) < 2:
Expand All @@ -802,6 +804,24 @@ async def test_profile_server(c, s, a, b):
assert time() < start + 2


@pytest.mark.slow
@gen_cluster(
client=True,
config={
"distributed.worker.profile.enabled": False,
"distributed.worker.profile.interval": "5ms",
"distributed.worker.profile.cycle": "10ms",
},
)
async def test_profile_server_disabled(c, s, a, b):
ptp = ProfileServer(s)
assert "disabled" in ptp.subtitle.text
start = time()
await asyncio.sleep(0.1)
ptp.trigger_update()
assert len(ptp.ts_source.data["time"]) == 0


@gen_cluster(client=True, scheduler_kwargs={"dashboard": True})
async def test_root_redirect(c, s, a, b):
http_client = AsyncHTTPClient()
Expand Down
4 changes: 4 additions & 0 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,10 @@ properties:
This data gets collected into statistical profiling information,
which is then periodically bundled together and sent along to the scheduler.
properties:
enabled:
type: boolean
description: |
Whether or not to enable profiling
interval:
type: string
description: |
Expand Down
1 change: 1 addition & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ distributed:
restart: False # Do we ressurrect the worker after the lifetime deadline?

profile:
enabled: True # Whether or not to enable profiling
interval: 10ms # Time between statistical profiling queries
cycle: 1000ms # Time between starting new profile
low-level: False # Whether or not to include low-level functions
Expand Down
5 changes: 4 additions & 1 deletion distributed/tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,10 @@ def check(dask_worker):
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)],
config={"distributed.worker.profile.interval": "1ms"},
config={
"distributed.worker.profile.enabled": True,
"distributed.worker.profile.interval": "1ms",
},
)
async def test_actors_in_profile(c, s, a):
class Sleeper:
Expand Down
65 changes: 62 additions & 3 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5422,7 +5422,13 @@ async def test_call_stack_collections_all(c, s, a, b):


@pytest.mark.flaky(condition=WINDOWS, reruns=10, reruns_delay=5)
@gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "100ms"})
@gen_cluster(
client=True,
config={
"distributed.worker.profile.enabled": True,
"distributed.worker.profile.cycle": "100ms",
},
)
async def test_profile(c, s, a, b):
futures = c.map(slowinc, range(10), delay=0.05, workers=a.address)
await wait(futures)
Expand All @@ -5444,7 +5450,33 @@ async def test_profile(c, s, a, b):
assert not result["count"]


@gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "100ms"})
@gen_cluster(
client=True,
config={
"distributed.worker.profile.enabled": False,
"distributed.worker.profile.cycle": "100ms",
},
)
async def test_profile_disabled(c, s, a, b):
futures = c.map(slowinc, range(10), delay=0.05, workers=a.address)
await wait(futures)

x = await c.profile(start=time() + 10, stop=time() + 20)
assert x["count"] == 0

x = await c.profile(start=0, stop=time())
assert x["count"] == 0

y = await c.profile(start=time() - 0.300, stop=time())
assert 0 == y["count"] == x["count"]


@gen_cluster(
client=True,
config={
"distributed.worker.profile.cycle": "100ms",
},
)
async def test_profile_keys(c, s, a, b):
x = c.map(slowinc, range(10), delay=0.05, workers=a.address)
y = c.map(slowdec, range(10), delay=0.05, workers=a.address)
Expand Down Expand Up @@ -6169,7 +6201,13 @@ async def test_futures_of_sorted(c, s, a, b):


@pytest.mark.flaky(reruns=10, reruns_delay=5)
@gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "10ms"})
@gen_cluster(
client=True,
config={
"distributed.worker.profile.enabled": True,
"distributed.worker.profile.cycle": "10ms",
},
)
async def test_profile_server(c, s, a, b):
for i in range(5):
try:
Expand All @@ -6193,6 +6231,27 @@ async def test_profile_server(c, s, a, b):
break


@gen_cluster(
client=True,
config={
"distributed.worker.profile.enabled": False,
"distributed.worker.profile.cycle": "10ms",
},
)
async def test_profile_server_disabled(c, s, a, b):
x = c.map(slowinc, range(10), delay=0.01, workers=a.address, pure=False)
await wait(x)
await asyncio.gather(
c.run(slowinc, 1, delay=0.5), c.run_on_scheduler(slowdec, 1, delay=0.5)
)

p = await c.profile(server=True) # All worker servers
assert "slowinc" not in str(p)

p = await c.profile(scheduler=True) # Scheduler
assert "slowdec" not in str(p)


@gen_cluster(client=True)
async def test_await_future(c, s, a, b):
future = c.submit(inc, 1)
Expand Down
18 changes: 16 additions & 2 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1300,7 +1300,13 @@ async def test_profile_metadata(c, s, a, b):
assert not meta["counts"][-1][1]


@gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "100ms"})
@gen_cluster(
client=True,
config={
"distributed.worker.profile.enabled": True,
"distributed.worker.profile.cycle": "100ms",
},
)
async def test_profile_metadata_timeout(c, s, a, b):
start = time() - 1

Expand All @@ -1321,7 +1327,13 @@ def raise_timeout(*args, **kwargs):
assert not meta["counts"][-1][1]


@gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "100ms"})
@gen_cluster(
client=True,
config={
"distributed.worker.profile.enabled": True,
"distributed.worker.profile.cycle": "100ms",
},
)
async def test_profile_metadata_keys(c, s, a, b):
x = c.map(slowinc, range(10), delay=0.05)
y = c.map(slowdec, range(10), delay=0.05)
Expand All @@ -1337,6 +1349,7 @@ async def test_profile_metadata_keys(c, s, a, b):
@gen_cluster(
client=True,
config={
"distributed.worker.profile.enabled": True,
"distributed.worker.profile.interval": "1ms",
"distributed.worker.profile.cycle": "100ms",
},
Expand All @@ -1353,6 +1366,7 @@ async def test_statistical_profiling(c, s, a, b):
@gen_cluster(
client=True,
config={
"distributed.worker.profile.enabled": True,
"distributed.worker.profile.interval": "1ms",
"distributed.worker.profile.cycle": "100ms",
},
Expand Down
Loading