From c4e07a104bdc9d17518504ee8218e37b5b82d3f6 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 7 Apr 2022 14:59:36 +0100 Subject: [PATCH 01/10] Overhaul test_priorities.py (#6077) --- distributed/tests/test_priorities.py | 299 ++++++++++++++++++--------- 1 file changed, 202 insertions(+), 97 deletions(-) diff --git a/distributed/tests/test_priorities.py b/distributed/tests/test_priorities.py index 00a046b457e..a1d81e9e0ef 100644 --- a/distributed/tests/test_priorities.py +++ b/distributed/tests/test_priorities.py @@ -1,128 +1,233 @@ +from __future__ import annotations + import asyncio +from contextlib import asynccontextmanager import pytest import dask -from dask import delayed, persist -from dask.core import flatten -from dask.utils import stringify - -from distributed import Worker, wait -from distributed.utils_test import gen_cluster, inc, slowdec, slowinc - - -@gen_cluster(client=True, nthreads=[]) -async def test_submit(c, s): - low = c.submit(inc, 1, priority=-1) - futures = c.map(slowinc, range(10), delay=0.1) - high = c.submit(inc, 2, priority=1) - async with Worker(s.address, nthreads=1): - await wait(high) - assert all(s.processing.values()) - assert s.tasks[low.key].state == "processing" - - -@gen_cluster(client=True, nthreads=[]) -async def test_map(c, s): - low = c.map(inc, [1, 2, 3], priority=-1) - futures = c.map(slowinc, range(10), delay=0.1) - high = c.map(inc, [4, 5, 6], priority=1) - async with Worker(s.address, nthreads=1): - await wait(high) - assert all(s.processing.values()) - assert s.tasks[low[0].key].state == "processing" - - -@gen_cluster(client=True, nthreads=[]) -async def test_compute(c, s): - da = pytest.importorskip("dask.array") - x = da.random.random((10, 10), chunks=(5, 5)) - y = da.random.random((10, 10), chunks=(5, 5)) - - low = c.compute(x, priority=-1) - futures = c.map(slowinc, range(10), delay=0.1) - high = c.compute(y, priority=1) - async with Worker(s.address, nthreads=1): - await wait(high) - assert all(s.processing.values()) - assert s.tasks[stringify(low.key)].state in ("processing", "waiting") - - -@gen_cluster(client=True, nthreads=[]) -async def test_persist(c, s): - da = pytest.importorskip("dask.array") - x = da.random.random((10, 10), chunks=(5, 5)) - y = da.random.random((10, 10), chunks=(5, 5)) - - low = x.persist(priority=-1) - futures = c.map(slowinc, range(10), delay=0.1) - high = y.persist(priority=1) - async with Worker(s.address, nthreads=1): - await wait(high) - assert all(s.processing.values()) - assert all( - s.tasks[stringify(k)].state in ("processing", "waiting") - for k in flatten(low.__dask_keys__()) - ) - - -@gen_cluster(client=True) -async def test_annotate_compute(c, s, a, b): +from dask import delayed + +from distributed import Client, Event, Scheduler, Status, Worker, wait +from distributed.utils_test import gen_cluster, inc, slowinc + +dinc = delayed(inc) +dslowinc = delayed(slowinc) +dwait = delayed(lambda ev: ev.wait()) + + +@asynccontextmanager +async def block_worker( + c: Client, + s: Scheduler, + w: Worker, + pause: bool, + ntasks_on_scheduler: int | None = None, + ntasks_on_worker: int | None = None, +): + """Make sure that no tasks submitted inside this context manager start running on + the worker until the context manager exits. + Must be used together with the ``@gen_blockable_cluster`` test decorator. + + Parameters + ---------- + pause : bool + True + When entering the context manager, pause the worker. At exit, wait for all + tasks created inside the context manager to be added to Scheduler.unrunnable + and then unpause the worker. + False + When entering the context manager, send a dummy long task to the worker. At + exit, wait for all tasks created inside the context manager to reach the + scheduler and then terminate the dummy task. + + ntasks_on_scheduler : int, optional + Number of tasks that must appear on the scheduler. Defaults to the number of + futures held by the client. + ntasks_on_worker : int, optional + Number of tasks that must appear on the worker before any task is actually + started. Defaults to the number of futures held by the client. + """ + if pause: + w.status = Status.paused + while s.workers[w.address].status != Status.paused: + await asyncio.sleep(0.01) + else: + ev = Event() + clog = c.submit(lambda ev: ev.wait(), ev, key="block_worker") + while "block_worker" not in w.tasks: + await asyncio.sleep(0.01) + + yield + + if ntasks_on_scheduler is None: + ntasks_on_scheduler = len(c.futures) + if ntasks_on_worker is None: + ntasks_on_worker = len(c.futures) + while len(s.tasks) < ntasks_on_scheduler: + await asyncio.sleep(0.01) + + if pause: + assert len(s.unrunnable) == ntasks_on_worker + assert not w.tasks + w.status = Status.running + else: + while len(w.tasks) < ntasks_on_worker: + await asyncio.sleep(0.01) + await ev.set() + await clog + del clog + while "block_worker" in s.tasks: + await asyncio.sleep(0.01) + + +def gen_blockable_cluster(test_func): + """Generate a cluster with 1 worker and disabled memory monitor, + to be used together with ``async with block_worker(...):``. + """ + return pytest.mark.parametrize( + "pause", + [ + pytest.param(False, id="queue on worker"), + pytest.param(True, id="queue on scheduler"), + ], + )( + gen_cluster( + client=True, + nthreads=[("", 1)], + config={"distributed.worker.memory.pause": False}, + )(test_func) + ) + + +@gen_blockable_cluster +async def test_submit(c, s, a, pause): + async with block_worker(c, s, a, pause): + low = c.submit(inc, 1, key="low", priority=-1) + ev = Event() + clog = c.submit(lambda ev: ev.wait(), ev, key="clog") + high = c.submit(inc, 2, key="high", priority=1) + + await wait(high) + assert all(s.processing.values()) + assert s.tasks[low.key].state == "processing" + await ev.set() + await wait(low) + + +@gen_blockable_cluster +async def test_map(c, s, a, pause): + async with block_worker(c, s, a, pause): + low = c.map(inc, [1, 2, 3], key=["l1", "l2", "l3"], priority=-1) + ev = Event() + clog = c.submit(lambda ev: ev.wait(), ev, key="clog") + high = c.map(inc, [4, 5, 6], key=["h1", "h2", "h3"], priority=1) + + await wait(high) + assert all(s.processing.values()) + assert all(s.tasks[fut.key].state == "processing" for fut in low) + await ev.set() + await clog + await wait(low) + + +@gen_blockable_cluster +async def test_compute(c, s, a, pause): + async with block_worker(c, s, a, pause): + low = c.compute(dinc(1, dask_key_name="low"), priority=-1) + ev = Event() + clog = c.submit(lambda ev: ev.wait(), ev, key="clog") + high = c.compute(dinc(2, dask_key_name="high"), priority=1) + + await wait(high) + assert all(s.processing.values()) + assert s.tasks[low.key].state == "processing" + await ev.set() + await clog + await wait(low) + + +@gen_blockable_cluster +async def test_persist(c, s, a, pause): + async with block_worker(c, s, a, pause): + low = dinc(1, dask_key_name="low").persist(priority=-1) + ev = Event() + clog = c.submit(lambda ev: ev.wait(), ev, key="clog") + high = dinc(2, dask_key_name="high").persist(priority=1) + + await wait(high) + assert all(s.processing.values()) + assert s.tasks[low.key].state == "processing" + await ev.set() + await wait(clog) + await wait(low) + + +@gen_blockable_cluster +async def test_annotate_compute(c, s, a, pause): with dask.annotate(priority=-1): - low = delayed(inc)(1) + low = dinc(1, dask_key_name="low") + ev = Event() + clog = dwait(ev, dask_key_name="clog") with dask.annotate(priority=1): - high = delayed(inc)(2) - many = [delayed(slowinc)(i, delay=0.1) for i in range(10)] + high = dinc(2, dask_key_name="high") + + async with block_worker(c, s, a, pause): + low, clog, high = c.compute([low, clog, high], optimize_graph=False) - low, many, high = c.compute([low, many, high], optimize_graph=False) await wait(high) assert s.tasks[low.key].state == "processing" + await ev.set() + await wait(clog) + await wait(low) -@gen_cluster(client=True) -async def test_annotate_persist(c, s, a, b): +@gen_blockable_cluster +async def test_annotate_persist(c, s, a, pause): with dask.annotate(priority=-1): - low = delayed(inc)(1, dask_key_name="low") + low = dinc(1, dask_key_name="low") + ev = Event() + clog = dwait(ev, dask_key_name="clog") with dask.annotate(priority=1): - high = delayed(inc)(2, dask_key_name="high") - many = [delayed(slowinc)(i, delay=0.1) for i in range(4)] + high = dinc(2, dask_key_name="high") + + async with block_worker(c, s, a, pause): + low, clog, high = c.persist([low, clog, high], optimize_graph=False) - low, high, x, y, z, w = persist(low, high, *many, optimize_graph=False) await wait(high) assert s.tasks[low.key].state == "processing" + await ev.set() + await wait(clog) + await wait(low) -@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)]) -async def test_repeated_persists_same_priority(c, s, w): - xs = [delayed(slowinc)(i, delay=0.05, dask_key_name="x-%d" % i) for i in range(10)] - ys = [ - delayed(slowinc)(x, delay=0.05, dask_key_name="y-%d" % i) - for i, x in enumerate(xs) - ] - zs = [ - delayed(slowdec)(x, delay=0.05, dask_key_name="z-%d" % i) - for i, x in enumerate(xs) - ] +@gen_blockable_cluster +async def test_repeated_persists_same_priority(c, s, a, pause): + xs = [delayed(slowinc)(i, delay=0.05, dask_key_name=f"x{i}") for i in range(10)] + ys = [delayed(slowinc)(xs[i], delay=0.05, dask_key_name=f"y{i}") for i in range(10)] + zs = [delayed(slowinc)(xs[i], delay=0.05, dask_key_name=f"z{i}") for i in range(10)] - ys = dask.persist(*ys) - zs = dask.persist(*zs) + async with block_worker(c, s, a, pause, 30, 10): + ys = dask.persist(*ys) + zs = dask.persist(*zs) while ( sum(t.state == "memory" for t in s.tasks.values()) < 5 ): # TODO: reduce this number await asyncio.sleep(0.01) - assert any(s.tasks[y.key].state == "memory" for y in ys) - assert any(s.tasks[z.key].state == "memory" for z in zs) + assert 0 < sum(s.tasks[fut.key].state == "memory" for fut in xs) < 10 + assert 0 < sum(s.tasks[fut.key].state == "memory" for fut in ys) < 10 + assert 0 < sum(s.tasks[fut.key].state == "memory" for fut in zs) < 10 -@gen_cluster(client=True, nthreads=[("127.0.0.1", 1)]) -async def test_last_in_first_out(c, s, w): - xs = [c.submit(slowinc, i, delay=0.05) for i in range(5)] - ys = [c.submit(slowinc, x, delay=0.05) for x in xs] - zs = [c.submit(slowinc, y, delay=0.05) for y in ys] +@gen_blockable_cluster +async def test_last_in_first_out(c, s, a, pause): + async with block_worker(c, s, a, pause, 15, 5): + xs = [c.submit(slowinc, i, delay=0.05, key=f"x{i}") for i in range(5)] + ys = [c.submit(slowinc, xs[i], delay=0.05, key=f"y{i}") for i in range(5)] + zs = [c.submit(slowinc, ys[i], delay=0.05, key=f"z{i}") for i in range(5)] - while len(s.tasks) < 15 or not any(s.tasks[z.key].state == "memory" for z in zs): + while not any(s.tasks[z.key].state == "memory" for z in zs): await asyncio.sleep(0.01) - assert not all(s.tasks[x.key].state == "memory" for x in xs) From c017629604ff97f4c33088a4674d9b4bd8590fe3 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 7 Apr 2022 14:20:42 +0100 Subject: [PATCH 02/10] Refactor ensure_comuting() -> None to _ensure_computing() -> RecsInstrs --- distributed/scheduler.py | 32 +-- distributed/tests/test_cancelled_state.py | 2 +- distributed/tests/test_cluster_dump.py | 2 +- distributed/tests/test_steal.py | 30 +-- distributed/tests/test_worker.py | 6 +- .../tests/test_worker_state_machine.py | 24 +++ distributed/worker.py | 186 ++++++++++-------- distributed/worker_state_machine.py | 22 +++ 8 files changed, 193 insertions(+), 111 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index fc96d5d02ec..e9d8a09eb19 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3611,6 +3611,25 @@ def _reevaluate_occupancy_worker(self, ws: WorkerState): for ts in ws._processing: steal.recalculate_cost(ts) + @ccall + def bulk_schedule_after_adding_worker(self, ws: WorkerState): + """Send tasks with ts.state=='no-worker' in bulk to a worker that just joined. + Return recommendations. As the worker will start executing the new tasks + immediately, without waiting for the batch to end, we can't rely on worker-side + ordering, so the recommendations are sorted by priority order here. + """ + ts: TaskState + tasks = [] + for ts in self._unrunnable: + valid: set = self.valid_workers(ts) + if valid is None or ws in valid: + # id(ts) is to prevent calling TaskState.__gt__ given equal priority + tasks.append(ts) + # These recommendations will generate {"op": "compute-task"} messages + # to the worker in reversed order + tasks.sort(key=operator.attrgetter("priority"), reverse=True) + return {ts._key: "waiting" for ts in tasks} + class Scheduler(SchedulerState, ServerNode): """Dynamic distributed task scheduler @@ -4583,10 +4602,7 @@ async def add_worker( ) if ws._status == Status.running: - for ts in parent._unrunnable: - valid: set = self.valid_workers(ts) - if valid is None or ws in valid: - recommendations[ts._key] = "waiting" + recommendations.update(self.bulk_schedule_after_adding_worker(ws)) if recommendations: parent._transitions(recommendations, client_msgs, worker_msgs) @@ -5699,13 +5715,7 @@ def handle_worker_status_change(self, status: str, worker: str) -> None: if ws._status == Status.running: parent._running.add(ws) - - recs = {} - ts: TaskState - for ts in parent._unrunnable: - valid: set = self.valid_workers(ts) - if valid is None or ws in valid: - recs[ts._key] = "waiting" + recs = self.bulk_schedule_after_adding_worker(ws) if recs: client_msgs: dict = {} worker_msgs: dict = {} diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 1a0df078376..92ac98587ae 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -87,7 +87,7 @@ def f(ev): [ ("f1", "compute-task"), ("f1", "released", "waiting", "waiting", {"f1": "ready"}), - ("f1", "waiting", "ready", "ready", {}), + ("f1", "waiting", "ready", "ready", {"f1": "executing"}), ("f1", "ready", "executing", "executing", {}), ("free-keys", ("f1",)), ("f1", "executing", "released", "cancelled", {}), diff --git a/distributed/tests/test_cluster_dump.py b/distributed/tests/test_cluster_dump.py index b01cf2611ca..c3912116c46 100644 --- a/distributed/tests/test_cluster_dump.py +++ b/distributed/tests/test_cluster_dump.py @@ -145,7 +145,7 @@ async def test_cluster_dump_story(c, s, a, b, tmp_path): [ (k, "compute-task"), (k, "released", "waiting", "waiting", {k: "ready"}), - (k, "waiting", "ready", "ready", {}), + (k, "waiting", "ready", "ready", {k: "executing"}), (k, "ready", "executing", "executing", {}), (k, "put-in-memory"), (k, "executing", "memory", "memory", {}), diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index 55f1bbf043a..98565bc2cfe 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -1182,17 +1182,25 @@ async def test_reschedule_concurrent_requests_deadlock(c, s, *workers): await ev.set() await c.gather(futs1) - # If this turns out to be overly flaky, the following may be relaxed or - # removed. The point of this test is to not deadlock but verifying expected - # state is still a nice thing - - # Either the last request goes through or both have been rejected since the - # computation was already done by the time the request comes in. This is - # unfortunately not stable. - if victim_ts.who_has != {wsC}: - msgs = steal.story(victim_ts) - assert len(msgs) == 2 - assert all(msg[0] == "already-aborted" for msg in msgs), msgs + assert victim_ts.who_has != {wsC} + msgs = steal.story(victim_ts) + msgs = [msg[:-1] for msg in msgs] # Remove random IDs + + # There are three possible outcomes + expect1 = [ + ("stale-response", victim_key, "executing", wsA.address), + ("already-computing", victim_key, "executing", wsB.address, wsC.address), + ] + expect2 = [ + ("already-computing", victim_key, "executing", wsB.address, wsC.address), + ("already-aborted", victim_key, "executing", wsA.address), + ] + # This outcome appears only in ~2% of the runs + expect3 = [ + ("already-computing", victim_key, "executing", wsB.address, wsC.address), + ("already-aborted", victim_key, "memory", wsA.address), + ] + assert msgs in (expect1, expect2, expect3) @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index aa8549b07ba..c75cdb2c784 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1687,12 +1687,12 @@ async def test_story_with_deps(c, s, a, b): # Story now includes randomized stimulus_ids and timestamps. stimulus_ids = {ev[-2] for ev in story} - assert len(stimulus_ids) == 3, stimulus_ids + assert len(stimulus_ids) == 2, stimulus_ids # This is a simple transition log expected = [ ("res", "compute-task"), ("res", "released", "waiting", "waiting", {"dep": "fetch"}), - ("res", "waiting", "ready", "ready", {}), + ("res", "waiting", "ready", "ready", {"res": "executing"}), ("res", "ready", "executing", "executing", {}), ("res", "put-in-memory"), ("res", "executing", "memory", "memory", {}), @@ -3089,7 +3089,7 @@ async def test_task_flight_compute_oserror(c, s, a, b): # Now, we actually compute the task *once*. This must not cycle back ("f1", "compute-task"), ("f1", "released", "waiting", "waiting", {"f1": "ready"}), - ("f1", "waiting", "ready", "ready", {}), + ("f1", "waiting", "ready", "ready", {"f1": "executing"}), ("f1", "ready", "executing", "executing", {}), ("f1", "put-in-memory"), ("f1", "executing", "memory", "memory", {}), diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index 78597a37e67..87828a17cf7 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -6,10 +6,12 @@ from distributed.worker_state_machine import ( Instruction, ReleaseWorkerDataMsg, + RescheduleMsg, SendMessageToScheduler, StateMachineEvent, TaskState, UniqueTaskHeap, + merge_recs_instructions, ) @@ -109,3 +111,25 @@ def test_sendmsg_to_dict(): # Arbitrary sample class smsg = ReleaseWorkerDataMsg(key="x") assert smsg.to_dict() == {"op": "release-worker-data", "key": "x"} + + +def test_merge_recs_instructions(): + x = TaskState("x") + y = TaskState("y") + instr1 = RescheduleMsg(key="foo", worker="a") + instr2 = RescheduleMsg(key="bar", worker="b") + assert merge_recs_instructions( + ({x: "memory"}, [instr1]), + ({y: "released"}, [instr2]), + ) == ( + {x: "memory", y: "released"}, + [instr1, instr2], + ) + + # Identical recommendations are silently ignored; incompatible ones raise + assert merge_recs_instructions(({x: "memory"}, []), ({x: "memory"}, [])) == ( + {x: "memory"}, + [], + ) + with pytest.raises(ValueError): + merge_recs_instructions(({x: "memory"}, []), ({x: "released"}, [])) diff --git a/distributed/worker.py b/distributed/worker.py index 7a0628763dd..336dfa8bfd4 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -129,6 +129,8 @@ TaskState, TaskStateState, UniqueTaskHeap, + UnpauseEvent, + merge_recs_instructions, ) if TYPE_CHECKING: @@ -921,8 +923,7 @@ def status(self, value): ServerNode.status.__set__(self, value) self._send_worker_status_change() if prev_status == Status.paused and value == Status.running: - self.ensure_computing() - self.ensure_communicating() + self.handle_stimulus(UnpauseEvent(stimulus_id=f"set-status-{time()}")) def _send_worker_status_change(self) -> None: if ( @@ -1178,9 +1179,7 @@ async def heartbeat(self): async def handle_scheduler(self, comm): try: - await self.handle_stream( - comm, every_cycle=[self.ensure_communicating, self.ensure_computing] - ) + await self.handle_stream(comm, every_cycle=[self.ensure_communicating]) except Exception as e: logger.exception(e) raise @@ -2022,7 +2021,7 @@ def transition_waiting_constrained( assert ts.key not in self.ready ts.state = "constrained" self.constrained.append(ts.key) - return {}, [] + return self._ensure_computing() def transition_long_running_rescheduled( self, ts: TaskState, *, stimulus_id: str @@ -2038,9 +2037,10 @@ def transition_executing_rescheduled( self.available_resources[resource] += quantity self._executing.discard(ts) - recs: Recs = {ts: "released"} - smsg = RescheduleMsg(key=ts.key, worker=self.address) - return recs, [smsg] + return merge_recs_instructions( + ({ts: "released"}, [RescheduleMsg(key=ts.key, worker=self.address)]), + self._ensure_computing(), + ) def transition_waiting_ready( self, ts: TaskState, *, stimulus_id: str @@ -2057,7 +2057,7 @@ def transition_waiting_ready( assert ts.priority is not None heapq.heappush(self.ready, (ts.priority, ts.key)) - return {}, [] + return self._ensure_computing() def transition_cancelled_error( self, @@ -2133,13 +2133,17 @@ def transition_executing_error( for resource, quantity in ts.resource_restrictions.items(): self.available_resources[resource] += quantity self._executing.discard(ts) - return self.transition_generic_error( - ts, - exception, - traceback, - exception_text, - traceback_text, - stimulus_id=stimulus_id, + + return merge_recs_instructions( + self.transition_generic_error( + ts, + exception, + traceback, + exception_text, + traceback_text, + stimulus_id=stimulus_id, + ), + self._ensure_computing(), ) def _transition_from_resumed( @@ -2254,12 +2258,12 @@ def transition_cancelled_released( for resource, quantity in ts.resource_restrictions.items(): self.available_resources[resource] += quantity - recs, instructions = self.transition_generic_released( - ts, stimulus_id=stimulus_id + + return merge_recs_instructions( + self.transition_generic_released(ts, stimulus_id=stimulus_id), + ({ts: next_state} if next_state != "released" else {}, []), + self._ensure_computing(), ) - if next_state != "released": - recs[ts] = next_state - return recs, instructions def transition_executing_released( self, ts: TaskState, *, stimulus_id: str @@ -2269,7 +2273,7 @@ def transition_executing_released( # See https://github.com/dask/distributed/pull/5046#discussion_r685093940 ts.state = "cancelled" ts.done = False - return {}, [] + return self._ensure_computing() def transition_long_running_memory( self, ts: TaskState, value=no_value, *, stimulus_id: str @@ -2292,16 +2296,19 @@ def transition_generic_memory( self._executing.discard(ts) self._in_flight_tasks.discard(ts) ts.coming_from = None + + instructions: Instructions = [] try: recs = self._put_key_in_memory(ts, value, stimulus_id=stimulus_id) except Exception as e: msg = error_message(e) recs = {ts: tuple(msg.values())} - return recs, [] - if self.validate: - assert ts.key in self.data or ts.key in self.actors - smsg = self._get_task_finished_msg(ts) - return recs, [smsg] + else: + if self.validate: + assert ts.key in self.data or ts.key in self.actors + instructions.append(self._get_task_finished_msg(ts)) + + return recs, instructions def transition_executing_memory( self, ts: TaskState, value=no_value, *, stimulus_id: str @@ -2313,7 +2320,10 @@ def transition_executing_memory( self._executing.discard(ts) self.executed_count += 1 - return self.transition_generic_memory(ts, value=value, stimulus_id=stimulus_id) + return merge_recs_instructions( + self.transition_generic_memory(ts, value=value, stimulus_id=stimulus_id), + self._ensure_computing(), + ) def transition_constrained_executing( self, ts: TaskState, *, stimulus_id: str @@ -2326,10 +2336,7 @@ def transition_constrained_executing( for dep in ts.dependencies: assert dep.key in self.data or dep.key in self.actors - for resource, quantity in ts.resource_restrictions.items(): - self.available_resources[resource] -= quantity ts.state = "executing" - self._executing.add(ts) instr = Execute(key=ts.key, stimulus_id=stimulus_id) return {}, [instr] @@ -2347,7 +2354,6 @@ def transition_ready_executing( ) ts.state = "executing" - self._executing.add(ts) instr = Execute(key=ts.key, stimulus_id=stimulus_id) return {}, [instr] @@ -2417,9 +2423,11 @@ def transition_executing_long_running( ts.state = "long-running" self._executing.discard(ts) self.long_running.add(ts.key) - smsg = LongRunningMsg(key=ts.key, compute_duration=compute_duration) - self.io_loop.add_callback(self.ensure_computing) - return {}, [smsg] + + return merge_recs_instructions( + ({}, [LongRunningMsg(key=ts.key, compute_duration=compute_duration)]), + self._ensure_computing(), + ) def transition_released_memory( self, ts: TaskState, value, *, stimulus_id: str @@ -2592,8 +2600,6 @@ def handle_stimulus(self, stim: StateMachineEvent) -> None: recs, instructions = self.handle_event(stim) self.transitions(recs, stimulus_id=stim.stimulus_id) self._handle_instructions(instructions) - self.ensure_computing() - self.ensure_communicating() def _handle_stimulus_from_future( self, future: asyncio.Future[StateMachineEvent | None] @@ -3035,7 +3041,6 @@ async def gather_dep( recommendations[ts] = "fetch" if ts.who_has else "missing" del data, response self.transitions(recommendations, stimulus_id=stimulus_id) - self.ensure_computing() if not busy: self.repetitively_busy = 0 @@ -3076,7 +3081,6 @@ async def find_missing(self) -> None: "find-missing" ].callback_time = self.periodic_callbacks["heartbeat"].callback_time self.ensure_communicating() - self.ensure_computing() async def query_who_has(self, *deps: str) -> dict[str, Collection[str]]: with log_errors(): @@ -3209,6 +3213,7 @@ def release_key( self._executing.discard(ts) self._in_flight_tasks.discard(ts) + self.ensure_communicating() self._notify_plugins( "release_key", key, state_before, cause, stimulus_id, report @@ -3327,16 +3332,6 @@ def actor_attribute(self, actor=None, attribute=None) -> dict[str, Any]: except Exception as ex: return {"status": "error", "exception": to_serialize(ex)} - def meets_resource_constraints(self, key: str) -> bool: - ts = self.tasks[key] - if not ts.resource_restrictions: - return True - for resource, needed in ts.resource_restrictions.items(): - if self.available_resources[resource] < needed: - return False - - return True - async def _maybe_deserialize_task( self, ts: TaskState, *, stimulus_id: str ) -> tuple[Callable, tuple, dict[str, Any]] | None: @@ -3369,42 +3364,56 @@ async def _maybe_deserialize_task( ) raise - def ensure_computing(self) -> None: + def _ensure_computing(self) -> RecsInstrs: if self.status in (Status.paused, Status.closing_gracefully): - return - try: - stimulus_id = f"ensure-computing-{time()}" - while self.constrained and self.executing_count < self.nthreads: - key = self.constrained[0] - ts = self.tasks.get(key, None) - if ts is None or ts.state != "constrained": - self.constrained.popleft() - continue - if self.meets_resource_constraints(key): - self.constrained.popleft() - self.transition(ts, "executing", stimulus_id=stimulus_id) - else: - break - while self.ready and self.executing_count < self.nthreads: - priority, key = heapq.heappop(self.ready) - ts = self.tasks.get(key) - if ts is None: - # It is possible for tasks to be released while still remaining on - # `ready` The scheduler might have re-routed to a new worker and - # told this worker to release. If the task has "disappeared" just - # continue through the heap - continue - elif ts.key in self.data: - self.transition(ts, "memory", stimulus_id=stimulus_id) - elif ts.state in READY: - self.transition(ts, "executing", stimulus_id=stimulus_id) - except Exception as e: # pragma: no cover - logger.exception(e) - if LOG_PDB: - import pdb + return {}, [] - pdb.set_trace() - raise + recs: Recs = {} + while self.constrained and len(self._executing) < self.nthreads: + key = self.constrained[0] + ts = self.tasks.get(key, None) + if ts is None or ts.state != "constrained": + self.constrained.popleft() + continue + + if any( + self.available_resources[resource] < needed + for resource, needed in ts.resource_restrictions.items() + ): + break + + self.constrained.popleft() + for resource, needed in ts.resource_restrictions.items(): + self.available_resources[resource] -= needed + + if self.validate: + assert ts not in recs + assert ts not in self._executing + + recs[ts] = "executing" + self._executing.add(ts) + + while self.ready and len(self._executing) < self.nthreads: + _, key = heapq.heappop(self.ready) + ts = self.tasks.get(key) + if ts is None: + # It is possible for tasks to be released while still remaining on + # `ready` The scheduler might have re-routed to a new worker and + # told this worker to release. If the task has "disappeared" just + # continue through the heap + continue + + if self.validate: + assert ts not in recs + assert ts not in self._executing + + if key in self.data: + recs[ts] = "memory" + elif ts.state in READY: + recs[ts] = "executing" + self._executing.add(ts) + + return recs, [] async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent | None: if self.status in {Status.closing, Status.closed, Status.closing_gracefully}: @@ -3538,6 +3547,15 @@ async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent | No def handle_event(self, ev: StateMachineEvent) -> RecsInstrs: raise TypeError(ev) # pragma: nocover + @handle_event.register + def _(self, ev: UnpauseEvent) -> RecsInstrs: + """Emerge from paused status. Do not send this event directly. Instead, just set + Worker.status back to running. + """ + assert self.status == Status.running + self.ensure_communicating() + return self._ensure_computing() + @handle_event.register def _(self, ev: CancelComputeEvent) -> RecsInstrs: """Scheduler requested to cancel a task""" diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py index 8ae454417c9..a21c2acb301 100644 --- a/distributed/worker_state_machine.py +++ b/distributed/worker_state_machine.py @@ -357,6 +357,11 @@ class StateMachineEvent: stimulus_id: str +@dataclass +class UnpauseEvent(StateMachineEvent): + __slots__ = () + + @dataclass class ExecuteSuccessEvent(StateMachineEvent): key: str @@ -410,3 +415,20 @@ class RescheduleEvent(StateMachineEvent): Recs = dict Instructions = list RecsInstrs = tuple + + +def merge_recs_instructions(*args: RecsInstrs) -> RecsInstrs: + """Merge multiple (recommendations, instructions) tuples. + Collisions in recommendations are only allowed if identical. + """ + recs: Recs = {} + instr: Instructions = [] + for recs_i, instr_i in args: + for k, v in recs_i.items(): + if k in recs and recs[k] != v: + raise ValueError( + f"Mismatched recommendations for {k}: {recs[k]} vs. {v}" + ) + recs[k] = v + instr += instr_i + return recs, instr From 916cc94ab76e752d5659ea1d6a96dd6b8874c276 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 7 Apr 2022 15:05:16 +0100 Subject: [PATCH 03/10] cleanup --- distributed/scheduler.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index e9d8a09eb19..0548aaa0942 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3623,7 +3623,6 @@ def bulk_schedule_after_adding_worker(self, ws: WorkerState): for ts in self._unrunnable: valid: set = self.valid_workers(ts) if valid is None or ws in valid: - # id(ts) is to prevent calling TaskState.__gt__ given equal priority tasks.append(ts) # These recommendations will generate {"op": "compute-task"} messages # to the worker in reversed order From 6d208df397712eae61426629c04f87d1d6929329 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 8 Apr 2022 15:04:38 +0100 Subject: [PATCH 04/10] release_key() is never called on executing keys --- distributed/worker.py | 35 ++++++++++++++--------------------- 1 file changed, 14 insertions(+), 21 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 336dfa8bfd4..4f668a8c6d7 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3164,9 +3164,12 @@ def release_key( stimulus_id: str, ) -> None: try: - if self.validate: - assert not isinstance(key, TaskState) ts = self.tasks[key] + + if self.validate: + assert ts.state != "executing" + assert ts not in self._executing + # needed for legacy notification support state_before = ts.state ts.state = "released" @@ -3181,26 +3184,20 @@ def release_key( ) else: self.log.append((key, "release-key", stimulus_id, time())) - if key in self.data: - try: - del self.data[key] - except FileNotFoundError: - logger.error("Tried to delete %s but no file found", exc_info=True) - if key in self.actors: - del self.actors[key] + + try: + self.data.pop(key, None) + except FileNotFoundError: + logger.error("Tried to delete %s but no file found", exc_info=True) + + self.actors.pop(key, None) + self.threads.pop(key, None) + self._in_flight_tasks.discard(ts) for worker in ts.who_has: self.has_what[worker].discard(ts.key) ts.who_has.clear() - if key in self.threads: - del self.threads[key] - - if ts.resource_restrictions is not None: - if ts.state == "executing": - for resource, quantity in ts.resource_restrictions.items(): - self.available_resources[resource] += quantity - for d in ts.dependencies: ts.waiting_for_data.discard(d) d.waiters.discard(ts) @@ -3211,10 +3208,6 @@ def release_key( ts._next = None ts.done = False - self._executing.discard(ts) - self._in_flight_tasks.discard(ts) - self.ensure_communicating() - self._notify_plugins( "release_key", key, state_before, cause, stimulus_id, report ) From 3b91cf2c725c37d88682baf3d80c1d31a8089963 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 8 Apr 2022 15:04:38 +0100 Subject: [PATCH 05/10] Revert "release_key() is never called on executing keys" This reverts commit 6d208df397712eae61426629c04f87d1d6929329. --- distributed/worker.py | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 4f668a8c6d7..336dfa8bfd4 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3164,12 +3164,9 @@ def release_key( stimulus_id: str, ) -> None: try: - ts = self.tasks[key] - if self.validate: - assert ts.state != "executing" - assert ts not in self._executing - + assert not isinstance(key, TaskState) + ts = self.tasks[key] # needed for legacy notification support state_before = ts.state ts.state = "released" @@ -3184,20 +3181,26 @@ def release_key( ) else: self.log.append((key, "release-key", stimulus_id, time())) - - try: - self.data.pop(key, None) - except FileNotFoundError: - logger.error("Tried to delete %s but no file found", exc_info=True) - - self.actors.pop(key, None) - self.threads.pop(key, None) - self._in_flight_tasks.discard(ts) + if key in self.data: + try: + del self.data[key] + except FileNotFoundError: + logger.error("Tried to delete %s but no file found", exc_info=True) + if key in self.actors: + del self.actors[key] for worker in ts.who_has: self.has_what[worker].discard(ts.key) ts.who_has.clear() + if key in self.threads: + del self.threads[key] + + if ts.resource_restrictions is not None: + if ts.state == "executing": + for resource, quantity in ts.resource_restrictions.items(): + self.available_resources[resource] += quantity + for d in ts.dependencies: ts.waiting_for_data.discard(d) d.waiters.discard(ts) @@ -3208,6 +3211,10 @@ def release_key( ts._next = None ts.done = False + self._executing.discard(ts) + self._in_flight_tasks.discard(ts) + self.ensure_communicating() + self._notify_plugins( "release_key", key, state_before, cause, stimulus_id, report ) From f22099d329f073eece47e59bf3d7c1faad604d54 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 8 Apr 2022 15:57:36 +0100 Subject: [PATCH 06/10] Deal with transition executing -> released --- distributed/worker.py | 43 ++++++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 336dfa8bfd4..b2d51f0948c 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1957,7 +1957,12 @@ def transition_generic_released( if not ts.dependents: recs[ts] = "forgotten" - return recs, [] + return merge_recs_instructions( + (recs, []), + # FIXME this is only necessary if transitioning from executing, which + # should not be possible but it happens - see note in release_key(). + self._ensure_computing(), + ) def transition_released_waiting( self, ts: TaskState, *, stimulus_id: str @@ -3181,26 +3186,30 @@ def release_key( ) else: self.log.append((key, "release-key", stimulus_id, time())) - if key in self.data: - try: - del self.data[key] - except FileNotFoundError: - logger.error("Tried to delete %s but no file found", exc_info=True) - if key in self.actors: - del self.actors[key] - for worker in ts.who_has: - self.has_what[worker].discard(ts.key) - ts.who_has.clear() + try: + self.data.pop(key, None) + except OSError: + logger.error("Failed to delete %s", key, exc_info=True) - if key in self.threads: - del self.threads[key] + self.actors.pop(key, None) + self.threads.pop(key, None) + self._in_flight_tasks.discard(ts) - if ts.resource_restrictions is not None: - if ts.state == "executing": + if ts.state == "executing": + # FIXME this should never happen, but it does. + # e.g. replacing this block with `assert ts.state != "executing" + # makes test_occupancy_cleardown become flaky. + # Need to investigate and improve test coverage. + self._executing.discard(ts) + if ts.resource_restrictions is not None: for resource, quantity in ts.resource_restrictions.items(): self.available_resources[resource] += quantity + for worker in ts.who_has: + self.has_what[worker].discard(ts.key) + ts.who_has.clear() + for d in ts.dependencies: ts.waiting_for_data.discard(d) d.waiters.discard(ts) @@ -3211,10 +3220,6 @@ def release_key( ts._next = None ts.done = False - self._executing.discard(ts) - self._in_flight_tasks.discard(ts) - self.ensure_communicating() - self._notify_plugins( "release_key", key, state_before, cause, stimulus_id, report ) From 0a4902d66a896fa2b25ab19e2bc42bce8562119d Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 8 Apr 2022 16:31:06 +0100 Subject: [PATCH 07/10] Deal with duplicates in the ready queue --- distributed/worker.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index b2d51f0948c..0c0b68a33fa 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3381,6 +3381,13 @@ def _ensure_computing(self) -> RecsInstrs: self.constrained.popleft() continue + # FIXME We should never have duplicates in self.constrained or self.ready; + # however replacing the block below and the matching ones later in + # this function for the ready queue with just 'assert ts not in recs' + # causes *sporadic* failures in the test suite. + if ts in recs: + continue + if any( self.available_resources[resource] < needed for resource, needed in ts.resource_restrictions.items() @@ -3391,10 +3398,6 @@ def _ensure_computing(self) -> RecsInstrs: for resource, needed in ts.resource_restrictions.items(): self.available_resources[resource] -= needed - if self.validate: - assert ts not in recs - assert ts not in self._executing - recs[ts] = "executing" self._executing.add(ts) @@ -3403,18 +3406,20 @@ def _ensure_computing(self) -> RecsInstrs: ts = self.tasks.get(key) if ts is None: # It is possible for tasks to be released while still remaining on - # `ready` The scheduler might have re-routed to a new worker and - # told this worker to release. If the task has "disappeared" just - # continue through the heap + # `ready`. The scheduler might have re-routed to a new worker and + # told this worker to release. If the task has "disappeared", just + # continue through the heap. continue - if self.validate: - assert ts not in recs - assert ts not in self._executing - if key in self.data: + # FIXME see comment above about duplicates + if self.validate: + assert ts not in recs or recs[ts] == "memory" recs[ts] = "memory" elif ts.state in READY: + # FIXME see comment above about duplicates + if self.validate: + assert ts not in recs or recs[ts] == "executing" recs[ts] = "executing" self._executing.add(ts) From 708398b78b68def69efa2118b13a1458d45a2697 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 8 Apr 2022 17:51:48 +0100 Subject: [PATCH 08/10] Revert release_key --- distributed/worker.py | 37 +++++++++++++++++-------------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 0c0b68a33fa..1a0c4eb872a 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1959,8 +1959,6 @@ def transition_generic_released( return merge_recs_instructions( (recs, []), - # FIXME this is only necessary if transitioning from executing, which - # should not be possible but it happens - see note in release_key(). self._ensure_computing(), ) @@ -3186,30 +3184,26 @@ def release_key( ) else: self.log.append((key, "release-key", stimulus_id, time())) + if key in self.data: + try: + del self.data[key] + except FileNotFoundError: + logger.error("Tried to delete %s but no file found", exc_info=True) + if key in self.actors: + del self.actors[key] - try: - self.data.pop(key, None) - except OSError: - logger.error("Failed to delete %s", key, exc_info=True) + for worker in ts.who_has: + self.has_what[worker].discard(ts.key) + ts.who_has.clear() - self.actors.pop(key, None) - self.threads.pop(key, None) - self._in_flight_tasks.discard(ts) + if key in self.threads: + del self.threads[key] - if ts.state == "executing": - # FIXME this should never happen, but it does. - # e.g. replacing this block with `assert ts.state != "executing" - # makes test_occupancy_cleardown become flaky. - # Need to investigate and improve test coverage. - self._executing.discard(ts) - if ts.resource_restrictions is not None: + if ts.resource_restrictions is not None: + if ts.state == "executing": for resource, quantity in ts.resource_restrictions.items(): self.available_resources[resource] += quantity - for worker in ts.who_has: - self.has_what[worker].discard(ts.key) - ts.who_has.clear() - for d in ts.dependencies: ts.waiting_for_data.discard(d) d.waiters.discard(ts) @@ -3220,6 +3214,9 @@ def release_key( ts._next = None ts.done = False + self._executing.discard(ts) + self._in_flight_tasks.discard(ts) + self._notify_plugins( "release_key", key, state_before, cause, stimulus_id, report ) From 9c9c75afe2d3b91ea94b1d7f15f5a39fd839e25c Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 8 Apr 2022 18:23:02 +0100 Subject: [PATCH 09/10] comment --- distributed/worker.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 1a0c4eb872a..825893874a3 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3378,10 +3378,11 @@ def _ensure_computing(self) -> RecsInstrs: self.constrained.popleft() continue - # FIXME We should never have duplicates in self.constrained or self.ready; - # however replacing the block below and the matching ones later in - # this function for the ready queue with just 'assert ts not in recs' - # causes *sporadic* failures in the test suite. + # There may be duplicates in the self.constrained and self.ready queues. + # This happens if a task: + # 1. is assigned to a Worker and transitioned to ready (heappush) + # 2. is stolen (no way to pop from heap, the task stays there) + # 3. is assigned to the worker again (heappush again) if ts in recs: continue @@ -3409,12 +3410,12 @@ def _ensure_computing(self) -> RecsInstrs: continue if key in self.data: - # FIXME see comment above about duplicates + # See comment above about duplicates if self.validate: assert ts not in recs or recs[ts] == "memory" recs[ts] = "memory" elif ts.state in READY: - # FIXME see comment above about duplicates + # See comment above about duplicates if self.validate: assert ts not in recs or recs[ts] == "executing" recs[ts] = "executing" From dbe891ad402ae8c7ec769a52e3b0f04163c46ef2 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 8 Apr 2022 18:44:15 +0100 Subject: [PATCH 10/10] cleanup --- distributed/worker.py | 1 - 1 file changed, 1 deletion(-) diff --git a/distributed/worker.py b/distributed/worker.py index 825893874a3..7c5bc61ca15 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2265,7 +2265,6 @@ def transition_cancelled_released( return merge_recs_instructions( self.transition_generic_released(ts, stimulus_id=stimulus_id), ({ts: next_state} if next_state != "released" else {}, []), - self._ensure_computing(), ) def transition_executing_released(