From 92aa3d2ef4fe27e5740bd504399825bb42445a79 Mon Sep 17 00:00:00 2001 From: Charles Blackmon-Luca <20627856+charlesbluca@users.noreply.github.com> Date: Mon, 9 Aug 2021 09:54:59 -0700 Subject: [PATCH] Make --benchmark-json a global benchmark arg --- dask_cuda/benchmarks/local_cudf_merge.py | 50 +++++++++++- dask_cuda/benchmarks/local_cudf_shuffle.py | 47 ++++++++++- dask_cuda/benchmarks/local_cupy.py | 79 ++++++++++--------- .../benchmarks/local_cupy_map_overlap.py | 61 +++++++++++--- dask_cuda/benchmarks/utils.py | 7 ++ 5 files changed, 191 insertions(+), 53 deletions(-) diff --git a/dask_cuda/benchmarks/local_cudf_merge.py b/dask_cuda/benchmarks/local_cudf_merge.py index e6e301905..f36be7478 100644 --- a/dask_cuda/benchmarks/local_cudf_merge.py +++ b/dask_cuda/benchmarks/local_cudf_merge.py @@ -1,6 +1,7 @@ import contextlib import math from collections import defaultdict +from json import dumps from time import perf_counter from warnings import filterwarnings @@ -278,6 +279,8 @@ def main(args): print(f"broadcast | {broadcast}") print(f"protocol | {args.protocol}") print(f"device(s) | {args.devs}") + if args.device_memory_limit: + print(f"memory-limit | {format_bytes(args.device_memory_limit)}") print(f"rmm-pool | {(not args.disable_rmm_pool)}") print(f"frac-match | {args.frac_match}") if args.protocol == "ucx": @@ -304,18 +307,59 @@ def main(args): if args.backend == "dask": if args.markdown: print("
\nWorker-Worker Transfer Rates\n\n```") - print("(w1,w2) | 25% 50% 75% (total nbytes)") + print("(w1,w2) | 25% 50% 75% (total nbytes)") print("-------------------------------") for (d1, d2), bw in sorted(bandwidths.items()): fmt = ( - "(%s,%s) | %s %s %s (%s)" + "(%s,%s) | %s %s %s (%s)" if args.multi_node or args.sched_addr - else "(%02d,%02d) | %s %s %s (%s)" + else "(%02d,%02d) | %s %s %s (%s)" ) print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)])) if args.markdown: print("```\n
\n") + if args.benchmark_json: + bandwidths_json = { + "bandwidth_({d1},{d2})_{i}" + if args.multi_node or args.sched_addr + else "(%02d,%02d)_%s" % (d1, d2, i): parse_bytes(v.rstrip("/s")) + for (d1, d2), bw in sorted(bandwidths.items()) + for i, v in zip( + ["25%", "50%", "75%", "total_nbytes"], + [bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]], + ) + } + + with open(args.benchmark_json, "a") as fp: + for data_processed, took in took_list: + fp.write( + dumps( + dict( + { + "backend": args.backend, + "merge_type": args.type, + "rows_per_chunk": args.chunk_size, + "base_chunks": args.base_chunks, + "other_chunks": args.other_chunks, + "broadcast": broadcast, + "protocol": args.protocol, + "devs": args.devs, + "device_memory_limit": args.device_memory_limit, + "rmm_pool": not args.disable_rmm_pool, + "tcp": args.enable_tcp_over_ucx, + "ib": args.enable_infiniband, + "nvlink": args.enable_nvlink, + "data_processed": data_processed, + "wall_clock": took, + "throughput": data_processed / took, + }, + **bandwidths_json, + ) + ) + + "\n" + ) + if args.multi_node: client.shutdown() client.close() diff --git a/dask_cuda/benchmarks/local_cudf_shuffle.py b/dask_cuda/benchmarks/local_cudf_shuffle.py index f329aa92b..f2c812d08 100644 --- a/dask_cuda/benchmarks/local_cudf_shuffle.py +++ b/dask_cuda/benchmarks/local_cudf_shuffle.py @@ -1,5 +1,6 @@ import contextlib from collections import defaultdict +from json import dumps from time import perf_counter as clock from warnings import filterwarnings @@ -151,6 +152,8 @@ def main(args): print(f"in-parts | {args.in_parts}") print(f"protocol | {args.protocol}") print(f"device(s) | {args.devs}") + if args.device_memory_limit: + print(f"memory-limit | {format_bytes(args.device_memory_limit)}") print(f"rmm-pool | {(not args.disable_rmm_pool)}") if args.protocol == "ucx": print(f"tcp | {args.enable_tcp_over_ucx}") @@ -176,18 +179,56 @@ def main(args): if args.backend == "dask": if args.markdown: print("
\nWorker-Worker Transfer Rates\n\n```") - print("(w1,w2) | 25% 50% 75% (total nbytes)") + print("(w1,w2) | 25% 50% 75% (total nbytes)") print("-------------------------------") for (d1, d2), bw in sorted(bandwidths.items()): fmt = ( - "(%s,%s) | %s %s %s (%s)" + "(%s,%s) | %s %s %s (%s)" if args.multi_node or args.sched_addr - else "(%02d,%02d) | %s %s %s (%s)" + else "(%02d,%02d) | %s %s %s (%s)" ) print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)])) if args.markdown: print("```\n
\n") + if args.benchmark_json: + bandwidths_json = { + "bandwidth_({d1},{d2})_{i}" + if args.multi_node or args.sched_addr + else "(%02d,%02d)_%s" % (d1, d2, i): parse_bytes(v.rstrip("/s")) + for (d1, d2), bw in sorted(bandwidths.items()) + for i, v in zip( + ["25%", "50%", "75%", "total_nbytes"], + [bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]], + ) + } + + with open(args.benchmark_json, "a") as fp: + for data_processed, took in took_list: + fp.write( + dumps( + dict( + { + "backend": args.backend, + "partition_size": args.partition_size, + "in_parts": args.in_parts, + "protocol": args.protocol, + "devs": args.devs, + "device_memory_limit": args.device_memory_limit, + "rmm_pool": not args.disable_rmm_pool, + "tcp": args.enable_tcp_over_ucx, + "ib": args.enable_infiniband, + "nvlink": args.enable_nvlink, + "data_processed": data_processed, + "wall_clock": took, + "throughput": data_processed / took, + }, + **bandwidths_json, + ) + ) + + "\n" + ) + if args.multi_node: client.shutdown() client.close() diff --git a/dask_cuda/benchmarks/local_cupy.py b/dask_cuda/benchmarks/local_cupy.py index 9a07b2afe..a4bbc341a 100644 --- a/dask_cuda/benchmarks/local_cupy.py +++ b/dask_cuda/benchmarks/local_cupy.py @@ -1,6 +1,6 @@ import asyncio from collections import defaultdict -from json import dump +from json import dumps from time import perf_counter as clock from warnings import filterwarnings @@ -246,6 +246,8 @@ async def run(args): print(f"Ignore-size | {format_bytes(args.ignore_size)}") print(f"Protocol | {args.protocol}") print(f"Device(s) | {args.devs}") + if args.device_memory_limit: + print(f"Memory limit | {format_bytes(args.device_memory_limit)}") print(f"Worker Thread(s) | {args.threads_per_worker}") print("==========================") print("Wall-clock | npartitions") @@ -266,37 +268,46 @@ async def run(args): print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)])) if args.benchmark_json: - - d = { - "operation": args.operation, - "size": args.size, - "second_size": args.second_size, - "chunk_size": args.chunk_size, - "compute_size": size, - "compute_chunk_size": chunksize, - "ignore_size": format_bytes(args.ignore_size), - "protocol": args.protocol, - "devs": args.devs, - "threads_per_worker": args.threads_per_worker, - "times": [ - {"wall_clock": took, "npartitions": npartitions} - for (took, npartitions) in took_list - ], - "bandwidths": { - f"({d1},{d2})" - if args.multi_node or args.sched_addr - else "(%02d,%02d)" - % (d1, d2): { - "25%": bw[0], - "50%": bw[1], - "75%": bw[2], - "total_nbytes": total_nbytes[(d1, d2)], - } - for (d1, d2), bw in sorted(bandwidths.items()) - }, + bandwidths_json = { + "bandwidth_({d1},{d2})_{i}" + if args.multi_node or args.sched_addr + else "(%02d,%02d)_%s" % (d1, d2, i): parse_bytes(v.rstrip("/s")) + for (d1, d2), bw in sorted(bandwidths.items()) + for i, v in zip( + ["25%", "50%", "75%", "total_nbytes"], + [bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]], + ) } - with open(args.benchmark_json, "w") as fp: - dump(d, fp, indent=2) + + with open(args.benchmark_json, "a") as fp: + for took, npartitions in took_list: + fp.write( + dumps( + dict( + { + "operation": args.operation, + "user_size": args.size, + "user_second_size": args.second_size, + "user_chunk_size": args.chunk_size, + "compute_size": size, + "compute_chunk_size": chunksize, + "ignore_size": args.ignore_size, + "protocol": args.protocol, + "devs": args.devs, + "device_memory_limit": args.device_memory_limit, + "worker_threads": args.threads_per_worker, + "rmm_pool": not args.disable_rmm_pool, + "tcp": args.enable_tcp_over_ucx, + "ib": args.enable_infiniband, + "nvlink": args.enable_nvlink, + "wall_clock": took, + "npartitions": npartitions, + }, + **bandwidths_json, + ) + ) + + "\n" + ) # An SSHCluster will not automatically shut down, we have to # ensure it does. @@ -353,12 +364,6 @@ def parse_args(): "type": int, "help": "Number of runs (default 3).", }, - { - "name": "--benchmark-json", - "default": None, - "type": str, - "help": "Dump a JSON report of benchmarks (optional).", - }, ] return parse_benchmark_args( diff --git a/dask_cuda/benchmarks/local_cupy_map_overlap.py b/dask_cuda/benchmarks/local_cupy_map_overlap.py index 374049ff7..077b212fb 100644 --- a/dask_cuda/benchmarks/local_cupy_map_overlap.py +++ b/dask_cuda/benchmarks/local_cupy_map_overlap.py @@ -1,5 +1,6 @@ import asyncio from collections import defaultdict +from json import dumps from time import perf_counter as clock from warnings import filterwarnings @@ -125,29 +126,69 @@ async def run(args): print("Roundtrip benchmark") print("--------------------------") - print(f"Size | {args.size}*{args.size}") - print(f"Chunk-size | {args.chunk_size}") - print(f"Ignore-size | {format_bytes(args.ignore_size)}") - print(f"Protocol | {args.protocol}") - print(f"Device(s) | {args.devs}") + print(f"Size | {args.size}*{args.size}") + print(f"Chunk-size | {args.chunk_size}") + print(f"Ignore-size | {format_bytes(args.ignore_size)}") + print(f"Protocol | {args.protocol}") + print(f"Device(s) | {args.devs}") + if args.device_memory_limit: + print(f"memory-limit | {format_bytes(args.device_memory_limit)}") print("==========================") - print("Wall-clock | npartitions") + print("Wall-clock | npartitions") print("--------------------------") for (took, npartitions) in took_list: t = format_time(took) - t += " " * (11 - len(t)) + t += " " * (12 - len(t)) print(f"{t} | {npartitions}") print("==========================") - print("(w1,w2) | 25% 50% 75% (total nbytes)") + print("(w1,w2) | 25% 50% 75% (total nbytes)") print("--------------------------") for (d1, d2), bw in sorted(bandwidths.items()): fmt = ( - "(%s,%s) | %s %s %s (%s)" + "(%s,%s) | %s %s %s (%s)" if args.multi_node or args.sched_addr - else "(%02d,%02d) | %s %s %s (%s)" + else "(%02d,%02d) | %s %s %s (%s)" ) print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)])) + if args.benchmark_json: + bandwidths_json = { + "bandwidth_({d1},{d2})_{i}" + if args.multi_node or args.sched_addr + else "(%02d,%02d)_%s" % (d1, d2, i): parse_bytes(v.rstrip("/s")) + for (d1, d2), bw in sorted(bandwidths.items()) + for i, v in zip( + ["25%", "50%", "75%", "total_nbytes"], + [bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]], + ) + } + + with open(args.benchmark_json, "a") as fp: + for took, npartitions in took_list: + fp.write( + dumps( + dict( + { + "size": args.size * args.size, + "chunk_size": args.chunk_size, + "ignore_size": args.ignore_size, + "protocol": args.protocol, + "devs": args.devs, + "device_memory_limit": args.device_memory_limit, + "worker_threads": args.threads_per_worker, + "rmm_pool": not args.disable_rmm_pool, + "tcp": args.enable_tcp_over_ucx, + "ib": args.enable_infiniband, + "nvlink": args.enable_nvlink, + "wall_clock": took, + "npartitions": npartitions, + }, + **bandwidths_json, + ) + ) + + "\n" + ) + # An SSHCluster will not automatically shut down, we have to # ensure it does. if args.multi_node: diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py index 9a185a81f..4cbe574c4 100644 --- a/dask_cuda/benchmarks/utils.py +++ b/dask_cuda/benchmarks/utils.py @@ -166,6 +166,13 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[] type=str, help="Generate plot output written to defined directory", ) + parser.add_argument( + "--benchmark-json", + default=None, + type=str, + help="Dump a line-delimited JSON report of benchmarks to this file (optional). " + "Creates file if it does not exist, appends otherwise.", + ) for args in args_list: name = args.pop("name")