From 06170d564bbd89b0eec3a046fec012927250bf75 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Mon, 28 Mar 2022 04:49:14 -0500 Subject: [PATCH] Add a hardware benchmark to test memory, disk, and network bandwidths (#5966) --- distributed/client.py | 13 ++ distributed/compatibility.py | 16 ++ distributed/dashboard/components/scheduler.py | 165 +++++++++++++++++- distributed/dashboard/scheduler.py | 20 ++- .../dashboard/tests/test_scheduler_bokeh.py | 6 + distributed/http/templates/base.html | 6 +- distributed/scheduler.py | 57 ++++++ distributed/tests/test_client.py | 14 ++ distributed/tests/test_worker.py | 21 ++- distributed/worker.py | 117 +++++++++++++ docs/source/http_services.rst | 7 +- 11 files changed, 431 insertions(+), 11 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index a8b4cb57989..49afd3792e3 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -4041,6 +4041,19 @@ def get_worker_logs(self, n=None, workers=None, nanny=False): """ return self.sync(self.scheduler.worker_logs, n=n, workers=workers, nanny=nanny) + def benchmark_hardware(self) -> dict: + """ + Run a benchmark on the workers for memory, disk, and network bandwidths + + Returns + ------- + result: dict + A dictionary mapping the names "disk", "memory", and "network" to + dictionaries mapping sizes to bandwidths. These bandwidths are + averaged over many workers running computations across the cluster. + """ + return self.sync(self.scheduler.benchmark_hardware) + def log_event(self, topic, msg): """Log an event under a given topic diff --git a/distributed/compatibility.py b/distributed/compatibility.py index 32c94151d55..f0151267867 100644 --- a/distributed/compatibility.py +++ b/distributed/compatibility.py @@ -37,3 +37,19 @@ async def to_thread(func, /, *args, **kwargs): ctx = contextvars.copy_context() func_call = functools.partial(ctx.run, func, *args, **kwargs) return await loop.run_in_executor(None, func_call) + + +if sys.version_info >= (3, 9): + from random import randbytes +else: + try: + import numpy + + def randbytes(size): + return numpy.random.randint(255, size=size, dtype="u8").tobytes() + + except ImportError: + import secrets + + def randbytes(size): + return secrets.token_bytes(size) diff --git a/distributed/dashboard/components/scheduler.py b/distributed/dashboard/components/scheduler.py index 7fa639c799d..a19e2838f77 100644 --- a/distributed/dashboard/components/scheduler.py +++ b/distributed/dashboard/components/scheduler.py @@ -21,6 +21,7 @@ ColumnDataSource, CustomJSHover, DataRange1d, + FactorRange, GroupFilter, HoverTool, NumberFormatter, @@ -49,7 +50,14 @@ import dask from dask import config -from dask.utils import format_bytes, format_time, funcname, key_split, parse_timedelta +from dask.utils import ( + format_bytes, + format_time, + funcname, + key_split, + parse_bytes, + parse_timedelta, +) from distributed.dashboard.components import add_periodic_callback from distributed.dashboard.components.shared import ( @@ -559,6 +567,148 @@ def update(self): update(self.source, d) +class Hardware(DashboardComponent): + """Occupancy (in time) per worker""" + + def __init__(self, scheduler, **kwargs): + with log_errors(): + self.scheduler = scheduler + # Disk + self.disk_source = ColumnDataSource( + { + "size": [], + "bandwidth": [], + } + ) + + self.disk_figure = figure( + title="Disk Bandwidth -- Computing ...", + tools="", + toolbar_location="above", + x_range=FactorRange(factors=[]), + **kwargs, + ) + self.disk_figure.vbar( + x="size", top="bandwidth", width=0.9, source=self.disk_source + ) + hover = HoverTool( + mode="vline", tooltips=[("Bandwidth", "@bandwidth{0.00 b}/s")] + ) + self.disk_figure.add_tools(hover) + self.disk_figure.yaxis[0].formatter = NumeralTickFormatter(format="0.0 b") + self.disk_figure.xgrid.visible = False + + # Memory + self.memory_source = ColumnDataSource( + { + "size": [], + "bandwidth": [], + } + ) + + self.memory_figure = figure( + title="Memory Bandwidth -- Computing ...", + tools="", + toolbar_location="above", + x_range=FactorRange(factors=[]), + **kwargs, + ) + + self.memory_figure.vbar( + x="size", top="bandwidth", width=0.9, source=self.memory_source + ) + hover = HoverTool( + mode="vline", tooltips=[("Bandwidth", "@bandwidth{0.00 b}/s")] + ) + self.memory_figure.add_tools(hover) + self.memory_figure.yaxis[0].formatter = NumeralTickFormatter(format="0.0 b") + self.memory_figure.xgrid.visible = False + + # Network + self.network_source = ColumnDataSource( + { + "size": [], + "bandwidth": [], + } + ) + + self.network_figure = figure( + title="Network Bandwidth -- Computing ...", + tools="", + toolbar_location="above", + x_range=FactorRange(factors=[]), + **kwargs, + ) + + self.network_figure.vbar( + x="size", top="bandwidth", width=0.9, source=self.network_source + ) + hover = HoverTool( + mode="vline", tooltips=[("Bandwidth", "@bandwidth{0.00 b}/s")] + ) + self.network_figure.add_tools(hover) + self.network_figure.yaxis[0].formatter = NumeralTickFormatter( + format="0.0 b" + ) + self.network_figure.xgrid.visible = False + + self.root = row( + self.memory_figure, + self.disk_figure, + self.network_figure, + ) + + self.memory_data = { + "size": [], + "bandwidth": [], + } + self.disk_data = { + "size": [], + "bandwidth": [], + } + self.network_data = { + "size": [], + "bandwidth": [], + } + + async def f(): + result = await self.scheduler.benchmark_hardware() + + for size in sorted(result["disk"], key=parse_bytes): + bandwidth = result["disk"][size] + self.disk_data["size"].append(size) + self.disk_data["bandwidth"].append(bandwidth) + + for size in sorted(result["memory"], key=parse_bytes): + bandwidth = result["memory"][size] + self.memory_data["size"].append(size) + self.memory_data["bandwidth"].append(bandwidth) + + for size in sorted(result["network"], key=parse_bytes): + bandwidth = result["network"][size] + self.network_data["size"].append(size) + self.network_data["bandwidth"].append(bandwidth) + + self.scheduler.loop.add_callback(f) + + def update(self): + if ( + not self.disk_data["size"] + or self.disk_figure.title.text == "Disk Bandwidth" + ): + return + + self.network_figure.x_range.factors = self.network_data["size"] + self.disk_figure.x_range.factors = self.disk_data["size"] + self.memory_figure.x_range.factors = self.memory_data["size"] + update(self.disk_source, self.disk_data) + update(self.memory_source, self.memory_data) + update(self.network_source, self.network_data) + self.memory_figure.title.text = "Memory Bandwidth" + self.disk_figure.title.text = "Disk Bandwidth" + self.network_figure.title.text = "Network Bandwidth" + + class BandwidthTypes(DashboardComponent): """Bar chart showing bandwidth per type""" @@ -3432,6 +3582,19 @@ def workers_doc(scheduler, extra, doc): doc.theme = BOKEH_THEME +def hardware_doc(scheduler, extra, doc): + with log_errors(): + hw = Hardware(scheduler) + hw.update() + doc.title = "Dask: Cluster Hardware Bandwidth" + doc.add_root(hw.root) + doc.template = env.get_template("simple.html") + doc.template_variables.update(extra) + doc.theme = BOKEH_THEME + + add_periodic_callback(doc, hw, 500) + + def tasks_doc(scheduler, extra, doc): with log_errors(): ts = TaskStream( diff --git a/distributed/dashboard/scheduler.py b/distributed/dashboard/scheduler.py index 42c50b732bc..a7dc0a05331 100644 --- a/distributed/dashboard/scheduler.py +++ b/distributed/dashboard/scheduler.py @@ -31,6 +31,7 @@ WorkerTable, events_doc, graph_doc, + hardware_doc, individual_doc, individual_profile_doc, individual_profile_server_doc, @@ -57,6 +58,7 @@ "/profile": profile_doc, "/profile-server": profile_server_doc, "/graph": graph_doc, + "/hardware": hardware_doc, "/groups": tg_graph_doc, "/gpu": gpu_doc, "/individual-task-stream": individual_doc( @@ -106,7 +108,7 @@ } -template_variables = { +template_variables: dict = { "pages": [ "status", "workers", @@ -117,8 +119,22 @@ "groups", "info", ], - "plots": [x.replace("/", "") for x in applications if "individual" in x], + "plots": [ + { + "url": x.strip("/"), + "name": " ".join(x.strip("/").split("-")[1:]) + .title() + .replace("Cpu", "CPU") + .replace("Gpu", "GPU"), + } + for x in applications + if "individual" in x + ] + + [{"url": "hardware", "name": "Hardware"}], } +template_variables["plots"] = sorted( + template_variables["plots"], key=lambda d: d["name"] +) if NVML_ENABLED: template_variables["pages"].insert(4, "gpu") diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index 566fe9377cf..415b566e3a0 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -25,6 +25,7 @@ CurrentLoad, EventLoop, Events, + Hardware, MemoryByKey, Occupancy, ProcessingHistogram, @@ -1004,3 +1005,8 @@ async def test_prefix_bokeh(s, a, b): bokeh_app = s.http_application.applications[0] assert isinstance(bokeh_app, BokehTornado) assert bokeh_app.prefix == f"/{prefix}" + + +@gen_cluster(client=True, scheduler_kwargs={"dashboard": True}) +async def test_hardware(c, s, a, b): + Hardware(s) # don't call update, takes too long for a test diff --git a/distributed/http/templates/base.html b/distributed/http/templates/base.html index 73e252f08e1..a38b20830d0 100644 --- a/distributed/http/templates/base.html +++ b/distributed/http/templates/base.html @@ -39,9 +39,7 @@ @@ -76,4 +74,4 @@ - \ No newline at end of file + diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 3484840af34..7a1c65a9a90 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -40,6 +40,7 @@ merge, merge_sorted, merge_with, + partition, pluck, second, valmap, @@ -3977,6 +3978,7 @@ def __init__( "stop_task_metadata": self.stop_task_metadata, "get_cluster_state": self.get_cluster_state, "dump_cluster_state_to_url": self.dump_cluster_state_to_url, + "benchmark_hardware": self.benchmark_hardware, } connection_limit = get_fileno_limit() / 2 @@ -7327,6 +7329,61 @@ async def get_call_stack(self, keys=None): response = {w: r for w, r in zip(workers, results) if r} return response + async def benchmark_hardware(self) -> "dict[str, dict[str, float]]": + """ + Run a benchmark on the workers for memory, disk, and network bandwidths + + Returns + ------- + result: dict + A dictionary mapping the names "disk", "memory", and "network" to + dictionaries mapping sizes to bandwidths. These bandwidths are + averaged over many workers running computations across the cluster. + """ + out: "dict[str, defaultdict[str, list[float]]]" = { + name: defaultdict(list) for name in ["disk", "memory", "network"] + } + + # disk + result = await self.broadcast(msg={"op": "benchmark_disk"}) + for d in result.values(): + for size, duration in d.items(): + out["disk"][size].append(duration) + + # memory + result = await self.broadcast(msg={"op": "benchmark_memory"}) + for d in result.values(): + for size, duration in d.items(): + out["memory"][size].append(duration) + + # network + workers = list(self.workers) + # On an adaptive cluster, if multiple workers are started on the same physical host, + # they are more likely to connect to the Scheduler in sequence, ending up next to + # each other in this list. + # The transfer speed within such clusters of workers will be effectively that of + # localhost. This could happen across different VMs and/or docker images, so + # implementing logic based on IP addresses would not necessarily help. + # Randomize the connections to even out the mean measures. + random.shuffle(workers) + futures = [ + self.rpc(a).benchmark_network(address=b) for a, b in partition(2, workers) + ] + responses = await asyncio.gather(*futures) + + for d in responses: + for size, duration in d.items(): + out["network"][size].append(duration) + + result = {} + for mode in out: + result[mode] = { + size: sum(durations) / len(durations) + for size, durations in out[mode].items() + } + + return result + def get_nbytes(self, keys=None, summary=True): parent: SchedulerState = cast(SchedulerState, self) ts: TaskState diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index f9558d2dd3f..7b419000327 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -7481,6 +7481,20 @@ async def test_security_loader_import_failed(self): pass +@pytest.mark.avoid_ci(reason="This is slow and probably not worth the cost") +@pytest.mark.slow +@gen_cluster(client=True) +async def test_benchmark_hardware(c, s, a, b): + result = await c.benchmark_hardware() + assert set(result) == {"disk", "memory", "network"} + assert all(isinstance(v, float) for d in result.values() for v in d.values()) + + +@gen_cluster(client=True, nthreads=[]) +async def test_benchmark_hardware_no_workers(c, s): + assert await c.benchmark_hardware() == {"memory": {}, "disk": {}, "network": {}} + + @gen_cluster(client=True, nthreads=[]) async def test_wait_for_workers_updates_info(c, s): async with Worker(s.address): diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index b168592a14a..317ccad635f 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -57,7 +57,14 @@ slowinc, slowsum, ) -from distributed.worker import Worker, error_message, logger +from distributed.worker import ( + Worker, + benchmark_disk, + benchmark_memory, + benchmark_network, + error_message, + logger, +) pytestmark = pytest.mark.ci1 @@ -3308,6 +3315,18 @@ async def test_Worker__to_dict(c, s, a): assert d["data"] == ["x"] +@gen_cluster() +async def test_benchmark_hardware(s, a, b): + sizes = ["1 kiB", "10 kiB"] + disk = benchmark_disk(sizes=sizes, duration="1 ms") + memory = benchmark_memory(sizes=sizes, duration="1 ms") + network = await benchmark_network( + address=a.address, rpc=b.rpc, sizes=sizes, duration="1 ms" + ) + + assert set(disk) == set(memory) == set(network) == set(sizes) + + @gen_cluster( client=True, config={ diff --git a/distributed/worker.py b/distributed/worker.py index 338e824f3c4..7a6e040303a 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -7,6 +7,7 @@ import heapq import logging import os +import pathlib import random import sys import threading @@ -41,6 +42,7 @@ parse_bytes, parse_timedelta, stringify, + tmpdir, typename, ) @@ -49,8 +51,10 @@ from distributed.comm import connect, get_address_host from distributed.comm.addressing import address_from_user_args, parse_address from distributed.comm.utils import OFFLOAD_THRESHOLD +from distributed.compatibility import randbytes from distributed.core import ( CommClosedError, + ConnectionPool, Status, coerce_to_address, error_message, @@ -738,6 +742,9 @@ def __init__( "plugin-add": self.plugin_add, "plugin-remove": self.plugin_remove, "get_monitor_info": self.get_monitor_info, + "benchmark_disk": self.benchmark_disk, + "benchmark_memory": self.benchmark_memory, + "benchmark_network": self.benchmark_network, } stream_handlers = { @@ -1197,6 +1204,7 @@ def func(data): with open(out_filename, "wb") as f: f.write(data) f.flush() + os.fsync(f.fileno()) return data if len(data) < 10000: @@ -3689,6 +3697,17 @@ def _notify_plugins(self, method_name, *args, **kwargs): "Plugin '%s' failed with exception", name, exc_info=True ) + async def benchmark_disk(self) -> dict[str, float]: + return await self.loop.run_in_executor( + self.executor, benchmark_disk, self.local_directory + ) + + async def benchmark_memory(self) -> dict[str, float]: + return await self.loop.run_in_executor(self.executor, benchmark_memory) + + async def benchmark_network(self, address: str) -> dict[str, float]: + return await benchmark_network(rpc=self.rpc, address=address) + ############## # Validation # ############## @@ -4568,3 +4587,101 @@ def warn(*args, **kwargs): worker.log_event("warn", {"args": args, "kwargs": kwargs}) warnings.warn(*args, **kwargs) + + +def benchmark_disk( + rootdir: str | None = None, + sizes: Iterable[str] = ("1 kiB", "100 kiB", "1 MiB", "10 MiB", "100 MiB"), + duration="1 s", +) -> dict[str, float]: + """ + Benchmark disk bandwidth + + Returns + ------- + out: dict + Maps sizes of outputs to measured bandwidths + """ + duration = parse_timedelta(duration) + + out = {} + for size_str in sizes: + with tmpdir(dir=rootdir) as dir: + dir = pathlib.Path(dir) + names = list(map(str, range(100))) + size = parse_bytes(size_str) + + data = randbytes(size) + + start = time() + total = 0 + while time() < start + duration: + with open(dir / random.choice(names), mode="ab") as f: + f.write(data) + f.flush() + os.fsync(f.fileno()) + total += size + + out[size_str] = total / (time() - start) + return out + + +def benchmark_memory( + sizes: Iterable[str] = ("2 kiB", "10 kiB", "100 kiB", "1 MiB", "10 MiB"), + duration="200 ms", +) -> dict[str, float]: + """ + Benchmark memory bandwidth + + Returns + ------- + out: dict + Maps sizes of outputs to measured bandwidths + """ + duration = parse_timedelta(duration) + out = {} + for size_str in sizes: + size = parse_bytes(size_str) + data = randbytes(size) + + start = time() + total = 0 + while time() < start + duration: + _ = data[:-1] + del _ + total += size + + out[size_str] = total / (time() - start) + return out + + +async def benchmark_network( + address: str, + rpc: ConnectionPool, + sizes: Iterable[str] = ("1 kiB", "10 kiB", "100 kiB", "1 MiB", "10 MiB", "50 MiB"), + duration="1 s", +) -> dict[str, float]: + """ + Benchmark network communications to another worker + + Returns + ------- + out: dict + Maps sizes of outputs to measured bandwidths + """ + + duration = parse_timedelta(duration) + out = {} + with rpc(address) as r: + for size_str in sizes: + size = parse_bytes(size_str) + data = to_serialize(randbytes(size)) + + start = time() + total = 0 + while time() < start + duration: + await r.echo(data=data) + total += size * 2 + + out[size_str] = total / (time() - start) + return out diff --git a/docs/source/http_services.rst b/docs/source/http_services.rst index 119733197f5..fe076a08653 100644 --- a/docs/source/http_services.rst +++ b/docs/source/http_services.rst @@ -21,6 +21,7 @@ others via a header navbar. - ``/graph``: currently processing graphs in a dependency tree view - ``/groups``: graph layout for task groups (dependencies, memory, output type, progress, tasks status) - ``/info``: redirect to ``/info/main/workers.html`` +- ``/hardware``: gathers bandwidth information on memory, disk, and network Scheduler HTTP -------------- @@ -81,11 +82,11 @@ Individual bokeh plots Worker HTTP ----------- -- ``/status``: -- ``/counters``: +- ``/status``: +- ``/counters``: - ``/crossfilter``: - ``/sitemap.json``: list of available endpoints -- ``/system``: +- ``/system``: - ``/health``: check server is alive - ``/metrics``: prometheus endpoint - ``/statics/()``: static file content (CSS, etc)