From 2fbf9ebef4ea862b72bd221f63d2785016d0086c Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Fri, 25 Mar 2022 13:38:03 -0500 Subject: [PATCH] Track Event Loop intervals in dashboard plot (#5964) --- distributed/core.py | 23 ++++++-- distributed/dashboard/components/scheduler.py | 53 +++++++++++++++++++ distributed/dashboard/scheduler.py | 2 + .../dashboard/tests/test_scheduler_bokeh.py | 9 +++- distributed/distributed-schema.yaml | 5 +- distributed/distributed.yaml | 1 + distributed/tests/test_worker.py | 19 +++++++ distributed/worker.py | 1 + 8 files changed, 108 insertions(+), 5 deletions(-) diff --git a/distributed/core.py b/distributed/core.py index 6d043c62b1d..419205ef4a0 100644 --- a/distributed/core.py +++ b/distributed/core.py @@ -234,11 +234,20 @@ def stop(): self.periodic_callbacks["monitor"] = pc self._last_tick = time() - measure_tick_interval = parse_timedelta( + self._tick_counter = 0 + self._tick_count = 0 + self._tick_count_last = time() + self._tick_interval = parse_timedelta( dask.config.get("distributed.admin.tick.interval"), default="ms" ) - pc = PeriodicCallback(self._measure_tick, measure_tick_interval * 1000) - self.periodic_callbacks["tick"] = pc + self._tick_interval_observed = self._tick_interval + self.periodic_callbacks["tick"] = PeriodicCallback( + self._measure_tick, self._tick_interval * 1000 + ) + self.periodic_callbacks["ticks"] = PeriodicCallback( + self._cycle_ticks, + parse_timedelta(dask.config.get("distributed.admin.tick.cycle")) * 1000, + ) self.thread_id = 0 @@ -351,6 +360,7 @@ def _measure_tick(self): now = time() diff = now - self._last_tick self._last_tick = now + self._tick_counter += 1 if diff > tick_maximum_delay: logger.info( "Event loop was unresponsive in %s for %.2fs. " @@ -363,6 +373,13 @@ def _measure_tick(self): if self.digests is not None: self.digests["tick-duration"].add(diff) + def _cycle_ticks(self): + if not self._tick_counter: + return + last, self._tick_count_last = self._tick_count_last, time() + count, self._tick_counter = self._tick_counter, 0 + self._tick_interval_observed = (time() - last) / (count or 1) + @property def address(self): """ diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index bab3cb2b1e5..7fa639c799d 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -3018,6 +3018,59 @@ def update(self): ) +class EventLoop(DashboardComponent): + """Event Loop Health""" + + def __init__(self, scheduler, **kwargs): + with log_errors(): + self.scheduler = scheduler + self.source = ColumnDataSource( + { + "names": ["Scheduler", "Workers"], + "values": [0, 0], + "text": ["0", "0"], + } + ) + + self.root = figure( + title="Event Loop Health", + x_range=["Scheduler", "Workers"], + y_range=[ + 0, + parse_timedelta(dask.config.get("distributed.admin.tick.interval")) + * 25, + ], + tools="", + toolbar_location="above", + **kwargs, + ) + self.root.vbar(x="names", top="values", width=0.9, source=self.source) + + self.root.xaxis.minor_tick_line_alpha = 0 + self.root.ygrid.visible = True + self.root.xgrid.visible = False + + hover = HoverTool(tooltips=[("Interval", "@text s")], mode="vline") + self.root.add_tools(hover) + + @without_property_validation + def update(self): + with log_errors(): + s = self.scheduler + + data = { + "names": ["Scheduler", "Workers"], + "values": [ + s._tick_interval_observed, + sum([w.metrics["event_loop_interval"] for w in s.workers.values()]) + / (len(s.workers) or 1), + ], + } + data["text"] = [format_time(x) for x in data["values"]] + + update(self.source, data) + + class WorkerTable(DashboardComponent): """Status of the current workers diff --git a/distributed/dashboard/scheduler.py b/distributed/dashboard/scheduler.py index 3d8e62d95ff..42c50b732bc 100644 --- a/distributed/dashboard/scheduler.py +++ b/distributed/dashboard/scheduler.py @@ -16,6 +16,7 @@ ClusterMemory, ComputePerKey, CurrentLoad, + EventLoop, MemoryByKey, Occupancy, SystemMonitor, @@ -97,6 +98,7 @@ "/individual-compute-time-per-key": individual_doc(ComputePerKey, 500), "/individual-aggregate-time-per-action": individual_doc(AggregateAction, 500), "/individual-scheduler-system": individual_doc(SystemMonitor, 500), + "/individual-event-loop": individual_doc(EventLoop, 500), "/individual-profile": individual_profile_doc, "/individual-profile-server": individual_profile_server_doc, "/individual-gpu-memory": gpu_memory_doc, diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index 5242aa1eb80..566fe9377cf 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -23,6 +23,7 @@ ClusterMemory, ComputePerKey, CurrentLoad, + EventLoop, Events, MemoryByKey, Occupancy, @@ -75,7 +76,13 @@ async def test_simple(c, s, a, b): @gen_cluster(client=True, worker_kwargs={"dashboard": True}) async def test_basic(c, s, a, b): - for component in [TaskStream, SystemMonitor, Occupancy, StealingTimeSeries]: + for component in [ + TaskStream, + SystemMonitor, + Occupancy, + StealingTimeSeries, + EventLoop, + ]: ss = component(s) ss.update() diff --git a/distributed/distributed-schema.yaml b/distributed/distributed-schema.yaml index 2c6c0d5b566..6187fb342f9 100644 --- a/distributed/distributed-schema.yaml +++ b/distributed/distributed-schema.yaml @@ -510,7 +510,7 @@ properties: - {type: number, minimum: 0} - enum: [false] description: >- - Limit of number of bytes to be spilled on disk. + Limit of number of bytes to be spilled on disk. monitor-interval: type: string @@ -976,6 +976,9 @@ properties: limit : type: string description: The time allowed before triggering a warning + cycle : + type: string + description: The time in between verifying event loop speed max-error-length: type: integer diff --git a/distributed/distributed.yaml b/distributed/distributed.yaml index 27642579409..61b6522c11e 100644 --- a/distributed/distributed.yaml +++ b/distributed/distributed.yaml @@ -270,6 +270,7 @@ distributed: tick: interval: 20ms # time between event loop health checks limit: 3s # time allowed before triggering a warning + cycle: 1s # time between checking event loop speed max-error-length: 10000 # Maximum size traceback after error to return log-length: 10000 # default length of logs to keep in memory diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 0caa128c02b..b168592a14a 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -3306,3 +3306,22 @@ async def test_Worker__to_dict(c, s, a): } assert d["tasks"]["x"]["key"] == "x" assert d["data"] == ["x"] + + +@gen_cluster( + client=True, + config={ + "distributed.admin.tick.interval": "5ms", + "distributed.admin.tick.cycle": "100ms", + }, +) +async def test_tick_interval(c, s, a, b): + import time + + await a.heartbeat() + x = s.workers[a.address].metrics["event_loop_interval"] + while s.workers[a.address].metrics["event_loop_interval"] > 0.050: + await asyncio.sleep(0.01) + while s.workers[a.address].metrics["event_loop_interval"] < 0.100: + await asyncio.sleep(0.01) + time.sleep(0.200) diff --git a/distributed/worker.py b/distributed/worker.py index 13e5adeff00..338e824f3c4 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -946,6 +946,7 @@ async def get_metrics(self) -> dict: "memory": spilled_memory, "disk": spilled_disk, }, + event_loop_interval=self._tick_interval_observed, ) out.update(self.monitor.recent())