From 5d1a9543ca7e4b64573ce679f5a009d1edd37fc3 Mon Sep 17 00:00:00 2001 From: fjetter Date: Tue, 21 Dec 2021 11:36:39 +0100 Subject: [PATCH 1/2] Ensure uniqueness of steal stimulus ID The stimulus ID is used to verify worker responses to distinguish concurrently running steal requests. The windows clock in particular is not strictly monotonic which caused duplicates in this test setup. --- distributed/stealing.py | 7 +++++-- distributed/tests/test_steal.py | 37 +++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/distributed/stealing.py b/distributed/stealing.py index cc9737796a6..101a228ce04 100644 --- a/distributed/stealing.py +++ b/distributed/stealing.py @@ -2,6 +2,7 @@ import asyncio import logging +import uuid from collections import defaultdict, deque from math import log2 from time import time @@ -233,7 +234,9 @@ def move_task_request(self, ts, victim, thief) -> str: try: if ts in self.in_flight: return "in-flight" - stimulus_id = f"steal-{time()}" + # Stimulus IDs are used to verify the response, see + # `move_task_confirm`. Therefore, this must be truly unique. + stimulus_id = f"steal-{uuid.uuid4().hex}" key = ts.key self.remove_key_from_stealable(ts) @@ -291,7 +294,7 @@ async def move_task_confirm(self, *, key, state, stimulus_id, worker=None): self.in_flight[ts] = d return except KeyError: - self.log(("already-aborted", key, state, stimulus_id)) + self.log(("already-aborted", key, state, worker, stimulus_id)) return thief = d["thief"] diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 3b6cd8497b5..168eafcdc2f 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1193,3 +1193,40 @@ async def test_correct_bad_time_estimate(c, s, *workers): assert any(s.tasks[f.key] in steal.key_stealable for f in futures) await wait(futures) assert all(w.data for w in workers), [sorted(w.data) for w in workers] + + +@gen_cluster(client=True) +async def test_steal_stimulus_id_unique(c, s, *workers): + steal = s.extensions["stealing"] + num_futs = 1_000 + from distributed import Lock + + lock = Lock() + async with lock: + + def blocked(x, lock): + lock.acquire() + + # Setup all tasks on worker 0 such that victim/thief relation is the + # same for all tasks. + futures = c.map( + blocked, range(num_futs), lock=lock, workers=[workers[0].address] + ) + # Ensure all tasks are assigned to the worker since otherwise the + # move_task_request fails. + while len(workers[0].tasks) != num_futs: + await asyncio.sleep(0.1) + tasks = [s.tasks[f.key] for f in futures] + w0 = s.workers[workers[0].address] + w1 = s.workers[workers[1].address] + # Generating the move task requests as fast as possible increases the + # chance of duplicates if the uniqueness is not guaranteed. + for ts in tasks: + steal.move_task_request(ts, w0, w1) + stimulus_ids = set() + # Values stored in in_flight are used for response verification. + # Therefore all stimulus IDs are stored here and must be unique + for dct in steal.in_flight.values(): + stimulus_ids.add(dct["stimulus_id"]) + assert len(stimulus_ids) == num_futs + await c.cancel(futures) From f27c112a4f07e811fd8b8e8f024aa05b3d5df3fd Mon Sep 17 00:00:00 2001 From: James Bourbeau Date: Thu, 6 Jan 2022 14:30:19 -0600 Subject: [PATCH 2/2] Reviewer feedback --- distributed/tests/test_steal.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 168eafcdc2f..72bf63c1afb 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -13,7 +13,7 @@ import dask -from distributed import Nanny, Worker, wait, worker_client +from distributed import Lock, Nanny, Worker, wait, worker_client from distributed.compatibility import LINUX, WINDOWS from distributed.config import config from distributed.metrics import time @@ -1199,10 +1199,7 @@ async def test_correct_bad_time_estimate(c, s, *workers): async def test_steal_stimulus_id_unique(c, s, *workers): steal = s.extensions["stealing"] num_futs = 1_000 - from distributed import Lock - - lock = Lock() - async with lock: + async with Lock() as lock: def blocked(x, lock): lock.acquire() @@ -1223,10 +1220,8 @@ def blocked(x, lock): # chance of duplicates if the uniqueness is not guaranteed. for ts in tasks: steal.move_task_request(ts, w0, w1) - stimulus_ids = set() # Values stored in in_flight are used for response verification. # Therefore all stimulus IDs are stored here and must be unique - for dct in steal.in_flight.values(): - stimulus_ids.add(dct["stimulus_id"]) + stimulus_ids = {dct["stimulus_id"] for dct in steal.in_flight.values()} assert len(stimulus_ids) == num_futs await c.cancel(futures)