Skip to content

Commit

Permalink
tweak fail to pickle tests
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Mar 9, 2022
1 parent d948828 commit 5fd4754
Showing 1 changed file with 14 additions and 10 deletions.
24 changes: 14 additions & 10 deletions distributed/tests/test_worker_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,16 @@ async def test_dict_data_if_no_spill_to_disk(s, w):
assert type(w.data) is dict


class CustomError(Exception):
pass


class FailToPickle:
def __init__(self, *, reported_size=0, actual_size=0):
def __init__(self, *, reported_size=0):
self.reported_size = int(reported_size)
self.data = "x" * int(actual_size)

def __getstate__(self):
raise TypeError()
raise CustomError()

def __sizeof__(self):
return self.reported_size
Expand All @@ -92,13 +95,13 @@ async def test_fail_write_to_disk_target_1(c, s, a, b):
than target. The data is lost and the task is marked as failed;
the worker remains in usable condition.
"""
future = c.submit(FailToPickle, reported_size=100e9)
await wait(future)
x = c.submit(FailToPickle, reported_size=100e9, key="x")
await wait(x)

assert future.status == "error"
assert x.status == "error"

with pytest.raises(TypeError, match="Could not serialize"):
await future
await x

await assert_basic_futures(c)

Expand Down Expand Up @@ -139,18 +142,19 @@ async def test_fail_write_to_disk_target_2(c, s, a):
@gen_cluster(
client=True,
nthreads=[("", 1)],
worker_kwargs={"memory_limit": "1 kiB"}, # Spill everything
worker_kwargs={"memory_limit": "1 kB"},
config={
"distributed.worker.memory.target": False,
"distributed.worker.memory.spill": 0.7,
"distributed.worker.memory.pause": False,
"distributed.worker.memory.monitor-interval.spill-pause": "10ms",
},
)
async def test_fail_write_to_disk_spill(c, s, a):
"""Test failure to evict a key, triggered by the spill threshold"""
a.monitor.get_process_memory = lambda: 701 if a.data.fast else 0

with captured_logger(logging.getLogger("distributed.spill")) as logs:
bad = c.submit(FailToPickle, actual_size=1_000_000, key="bad")
bad = c.submit(FailToPickle, key="bad")
await wait(bad)

# Must wait for memory monitor to kick in
Expand Down

0 comments on commit 5fd4754

Please sign in to comment.