Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use unittest.mock.patch for all os.environ tests #787

Merged
merged 1 commit into from
Nov 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 42 additions & 54 deletions dask_cuda/tests/test_dask_cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,47 +24,42 @@
_runtime_version = rmm._cuda.gpu.runtimeGetVersion()


@patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0,3,7,8"})
def test_cuda_visible_devices_and_memory_limit_and_nthreads(loop): # noqa: F811
os.environ["CUDA_VISIBLE_DEVICES"] = "0,3,7,8"
nthreads = 4
try:
with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]):
with popen(
[
"dask-cuda-worker",
"127.0.0.1:9359",
"--host",
"127.0.0.1",
"--device-memory-limit",
"1 MB",
"--nthreads",
str(nthreads),
"--no-dashboard",
"--worker-class",
"dask_cuda.utils.MockWorker",
]
):
with Client("127.0.0.1:9359", loop=loop) as client:
assert wait_workers(client, n_gpus=4)
with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]):
with popen(
[
"dask-cuda-worker",
"127.0.0.1:9359",
"--host",
"127.0.0.1",
"--device-memory-limit",
"1 MB",
"--nthreads",
str(nthreads),
"--no-dashboard",
"--worker-class",
"dask_cuda.utils.MockWorker",
]
):
with Client("127.0.0.1:9359", loop=loop) as client:
assert wait_workers(client, n_gpus=4)

def get_visible_devices():
return os.environ["CUDA_VISIBLE_DEVICES"]
def get_visible_devices():
return os.environ["CUDA_VISIBLE_DEVICES"]

# verify 4 workers with the 4 expected CUDA_VISIBLE_DEVICES
result = client.run(get_visible_devices)
expected = {"0,3,7,8": 1, "3,7,8,0": 1, "7,8,0,3": 1, "8,0,3,7": 1}
for v in result.values():
del expected[v]

workers = client.scheduler_info()["workers"]
for w in workers.values():
assert (
w["memory_limit"] == MEMORY_LIMIT // len(workers) * nthreads
)
# verify 4 workers with the 4 expected CUDA_VISIBLE_DEVICES
result = client.run(get_visible_devices)
expected = {"0,3,7,8": 1, "3,7,8,0": 1, "7,8,0,3": 1, "8,0,3,7": 1}
for v in result.values():
del expected[v]

workers = client.scheduler_info()["workers"]
for w in workers.values():
assert w["memory_limit"] == MEMORY_LIMIT // len(workers) * nthreads

assert len(expected) == 0
finally:
del os.environ["CUDA_VISIBLE_DEVICES"]
assert len(expected) == 0


def test_rmm_pool(loop): # noqa: F811
Expand Down Expand Up @@ -194,20 +189,18 @@ def test_unknown_argument():
assert b"Scheduler address: --my-argument" in ret.stderr


@patch.dict(os.environ, {"DASK_DISTRIBUTED__DIAGNOSTICS__NVML": "False"})
def test_cuda_mig_visible_devices_and_memory_limit_and_nthreads(loop): # noqa: F811
init_nvmlstatus = os.environ.get("DASK_DISTRIBUTED__DIAGNOSTICS__NVML")
try:
os.environ["DASK_DISTRIBUTED__DIAGNOSTICS__NVML"] = "False"
uuids = get_gpu_count_mig(return_uuids=True)[1]
# test only with some MIG Instances assuming the test bed
# does not have a huge number of mig instances
if len(uuids) > 0:
uuids = [i.decode("utf-8") for i in uuids]
else:
pytest.skip("No MIG devices found")
CUDA_VISIBLE_DEVICES = ",".join(uuids)
os.environ["CUDA_VISIBLE_DEVICES"] = CUDA_VISIBLE_DEVICES
nthreads = len(CUDA_VISIBLE_DEVICES)
uuids = get_gpu_count_mig(return_uuids=True)[1]
# test only with some MIG Instances assuming the test bed
# does not have a huge number of mig instances
if len(uuids) > 0:
cuda_visible_devices = ",".join([i.decode("utf-8") for i in uuids])
else:
pytest.skip("No MIG devices found")

with patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": cuda_visible_devices}):
nthreads = len(cuda_visible_devices)
with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]):
with popen(
[
Expand Down Expand Up @@ -237,11 +230,6 @@ def get_visible_devices():
assert set(v.split(",")[i] for v in result.values()) == set(
uuids
)
finally:
if "CUDA_VISIBLE_DEVICES" in os.environ:
del os.environ["CUDA_VISIBLE_DEVICES"]
if init_nvmlstatus:
os.environ["DASK_DISTRIBUTED__DIAGNOSTICS__NVML"] = init_nvmlstatus


def test_cuda_visible_devices_uuid(loop): # noqa: F811
Expand Down
74 changes: 31 additions & 43 deletions dask_cuda/tests/test_local_cuda_cluster.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from unittest.mock import patch

import pytest

Expand Down Expand Up @@ -51,35 +52,32 @@ def get_visible_devices():
# Notice, this test might raise errors when the number of available GPUs is less
# than 8 but as long as the test passes the errors can be ignored.
@pytest.mark.filterwarnings("ignore:Cannot get CPU affinity")
@patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0,3,6,8"})
@gen_test(timeout=20)
async def test_with_subset_of_cuda_visible_devices():
os.environ["CUDA_VISIBLE_DEVICES"] = "0,3,6,8"
try:
async with LocalCUDACluster(
scheduler_port=0,
asynchronous=True,
device_memory_limit=1,
worker_class=MockWorker,
) as cluster:
async with Client(cluster, asynchronous=True) as client:
assert len(cluster.workers) == 4
async with LocalCUDACluster(
scheduler_port=0,
asynchronous=True,
device_memory_limit=1,
worker_class=MockWorker,
) as cluster:
async with Client(cluster, asynchronous=True) as client:
assert len(cluster.workers) == 4

# CUDA_VISIBLE_DEVICES cycles properly
def get_visible_devices():
return os.environ["CUDA_VISIBLE_DEVICES"]
# CUDA_VISIBLE_DEVICES cycles properly
def get_visible_devices():
return os.environ["CUDA_VISIBLE_DEVICES"]

result = await client.run(get_visible_devices)
result = await client.run(get_visible_devices)

assert all(len(v.split(",")) == 4 for v in result.values())
for i in range(4):
assert {int(v.split(",")[i]) for v in result.values()} == {
0,
3,
6,
8,
}
finally:
del os.environ["CUDA_VISIBLE_DEVICES"]
assert all(len(v.split(",")) == 4 for v in result.values())
for i in range(4):
assert {int(v.split(",")[i]) for v in result.values()} == {
0,
3,
6,
8,
}


@pytest.mark.parametrize("protocol", ["ucx", None])
Expand Down Expand Up @@ -210,23 +208,18 @@ async def test_cluster_worker():
await new_worker.close()


@patch.dict(os.environ, {"DASK_DISTRIBUTED__DIAGNOSTICS__NVML": "False"})
@gen_test(timeout=20)
async def test_available_mig_workers():
import dask

init_nvmlstatus = os.environ.get("DASK_DISTRIBUTED__DIAGNOSTICS__NVML")
try:
os.environ["DASK_DISTRIBUTED__DIAGNOSTICS__NVML"] = "False"
dask.config.refresh()
uuids = get_gpu_count_mig(return_uuids=True)[1]
if len(uuids) > 0:
uuids = [i.decode("utf-8") for i in uuids]
else:
pytest.skip("No MIG devices found")
CUDA_VISIBLE_DEVICES = ",".join(uuids)
os.environ["CUDA_VISIBLE_DEVICES"] = CUDA_VISIBLE_DEVICES
uuids = get_gpu_count_mig(return_uuids=True)[1]
if len(uuids) > 0:
cuda_visible_devices = ",".join([i.decode("utf-8") for i in uuids])
else:
pytest.skip("No MIG devices found")

with patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": cuda_visible_devices}):
async with LocalCUDACluster(
CUDA_VISIBLE_DEVICES=CUDA_VISIBLE_DEVICES, asynchronous=True
CUDA_VISIBLE_DEVICES=cuda_visible_devices, asynchronous=True
) as cluster:
async with Client(cluster, asynchronous=True) as client:
len(cluster.workers) == len(uuids)
Expand All @@ -240,11 +233,6 @@ def get_visible_devices():
assert all(len(v.split(",")) == len(uuids) for v in result.values())
for i in range(len(uuids)):
assert set(v.split(",")[i] for v in result.values()) == set(uuids)
finally:
if "CUDA_VISIBLE_DEVICES" in os.environ:
del os.environ["CUDA_VISIBLE_DEVICES"]
if init_nvmlstatus:
os.environ["DASK_DISTRIBUTED__DIAGNOSTICS__NVML"] = init_nvmlstatus


@gen_test(timeout=20)
Expand Down
24 changes: 11 additions & 13 deletions dask_cuda/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from unittest.mock import patch

import pytest
from numba import cuda
Expand All @@ -20,14 +21,11 @@
)


@patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0,1,2"})
def test_get_n_gpus():
assert isinstance(get_n_gpus(), int)

try:
os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2"
assert get_n_gpus() == 3
finally:
del os.environ["CUDA_VISIBLE_DEVICES"]
assert get_n_gpus() == 3


@pytest.mark.parametrize(
Expand Down Expand Up @@ -220,16 +218,16 @@ def test_parse_visible_devices():
uuids.append(pynvml.nvmlDeviceGetUUID(handle).decode("utf-8"))

index_devices = ",".join(indices)
os.environ["CUDA_VISIBLE_DEVICES"] = index_devices
for index in range(get_gpu_count()):
visible = cuda_visible_devices(index)
assert visible.split(",")[0] == str(index)
with patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": index_devices}):
for index in range(get_gpu_count()):
visible = cuda_visible_devices(index)
assert visible.split(",")[0] == str(index)

uuid_devices = ",".join(uuids)
os.environ["CUDA_VISIBLE_DEVICES"] = uuid_devices
for index in range(get_gpu_count()):
visible = cuda_visible_devices(index)
assert visible.split(",")[0] == str(uuids[index])
with patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": uuid_devices}):
for index in range(get_gpu_count()):
visible = cuda_visible_devices(index)
assert visible.split(",")[0] == str(uuids[index])

with pytest.raises(ValueError):
parse_cuda_visible_device("Foo")
Expand Down