Skip to content

Commit

Permalink
Drop support for zict < 2.1.0 (#7456)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky authored Jan 6, 2023
1 parent e1f1524 commit 24ba207
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 46 deletions.
29 changes: 6 additions & 23 deletions distributed/spill.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from distributed.sizeof import safe_sizeof

logger = logging.getLogger(__name__)
has_zict_210 = parse_version(zict.__version__) >= parse_version("2.1.0")
has_zict_220 = parse_version(zict.__version__) >= parse_version("2.2.0")
has_zict_230 = parse_version(zict.__version__) >= parse_version("2.3.0")

Expand Down Expand Up @@ -139,10 +138,6 @@ def __init__(
max_spill: int | Literal[False] = False,
min_log_interval: float = 2,
):

if max_spill is not False and not has_zict_210:
raise ValueError("zict >= 2.1.0 required to set max-spill")

slow: MutableMapping[str, Any] = Slow(spill_directory, max_spill)
if has_zict_220:
# If a value is still in use somewhere on the worker since the last time it
Expand Down Expand Up @@ -195,11 +190,7 @@ def handle_errors(self, key: str | None) -> Iterator[None]:
# This happens only when the key is individually larger than target.
# The exception will be caught by Worker and logged; the status of
# the task will be set to error.
if has_zict_210:
del self[key]
else:
assert key not in self.fast
assert key not in self.slow
del self[key]
raise orig_e
else:
# The key we just inserted is smaller than target, but it caused
Expand Down Expand Up @@ -236,11 +227,7 @@ def __setitem__(self, key: str, value: Any) -> None:
super().__setitem__(key, value)
self.logged_pickle_errors.discard(key)
except HandledError:
if has_zict_210:
assert key in self.fast
else:
assert key not in self.fast
logger.error("Key %s lost. Please upgrade to zict >= 2.1.0", key)
assert key in self.fast
assert key not in self.slow

def evict(self) -> int:
Expand Down Expand Up @@ -413,14 +400,10 @@ def __setitem__(self, key: str, value: Any) -> None:
)
t1 = perf_counter()

if has_zict_210:
# Thanks to Buffer.__setitem__, we never update existing
# keys in slow, but always delete them and reinsert them.
assert key not in self.d
assert key not in self.weight_by_key
else:
self.d.pop(key, None)
self.total_weight -= self.weight_by_key.pop(key, SpilledSize(0, 0))
# Thanks to Buffer.__setitem__, we never update existing
# keys in slow, but always delete them and reinsert them.
assert key not in self.d
assert key not in self.weight_by_key

if (
self.max_weight is not False
Expand Down
10 changes: 1 addition & 9 deletions distributed/tests/test_spill.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,9 @@

from distributed import profile
from distributed.compatibility import WINDOWS
from distributed.spill import SpillBuffer, has_zict_210, has_zict_220
from distributed.spill import SpillBuffer, has_zict_220
from distributed.utils_test import captured_logger

requires_zict_210 = pytest.mark.skipif(
not has_zict_210,
reason="requires zict version >= 2.1.0",
)
requires_zict_220 = pytest.mark.skipif(
not has_zict_220,
reason="requires zict version >= 2.2.0",
Expand Down Expand Up @@ -130,7 +126,6 @@ def test_disk_size_calculation(tmp_path):
assert_buf(buf, tmp_path, {}, {"a": a, "b": b})


@requires_zict_210
def test_spillbuffer_maxlim(tmp_path_factory):
buf_dir = tmp_path_factory.mktemp("buf")
buf = SpillBuffer(str(buf_dir), target=200, max_spill=600, min_log_interval=0)
Expand Down Expand Up @@ -213,7 +208,6 @@ def __sizeof__(self):
return self.size


@requires_zict_210
def test_spillbuffer_fail_to_serialize(tmp_path):
buf = SpillBuffer(str(tmp_path), target=200, max_spill=600, min_log_interval=0)

Expand Down Expand Up @@ -247,7 +241,6 @@ def test_spillbuffer_fail_to_serialize(tmp_path):
assert_buf(buf, tmp_path, {"b": b, "c": c}, {})


@requires_zict_210
@pytest.mark.skipif(WINDOWS, reason="Needs chmod")
def test_spillbuffer_oserror(tmp_path):
buf = SpillBuffer(str(tmp_path), target=200, max_spill=800, min_log_interval=0)
Expand Down Expand Up @@ -287,7 +280,6 @@ def test_spillbuffer_oserror(tmp_path):
assert_buf(buf, tmp_path, {"b": b, "d": d}, {"a": a})


@requires_zict_210
def test_spillbuffer_evict(tmp_path):
buf = SpillBuffer(str(tmp_path), target=300, min_log_interval=0)

Expand Down
15 changes: 1 addition & 14 deletions distributed/tests/test_worker_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from distributed.compatibility import MACOS, WINDOWS
from distributed.core import Status
from distributed.metrics import monotonic
from distributed.spill import has_zict_210
from distributed.utils_test import (
NO_AMM,
captured_logger,
Expand All @@ -39,11 +38,6 @@
TaskErredMsg,
)

requires_zict_210 = pytest.mark.skipif(
not has_zict_210,
reason="requires zict version >= 2.1.0",
)


def memory_monitor_running(dask_worker: Worker | Nanny) -> bool:
return "memory_monitor" in dask_worker.periodic_callbacks
Expand Down Expand Up @@ -282,17 +276,11 @@ async def test_fail_to_pickle_execute_2(c, s, a):

y = c.submit(lambda: "y" * 256, key="y")
await wait(y)
if has_zict_210:
assert set(a.data.memory) == {"x", "y"}
else:
assert set(a.data.memory) == {"y"}

assert set(a.data.memory) == {"x", "y"}
assert not a.data.disk

await assert_basic_futures(c)


@requires_zict_210
@gen_cluster(
client=True,
nthreads=[("", 1)],
Expand Down Expand Up @@ -373,7 +361,6 @@ async def test_spill_target_threshold(c, s, a):
assert set(a.data.disk) == {"y"}


@requires_zict_210
@gen_cluster(
client=True,
nthreads=[("", 1)],
Expand Down

0 comments on commit 24ba207

Please sign in to comment.