Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GPU memory usage to dask_memusage plugin. #11

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions dask_memusage.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from time import sleep
from threading import Lock, Thread
from collections import defaultdict
from functools import reduce

from psutil import Process
import click
Expand All @@ -26,19 +27,30 @@


__all__ = ["install"]
__version__ = "1.1"
__version__ = "1.2"


def _process_memory():
def _process_memory() -> tuple:
"""Return process memory usage, in MB.

We include memory used by subprocesses.
We include memory used by subprocesses and GPU usage (default=False).
"""
proc = Process(os.getpid())
return sum([

gpu_mem = 0

try:
# Import here to avoid GPUtil installed in scheduler
import GPUtil

gpu_mem = sum([gpu.memoryUsed for gpu in GPUtil.getGPUs()])
except:
pass

return (sum([
p.memory_info().rss / (1024 * 1024)
for p in [proc] + list(proc.children(recursive=True))
])
]), gpu_mem)


class _WorkerMemory(object):
Expand Down Expand Up @@ -77,7 +89,7 @@ def memory_for_task(self, worker_address):
with self._lock:
result = self._worker_memory[worker_address]
if not result:
result = [0]
result = [(0, 0)]
del self._worker_memory[worker_address]
return result

Expand All @@ -98,7 +110,8 @@ def __init__(self, scheduler, csv_path):
SchedulerPlugin.__init__(self)
f = open(os.path.join(csv_path), "w", buffering=1)
self._csv = csv.writer(f)
self._csv.writerow(["task_key", "min_memory_mb", "max_memory_mb"])
self._csv.writerow(["task_key", "min_memory_mb", "max_memory_mb",
"min_memory_gpu_mb", "max_memory_gpu_mb"])
self._worker_memory = _WorkerMemory(scheduler.address)
self._worker_memory.start()

Expand All @@ -108,9 +121,16 @@ def transition(self, key, start, finish, *args, **kwargs):
if start == "processing" and finish in ("memory", "erred"):
worker_address = kwargs["worker"]
memory_usage = self._worker_memory.memory_for_task(worker_address)
max_memory_usage = max(memory_usage)
min_memory_usage = min(memory_usage)
self._csv.writerow([key, min_memory_usage, max_memory_usage])
max_memory_usage = reduce(lambda x, y: max(x, y[0]),
memory_usage, float("-inf"))
min_memory_usage = reduce(lambda x, y: min(x, y[0]),
memory_usage, float("inf"))
max_memory_gpu_usage = reduce(lambda x, y: max(x, y[1]),
memory_usage, float("-inf"))
min_memory_gpu_usage = reduce(lambda x, y: min(x, y[1]),
memory_usage, float("inf"))
self._csv.writerow([key, min_memory_usage, max_memory_usage,
min_memory_gpu_usage, max_memory_gpu_usage])


def install(scheduler: Scheduler, csv_path: str):
Expand Down
2 changes: 2 additions & 0 deletions test_memusage.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def test_highlevel_python_usage(tmpdir):
client = Client(cluster)
compute(make_bag())
check_csv(tempfile)
client.shutdown()


def test_commandline_usage(tmpdir):
Expand All @@ -77,6 +78,7 @@ def test_commandline_usage(tmpdir):
client = Client("tcp://127.0.0.1:3333")
compute(make_bag())
check_csv(tempfile)
client.shutdown()
finally:
worker.kill()
scheduler.kill()
Expand Down