Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --benchmark-json option to all benchmarks #700

Merged
merged 1 commit into from
Aug 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 47 additions & 3 deletions dask_cuda/benchmarks/local_cudf_merge.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import contextlib
import math
from collections import defaultdict
from json import dumps
from time import perf_counter
from warnings import filterwarnings

Expand Down Expand Up @@ -278,6 +279,8 @@ def main(args):
print(f"broadcast | {broadcast}")
print(f"protocol | {args.protocol}")
print(f"device(s) | {args.devs}")
if args.device_memory_limit:
print(f"memory-limit | {format_bytes(args.device_memory_limit)}")
print(f"rmm-pool | {(not args.disable_rmm_pool)}")
print(f"frac-match | {args.frac_match}")
if args.protocol == "ucx":
Expand All @@ -304,18 +307,59 @@ def main(args):
if args.backend == "dask":
if args.markdown:
print("<details>\n<summary>Worker-Worker Transfer Rates</summary>\n\n```")
print("(w1,w2) | 25% 50% 75% (total nbytes)")
print("(w1,w2) | 25% 50% 75% (total nbytes)")
print("-------------------------------")
for (d1, d2), bw in sorted(bandwidths.items()):
fmt = (
"(%s,%s) | %s %s %s (%s)"
"(%s,%s) | %s %s %s (%s)"
if args.multi_node or args.sched_addr
else "(%02d,%02d) | %s %s %s (%s)"
else "(%02d,%02d) | %s %s %s (%s)"
)
print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]))
if args.markdown:
print("```\n</details>\n")

if args.benchmark_json:
bandwidths_json = {
"bandwidth_({d1},{d2})_{i}"
if args.multi_node or args.sched_addr
else "(%02d,%02d)_%s" % (d1, d2, i): parse_bytes(v.rstrip("/s"))
for (d1, d2), bw in sorted(bandwidths.items())
for i, v in zip(
["25%", "50%", "75%", "total_nbytes"],
[bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]],
)
}

with open(args.benchmark_json, "a") as fp:
for data_processed, took in took_list:
fp.write(
dumps(
dict(
{
"backend": args.backend,
"merge_type": args.type,
"rows_per_chunk": args.chunk_size,
"base_chunks": args.base_chunks,
"other_chunks": args.other_chunks,
"broadcast": broadcast,
"protocol": args.protocol,
"devs": args.devs,
"device_memory_limit": args.device_memory_limit,
"rmm_pool": not args.disable_rmm_pool,
"tcp": args.enable_tcp_over_ucx,
"ib": args.enable_infiniband,
"nvlink": args.enable_nvlink,
"data_processed": data_processed,
"wall_clock": took,
"throughput": data_processed / took,
},
**bandwidths_json,
)
)
+ "\n"
)

if args.multi_node:
client.shutdown()
client.close()
Expand Down
47 changes: 44 additions & 3 deletions dask_cuda/benchmarks/local_cudf_shuffle.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import contextlib
from collections import defaultdict
from json import dumps
from time import perf_counter as clock
from warnings import filterwarnings

Expand Down Expand Up @@ -151,6 +152,8 @@ def main(args):
print(f"in-parts | {args.in_parts}")
print(f"protocol | {args.protocol}")
print(f"device(s) | {args.devs}")
if args.device_memory_limit:
print(f"memory-limit | {format_bytes(args.device_memory_limit)}")
print(f"rmm-pool | {(not args.disable_rmm_pool)}")
if args.protocol == "ucx":
print(f"tcp | {args.enable_tcp_over_ucx}")
Expand All @@ -176,18 +179,56 @@ def main(args):
if args.backend == "dask":
if args.markdown:
print("<details>\n<summary>Worker-Worker Transfer Rates</summary>\n\n```")
print("(w1,w2) | 25% 50% 75% (total nbytes)")
print("(w1,w2) | 25% 50% 75% (total nbytes)")
print("-------------------------------")
for (d1, d2), bw in sorted(bandwidths.items()):
fmt = (
"(%s,%s) | %s %s %s (%s)"
"(%s,%s) | %s %s %s (%s)"
if args.multi_node or args.sched_addr
else "(%02d,%02d) | %s %s %s (%s)"
else "(%02d,%02d) | %s %s %s (%s)"
)
print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]))
if args.markdown:
print("```\n</details>\n")

if args.benchmark_json:
bandwidths_json = {
"bandwidth_({d1},{d2})_{i}"
if args.multi_node or args.sched_addr
else "(%02d,%02d)_%s" % (d1, d2, i): parse_bytes(v.rstrip("/s"))
for (d1, d2), bw in sorted(bandwidths.items())
for i, v in zip(
["25%", "50%", "75%", "total_nbytes"],
[bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]],
)
}

with open(args.benchmark_json, "a") as fp:
for data_processed, took in took_list:
fp.write(
dumps(
dict(
{
"backend": args.backend,
"partition_size": args.partition_size,
"in_parts": args.in_parts,
"protocol": args.protocol,
"devs": args.devs,
"device_memory_limit": args.device_memory_limit,
"rmm_pool": not args.disable_rmm_pool,
"tcp": args.enable_tcp_over_ucx,
"ib": args.enable_infiniband,
"nvlink": args.enable_nvlink,
"data_processed": data_processed,
"wall_clock": took,
"throughput": data_processed / took,
},
**bandwidths_json,
)
)
+ "\n"
)

if args.multi_node:
client.shutdown()
client.close()
Expand Down
79 changes: 42 additions & 37 deletions dask_cuda/benchmarks/local_cupy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
from collections import defaultdict
from json import dump
from json import dumps
from time import perf_counter as clock
from warnings import filterwarnings

Expand Down Expand Up @@ -246,6 +246,8 @@ async def run(args):
print(f"Ignore-size | {format_bytes(args.ignore_size)}")
print(f"Protocol | {args.protocol}")
print(f"Device(s) | {args.devs}")
if args.device_memory_limit:
print(f"Memory limit | {format_bytes(args.device_memory_limit)}")
print(f"Worker Thread(s) | {args.threads_per_worker}")
print("==========================")
print("Wall-clock | npartitions")
Expand All @@ -266,37 +268,46 @@ async def run(args):
print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]))

if args.benchmark_json:

d = {
"operation": args.operation,
"size": args.size,
"second_size": args.second_size,
"chunk_size": args.chunk_size,
"compute_size": size,
"compute_chunk_size": chunksize,
"ignore_size": format_bytes(args.ignore_size),
"protocol": args.protocol,
"devs": args.devs,
"threads_per_worker": args.threads_per_worker,
"times": [
{"wall_clock": took, "npartitions": npartitions}
for (took, npartitions) in took_list
],
"bandwidths": {
f"({d1},{d2})"
if args.multi_node or args.sched_addr
else "(%02d,%02d)"
% (d1, d2): {
"25%": bw[0],
"50%": bw[1],
"75%": bw[2],
"total_nbytes": total_nbytes[(d1, d2)],
}
for (d1, d2), bw in sorted(bandwidths.items())
},
bandwidths_json = {
"bandwidth_({d1},{d2})_{i}"
if args.multi_node or args.sched_addr
else "(%02d,%02d)_%s" % (d1, d2, i): parse_bytes(v.rstrip("/s"))
for (d1, d2), bw in sorted(bandwidths.items())
for i, v in zip(
["25%", "50%", "75%", "total_nbytes"],
[bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]],
)
}
with open(args.benchmark_json, "w") as fp:
dump(d, fp, indent=2)

with open(args.benchmark_json, "a") as fp:
for took, npartitions in took_list:
fp.write(
dumps(
dict(
{
"operation": args.operation,
"user_size": args.size,
"user_second_size": args.second_size,
"user_chunk_size": args.chunk_size,
"compute_size": size,
"compute_chunk_size": chunksize,
"ignore_size": args.ignore_size,
"protocol": args.protocol,
"devs": args.devs,
"device_memory_limit": args.device_memory_limit,
"worker_threads": args.threads_per_worker,
"rmm_pool": not args.disable_rmm_pool,
"tcp": args.enable_tcp_over_ucx,
"ib": args.enable_infiniband,
"nvlink": args.enable_nvlink,
"wall_clock": took,
"npartitions": npartitions,
},
**bandwidths_json,
)
)
+ "\n"
)

# An SSHCluster will not automatically shut down, we have to
# ensure it does.
Expand Down Expand Up @@ -353,12 +364,6 @@ def parse_args():
"type": int,
"help": "Number of runs (default 3).",
},
{
"name": "--benchmark-json",
"default": None,
"type": str,
"help": "Dump a JSON report of benchmarks (optional).",
},
]

return parse_benchmark_args(
Expand Down
61 changes: 51 additions & 10 deletions dask_cuda/benchmarks/local_cupy_map_overlap.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
from collections import defaultdict
from json import dumps
from time import perf_counter as clock
from warnings import filterwarnings

Expand Down Expand Up @@ -125,29 +126,69 @@ async def run(args):

print("Roundtrip benchmark")
print("--------------------------")
print(f"Size | {args.size}*{args.size}")
print(f"Chunk-size | {args.chunk_size}")
print(f"Ignore-size | {format_bytes(args.ignore_size)}")
print(f"Protocol | {args.protocol}")
print(f"Device(s) | {args.devs}")
print(f"Size | {args.size}*{args.size}")
print(f"Chunk-size | {args.chunk_size}")
print(f"Ignore-size | {format_bytes(args.ignore_size)}")
print(f"Protocol | {args.protocol}")
print(f"Device(s) | {args.devs}")
if args.device_memory_limit:
print(f"memory-limit | {format_bytes(args.device_memory_limit)}")
print("==========================")
print("Wall-clock | npartitions")
print("Wall-clock | npartitions")
print("--------------------------")
for (took, npartitions) in took_list:
t = format_time(took)
t += " " * (11 - len(t))
t += " " * (12 - len(t))
print(f"{t} | {npartitions}")
print("==========================")
print("(w1,w2) | 25% 50% 75% (total nbytes)")
print("(w1,w2) | 25% 50% 75% (total nbytes)")
print("--------------------------")
for (d1, d2), bw in sorted(bandwidths.items()):
fmt = (
"(%s,%s) | %s %s %s (%s)"
"(%s,%s) | %s %s %s (%s)"
if args.multi_node or args.sched_addr
else "(%02d,%02d) | %s %s %s (%s)"
else "(%02d,%02d) | %s %s %s (%s)"
)
print(fmt % (d1, d2, bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]))

if args.benchmark_json:
bandwidths_json = {
"bandwidth_({d1},{d2})_{i}"
if args.multi_node or args.sched_addr
else "(%02d,%02d)_%s" % (d1, d2, i): parse_bytes(v.rstrip("/s"))
for (d1, d2), bw in sorted(bandwidths.items())
for i, v in zip(
["25%", "50%", "75%", "total_nbytes"],
[bw[0], bw[1], bw[2], total_nbytes[(d1, d2)]],
)
}

with open(args.benchmark_json, "a") as fp:
for took, npartitions in took_list:
fp.write(
dumps(
dict(
{
"size": args.size * args.size,
"chunk_size": args.chunk_size,
"ignore_size": args.ignore_size,
"protocol": args.protocol,
"devs": args.devs,
"device_memory_limit": args.device_memory_limit,
"worker_threads": args.threads_per_worker,
"rmm_pool": not args.disable_rmm_pool,
"tcp": args.enable_tcp_over_ucx,
"ib": args.enable_infiniband,
"nvlink": args.enable_nvlink,
"wall_clock": took,
"npartitions": npartitions,
},
**bandwidths_json,
)
)
+ "\n"
)

# An SSHCluster will not automatically shut down, we have to
# ensure it does.
if args.multi_node:
Expand Down
7 changes: 7 additions & 0 deletions dask_cuda/benchmarks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[]
type=str,
help="Generate plot output written to defined directory",
)
parser.add_argument(
"--benchmark-json",
default=None,
type=str,
help="Dump a line-delimited JSON report of benchmarks to this file (optional). "
"Creates file if it does not exist, appends otherwise.",
)

for args in args_list:
name = args.pop("name")
Expand Down