Skip to content

Commit

Permalink
Add config option to disable profiling and disable it in many tests p…
Browse files Browse the repository at this point in the history
…er default (#6490)
  • Loading branch information
hendrikmakait authored Jun 2, 2022
1 parent 69b798d commit 6d85a85
Show file tree
Hide file tree
Showing 12 changed files with 205 additions and 36 deletions.
29 changes: 16 additions & 13 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import uuid
import warnings
import weakref
from collections import defaultdict
from collections import defaultdict, deque
from collections.abc import Container
from contextlib import suppress
from enum import Enum
Expand Down Expand Up @@ -195,18 +195,21 @@ def __init__(
self.loop = self.io_loop

if not hasattr(self.io_loop, "profile"):
ref = weakref.ref(self.io_loop)

def stop() -> bool:
loop = ref()
return loop is None or loop.asyncio_loop.is_closed()

self.io_loop.profile = profile.watch(
omit=("profile.py", "selectors.py"),
interval=dask.config.get("distributed.worker.profile.interval"),
cycle=dask.config.get("distributed.worker.profile.cycle"),
stop=stop,
)
if dask.config.get("distributed.worker.profile.enabled"):
ref = weakref.ref(self.io_loop)

def stop() -> bool:
loop = ref()
return loop is None or loop.asyncio_loop.is_closed()

self.io_loop.profile = profile.watch(
omit=("profile.py", "selectors.py"),
interval=dask.config.get("distributed.worker.profile.interval"),
cycle=dask.config.get("distributed.worker.profile.cycle"),
stop=stop,
)
else:
self.io_loop.profile = deque()

# Statistics counters for various events
with suppress(ImportError):
Expand Down
17 changes: 16 additions & 1 deletion distributed/dashboard/components/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
NumeralTickFormatter,
Range1d,
Select,
Title,
)
from bokeh.palettes import Spectral9
from bokeh.plotting import figure
Expand Down Expand Up @@ -193,7 +194,6 @@ def __init__(self, server, doc=None, **kwargs):
data = profile.plot_data(self.state, profile_interval)
self.states = data.pop("states")
self.profile_plot, self.source = profile.plot_figure(data, **kwargs)

changing = [False] # avoid repeated changes from within callback

@without_property_validation
Expand Down Expand Up @@ -270,6 +270,14 @@ def select_cb(attr, old, new):
**kwargs,
)

self.subtitle = Title(text=" ", text_font_style="italic")
self.profile_plot.add_layout(self.subtitle, "above")
if not dask.config.get("distributed.worker.profile.enabled"):
self.subtitle.text = "Profiling is disabled."
self.select.disabled = True
self.reset_button.disabled = True
self.update_button.disabled = True

@without_property_validation
@log_errors
def update(self, state, metadata=None):
Expand Down Expand Up @@ -388,6 +396,13 @@ def ts_change(attr, old, new):
**kwargs,
)

self.subtitle = Title(text=" ", text_font_style="italic")
self.profile_plot.add_layout(self.subtitle, "above")
if not dask.config.get("distributed.worker.profile.enabled"):
self.subtitle.text = "Profiling is disabled."
self.reset_button.disabled = True
self.update_button.disabled = True

@without_property_validation
@log_errors
def update(self, state):
Expand Down
50 changes: 43 additions & 7 deletions distributed/dashboard/tests/test_components.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import asyncio

import pytest

pytest.importorskip("bokeh")
Expand All @@ -21,7 +19,11 @@ def test_basic(Component):
assert isinstance(c.root, Model)


@gen_cluster(client=True, clean_kwargs={"threads": False})
@gen_cluster(
client=True,
clean_kwargs={"threads": False},
config={"distributed.worker.profile.enabled": True},
)
async def test_profile_plot(c, s, a, b):
p = ProfilePlot()
assert not p.source.data["left"]
Expand All @@ -30,20 +32,54 @@ async def test_profile_plot(c, s, a, b):
p.update(a.profile_recent)


@gen_cluster(client=True, clean_kwargs={"threads": False})
@gen_cluster(
client=True,
clean_kwargs={"threads": False},
config={
"distributed.worker.profile.enabled": True,
"distributed.worker.profile.interval": "10ms",
"distributed.worker.profile.cycle": "50ms",
},
)
async def test_profile_time_plot(c, s, a, b):
from bokeh.io import curdoc

sp = ProfileTimePlot(s, doc=curdoc())
assert "disabled" not in sp.subtitle.text
sp.trigger_update()

ap = ProfileTimePlot(a, doc=curdoc())
assert "disabled" not in sp.subtitle.text
ap.trigger_update()

assert not len(sp.source.data["left"])
assert not len(ap.source.data["left"])
assert len(sp.source.data["left"]) == 0
assert len(ap.source.data["left"]) == 0

await c.gather(c.map(slowinc, range(10), delay=0.05))

ap.trigger_update()
sp.trigger_update()
await asyncio.sleep(0.05)


@gen_cluster(
client=True,
clean_kwargs={"threads": False},
config={
"distributed.worker.profile.enabled": False,
"distributed.worker.profile.interval": "10ms",
"distributed.worker.profile.cycle": "50ms",
},
)
async def test_profile_time_plot_disabled(c, s, a, b):
from bokeh.io import curdoc

sp = ProfileTimePlot(s, doc=curdoc())
assert "disabled" in sp.subtitle.text
sp.trigger_update()

ap = ProfileTimePlot(a, doc=curdoc())
assert "disabled" in sp.subtitle.text
ap.trigger_update()

assert len(sp.source.data["left"]) == 0
assert len(ap.source.data["left"]) == 0
20 changes: 20 additions & 0 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,12 +788,14 @@ async def test_TaskGroupGraph_arrows(c, s, a, b):
@gen_cluster(
client=True,
config={
"distributed.worker.profile.enabled": True,
"distributed.worker.profile.interval": "10ms",
"distributed.worker.profile.cycle": "50ms",
},
)
async def test_profile_server(c, s, a, b):
ptp = ProfileServer(s)
assert "disabled" not in ptp.subtitle.text
start = time()
await asyncio.sleep(0.100)
while len(ptp.ts_source.data["time"]) < 2:
Expand All @@ -802,6 +804,24 @@ async def test_profile_server(c, s, a, b):
assert time() < start + 2


@pytest.mark.slow
@gen_cluster(
client=True,
config={
"distributed.worker.profile.enabled": False,
"distributed.worker.profile.interval": "5ms",
"distributed.worker.profile.cycle": "10ms",
},
)
async def test_profile_server_disabled(c, s, a, b):
ptp = ProfileServer(s)
assert "disabled" in ptp.subtitle.text
start = time()
await asyncio.sleep(0.1)
ptp.trigger_update()
assert len(ptp.ts_source.data["time"]) == 0


@gen_cluster(client=True, scheduler_kwargs={"dashboard": True})
async def test_root_redirect(c, s, a, b):
http_client = AsyncHTTPClient()
Expand Down
4 changes: 4 additions & 0 deletions distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,10 @@ properties:
This data gets collected into statistical profiling information,
which is then periodically bundled together and sent along to the scheduler.
properties:
enabled:
type: boolean
description: |
Whether or not to enable profiling
interval:
type: string
description: |
Expand Down
1 change: 1 addition & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ distributed:
restart: False # Do we ressurrect the worker after the lifetime deadline?

profile:
enabled: True # Whether or not to enable profiling
interval: 10ms # Time between statistical profiling queries
cycle: 1000ms # Time between starting new profile
low-level: False # Whether or not to include low-level functions
Expand Down
5 changes: 4 additions & 1 deletion distributed/tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,10 @@ def check(dask_worker):
@gen_cluster(
client=True,
nthreads=[("127.0.0.1", 1)],
config={"distributed.worker.profile.interval": "1ms"},
config={
"distributed.worker.profile.enabled": True,
"distributed.worker.profile.interval": "1ms",
},
)
async def test_actors_in_profile(c, s, a):
class Sleeper:
Expand Down
65 changes: 62 additions & 3 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5422,7 +5422,13 @@ async def test_call_stack_collections_all(c, s, a, b):


@pytest.mark.flaky(condition=WINDOWS, reruns=10, reruns_delay=5)
@gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "100ms"})
@gen_cluster(
client=True,
config={
"distributed.worker.profile.enabled": True,
"distributed.worker.profile.cycle": "100ms",
},
)
async def test_profile(c, s, a, b):
futures = c.map(slowinc, range(10), delay=0.05, workers=a.address)
await wait(futures)
Expand All @@ -5444,7 +5450,33 @@ async def test_profile(c, s, a, b):
assert not result["count"]


@gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "100ms"})
@gen_cluster(
client=True,
config={
"distributed.worker.profile.enabled": False,
"distributed.worker.profile.cycle": "100ms",
},
)
async def test_profile_disabled(c, s, a, b):
futures = c.map(slowinc, range(10), delay=0.05, workers=a.address)
await wait(futures)

x = await c.profile(start=time() + 10, stop=time() + 20)
assert x["count"] == 0

x = await c.profile(start=0, stop=time())
assert x["count"] == 0

y = await c.profile(start=time() - 0.300, stop=time())
assert 0 == y["count"] == x["count"]


@gen_cluster(
client=True,
config={
"distributed.worker.profile.cycle": "100ms",
},
)
async def test_profile_keys(c, s, a, b):
x = c.map(slowinc, range(10), delay=0.05, workers=a.address)
y = c.map(slowdec, range(10), delay=0.05, workers=a.address)
Expand Down Expand Up @@ -6169,7 +6201,13 @@ async def test_futures_of_sorted(c, s, a, b):


@pytest.mark.flaky(reruns=10, reruns_delay=5)
@gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "10ms"})
@gen_cluster(
client=True,
config={
"distributed.worker.profile.enabled": True,
"distributed.worker.profile.cycle": "10ms",
},
)
async def test_profile_server(c, s, a, b):
for i in range(5):
try:
Expand All @@ -6193,6 +6231,27 @@ async def test_profile_server(c, s, a, b):
break


@gen_cluster(
client=True,
config={
"distributed.worker.profile.enabled": False,
"distributed.worker.profile.cycle": "10ms",
},
)
async def test_profile_server_disabled(c, s, a, b):
x = c.map(slowinc, range(10), delay=0.01, workers=a.address, pure=False)
await wait(x)
await asyncio.gather(
c.run(slowinc, 1, delay=0.5), c.run_on_scheduler(slowdec, 1, delay=0.5)
)

p = await c.profile(server=True) # All worker servers
assert "slowinc" not in str(p)

p = await c.profile(scheduler=True) # Scheduler
assert "slowdec" not in str(p)


@gen_cluster(client=True)
async def test_await_future(c, s, a, b):
future = c.submit(inc, 1)
Expand Down
18 changes: 16 additions & 2 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1300,7 +1300,13 @@ async def test_profile_metadata(c, s, a, b):
assert not meta["counts"][-1][1]


@gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "100ms"})
@gen_cluster(
client=True,
config={
"distributed.worker.profile.enabled": True,
"distributed.worker.profile.cycle": "100ms",
},
)
async def test_profile_metadata_timeout(c, s, a, b):
start = time() - 1

Expand All @@ -1321,7 +1327,13 @@ def raise_timeout(*args, **kwargs):
assert not meta["counts"][-1][1]


@gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "100ms"})
@gen_cluster(
client=True,
config={
"distributed.worker.profile.enabled": True,
"distributed.worker.profile.cycle": "100ms",
},
)
async def test_profile_metadata_keys(c, s, a, b):
x = c.map(slowinc, range(10), delay=0.05)
y = c.map(slowdec, range(10), delay=0.05)
Expand All @@ -1337,6 +1349,7 @@ async def test_profile_metadata_keys(c, s, a, b):
@gen_cluster(
client=True,
config={
"distributed.worker.profile.enabled": True,
"distributed.worker.profile.interval": "1ms",
"distributed.worker.profile.cycle": "100ms",
},
Expand All @@ -1353,6 +1366,7 @@ async def test_statistical_profiling(c, s, a, b):
@gen_cluster(
client=True,
config={
"distributed.worker.profile.enabled": True,
"distributed.worker.profile.interval": "1ms",
"distributed.worker.profile.cycle": "100ms",
},
Expand Down
Loading

0 comments on commit 6d85a85

Please sign in to comment.