Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a hardware benchmark to test memory, disk, and network bandwidths #5966

Merged
merged 23 commits into from
Mar 28, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b5f9eca
Add first draft of benchmark code
mrocklin Mar 19, 2022
15a411b
set up scheduler and worker routes
mrocklin Mar 19, 2022
e26ae0d
Add hardware benchmark page to dashboard
mrocklin Mar 20, 2022
fb1ffcf
cleanup hardware benchmark test
mrocklin Mar 20, 2022
609ffdc
add reference to /hardware in http_services page
mrocklin Mar 21, 2022
b463a7d
Respond to feedback
mrocklin Mar 22, 2022
685ada0
add randbytes to compatibility
mrocklin Mar 22, 2022
04ba089
Have workers that only send or receive, not both
mrocklin Mar 22, 2022
3177fc6
Apply suggestions from code review
mrocklin Mar 23, 2022
6ea0675
add docstrings
mrocklin Mar 23, 2022
4ee3a2b
Merge branch 'main' of github.com:dask/distributed into benchmark
mrocklin Mar 23, 2022
50bd49e
add hardware to More... plots
mrocklin Mar 24, 2022
d3b5baa
Update factor ranges dynamically
mrocklin Mar 24, 2022
6b53f4a
fixup test
mrocklin Mar 24, 2022
5c484ae
change update
mrocklin Mar 25, 2022
f24f2ad
Apply suggestions from code review
mrocklin Mar 25, 2022
1916f0c
Update distributed/dashboard/components/scheduler.py
crusaderky Mar 25, 2022
ca61cac
add docstring
mrocklin Mar 25, 2022
b31b76f
Update More... handling in dashboard to include explicit extra pages
mrocklin Mar 25, 2022
9d7d4d6
Merge branch 'benchmark' of github.com:mrocklin/distributed into benc…
mrocklin Mar 25, 2022
8a0fb9c
Merge branch 'main' of github.com:dask/distributed into benchmark
mrocklin Mar 25, 2022
1b97b5d
types
mrocklin Mar 25, 2022
67f7514
Use local executor again
mrocklin Mar 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4041,6 +4041,10 @@ def get_worker_logs(self, n=None, workers=None, nanny=False):
"""
return self.sync(self.scheduler.worker_logs, n=n, workers=workers, nanny=nanny)

def benchmark_hardware(self) -> dict:
"""Run a hardware benchmark"""
return self.sync(self.scheduler.benchmark_hardware)

def log_event(self, topic, msg):
"""Log an event under a given topic

Expand Down
16 changes: 16 additions & 0 deletions distributed/compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,19 @@ async def to_thread(func, /, *args, **kwargs):
ctx = contextvars.copy_context()
func_call = functools.partial(ctx.run, func, *args, **kwargs)
return await loop.run_in_executor(None, func_call)


if sys.version_info >= (3, 9):
from random import randbytes
else:
try:
import numpy

def randbytes(size):
return numpy.random.randint(255, size=size, dtype="u8").tobytes()

except ImportError:
import secrets

def randbytes(size):
return secrets.token_bytes(size)
163 changes: 162 additions & 1 deletion distributed/dashboard/components/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
ColumnDataSource,
CustomJSHover,
DataRange1d,
FactorRange,
GroupFilter,
HoverTool,
NumberFormatter,
Expand Down Expand Up @@ -49,7 +50,14 @@

import dask
from dask import config
from dask.utils import format_bytes, format_time, funcname, key_split, parse_timedelta
from dask.utils import (
format_bytes,
format_time,
funcname,
key_split,
parse_bytes,
parse_timedelta,
)

from distributed.dashboard.components import add_periodic_callback
from distributed.dashboard.components.shared import (
Expand Down Expand Up @@ -559,6 +567,146 @@ def update(self):
update(self.source, d)


class Hardware(DashboardComponent):
"""Occupancy (in time) per worker"""

def __init__(self, scheduler, **kwargs):
with log_errors():
self.scheduler = scheduler
# Disk
self.disk_source = ColumnDataSource(
{
"size": [],
"bandwidth": [],
}
)

self.disk_figure = figure(
title="Disk Bandwidth -- Computing ...",
tools="",
toolbar_location="above",
x_range=FactorRange(factors=[]),
**kwargs,
)
self.disk_figure.vbar(
x="size", top="bandwidth", width=0.9, source=self.disk_source
)
hover = HoverTool(
mode="vline", tooltips=[("Bandwidth", "@bandwidth{0.00 b}/s")]
)
self.disk_figure.add_tools(hover)
self.disk_figure.yaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.disk_figure.xgrid.visible = False

# Memory
self.memory_source = ColumnDataSource(
{
"size": [],
"bandwidth": [],
}
)

self.memory_figure = figure(
title="Memory Bandwidth -- Computing ...",
tools="",
toolbar_location="above",
x_range=FactorRange(factors=[]),
**kwargs,
)

self.memory_figure.vbar(
x="size", top="bandwidth", width=0.9, source=self.memory_source
)
hover = HoverTool(
mode="vline", tooltips=[("Bandwidth", "@bandwidth{0.00 b}/s")]
)
self.memory_figure.add_tools(hover)
self.memory_figure.yaxis[0].formatter = NumeralTickFormatter(format="0.0 b")
self.memory_figure.xgrid.visible = False

# Network
self.network_source = ColumnDataSource(
{
"size": [],
"bandwidth": [],
}
)

self.network_figure = figure(
title="Network Bandwidth -- Computing ...",
tools="",
toolbar_location="above",
x_range=FactorRange(factors=[]),
**kwargs,
)

self.network_figure.vbar(
x="size", top="bandwidth", width=0.9, source=self.network_source
)
hover = HoverTool(
mode="vline", tooltips=[("Bandwidth", "@bandwidth{0.00 b}/s")]
)
self.network_figure.add_tools(hover)
self.network_figure.yaxis[0].formatter = NumeralTickFormatter(
format="0.0 b"
)
self.network_figure.xgrid.visible = False

self.root = row(
self.memory_figure,
self.disk_figure,
self.network_figure,
)

self.memory_data = {
"size": [],
"bandwidth": [],
}
self.disk_data = {
"size": [],
"bandwidth": [],
}
self.network_data = {
"size": [],
"bandwidth": [],
}

async def f():
result = await self.scheduler.benchmark_hardware()

for size in sorted(result["disk"], key=parse_bytes):
bandwidth = result["disk"][size]
self.disk_data["size"].append(size)
self.disk_data["bandwidth"].append(bandwidth)

for size in sorted(result["memory"], key=parse_bytes):
bandwidth = result["memory"][size]
self.memory_data["size"].append(size)
self.memory_data["bandwidth"].append(bandwidth)

for size in sorted(result["network"], key=parse_bytes):
bandwidth = result["network"][size]
self.network_data["size"].append(size)
self.network_data["bandwidth"].append(bandwidth)

self.scheduler.loop.add_callback(f)

def update(self):
if any(self.disk_data.values()):
if self.memory_figure.title.text == "Memory Bandwidth":
return
else:
self.network_figure.x_range.factors = self.network_data["size"]
self.disk_figure.x_range.factors = self.disk_data["size"]
self.memory_figure.x_range.factors = self.memory_data["size"]
update(self.disk_source, self.disk_data)
update(self.memory_source, self.memory_data)
update(self.network_source, self.network_data)
self.memory_figure.title.text = "Memory Bandwidth"
self.disk_figure.title.text = "Disk Bandwidth"
self.network_figure.title.text = "Network Bandwidth"


class BandwidthTypes(DashboardComponent):
"""Bar chart showing bandwidth per type"""

Expand Down Expand Up @@ -3379,6 +3527,19 @@ def workers_doc(scheduler, extra, doc):
doc.theme = BOKEH_THEME


def hardware_doc(scheduler, extra, doc):
with log_errors():
hw = Hardware(scheduler)
hw.update()
doc.title = "Dask: Cluster Hardware Bandwidth"
doc.add_root(hw.root)
doc.template = env.get_template("simple.html")
doc.template_variables.update(extra)
doc.theme = BOKEH_THEME

add_periodic_callback(doc, hw, 500)


def tasks_doc(scheduler, extra, doc):
with log_errors():
ts = TaskStream(
Expand Down
3 changes: 3 additions & 0 deletions distributed/dashboard/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
WorkerTable,
events_doc,
graph_doc,
hardware_doc,
individual_doc,
individual_profile_doc,
individual_profile_server_doc,
Expand All @@ -56,6 +57,7 @@
"/profile": profile_doc,
"/profile-server": profile_server_doc,
"/graph": graph_doc,
"/hardware": hardware_doc,
"/groups": tg_graph_doc,
"/gpu": gpu_doc,
"/individual-task-stream": individual_doc(
Expand Down Expand Up @@ -117,6 +119,7 @@
],
"plots": [x.replace("/", "") for x in applications if "individual" in x],
}
template_variables["plots"].append("hardware")

if NVML_ENABLED:
template_variables["pages"].insert(4, "gpu")
Expand Down
6 changes: 6 additions & 0 deletions distributed/dashboard/tests/test_scheduler_bokeh.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
ComputePerKey,
CurrentLoad,
Events,
Hardware,
MemoryByKey,
Occupancy,
ProcessingHistogram,
Expand Down Expand Up @@ -997,3 +998,8 @@ async def test_prefix_bokeh(s, a, b):
bokeh_app = s.http_application.applications[0]
assert isinstance(bokeh_app, BokehTornado)
assert bokeh_app.prefix == f"/{prefix}"


@gen_cluster(client=True, scheduler_kwargs={"dashboard": True})
async def test_hardware(c, s, a, b):
Hardware(s) # don't call update, takes too long for a test
2 changes: 1 addition & 1 deletion distributed/http/templates/base.html
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,4 @@
</script>
</body>

</html>
</html>
47 changes: 47 additions & 0 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
merge,
merge_sorted,
merge_with,
partition,
pluck,
second,
valmap,
Expand Down Expand Up @@ -3977,6 +3978,7 @@ def __init__(
"stop_task_metadata": self.stop_task_metadata,
"get_cluster_state": self.get_cluster_state,
"dump_cluster_state_to_url": self.dump_cluster_state_to_url,
"benchmark_hardware": self.benchmark_hardware,
}

connection_limit = get_fileno_limit() / 2
Expand Down Expand Up @@ -7329,6 +7331,51 @@ async def get_call_stack(self, keys=None):
response = {w: r for w, r in zip(workers, results) if r}
return response

async def benchmark_hardware(self) -> "dict[str, dict[str, float]]":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a docstring and explain the output?

out: "dict[str, defaultdict[str, list[float]]]" = {
name: defaultdict(list) for name in ["disk", "memory", "network"]
}

# disk
result = await self.broadcast(msg={"op": "benchmark_disk"})
for d in result.values():
for size, duration in d.items():
out["disk"][size].append(duration)

# memory
result = await self.broadcast(msg={"op": "benchmark_memory"})
for d in result.values():
for size, duration in d.items():
out["memory"][size].append(duration)

# network
workers = list(self.workers)
# On an adaptive cluster, if multiple workers are started on the same physical host,
# they are more likely to connect to the Scheduler in sequence, ending up next to
# each other in this list.
# The transfer speed within such clusters of workers will be effectively that of
# localhost. This could happen across different VMs and/or docker images, so
# implementing logic based on IP addresses would not necessarily help.
# Randomize the connections to even out the mean measures.
random.shuffle(workers)
futures = [
self.rpc(a).benchmark_network(address=b) for a, b in partition(2, workers)
]
responses = await asyncio.gather(*futures)

for d in responses:
for size, duration in d.items():
out["network"][size].append(duration)

result = {}
for mode in out:
result[mode] = {
size: sum(durations) / len(durations)
for size, durations in out[mode].items()
}

return result

def get_nbytes(self, keys=None, summary=True):
parent: SchedulerState = cast(SchedulerState, self)
ts: TaskState
Expand Down
14 changes: 14 additions & 0 deletions distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7481,6 +7481,20 @@ async def test_security_loader_import_failed(self):
pass


@pytest.mark.avoid_ci(reason="This is slow and probably not worth the cost")
@pytest.mark.slow
@gen_cluster(client=True)
async def test_benchmark_hardware(c, s, a, b):
result = await c.benchmark_hardware()
assert set(result) == {"disk", "memory", "network"}
assert all(isinstance(v, float) for d in result.values() for v in d.values())


@gen_cluster(client=True, nthreads=[])
async def test_benchmark_hardware_no_workers(c, s):
assert await c.benchmark_hardware() == {"memory": {}, "disk": {}, "network": {}}


@gen_cluster(client=True, nthreads=[])
async def test_wait_for_workers_updates_info(c, s):
async with Worker(s.address):
Expand Down
21 changes: 20 additions & 1 deletion distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,14 @@
slowinc,
slowsum,
)
from distributed.worker import Worker, error_message, logger
from distributed.worker import (
Worker,
benchmark_disk,
benchmark_memory,
benchmark_network,
error_message,
logger,
)

pytestmark = pytest.mark.ci1

Expand Down Expand Up @@ -3306,3 +3313,15 @@ async def test_Worker__to_dict(c, s, a):
}
assert d["tasks"]["x"]["key"] == "x"
assert d["data"] == ["x"]


@gen_cluster()
async def test_benchmark_hardware(s, a, b):
sizes = ["1 kiB", "10 kiB"]
disk = benchmark_disk(sizes=sizes, duration="1 ms")
memory = benchmark_memory(sizes=sizes, duration="1 ms")
network = await benchmark_network(
address=a.address, rpc=b.rpc, sizes=sizes, duration="1 ms"
)

assert set(disk) == set(memory) == set(network) == set(sizes)
Loading