Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Dask CUDA work with the new WorkerMemoryManager abstraction #870

Merged
merged 10 commits into from
Mar 21, 2022
2 changes: 1 addition & 1 deletion dask_cuda/cuda_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
enable_proctitle_on_children,
enable_proctitle_on_current,
)
from distributed.worker import parse_memory_limit
from distributed.worker_memory import parse_memory_limit

from .device_host_file import DeviceHostFile
from .initialize import initialize
Expand Down
2 changes: 1 addition & 1 deletion dask_cuda/local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import dask
from dask.utils import parse_bytes
from distributed import LocalCluster, Nanny, Worker
from distributed.worker import parse_memory_limit
from distributed.worker_memory import parse_memory_limit

from .device_host_file import DeviceHostFile
from .initialize import initialize
Expand Down
2 changes: 1 addition & 1 deletion dask_cuda/tests/test_local_cuda_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def get_visible_devices():
)

# Use full memory, checked with some buffer to ignore rounding difference
full_mem = sum(w.memory_limit for w in cluster.workers.values())
full_mem = sum(w.memory_manager.memory_limit for w in cluster.workers.values())
assert full_mem >= MEMORY_LIMIT - 1024 and full_mem < MEMORY_LIMIT + 1024

for w, devices in result.items():
Expand Down
14 changes: 8 additions & 6 deletions dask_cuda/tests/test_proxify_host_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ async def test_worker_force_spill_to_disk():
"""Test Dask triggering CPU-to-Disk spilling """
cudf = pytest.importorskip("cudf")

with dask.config.set({"distributed.worker.memory.terminate": 0}):
with dask.config.set({"distributed.worker.memory.terminate": None}):
async with dask_cuda.LocalCUDACluster(
n_workers=1, device_memory_limit="1MB", jit_unspill=True, asynchronous=True
) as cluster:
Expand All @@ -392,18 +392,20 @@ async def test_worker_force_spill_to_disk():
ddf = dask.dataframe.from_pandas(df, npartitions=1).persist()
await ddf


async def f():
"""Trigger a memory_monitor() and reset memory_limit"""
w = get_worker()
# Set a host memory limit that triggers spilling to disk
w.memory_pause_fraction = False
w.memory_manager.memory_pause_fraction = False
memory = w.monitor.proc.memory_info().rss
w.memory_limit = memory - 10 ** 8
w.memory_target_fraction = 1
await w.memory_monitor()
w.memory_manager.memory_limit = memory - 10 ** 8
w.memory_manager.memory_target_fraction = 1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't work: dask/distributed#5367

print(w.memory_manager.data)
await w.memory_manager.memory_monitor(w)
# Check that host memory are freed
assert w.monitor.proc.memory_info().rss < memory - 10 ** 7
w.memory_limit = memory * 10 # Un-limit
w.memory_manager.memory_limit = memory * 10 # Un-limit

await client.submit(f)
log = str(await client.get_worker_logs())
Expand Down