Skip to content

Commit

Permalink
Track Event Loop intervals in dashboard plot (#5964)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin authored Mar 25, 2022
1 parent 6949c6d commit 2fbf9eb
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 5 deletions.
23 changes: 20 additions & 3 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,20 @@ def stop():
self.periodic_callbacks["monitor"] = pc

self._last_tick = time()
measure_tick_interval = parse_timedelta(
self._tick_counter = 0
self._tick_count = 0
self._tick_count_last = time()
self._tick_interval = parse_timedelta(
dask.config.get("distributed.admin.tick.interval"), default="ms"
)
pc = PeriodicCallback(self._measure_tick, measure_tick_interval * 1000)
self.periodic_callbacks["tick"] = pc
self._tick_interval_observed = self._tick_interval
self.periodic_callbacks["tick"] = PeriodicCallback(
self._measure_tick, self._tick_interval * 1000
)
self.periodic_callbacks["ticks"] = PeriodicCallback(
self._cycle_ticks,
parse_timedelta(dask.config.get("distributed.admin.tick.cycle")) * 1000,
)

self.thread_id = 0

Expand Down Expand Up @@ -351,6 +360,7 @@ def _measure_tick(self):
now = time()
diff = now - self._last_tick
self._last_tick = now
self._tick_counter += 1
if diff > tick_maximum_delay:
logger.info(
"Event loop was unresponsive in %s for %.2fs. "
Expand All @@ -363,6 +373,13 @@ def _measure_tick(self):
if self.digests is not None:
self.digests["tick-duration"].add(diff)

def _cycle_ticks(self):
if not self._tick_counter:
return
last, self._tick_count_last = self._tick_count_last, time()
count, self._tick_counter = self._tick_counter, 0
self._tick_interval_observed = (time() - last) / (count or 1)

@property
def address(self):
"""
Expand Down
53 changes: 53 additions & 0 deletions distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3018,6 +3018,59 @@ def update(self):
)


class EventLoop(DashboardComponent):
"""Event Loop Health"""

def __init__(self, scheduler, **kwargs):
with log_errors():
self.scheduler = scheduler
self.source = ColumnDataSource(
{
"names": ["Scheduler", "Workers"],
"values": [0, 0],
"text": ["0", "0"],
}
)

self.root = figure(
title="Event Loop Health",
x_range=["Scheduler", "Workers"],
y_range=[
0,
parse_timedelta(dask.config.get("distributed.admin.tick.interval"))
* 25,
],
tools="",
toolbar_location="above",
**kwargs,
)
self.root.vbar(x="names", top="values", width=0.9, source=self.source)

self.root.xaxis.minor_tick_line_alpha = 0
self.root.ygrid.visible = True
self.root.xgrid.visible = False

hover = HoverTool(tooltips=[("Interval", "@text s")], mode="vline")
self.root.add_tools(hover)

@without_property_validation
def update(self):
with log_errors():
s = self.scheduler

data = {
"names": ["Scheduler", "Workers"],
"values": [
s._tick_interval_observed,
sum([w.metrics["event_loop_interval"] for w in s.workers.values()])
/ (len(s.workers) or 1),
],
}
data["text"] = [format_time(x) for x in data["values"]]

update(self.source, data)


class WorkerTable(DashboardComponent):
"""Status of the current workers
Expand Down
2 changes: 2 additions & 0 deletions distributed/dashboard/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
ClusterMemory,
ComputePerKey,
CurrentLoad,
EventLoop,
MemoryByKey,
Occupancy,
SystemMonitor,
Expand Down Expand Up @@ -97,6 +98,7 @@
"/individual-compute-time-per-key": individual_doc(ComputePerKey, 500),
"/individual-aggregate-time-per-action": individual_doc(AggregateAction, 500),
"/individual-scheduler-system": individual_doc(SystemMonitor, 500),
"/individual-event-loop": individual_doc(EventLoop, 500),
"/individual-profile": individual_profile_doc,
"/individual-profile-server": individual_profile_server_doc,
"/individual-gpu-memory": gpu_memory_doc,
Expand Down
9 changes: 8 additions & 1 deletion distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
ClusterMemory,
ComputePerKey,
CurrentLoad,
EventLoop,
Events,
MemoryByKey,
Occupancy,
Expand Down Expand Up @@ -75,7 +76,13 @@ async def test_simple(c, s, a, b):

@gen_cluster(client=True, worker_kwargs={"dashboard": True})
async def test_basic(c, s, a, b):
for component in [TaskStream, SystemMonitor, Occupancy, StealingTimeSeries]:
for component in [
TaskStream,
SystemMonitor,
Occupancy,
StealingTimeSeries,
EventLoop,
]:
ss = component(s)

ss.update()
Expand Down
5 changes: 4 additions & 1 deletion distributed/distributed-schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ properties:
- {type: number, minimum: 0}
- enum: [false]
description: >-
Limit of number of bytes to be spilled on disk.
Limit of number of bytes to be spilled on disk.
monitor-interval:
type: string
Expand Down Expand Up @@ -976,6 +976,9 @@ properties:
limit :
type: string
description: The time allowed before triggering a warning
cycle :
type: string
description: The time in between verifying event loop speed

max-error-length:
type: integer
Expand Down
1 change: 1 addition & 0 deletions distributed/distributed.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ distributed:
tick:
interval: 20ms # time between event loop health checks
limit: 3s # time allowed before triggering a warning
cycle: 1s # time between checking event loop speed

max-error-length: 10000 # Maximum size traceback after error to return
log-length: 10000 # default length of logs to keep in memory
Expand Down
19 changes: 19 additions & 0 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3306,3 +3306,22 @@ async def test_Worker__to_dict(c, s, a):
}
assert d["tasks"]["x"]["key"] == "x"
assert d["data"] == ["x"]


@gen_cluster(
client=True,
config={
"distributed.admin.tick.interval": "5ms",
"distributed.admin.tick.cycle": "100ms",
},
)
async def test_tick_interval(c, s, a, b):
import time

await a.heartbeat()
x = s.workers[a.address].metrics["event_loop_interval"]
while s.workers[a.address].metrics["event_loop_interval"] > 0.050:
await asyncio.sleep(0.01)
while s.workers[a.address].metrics["event_loop_interval"] < 0.100:
await asyncio.sleep(0.01)
time.sleep(0.200)
1 change: 1 addition & 0 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,7 @@ async def get_metrics(self) -> dict:
"memory": spilled_memory,
"disk": spilled_disk,
},
event_loop_interval=self._tick_interval_observed,
)
out.update(self.monitor.recent())

Expand Down

0 comments on commit 2fbf9eb

Please sign in to comment.