diff --git a/dask_cuda/benchmarks/local_cudf_merge.py b/dask_cuda/benchmarks/local_cudf_merge.py
index e6e301905..f36be7478 100644
--- a/dask_cuda/benchmarks/local_cudf_merge.py
+++ b/dask_cuda/benchmarks/local_cudf_merge.py
@@ -1,6 +1,7 @@
import contextlib
import math
from collections import defaultdict
+from json import dumps
from time import perf_counter
from warnings import filterwarnings
@@ -278,6 +279,8 @@ def main(args):
print(f"broadcast | {broadcast}")
print(f"protocol | {args.protocol}")
print(f"device(s) | {args.devs}")
+ if args.device_memory_limit:
+ print(f"memory-limit | {format_bytes(args.device_memory_limit)}")
print(f"rmm-pool | {(not args.disable_rmm_pool)}")
print(f"frac-match | {args.frac_match}")
if args.protocol == "ucx":
@@ -304,18 +307,59 @@ def main(args):
if args.backend == "dask":
if args.markdown:
print("\nWorker-Worker Transfer Rates
\n\n```")
- print("(w1,w2) | 25% 50% 75% (total nbytes)")
+ print("(w1,w2) | 25% 50% 75% (total nbytes)")
print("-------------------------------")
for (d1, d2), bw in sorted(bandwidths.items()):
fmt = (
- "(%s,%s) | %s %s %s (%s)"
+ "(%s,%s) | %s %s %s (%s)"
if args.multi_node or args.sched_addr
- else "(%02d,%02d) | %s %s %s (%s)"
+ else "(%02d,%02d) | %s %s %s (%s)"
)
print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]))
if args.markdown:
print("```\n \n")
+ if args.benchmark_json:
+ bandwidths_json = {
+ "bandwidth_({d1},{d2})_{i}"
+ if args.multi_node or args.sched_addr
+ else "(%02d,%02d)_%s" % (d1, d2, i): parse_bytes(v.rstrip("/s"))
+ for (d1, d2), bw in sorted(bandwidths.items())
+ for i, v in zip(
+ ["25%", "50%", "75%", "total_nbytes"],
+ [bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]],
+ )
+ }
+
+ with open(args.benchmark_json, "a") as fp:
+ for data_processed, took in took_list:
+ fp.write(
+ dumps(
+ dict(
+ {
+ "backend": args.backend,
+ "merge_type": args.type,
+ "rows_per_chunk": args.chunk_size,
+ "base_chunks": args.base_chunks,
+ "other_chunks": args.other_chunks,
+ "broadcast": broadcast,
+ "protocol": args.protocol,
+ "devs": args.devs,
+ "device_memory_limit": args.device_memory_limit,
+ "rmm_pool": not args.disable_rmm_pool,
+ "tcp": args.enable_tcp_over_ucx,
+ "ib": args.enable_infiniband,
+ "nvlink": args.enable_nvlink,
+ "data_processed": data_processed,
+ "wall_clock": took,
+ "throughput": data_processed / took,
+ },
+ **bandwidths_json,
+ )
+ )
+ + "\n"
+ )
+
if args.multi_node:
client.shutdown()
client.close()
diff --git a/dask_cuda/benchmarks/local_cudf_shuffle.py b/dask_cuda/benchmarks/local_cudf_shuffle.py
index f329aa92b..f2c812d08 100644
--- a/dask_cuda/benchmarks/local_cudf_shuffle.py
+++ b/dask_cuda/benchmarks/local_cudf_shuffle.py
@@ -1,5 +1,6 @@
import contextlib
from collections import defaultdict
+from json import dumps
from time import perf_counter as clock
from warnings import filterwarnings
@@ -151,6 +152,8 @@ def main(args):
print(f"in-parts | {args.in_parts}")
print(f"protocol | {args.protocol}")
print(f"device(s) | {args.devs}")
+ if args.device_memory_limit:
+ print(f"memory-limit | {format_bytes(args.device_memory_limit)}")
print(f"rmm-pool | {(not args.disable_rmm_pool)}")
if args.protocol == "ucx":
print(f"tcp | {args.enable_tcp_over_ucx}")
@@ -176,18 +179,56 @@ def main(args):
if args.backend == "dask":
if args.markdown:
print("\nWorker-Worker Transfer Rates
\n\n```")
- print("(w1,w2) | 25% 50% 75% (total nbytes)")
+ print("(w1,w2) | 25% 50% 75% (total nbytes)")
print("-------------------------------")
for (d1, d2), bw in sorted(bandwidths.items()):
fmt = (
- "(%s,%s) | %s %s %s (%s)"
+ "(%s,%s) | %s %s %s (%s)"
if args.multi_node or args.sched_addr
- else "(%02d,%02d) | %s %s %s (%s)"
+ else "(%02d,%02d) | %s %s %s (%s)"
)
print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]))
if args.markdown:
print("```\n \n")
+ if args.benchmark_json:
+ bandwidths_json = {
+ "bandwidth_({d1},{d2})_{i}"
+ if args.multi_node or args.sched_addr
+ else "(%02d,%02d)_%s" % (d1, d2, i): parse_bytes(v.rstrip("/s"))
+ for (d1, d2), bw in sorted(bandwidths.items())
+ for i, v in zip(
+ ["25%", "50%", "75%", "total_nbytes"],
+ [bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]],
+ )
+ }
+
+ with open(args.benchmark_json, "a") as fp:
+ for data_processed, took in took_list:
+ fp.write(
+ dumps(
+ dict(
+ {
+ "backend": args.backend,
+ "partition_size": args.partition_size,
+ "in_parts": args.in_parts,
+ "protocol": args.protocol,
+ "devs": args.devs,
+ "device_memory_limit": args.device_memory_limit,
+ "rmm_pool": not args.disable_rmm_pool,
+ "tcp": args.enable_tcp_over_ucx,
+ "ib": args.enable_infiniband,
+ "nvlink": args.enable_nvlink,
+ "data_processed": data_processed,
+ "wall_clock": took,
+ "throughput": data_processed / took,
+ },
+ **bandwidths_json,
+ )
+ )
+ + "\n"
+ )
+
if args.multi_node:
client.shutdown()
client.close()
diff --git a/dask_cuda/benchmarks/local_cupy.py b/dask_cuda/benchmarks/local_cupy.py
index 9a07b2afe..a4bbc341a 100644
--- a/dask_cuda/benchmarks/local_cupy.py
+++ b/dask_cuda/benchmarks/local_cupy.py
@@ -1,6 +1,6 @@
import asyncio
from collections import defaultdict
-from json import dump
+from json import dumps
from time import perf_counter as clock
from warnings import filterwarnings
@@ -246,6 +246,8 @@ async def run(args):
print(f"Ignore-size | {format_bytes(args.ignore_size)}")
print(f"Protocol | {args.protocol}")
print(f"Device(s) | {args.devs}")
+ if args.device_memory_limit:
+ print(f"Memory limit | {format_bytes(args.device_memory_limit)}")
print(f"Worker Thread(s) | {args.threads_per_worker}")
print("==========================")
print("Wall-clock | npartitions")
@@ -266,37 +268,46 @@ async def run(args):
print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]))
if args.benchmark_json:
-
- d = {
- "operation": args.operation,
- "size": args.size,
- "second_size": args.second_size,
- "chunk_size": args.chunk_size,
- "compute_size": size,
- "compute_chunk_size": chunksize,
- "ignore_size": format_bytes(args.ignore_size),
- "protocol": args.protocol,
- "devs": args.devs,
- "threads_per_worker": args.threads_per_worker,
- "times": [
- {"wall_clock": took, "npartitions": npartitions}
- for (took, npartitions) in took_list
- ],
- "bandwidths": {
- f"({d1},{d2})"
- if args.multi_node or args.sched_addr
- else "(%02d,%02d)"
- % (d1, d2): {
- "25%": bw[0],
- "50%": bw[1],
- "75%": bw[2],
- "total_nbytes": total_nbytes[(d1, d2)],
- }
- for (d1, d2), bw in sorted(bandwidths.items())
- },
+ bandwidths_json = {
+ "bandwidth_({d1},{d2})_{i}"
+ if args.multi_node or args.sched_addr
+ else "(%02d,%02d)_%s" % (d1, d2, i): parse_bytes(v.rstrip("/s"))
+ for (d1, d2), bw in sorted(bandwidths.items())
+ for i, v in zip(
+ ["25%", "50%", "75%", "total_nbytes"],
+ [bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]],
+ )
}
- with open(args.benchmark_json, "w") as fp:
- dump(d, fp, indent=2)
+
+ with open(args.benchmark_json, "a") as fp:
+ for took, npartitions in took_list:
+ fp.write(
+ dumps(
+ dict(
+ {
+ "operation": args.operation,
+ "user_size": args.size,
+ "user_second_size": args.second_size,
+ "user_chunk_size": args.chunk_size,
+ "compute_size": size,
+ "compute_chunk_size": chunksize,
+ "ignore_size": args.ignore_size,
+ "protocol": args.protocol,
+ "devs": args.devs,
+ "device_memory_limit": args.device_memory_limit,
+ "worker_threads": args.threads_per_worker,
+ "rmm_pool": not args.disable_rmm_pool,
+ "tcp": args.enable_tcp_over_ucx,
+ "ib": args.enable_infiniband,
+ "nvlink": args.enable_nvlink,
+ "wall_clock": took,
+ "npartitions": npartitions,
+ },
+ **bandwidths_json,
+ )
+ )
+ + "\n"
+ )
# An SSHCluster will not automatically shut down, we have to
# ensure it does.
@@ -353,12 +364,6 @@ def parse_args():
"type": int,
"help": "Number of runs (default 3).",
},
- {
- "name": "--benchmark-json",
- "default": None,
- "type": str,
- "help": "Dump a JSON report of benchmarks (optional).",
- },
]
return parse_benchmark_args(
diff --git a/dask_cuda/benchmarks/local_cupy_map_overlap.py b/dask_cuda/benchmarks/local_cupy_map_overlap.py
index 374049ff7..077b212fb 100644
--- a/dask_cuda/benchmarks/local_cupy_map_overlap.py
+++ b/dask_cuda/benchmarks/local_cupy_map_overlap.py
@@ -1,5 +1,6 @@
import asyncio
from collections import defaultdict
+from json import dumps
from time import perf_counter as clock
from warnings import filterwarnings
@@ -125,29 +126,69 @@ async def run(args):
print("Roundtrip benchmark")
print("--------------------------")
- print(f"Size | {args.size}*{args.size}")
- print(f"Chunk-size | {args.chunk_size}")
- print(f"Ignore-size | {format_bytes(args.ignore_size)}")
- print(f"Protocol | {args.protocol}")
- print(f"Device(s) | {args.devs}")
+ print(f"Size | {args.size}*{args.size}")
+ print(f"Chunk-size | {args.chunk_size}")
+ print(f"Ignore-size | {format_bytes(args.ignore_size)}")
+ print(f"Protocol | {args.protocol}")
+ print(f"Device(s) | {args.devs}")
+ if args.device_memory_limit:
+ print(f"memory-limit | {format_bytes(args.device_memory_limit)}")
print("==========================")
- print("Wall-clock | npartitions")
+ print("Wall-clock | npartitions")
print("--------------------------")
for (took, npartitions) in took_list:
t = format_time(took)
- t += " " * (11 - len(t))
+ t += " " * (12 - len(t))
print(f"{t} | {npartitions}")
print("==========================")
- print("(w1,w2) | 25% 50% 75% (total nbytes)")
+ print("(w1,w2) | 25% 50% 75% (total nbytes)")
print("--------------------------")
for (d1, d2), bw in sorted(bandwidths.items()):
fmt = (
- "(%s,%s) | %s %s %s (%s)"
+ "(%s,%s) | %s %s %s (%s)"
if args.multi_node or args.sched_addr
- else "(%02d,%02d) | %s %s %s (%s)"
+ else "(%02d,%02d) | %s %s %s (%s)"
)
print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]))
+ if args.benchmark_json:
+ bandwidths_json = {
+ "bandwidth_({d1},{d2})_{i}"
+ if args.multi_node or args.sched_addr
+ else "(%02d,%02d)_%s" % (d1, d2, i): parse_bytes(v.rstrip("/s"))
+ for (d1, d2), bw in sorted(bandwidths.items())
+ for i, v in zip(
+ ["25%", "50%", "75%", "total_nbytes"],
+ [bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]],
+ )
+ }
+
+ with open(args.benchmark_json, "a") as fp:
+ for took, npartitions in took_list:
+ fp.write(
+ dumps(
+ dict(
+ {
+ "size": args.size * args.size,
+ "chunk_size": args.chunk_size,
+ "ignore_size": args.ignore_size,
+ "protocol": args.protocol,
+ "devs": args.devs,
+ "device_memory_limit": args.device_memory_limit,
+ "worker_threads": args.threads_per_worker,
+ "rmm_pool": not args.disable_rmm_pool,
+ "tcp": args.enable_tcp_over_ucx,
+ "ib": args.enable_infiniband,
+ "nvlink": args.enable_nvlink,
+ "wall_clock": took,
+ "npartitions": npartitions,
+ },
+ **bandwidths_json,
+ )
+ )
+ + "\n"
+ )
+
# An SSHCluster will not automatically shut down, we have to
# ensure it does.
if args.multi_node:
diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py
index 9a185a81f..4cbe574c4 100644
--- a/dask_cuda/benchmarks/utils.py
+++ b/dask_cuda/benchmarks/utils.py
@@ -166,6 +166,13 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[]
type=str,
help="Generate plot output written to defined directory",
)
+ parser.add_argument(
+ "--benchmark-json",
+ default=None,
+ type=str,
+ help="Dump a line-delimited JSON report of benchmarks to this file (optional). "
+ "Creates file if it does not exist, appends otherwise.",
+ )
for args in args_list:
name = args.pop("name")