Skip to content

Commit

Permalink
enhance tests when spilling with custom data
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Mar 9, 2022
1 parent 4badfe5 commit ee5ca53
Showing 1 changed file with 32 additions and 26 deletions.
58 changes: 32 additions & 26 deletions distributed/tests/test_worker_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import logging
from collections import UserDict
from time import sleep

import pytest
Expand Down Expand Up @@ -513,53 +514,58 @@ async def test_avoid_memory_monitor_if_zero_limit_nanny(c, s, nanny):

@gen_cluster(nthreads=[])
async def test_override_data_worker(s):
async with Worker(s.address, data=dict) as w:
assert type(w.data) is dict
# Use a UserDict to sidestep potential special case handling for dict
async with Worker(s.address, data=UserDict) as w:
assert type(w.data) is UserDict

data = {"x": 1}
data = UserDict({"x": 1})
async with Worker(s.address, data=data) as w:
assert w.data is data
assert w.data == {"x": 1}

class Data(dict):
def __init__(self, x, y):
self.x = x
self.y = y

async with Worker(s.address, data=(Data, {"x": 123, "y": 456})) as w:
assert w.data.x == 123
assert w.data.y == 456


@gen_cluster(
client=True,
nthreads=[("", 1)],
Worker=Nanny,
worker_kwargs={"data": dict},
worker_kwargs={"data": UserDict},
)
async def test_override_data_nanny(c, s, n):
r = await c.run(lambda dask_worker: type(dask_worker.data))
assert r[n.worker_address] is dict
assert r[n.worker_address] is UserDict


@gen_cluster(
client=True,
nthreads=[("", 1)],
worker_kwargs={"memory_limit": 1000, "data": dict},
config={
"distributed.worker.memory.pause": False,
"distributed.worker.memory.monitor-interval.spill-pause": "10ms",
},
worker_kwargs={"memory_limit": "1 GB", "data": UserDict},
config={"distributed.worker.memory.monitor-interval.spill-pause": "10ms"},
)
async def test_override_data_does_not_spill(c, s, a):
async def test_override_data_vs_memory_monitor(c, s, a):
assert memory_monitor_running(a)
a.monitor.get_process_memory = lambda: 10000

# Push a key that would normally trip both the target and the spill thresholds
x = c.submit(lambda: "x" * 2000)
await wait(x)
await asyncio.sleep(0.05)
assert type(a.data) is dict
assert a.data == {x.key: "x" * 2000}
class C:
def __sizeof__(self):
return 1_000_000_000

# Capture output of log_errors()
with captured_logger(logging.getLogger("distributed.utils")) as logger:
x = c.submit(C)
await wait(x)
a.monitor.get_process_memory = lambda: 1_000_000_000

# The pause subsystem of the memory monitor has been tripped.
# The spill subsystem hasn't.
while a.status != Status.paused:
await asyncio.sleep(0.01)
await asyncio.sleep(0.05)

# This would happen if memory_monitor() tried to blindly call SpillBuffer.evict()
assert "Traceback" not in logger.getvalue()

assert type(a.data) is UserDict
assert a.data.keys() == {x.key}


@pytest.mark.slow
Expand Down

0 comments on commit ee5ca53

Please sign in to comment.