diff --git a/distributed/core.py b/distributed/core.py index 2a2a12521ff..f18b8f0b56e 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -10,7 +10,7 @@ import uuid import warnings import weakref -from collections import defaultdict +from collections import defaultdict, deque from collections.abc import Container from contextlib import suppress from enum import Enum @@ -195,18 +195,21 @@ def __init__( self.loop = self.io_loop if not hasattr(self.io_loop, "profile"): - ref = weakref.ref(self.io_loop) - - def stop() -> bool: - loop = ref() - return loop is None or loop.asyncio_loop.is_closed() - - self.io_loop.profile = profile.watch( - omit=("profile.py", "selectors.py"), - interval=dask.config.get("distributed.worker.profile.interval"), - cycle=dask.config.get("distributed.worker.profile.cycle"), - stop=stop, - ) + if dask.config.get("distributed.worker.profile.enabled"): + ref = weakref.ref(self.io_loop) + + def stop() -> bool: + loop = ref() + return loop is None or loop.asyncio_loop.is_closed() + + self.io_loop.profile = profile.watch( + omit=("profile.py", "selectors.py"), + interval=dask.config.get("distributed.worker.profile.interval"), + cycle=dask.config.get("distributed.worker.profile.cycle"), + stop=stop, + ) + else: + self.io_loop.profile = deque() # Statistics counters for various events with suppress(ImportError): diff --git a/distributed/dashboard/components/shared.py b/distributed/dashboard/components/shared.py index f07f8b1f07f..28f9110a773 100644 --- a/distributed/dashboard/components/shared.py +++ b/distributed/dashboard/components/shared.py @@ -14,6 +14,7 @@ NumeralTickFormatter, Range1d, Select, + Title, ) from bokeh.palettes import Spectral9 from bokeh.plotting import figure @@ -193,7 +194,6 @@ def __init__(self, server, doc=None, **kwargs): data = profile.plot_data(self.state, profile_interval) self.states = data.pop("states") self.profile_plot, self.source = profile.plot_figure(data, **kwargs) - changing = [False] # avoid repeated changes from within callback @without_property_validation @@ -270,6 +270,14 @@ def select_cb(attr, old, new): **kwargs, ) + self.subtitle = Title(text=" ", text_font_style="italic") + self.profile_plot.add_layout(self.subtitle, "above") + if not dask.config.get("distributed.worker.profile.enabled"): + self.subtitle.text = "Profiling is disabled." + self.select.disabled = True + self.reset_button.disabled = True + self.update_button.disabled = True + @without_property_validation @log_errors def update(self, state, metadata=None): @@ -388,6 +396,13 @@ def ts_change(attr, old, new): **kwargs, ) + self.subtitle = Title(text=" ", text_font_style="italic") + self.profile_plot.add_layout(self.subtitle, "above") + if not dask.config.get("distributed.worker.profile.enabled"): + self.subtitle.text = "Profiling is disabled." + self.reset_button.disabled = True + self.update_button.disabled = True + @without_property_validation @log_errors def update(self, state): diff --git a/distributed/dashboard/tests/test_components.py b/distributed/dashboard/tests/test_components.py index f268c279b32..fb4487831e0 100644 --- a/distributed/dashboard/tests/test_components.py +++ b/distributed/dashboard/tests/test_components.py @@ -1,5 +1,3 @@ -import asyncio - import pytest pytest.importorskip("bokeh") @@ -21,7 +19,11 @@ def test_basic(Component): assert isinstance(c.root, Model) -@gen_cluster(client=True, clean_kwargs={"threads": False}) +@gen_cluster( + client=True, + clean_kwargs={"threads": False}, + config={"distributed.worker.profile.enabled": True}, +) async def test_profile_plot(c, s, a, b): p = ProfilePlot() assert not p.source.data["left"] @@ -30,20 +32,54 @@ async def test_profile_plot(c, s, a, b): p.update(a.profile_recent) -@gen_cluster(client=True, clean_kwargs={"threads": False}) +@gen_cluster( + client=True, + clean_kwargs={"threads": False}, + config={ + "distributed.worker.profile.enabled": True, + "distributed.worker.profile.interval": "10ms", + "distributed.worker.profile.cycle": "50ms", + }, +) async def test_profile_time_plot(c, s, a, b): from bokeh.io import curdoc sp = ProfileTimePlot(s, doc=curdoc()) + assert "disabled" not in sp.subtitle.text sp.trigger_update() ap = ProfileTimePlot(a, doc=curdoc()) + assert "disabled" not in sp.subtitle.text ap.trigger_update() - assert not len(sp.source.data["left"]) - assert not len(ap.source.data["left"]) + assert len(sp.source.data["left"]) == 0 + assert len(ap.source.data["left"]) == 0 await c.gather(c.map(slowinc, range(10), delay=0.05)) + ap.trigger_update() sp.trigger_update() - await asyncio.sleep(0.05) + + +@gen_cluster( + client=True, + clean_kwargs={"threads": False}, + config={ + "distributed.worker.profile.enabled": False, + "distributed.worker.profile.interval": "10ms", + "distributed.worker.profile.cycle": "50ms", + }, +) +async def test_profile_time_plot_disabled(c, s, a, b): + from bokeh.io import curdoc + + sp = ProfileTimePlot(s, doc=curdoc()) + assert "disabled" in sp.subtitle.text + sp.trigger_update() + + ap = ProfileTimePlot(a, doc=curdoc()) + assert "disabled" in sp.subtitle.text + ap.trigger_update() + + assert len(sp.source.data["left"]) == 0 + assert len(ap.source.data["left"]) == 0 diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index 88db7db032d..5a8219c15ad 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -788,12 +788,14 @@ async def test_TaskGroupGraph_arrows(c, s, a, b): @gen_cluster( client=True, config={ + "distributed.worker.profile.enabled": True, "distributed.worker.profile.interval": "10ms", "distributed.worker.profile.cycle": "50ms", }, ) async def test_profile_server(c, s, a, b): ptp = ProfileServer(s) + assert "disabled" not in ptp.subtitle.text start = time() await asyncio.sleep(0.100) while len(ptp.ts_source.data["time"]) < 2: @@ -802,6 +804,24 @@ async def test_profile_server(c, s, a, b): assert time() < start + 2 +@pytest.mark.slow +@gen_cluster( + client=True, + config={ + "distributed.worker.profile.enabled": False, + "distributed.worker.profile.interval": "5ms", + "distributed.worker.profile.cycle": "10ms", + }, +) +async def test_profile_server_disabled(c, s, a, b): + ptp = ProfileServer(s) + assert "disabled" in ptp.subtitle.text + start = time() + await asyncio.sleep(0.1) + ptp.trigger_update() + assert len(ptp.ts_source.data["time"]) == 0 + + @gen_cluster(client=True, scheduler_kwargs={"dashboard": True}) async def test_root_redirect(c, s, a, b): http_client = AsyncHTTPClient() diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index 942eab8ff9a..ea74e045459 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -405,6 +405,10 @@ properties: This data gets collected into statistical profiling information, which is then periodically bundled together and sent along to the scheduler. properties: + enabled: + type: boolean + description: | + Whether or not to enable profiling interval: type: string description: | diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 74d59addb35..49645a9d049 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -91,6 +91,7 @@ distributed: restart: False # Do we ressurrect the worker after the lifetime deadline? profile: + enabled: True # Whether or not to enable profiling interval: 10ms # Time between statistical profiling queries cycle: 1000ms # Time between starting new profile low-level: False # Whether or not to include low-level functions diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index 582180a8869..2906f83c0d8 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -530,7 +530,10 @@ def check(dask_worker): @gen_cluster( client=True, nthreads=[("127.0.0.1", 1)], - config={"distributed.worker.profile.interval": "1ms"}, + config={ + "distributed.worker.profile.enabled": True, + "distributed.worker.profile.interval": "1ms", + }, ) async def test_actors_in_profile(c, s, a): class Sleeper: diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index b82f36e27f5..fc785b9b005 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -5422,7 +5422,13 @@ async def test_call_stack_collections_all(c, s, a, b): @pytest.mark.flaky(condition=WINDOWS, reruns=10, reruns_delay=5) -@gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "100ms"}) +@gen_cluster( + client=True, + config={ + "distributed.worker.profile.enabled": True, + "distributed.worker.profile.cycle": "100ms", + }, +) async def test_profile(c, s, a, b): futures = c.map(slowinc, range(10), delay=0.05, workers=a.address) await wait(futures) @@ -5444,7 +5450,33 @@ async def test_profile(c, s, a, b): assert not result["count"] -@gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "100ms"}) +@gen_cluster( + client=True, + config={ + "distributed.worker.profile.enabled": False, + "distributed.worker.profile.cycle": "100ms", + }, +) +async def test_profile_disabled(c, s, a, b): + futures = c.map(slowinc, range(10), delay=0.05, workers=a.address) + await wait(futures) + + x = await c.profile(start=time() + 10, stop=time() + 20) + assert x["count"] == 0 + + x = await c.profile(start=0, stop=time()) + assert x["count"] == 0 + + y = await c.profile(start=time() - 0.300, stop=time()) + assert 0 == y["count"] == x["count"] + + +@gen_cluster( + client=True, + config={ + "distributed.worker.profile.cycle": "100ms", + }, +) async def test_profile_keys(c, s, a, b): x = c.map(slowinc, range(10), delay=0.05, workers=a.address) y = c.map(slowdec, range(10), delay=0.05, workers=a.address) @@ -6169,7 +6201,13 @@ async def test_futures_of_sorted(c, s, a, b): @pytest.mark.flaky(reruns=10, reruns_delay=5) -@gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "10ms"}) +@gen_cluster( + client=True, + config={ + "distributed.worker.profile.enabled": True, + "distributed.worker.profile.cycle": "10ms", + }, +) async def test_profile_server(c, s, a, b): for i in range(5): try: @@ -6193,6 +6231,27 @@ async def test_profile_server(c, s, a, b): break +@gen_cluster( + client=True, + config={ + "distributed.worker.profile.enabled": False, + "distributed.worker.profile.cycle": "10ms", + }, +) +async def test_profile_server_disabled(c, s, a, b): + x = c.map(slowinc, range(10), delay=0.01, workers=a.address, pure=False) + await wait(x) + await asyncio.gather( + c.run(slowinc, 1, delay=0.5), c.run_on_scheduler(slowdec, 1, delay=0.5) + ) + + p = await c.profile(server=True) # All worker servers + assert "slowinc" not in str(p) + + p = await c.profile(scheduler=True) # Scheduler + assert "slowdec" not in str(p) + + @gen_cluster(client=True) async def test_await_future(c, s, a, b): future = c.submit(inc, 1) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index e304a3959ec..90cd3f2c9b8 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -1300,7 +1300,13 @@ async def test_profile_metadata(c, s, a, b): assert not meta["counts"][-1][1] -@gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "100ms"}) +@gen_cluster( + client=True, + config={ + "distributed.worker.profile.enabled": True, + "distributed.worker.profile.cycle": "100ms", + }, +) async def test_profile_metadata_timeout(c, s, a, b): start = time() - 1 @@ -1321,7 +1327,13 @@ def raise_timeout(*args, **kwargs): assert not meta["counts"][-1][1] -@gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "100ms"}) +@gen_cluster( + client=True, + config={ + "distributed.worker.profile.enabled": True, + "distributed.worker.profile.cycle": "100ms", + }, +) async def test_profile_metadata_keys(c, s, a, b): x = c.map(slowinc, range(10), delay=0.05) y = c.map(slowdec, range(10), delay=0.05) @@ -1337,6 +1349,7 @@ async def test_profile_metadata_keys(c, s, a, b): @gen_cluster( client=True, config={ + "distributed.worker.profile.enabled": True, "distributed.worker.profile.interval": "1ms", "distributed.worker.profile.cycle": "100ms", }, @@ -1353,6 +1366,7 @@ async def test_statistical_profiling(c, s, a, b): @gen_cluster( client=True, config={ + "distributed.worker.profile.enabled": True, "distributed.worker.profile.interval": "1ms", "distributed.worker.profile.cycle": "100ms", }, diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index d5b5966cc24..2488fbc447a 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1074,7 +1074,12 @@ async def test_scheduler_delay(c, s, a, b): @pytest.mark.flaky(reruns=10, reruns_delay=5) -@gen_cluster(client=True) +@gen_cluster( + client=True, + config={ + "distributed.worker.profile.enabled": True, + }, +) async def test_statistical_profiling(c, s, a, b): futures = c.map(slowinc, range(10), delay=0.1) await wait(futures) @@ -1089,6 +1094,7 @@ async def test_statistical_profiling(c, s, a, b): client=True, timeout=30, config={ + "distributed.worker.profile.enabled": True, "distributed.worker.profile.interval": "1ms", "distributed.worker.profile.cycle": "100ms", }, @@ -1106,7 +1112,13 @@ async def test_statistical_profiling_2(c, s, a, b): break -@gen_cluster(client=True, worker_kwargs={"profile_cycle_interval": "50 ms"}) +@gen_cluster( + client=True, + config={ + "distributed.worker.profile.enabled": True, + "distributed.worker.profile.cycle": "100ms", + }, +) async def test_statistical_profiling_cycle(c, s, a, b): futures = c.map(slowinc, range(20), delay=0.05) await wait(futures) diff --git a/distributed/utils_test.py b/distributed/utils_test.py index a2780f15f1f..cd07e867da0 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -1888,6 +1888,7 @@ def _reconfigure(): { "distributed.comm.timeouts.connect": "5s", "distributed.admin.tick.interval": "500 ms", + "distributed.worker.profile.enabled": False, } ): # Restore default logging levels diff --git a/distributed/worker.py b/distributed/worker.py index e47d7bbf432..a8f4b6ba942 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -875,14 +875,15 @@ def __init__( setproctitle("dask-worker [not started]") - profile_trigger_interval = parse_timedelta( - dask.config.get("distributed.worker.profile.interval"), default="ms" - ) - pc = PeriodicCallback(self.trigger_profile, profile_trigger_interval * 1000) - self.periodic_callbacks["profile"] = pc + if dask.config.get("distributed.worker.profile.enabled"): + profile_trigger_interval = parse_timedelta( + dask.config.get("distributed.worker.profile.interval"), default="ms" + ) + pc = PeriodicCallback(self.trigger_profile, profile_trigger_interval * 1000) + self.periodic_callbacks["profile"] = pc - pc = PeriodicCallback(self.cycle_profile, profile_cycle_interval * 1000) - self.periodic_callbacks["profile-cycle"] = pc + pc = PeriodicCallback(self.cycle_profile, profile_cycle_interval * 1000) + self.periodic_callbacks["profile-cycle"] = pc self.plugins = {} self._pending_plugins = plugins