diff --git a/dask_cuda/tests/test_dask_cuda_worker.py b/dask_cuda/tests/test_dask_cuda_worker.py index efdb9ffb3..c7578417e 100644 --- a/dask_cuda/tests/test_dask_cuda_worker.py +++ b/dask_cuda/tests/test_dask_cuda_worker.py @@ -24,47 +24,42 @@ _runtime_version = rmm._cuda.gpu.runtimeGetVersion() +@patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0,3,7,8"}) def test_cuda_visible_devices_and_memory_limit_and_nthreads(loop): # noqa: F811 - os.environ["CUDA_VISIBLE_DEVICES"] = "0,3,7,8" nthreads = 4 - try: - with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]): - with popen( - [ - "dask-cuda-worker", - "127.0.0.1:9359", - "--host", - "127.0.0.1", - "--device-memory-limit", - "1 MB", - "--nthreads", - str(nthreads), - "--no-dashboard", - "--worker-class", - "dask_cuda.utils.MockWorker", - ] - ): - with Client("127.0.0.1:9359", loop=loop) as client: - assert wait_workers(client, n_gpus=4) + with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]): + with popen( + [ + "dask-cuda-worker", + "127.0.0.1:9359", + "--host", + "127.0.0.1", + "--device-memory-limit", + "1 MB", + "--nthreads", + str(nthreads), + "--no-dashboard", + "--worker-class", + "dask_cuda.utils.MockWorker", + ] + ): + with Client("127.0.0.1:9359", loop=loop) as client: + assert wait_workers(client, n_gpus=4) - def get_visible_devices(): - return os.environ["CUDA_VISIBLE_DEVICES"] + def get_visible_devices(): + return os.environ["CUDA_VISIBLE_DEVICES"] - # verify 4 workers with the 4 expected CUDA_VISIBLE_DEVICES - result = client.run(get_visible_devices) - expected = {"0,3,7,8": 1, "3,7,8,0": 1, "7,8,0,3": 1, "8,0,3,7": 1} - for v in result.values(): - del expected[v] - - workers = client.scheduler_info()["workers"] - for w in workers.values(): - assert ( - w["memory_limit"] == MEMORY_LIMIT // len(workers) * nthreads - ) + # verify 4 workers with the 4 expected CUDA_VISIBLE_DEVICES + result = client.run(get_visible_devices) + expected = {"0,3,7,8": 1, "3,7,8,0": 1, "7,8,0,3": 1, "8,0,3,7": 1} + for v in result.values(): + del expected[v] + + workers = client.scheduler_info()["workers"] + for w in workers.values(): + assert w["memory_limit"] == MEMORY_LIMIT // len(workers) * nthreads - assert len(expected) == 0 - finally: - del os.environ["CUDA_VISIBLE_DEVICES"] + assert len(expected) == 0 def test_rmm_pool(loop): # noqa: F811 @@ -194,20 +189,18 @@ def test_unknown_argument(): assert b"Scheduler address: --my-argument" in ret.stderr +@patch.dict(os.environ, {"DASK_DISTRIBUTED__DIAGNOSTICS__NVML": "False"}) def test_cuda_mig_visible_devices_and_memory_limit_and_nthreads(loop): # noqa: F811 - init_nvmlstatus = os.environ.get("DASK_DISTRIBUTED__DIAGNOSTICS__NVML") - try: - os.environ["DASK_DISTRIBUTED__DIAGNOSTICS__NVML"] = "False" - uuids = get_gpu_count_mig(return_uuids=True)[1] - # test only with some MIG Instances assuming the test bed - # does not have a huge number of mig instances - if len(uuids) > 0: - uuids = [i.decode("utf-8") for i in uuids] - else: - pytest.skip("No MIG devices found") - CUDA_VISIBLE_DEVICES = ",".join(uuids) - os.environ["CUDA_VISIBLE_DEVICES"] = CUDA_VISIBLE_DEVICES - nthreads = len(CUDA_VISIBLE_DEVICES) + uuids = get_gpu_count_mig(return_uuids=True)[1] + # test only with some MIG Instances assuming the test bed + # does not have a huge number of mig instances + if len(uuids) > 0: + cuda_visible_devices = ",".join([i.decode("utf-8") for i in uuids]) + else: + pytest.skip("No MIG devices found") + + with patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": cuda_visible_devices}): + nthreads = len(cuda_visible_devices) with popen(["dask-scheduler", "--port", "9359", "--no-dashboard"]): with popen( [ @@ -237,11 +230,6 @@ def get_visible_devices(): assert set(v.split(",")[i] for v in result.values()) == set( uuids ) - finally: - if "CUDA_VISIBLE_DEVICES" in os.environ: - del os.environ["CUDA_VISIBLE_DEVICES"] - if init_nvmlstatus: - os.environ["DASK_DISTRIBUTED__DIAGNOSTICS__NVML"] = init_nvmlstatus def test_cuda_visible_devices_uuid(loop): # noqa: F811 diff --git a/dask_cuda/tests/test_local_cuda_cluster.py b/dask_cuda/tests/test_local_cuda_cluster.py index 33f007dac..edf73e4e4 100644 --- a/dask_cuda/tests/test_local_cuda_cluster.py +++ b/dask_cuda/tests/test_local_cuda_cluster.py @@ -1,4 +1,5 @@ import os +from unittest.mock import patch import pytest @@ -51,35 +52,32 @@ def get_visible_devices(): # Notice, this test might raise errors when the number of available GPUs is less # than 8 but as long as the test passes the errors can be ignored. @pytest.mark.filterwarnings("ignore:Cannot get CPU affinity") +@patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0,3,6,8"}) @gen_test(timeout=20) async def test_with_subset_of_cuda_visible_devices(): - os.environ["CUDA_VISIBLE_DEVICES"] = "0,3,6,8" - try: - async with LocalCUDACluster( - scheduler_port=0, - asynchronous=True, - device_memory_limit=1, - worker_class=MockWorker, - ) as cluster: - async with Client(cluster, asynchronous=True) as client: - assert len(cluster.workers) == 4 + async with LocalCUDACluster( + scheduler_port=0, + asynchronous=True, + device_memory_limit=1, + worker_class=MockWorker, + ) as cluster: + async with Client(cluster, asynchronous=True) as client: + assert len(cluster.workers) == 4 - # CUDA_VISIBLE_DEVICES cycles properly - def get_visible_devices(): - return os.environ["CUDA_VISIBLE_DEVICES"] + # CUDA_VISIBLE_DEVICES cycles properly + def get_visible_devices(): + return os.environ["CUDA_VISIBLE_DEVICES"] - result = await client.run(get_visible_devices) + result = await client.run(get_visible_devices) - assert all(len(v.split(",")) == 4 for v in result.values()) - for i in range(4): - assert {int(v.split(",")[i]) for v in result.values()} == { - 0, - 3, - 6, - 8, - } - finally: - del os.environ["CUDA_VISIBLE_DEVICES"] + assert all(len(v.split(",")) == 4 for v in result.values()) + for i in range(4): + assert {int(v.split(",")[i]) for v in result.values()} == { + 0, + 3, + 6, + 8, + } @pytest.mark.parametrize("protocol", ["ucx", None]) @@ -210,23 +208,18 @@ async def test_cluster_worker(): await new_worker.close() +@patch.dict(os.environ, {"DASK_DISTRIBUTED__DIAGNOSTICS__NVML": "False"}) @gen_test(timeout=20) async def test_available_mig_workers(): - import dask - - init_nvmlstatus = os.environ.get("DASK_DISTRIBUTED__DIAGNOSTICS__NVML") - try: - os.environ["DASK_DISTRIBUTED__DIAGNOSTICS__NVML"] = "False" - dask.config.refresh() - uuids = get_gpu_count_mig(return_uuids=True)[1] - if len(uuids) > 0: - uuids = [i.decode("utf-8") for i in uuids] - else: - pytest.skip("No MIG devices found") - CUDA_VISIBLE_DEVICES = ",".join(uuids) - os.environ["CUDA_VISIBLE_DEVICES"] = CUDA_VISIBLE_DEVICES + uuids = get_gpu_count_mig(return_uuids=True)[1] + if len(uuids) > 0: + cuda_visible_devices = ",".join([i.decode("utf-8") for i in uuids]) + else: + pytest.skip("No MIG devices found") + + with patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": cuda_visible_devices}): async with LocalCUDACluster( - CUDA_VISIBLE_DEVICES=CUDA_VISIBLE_DEVICES, asynchronous=True + CUDA_VISIBLE_DEVICES=cuda_visible_devices, asynchronous=True ) as cluster: async with Client(cluster, asynchronous=True) as client: len(cluster.workers) == len(uuids) @@ -240,11 +233,6 @@ def get_visible_devices(): assert all(len(v.split(",")) == len(uuids) for v in result.values()) for i in range(len(uuids)): assert set(v.split(",")[i] for v in result.values()) == set(uuids) - finally: - if "CUDA_VISIBLE_DEVICES" in os.environ: - del os.environ["CUDA_VISIBLE_DEVICES"] - if init_nvmlstatus: - os.environ["DASK_DISTRIBUTED__DIAGNOSTICS__NVML"] = init_nvmlstatus @gen_test(timeout=20) diff --git a/dask_cuda/tests/test_utils.py b/dask_cuda/tests/test_utils.py index c6838c323..4aeac6e4c 100644 --- a/dask_cuda/tests/test_utils.py +++ b/dask_cuda/tests/test_utils.py @@ -1,4 +1,5 @@ import os +from unittest.mock import patch import pytest from numba import cuda @@ -20,14 +21,11 @@ ) +@patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": "0,1,2"}) def test_get_n_gpus(): assert isinstance(get_n_gpus(), int) - try: - os.environ["CUDA_VISIBLE_DEVICES"] = "0,1,2" - assert get_n_gpus() == 3 - finally: - del os.environ["CUDA_VISIBLE_DEVICES"] + assert get_n_gpus() == 3 @pytest.mark.parametrize( @@ -220,16 +218,16 @@ def test_parse_visible_devices(): uuids.append(pynvml.nvmlDeviceGetUUID(handle).decode("utf-8")) index_devices = ",".join(indices) - os.environ["CUDA_VISIBLE_DEVICES"] = index_devices - for index in range(get_gpu_count()): - visible = cuda_visible_devices(index) - assert visible.split(",")[0] == str(index) + with patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": index_devices}): + for index in range(get_gpu_count()): + visible = cuda_visible_devices(index) + assert visible.split(",")[0] == str(index) uuid_devices = ",".join(uuids) - os.environ["CUDA_VISIBLE_DEVICES"] = uuid_devices - for index in range(get_gpu_count()): - visible = cuda_visible_devices(index) - assert visible.split(",")[0] == str(uuids[index]) + with patch.dict(os.environ, {"CUDA_VISIBLE_DEVICES": uuid_devices}): + for index in range(get_gpu_count()): + visible = cuda_visible_devices(index) + assert visible.split(",")[0] == str(uuids[index]) with pytest.raises(ValueError): parse_cuda_visible_device("Foo")