Skip to content

Commit

Permalink
Add a hardware benchmark to test memory, disk, and network bandwidths (
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin authored Mar 28, 2022
1 parent 2fbf9eb commit 06170d5
Show file tree
Hide file tree
Showing 11 changed files with 431 additions and 11 deletions.
13 changes: 13 additions & 0 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4041,6 +4041,19 @@ def get_worker_logs(self, n=None, workers=None, nanny=False):
"""
return self.sync(self.scheduler.worker_logs, n=n, workers=workers, nanny=nanny)

def benchmark_hardware(self) -> dict:
"""
Run a benchmark on the workers for memory, disk, and network bandwidths
Returns
-------
result: dict
A dictionary mapping the names "disk", "memory", and "network" to
dictionaries mapping sizes to bandwidths. These bandwidths are
averaged over many workers running computations across the cluster.
"""
return self.sync(self.scheduler.benchmark_hardware)

def log_event(self, topic, msg):
"""Log an event under a given topic
Expand Down
16 changes: 16 additions & 0 deletions distributed/compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,19 @@ async def to_thread(func, /, *args, **kwargs):
ctx = contextvars.copy_context()
func_call = functools.partial(ctx.run, func, *args, **kwargs)
return await loop.run_in_executor(None, func_call)


if sys.version_info >= (3, 9):
from random import randbytes
else:
try:
import numpy

def randbytes(size):
return numpy.random.randint(255, size=size, dtype="u8").tobytes()

except ImportError:
import secrets

def randbytes(size):
return secrets.token_bytes(size)
165 changes: 164 additions & 1 deletion distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
ColumnDataSource,
CustomJSHover,
DataRange1d,
FactorRange,
GroupFilter,
HoverTool,
NumberFormatter,
Expand Down Expand Up @@ -49,7 +50,14 @@

import dask
from dask import config
from dask.utils import format_bytes, format_time, funcname, key_split, parse_timedelta
from dask.utils import (
format_bytes,
format_time,
funcname,
key_split,
parse_bytes,
parse_timedelta,
)

from distributed.dashboard.components import add_periodic_callback
from distributed.dashboard.components.shared import (
Expand Down Expand Up @@ -559,6 +567,148 @@ def update(self):
update(self.source, d)


class Hardware(DashboardComponent):
"""Occupancy (in time) per worker"""

def __init__(self, scheduler, **kwargs):
with log_errors():
self.scheduler = scheduler
# Disk
self.disk_source = ColumnDataSource(
{
"size": [],
"bandwidth": [],
}
)

self.disk_figure = figure(
title="Disk Bandwidth -- Computing ...",
tools="",
toolbar_location="above",
x_range=FactorRange(factors=[]),
**kwargs,
)
self.disk_figure.vbar(
x="size", top="bandwidth", width=0.9, source=self.disk_source
)
hover = HoverTool(
mode="vline", tooltips=[("Bandwidth", "@bandwidth{0.00 b}/s")]
)
self.disk_figure.add_tools(hover)
self.disk_figure.yaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.disk_figure.xgrid.visible = False

# Memory
self.memory_source = ColumnDataSource(
{
"size": [],
"bandwidth": [],
}
)

self.memory_figure = figure(
title="Memory Bandwidth -- Computing ...",
tools="",
toolbar_location="above",
x_range=FactorRange(factors=[]),
**kwargs,
)

self.memory_figure.vbar(
x="size", top="bandwidth", width=0.9, source=self.memory_source
)
hover = HoverTool(
mode="vline", tooltips=[("Bandwidth", "@bandwidth{0.00 b}/s")]
)
self.memory_figure.add_tools(hover)
self.memory_figure.yaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.memory_figure.xgrid.visible = False

# Network
self.network_source = ColumnDataSource(
{
"size": [],
"bandwidth": [],
}
)

self.network_figure = figure(
title="Network Bandwidth -- Computing ...",
tools="",
toolbar_location="above",
x_range=FactorRange(factors=[]),
**kwargs,
)

self.network_figure.vbar(
x="size", top="bandwidth", width=0.9, source=self.network_source
)
hover = HoverTool(
mode="vline", tooltips=[("Bandwidth", "@bandwidth{0.00 b}/s")]
)
self.network_figure.add_tools(hover)
self.network_figure.yaxis[0].formatter = NumeralTickFormatter(
format="0.0 b"
)
self.network_figure.xgrid.visible = False

self.root = row(
self.memory_figure,
self.disk_figure,
self.network_figure,
)

self.memory_data = {
"size": [],
"bandwidth": [],
}
self.disk_data = {
"size": [],
"bandwidth": [],
}
self.network_data = {
"size": [],
"bandwidth": [],
}

async def f():
result = await self.scheduler.benchmark_hardware()

for size in sorted(result["disk"], key=parse_bytes):
bandwidth = result["disk"][size]
self.disk_data["size"].append(size)
self.disk_data["bandwidth"].append(bandwidth)

for size in sorted(result["memory"], key=parse_bytes):
bandwidth = result["memory"][size]
self.memory_data["size"].append(size)
self.memory_data["bandwidth"].append(bandwidth)

for size in sorted(result["network"], key=parse_bytes):
bandwidth = result["network"][size]
self.network_data["size"].append(size)
self.network_data["bandwidth"].append(bandwidth)

self.scheduler.loop.add_callback(f)

def update(self):
if (
not self.disk_data["size"]
or self.disk_figure.title.text == "Disk Bandwidth"
):
return

self.network_figure.x_range.factors = self.network_data["size"]
self.disk_figure.x_range.factors = self.disk_data["size"]
self.memory_figure.x_range.factors = self.memory_data["size"]
update(self.disk_source, self.disk_data)
update(self.memory_source, self.memory_data)
update(self.network_source, self.network_data)
self.memory_figure.title.text = "Memory Bandwidth"
self.disk_figure.title.text = "Disk Bandwidth"
self.network_figure.title.text = "Network Bandwidth"


class BandwidthTypes(DashboardComponent):
"""Bar chart showing bandwidth per type"""

Expand Down Expand Up @@ -3432,6 +3582,19 @@ def workers_doc(scheduler, extra, doc):
doc.theme = BOKEH_THEME


def hardware_doc(scheduler, extra, doc):
with log_errors():
hw = Hardware(scheduler)
hw.update()
doc.title = "Dask: Cluster Hardware Bandwidth"
doc.add_root(hw.root)
doc.template = env.get_template("simple.html")
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME

add_periodic_callback(doc, hw, 500)


def tasks_doc(scheduler, extra, doc):
with log_errors():
ts = TaskStream(
Expand Down
20 changes: 18 additions & 2 deletions distributed/dashboard/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
WorkerTable,
events_doc,
graph_doc,
hardware_doc,
individual_doc,
individual_profile_doc,
individual_profile_server_doc,
Expand All @@ -57,6 +58,7 @@
"/profile": profile_doc,
"/profile-server": profile_server_doc,
"/graph": graph_doc,
"/hardware": hardware_doc,
"/groups": tg_graph_doc,
"/gpu": gpu_doc,
"/individual-task-stream": individual_doc(
Expand Down Expand Up @@ -106,7 +108,7 @@
}


template_variables = {
template_variables: dict = {
"pages": [
"status",
"workers",
Expand All @@ -117,8 +119,22 @@
"groups",
"info",
],
"plots": [x.replace("/", "") for x in applications if "individual" in x],
"plots": [
{
"url": x.strip("/"),
"name": " ".join(x.strip("/").split("-")[1:])
.title()
.replace("Cpu", "CPU")
.replace("Gpu", "GPU"),
}
for x in applications
if "individual" in x
]
+ [{"url": "hardware", "name": "Hardware"}],
}
template_variables["plots"] = sorted(
template_variables["plots"], key=lambda d: d["name"]
)

if NVML_ENABLED:
template_variables["pages"].insert(4, "gpu")
Expand Down
6 changes: 6 additions & 0 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
CurrentLoad,
EventLoop,
Events,
Hardware,
MemoryByKey,
Occupancy,
ProcessingHistogram,
Expand Down Expand Up @@ -1004,3 +1005,8 @@ async def test_prefix_bokeh(s, a, b):
bokeh_app = s.http_application.applications[0]
assert isinstance(bokeh_app, BokehTornado)
assert bokeh_app.prefix == f"/{prefix}"


@gen_cluster(client=True, scheduler_kwargs={"dashboard": True})
async def test_hardware(c, s, a, b):
Hardware(s) # don't call update, takes too long for a test
6 changes: 2 additions & 4 deletions distributed/http/templates/base.html
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@
<div class="dropdown-content">
<ul>
{% for plot in plots %}
<li><a href="{{ plot }}">
{{ " ".join(plot.split("-")[1:]).title().replace("Cpu", "CPU").replace("Gpu", "GPU") }}
</a></li>
<li><a href="{{ plot["url"] }}">{{ plot["name"] }}</a></li>
{% endfor %}
</ul>
</div>
Expand Down Expand Up @@ -76,4 +74,4 @@
</script>
</body>

</html>
</html>
57 changes: 57 additions & 0 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
merge,
merge_sorted,
merge_with,
partition,
pluck,
second,
valmap,
Expand Down Expand Up @@ -3977,6 +3978,7 @@ def __init__(
"stop_task_metadata": self.stop_task_metadata,
"get_cluster_state": self.get_cluster_state,
"dump_cluster_state_to_url": self.dump_cluster_state_to_url,
"benchmark_hardware": self.benchmark_hardware,
}

connection_limit = get_fileno_limit() / 2
Expand Down Expand Up @@ -7327,6 +7329,61 @@ async def get_call_stack(self, keys=None):
response = {w: r for w, r in zip(workers, results) if r}
return response

async def benchmark_hardware(self) -> "dict[str, dict[str, float]]":
"""
Run a benchmark on the workers for memory, disk, and network bandwidths
Returns
-------
result: dict
A dictionary mapping the names "disk", "memory", and "network" to
dictionaries mapping sizes to bandwidths. These bandwidths are
averaged over many workers running computations across the cluster.
"""
out: "dict[str, defaultdict[str, list[float]]]" = {
name: defaultdict(list) for name in ["disk", "memory", "network"]
}

# disk
result = await self.broadcast(msg={"op": "benchmark_disk"})
for d in result.values():
for size, duration in d.items():
out["disk"][size].append(duration)

# memory
result = await self.broadcast(msg={"op": "benchmark_memory"})
for d in result.values():
for size, duration in d.items():
out["memory"][size].append(duration)

# network
workers = list(self.workers)
# On an adaptive cluster, if multiple workers are started on the same physical host,
# they are more likely to connect to the Scheduler in sequence, ending up next to
# each other in this list.
# The transfer speed within such clusters of workers will be effectively that of
# localhost. This could happen across different VMs and/or docker images, so
# implementing logic based on IP addresses would not necessarily help.
# Randomize the connections to even out the mean measures.
random.shuffle(workers)
futures = [
self.rpc(a).benchmark_network(address=b) for a, b in partition(2, workers)
]
responses = await asyncio.gather(*futures)

for d in responses:
for size, duration in d.items():
out["network"][size].append(duration)

result = {}
for mode in out:
result[mode] = {
size: sum(durations) / len(durations)
for size, durations in out[mode].items()
}

return result

def get_nbytes(self, keys=None, summary=True):
parent: SchedulerState = cast(SchedulerState, self)
ts: TaskState
Expand Down
Loading

0 comments on commit 06170d5

Please sign in to comment.