-
-
Notifications
You must be signed in to change notification settings - Fork 727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Migrate ensure_computing transitions to new WorkerState event mechanism - part 2 #6062
Conversation
distributed/tests/test_steal.py
Outdated
("already-computing", victim_key, "executing", wsB.address, wsC.address), | ||
("already-aborted", victim_key, "executing", wsA.address), | ||
] | ||
assert msgs in (expect1, expect2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this PR, victim_ts.who_has != {wsC}
was never true. After my change, it is always true. I don't think I fully understand what happened here, and if this is OK or if it warrants further investigation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if this is because you're using an event instead of the sleep and the test assumptions are therefore much more deterministic. I'll check the code out and will have a look
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I encountered a third possibility with ("already-aborted", victim_key, "memory", wsA.address)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should move the event setting and fut gathering to the bottom. the steal responses are already logged at this point in time such that we can assert them. We only set the event and collect the futures to allow the threadpool to properly shut down but the test actually doesn't require them to finish
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an interesting change, indeed. It is expected.
Before, the key was sometimes in state ready
, other times in state executing
when the steal request (B->C) reaches the workers. I don't believe that on main
this condition is never true, rather it is unlikely.
Whether or not the task was in state ready or executing would depend on whether or not something called ensure_computing
before the steal request could've been handled, e.g. if any other message comes over the stream before the steal request, we might've transitioned the key from ready to executing since every stream message triggers an ensure_compute.
This PR removes this ambiguity by ensuring that a task is transitioned to executing in the very same tick as it was transitioned to ready. It removes the possibility of this race condition entirely. That's great 🎉
The ambiguity that's still in the test (expected1 vs expected2) is outside of our control since this depends on the which workers responds faster which is almost impossible to control.
we should, however, move the ev.set()
to the bottom to remove the third psosibiltiy of the task already done computing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you move ev.set() to the bottom, the story is empty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I encountered a third possibility with
("already-aborted", victim_key, "memory", wsA.address)
[EDIT]
reproduced; thanks for spotting it
return merge_recs_instructions( | ||
({ts: "released"}, [RescheduleMsg(key=ts.key, worker=self.address)]), | ||
self._ensure_computing(), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did this in all transitions that invoke self._executing.discard(...)
ts.state = "executing" | ||
self._executing.add(ts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These lines have moved to _ensure_computing. I'm not happy about it but I could not find a cleaner solution.
@@ -2347,7 +2357,6 @@ def transition_ready_executing( | |||
) | |||
|
|||
ts.state = "executing" | |||
self._executing.add(ts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line has moved to _ensure_computing. I'm not happy about it but I could not find a cleaner solution.
assert ts not in self._executing | ||
|
||
recs[ts] = "executing" | ||
self._executing.add(ts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved from transition_constrained_executing
and transition_ready_executing
. I'm not happy about it but I could not find a cleaner solution.
distributed/worker.py
Outdated
This is a temporary hack to be removed as part of | ||
https://github.com/dask/distributed/issues/5894. | ||
""" | ||
return self._ensure_computing() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Called through self.ensure_computing
from
Worker.handle_scheduler
Worker.gather_dep
Worker.find_missing
Worker.release_key
24cd168
to
ba7d031
Compare
Merged all blockers. This is ready for final review and merge. |
Unit Test Results 16 files ± 0 16 suites ±0 7h 44m 38s ⏱️ + 22m 29s For more details on these failures and errors, see this check. Results for commit dbe891a. ± Comparison against base commit a40f205. ♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be possible to remove the ensure_computing handler entirely. At least locally, the tests pass for me
diff --git a/distributed/worker.py b/distributed/worker.py
index 96d95373..df8fe680 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -110,7 +110,6 @@ from distributed.worker_state_machine import (
AddKeysMsg,
AlreadyCancelledEvent,
CancelComputeEvent,
- EnsureComputingEvent,
Execute,
ExecuteFailureEvent,
ExecuteSuccessEvent,
@@ -1181,7 +1180,7 @@ class Worker(ServerNode):
async def handle_scheduler(self, comm):
try:
await self.handle_stream(
- comm, every_cycle=[self.ensure_communicating, self.ensure_computing]
+ comm, every_cycle=[self.ensure_communicating]
)
except Exception as e:
logger.exception(e)
@@ -2024,7 +2023,7 @@ class Worker(ServerNode):
assert ts.key not in self.ready
ts.state = "constrained"
self.constrained.append(ts.key)
- return {}, []
+ return self._ensure_computing()
def transition_long_running_rescheduled(
self, ts: TaskState, *, stimulus_id: str
@@ -3044,7 +3043,6 @@ class Worker(ServerNode):
recommendations[ts] = "fetch" if ts.who_has else "missing"
del data, response
self.transitions(recommendations, stimulus_id=stimulus_id)
- self.ensure_computing()
if not busy:
self.repetitively_busy = 0
@@ -3085,7 +3083,6 @@ class Worker(ServerNode):
"find-missing"
].callback_time = self.periodic_callbacks["heartbeat"].callback_time
self.ensure_communicating()
- self.ensure_computing()
async def query_who_has(self, *deps: str) -> dict[str, Collection[str]]:
with log_errors():
@@ -3218,7 +3215,6 @@ class Worker(ServerNode):
self._executing.discard(ts)
self._in_flight_tasks.discard(ts)
- self.ensure_computing()
self.ensure_communicating()
self._notify_plugins(
@@ -3370,11 +3366,6 @@ class Worker(ServerNode):
)
raise
- def ensure_computing(self) -> None:
- self.handle_stimulus(
- EnsureComputingEvent(stimulus_id=f"ensure_computing-{time()}")
- )
-
def _ensure_computing(self) -> RecsInstrs:
if self.status in (Status.paused, Status.closing_gracefully):
return {}, []
@@ -3558,14 +3549,6 @@ class Worker(ServerNode):
def handle_event(self, ev: StateMachineEvent) -> RecsInstrs:
raise TypeError(ev) # pragma: nocover
- @handle_event.register
- def _(self, ev: EnsureComputingEvent) -> RecsInstrs:
- """Let various methods of worker give an artificial 'kick' to _ensure_computing.
- This is a temporary hack to be removed as part of
- https://github.com/dask/distributed/issues/5894.
- """
- return self._ensure_computing()
-
@handle_event.register
def _(self, ev: UnpauseEvent) -> RecsInstrs:
"""Emerge from paused status. Do not send this event directly. Instead, just set
diff --git a/distributed/worker_state_machine.py b/distributed/worker_state_machine.py
index 51c5a768..5c07fced 100644
--- a/distributed/worker_state_machine.py
+++ b/distributed/worker_state_machine.py
@@ -358,17 +358,6 @@ class StateMachineEvent:
__slots__ = ("stimulus_id",)
stimulus_id: str
-
-@dataclass
-class EnsureComputingEvent(StateMachineEvent):
- """Let various methods of worker give an artificial 'kick' to _ensure_computing.
- This is a temporary hack to be removed as part of
- https://github.com/dask/distributed/issues/5894.
- """
-
- __slots__ = ()
-
-
@dataclass
class UnpauseEvent(StateMachineEvent):
__slots__ = ()
distributed/tests/test_steal.py
Outdated
("already-computing", victim_key, "executing", wsB.address, wsC.address), | ||
("already-aborted", victim_key, "executing", wsA.address), | ||
] | ||
assert msgs in (expect1, expect2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if this is because you're using an event instead of the sleep and the test assumptions are therefore much more deterministic. I'll check the code out and will have a look
distributed/worker.py
Outdated
@@ -3370,41 +3371,60 @@ async def _maybe_deserialize_task( | |||
raise | |||
|
|||
def ensure_computing(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would've expected this to vanish entirely. I don't think "ensure_computing" should be an event on it's own
distributed/tests/test_steal.py
Outdated
("already-computing", victim_key, "executing", wsB.address, wsC.address), | ||
("already-aborted", victim_key, "executing", wsA.address), | ||
] | ||
assert msgs in (expect1, expect2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I encountered a third possibility with ("already-aborted", victim_key, "memory", wsA.address)
distributed/tests/test_steal.py
Outdated
("already-computing", victim_key, "executing", wsB.address, wsC.address), | ||
("already-aborted", victim_key, "executing", wsA.address), | ||
] | ||
assert msgs in (expect1, expect2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should move the event setting and fut gathering to the bottom. the steal responses are already logged at this point in time such that we can assert them. We only set the event and collect the futures to allow the threadpool to properly shut down but the test actually doesn't require them to finish
All review comments incorporated; let's see if it remains green |
Most failures are unrelated and some are already fixed on main
Edit: I see you're already on it #6077 |
b3cbedb
to
da8805b
Compare
@fjetter I'm a bit disconcerted that test_persisted started being flaky after the complete removal of ensure_computing (da8805b). After overhauling the test in #6077, it turns out that the linked commit was only affecting timings. Now the whole test module fails, before and after it. Take in particular the last test @gen_cluster(client=True, nthreads=[("", 1)])
async def test_last_in_first_out(c, s, a):
xs = [c.submit(print, i, key=f"x{i}") for i in range(5)]
await wait(xs)
assert False In main, the execution order is x4 -> x3 -> x2 -> x1 -> x0. Priorities on the worker have remained the same:
|
This PR is now blocked by and incorporates #6077 |
Updated #6077 by moving the Worker init back to where it was, after the tasks I've reached the scheduler. |
fabee12
to
dd76552
Compare
I think I found an important design problem. Patch to #6077: --- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -1831,6 +1831,7 @@ class Worker(ServerNode):
annotations: dict | None = None,
stimulus_id: str,
) -> None:
+ print(f"handle_compute_task({key}); {self.tasks=}")
self.log.append((key, "compute-task", stimulus_id, time()))
try:
ts = self.tasks[key]
@@ -3372,6 +3373,8 @@ class Worker(ServerNode):
def ensure_computing(self) -> None:
if self.status in (Status.paused, Status.closing_gracefully):
return
+ if self.ready and self.executing_count < self.nthreads:
+ print(f"ensure_computing: ready={sorted(self.ready)} {self.tasks=}")
try:
stimulus_id = f"ensure-computing-{time()}"
while self.constrained and self.executing_count < self.nthreads: Patch to this PR: --- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -1830,6 +1830,7 @@ class Worker(ServerNode):
annotations: dict | None = None,
stimulus_id: str,
) -> None:
+ print(f"handle_compute_task({key}); {self.tasks=}")
self.log.append((key, "compute-task", stimulus_id, time()))
try:
ts = self.tasks[key]
@@ -3367,7 +3368,8 @@ class Worker(ServerNode):
def _ensure_computing(self) -> RecsInstrs:
if self.status in (Status.paused, Status.closing_gracefully):
return {}, []
-
+ if self.ready and self.executing_count < self.nthreads:
+ print(f"ensure_computing: ready={sorted(self.ready)} {self.tasks=}")
recs: Recs = {}
while self.constrained and len(self._executing) < self.nthreads:
key = self.constrained[0] Output of test_priorities::test_compute():
This PR:
In both branches, the order of the first three handle_compute_task calls is random and changes at every run. In main, ensure_computing is not executed until after all three calls to handle_compute_task. Hence, their order does not matter and ensure_computing finds a complete In this PR, ensure_computing is executed immediately after the first call to handle_compute (which sounds a lot more robust to me), but the obvious consequence is that the ready heap is incomplete and just contains a random task in it when this happens. The correct solution IMHO is to ensure the scheduler invokes handle_compute_task in the correct order to begin with? Side note 1I can't understand why there are more than 3 calls to handle_compute_task and why the task states are flip-flopping from memory back to ready. Side note 2From this I reason that, as soon as you exceed the chunk size for batched sends, then priority should go out of the window. This is green in main and I can't figure out how it's possible: @pytest.mark.parametrize("ntasks", [10, 100, 1000, 2000, 4000])
@gen_cluster(client=True, nthreads=[])
async def test_large_batches(c, s, ntasks):
"""Test that the scheduling order is unperturbed by the chunk size of the batched
comms
"""
futs = [c.submit(inc, key=str(i), priority=i) for i in range(ntasks)]
while len(s.tasks) < ntasks:
await asyncio.sleep(0.01)
async with Worker(s.address, nthreads=1) as a: # See comment in test_submit
await wait(futs)
story = a.story(*(str(i) for i in range(ntasks)))
story = [
int(ev[0])
for ev in story
if ev[1:3] == ("ready", "executing")
]
assert story == list(reversed(range(ntasks))) |
That's actually an interesting problem. By us scheduling everything on the scheduler without having a worker, all tasks are put into state |
Replacing this set with a heap should not be a big deal. However, there is one place where we're discarding a task and this is not implemented on our heap (UniqueTaskHeap) |
Is this test breaking or is this just something you noticed during debugging? If the test is OK I would suggest to file an issue about this and move on for now |
A much simpler approach is to just sort the set once by priority. This might be costly but I don't expect this to happen very frequently distributed/distributed/scheduler.py Line 4586 in 8cf1196
|
All "ordinary" task assignments and priorities should be handled properly by this sort so I assume this is just us leaving the unrunnable state. I think sorting should be the way to go there |
I believe this prioritization issue is why #5443 does not work |
This test is not sensitive to how BatchedSend works. BatchedSend is not chunking based on buffer size but it accumulates messages in a buffer until a deadline passes and submits the entire buffer, regardless of how large it is. I.e. regardless of how many tasks you are generating, they will all be submitted in the same batch unless there is a tick allowed between task generations |
7eb70ca
to
c017629
Compare
All green! Started tests one last time. |
@@ -3611,6 +3611,24 @@ def _reevaluate_occupancy_worker(self, ws: WorkerState): | |||
for ts in ws._processing: | |||
steal.recalculate_cost(ts) | |||
|
|||
@ccall | |||
def bulk_schedule_after_adding_worker(self, ws: WorkerState): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should have an underscore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where?
distributed/worker.py
Outdated
@@ -3209,6 +3213,7 @@ def release_key( | |||
|
|||
self._executing.discard(ts) | |||
self._in_flight_tasks.discard(ts) | |||
self.ensure_communicating() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a change in _in_flight_task, so I thought it would be necessary to prod the comms.
But I guess it can be removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this has an impact. It shouldn't be harmful, though. I was just curious
# This outcome appears only in ~2% of the runs | ||
expect3 = [ | ||
("already-computing", victim_key, "executing", wsB.address, wsC.address), | ||
("already-aborted", victim_key, "memory", wsA.address), | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be avoided by moving the ev.set (and gather) after the stories are collected
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you do that, the story is completely empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious about the additional ensure_communicating
. It should not be a big deal but I'm wondering if this accidentally slipped in or if there is a reason.
Otherwise LGTM
I just realised that release_key should never be called on an executing task. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The last commit is indeed delicate. However, the way how the transition system is designed, i.e. by having the cancelled state, it is impossible to arrive in a condition where release_key
is called on a task that is still executing.
Code coverage, however flaky it is, also show that these lines are indeed never hit.
For a similar argument, ts should also never be in _in_flight_tasks but let's not push our luck; I'd prefer a dedicated PR for changes like this
distributed/worker.py
Outdated
except FileNotFoundError: | ||
logger.error("Tried to delete %s but no file found", exc_info=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Off-topic and unrelated, obviously, but I would like this to be part of the spill API. popping a mapping and catching a FileNotFound is just weird...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I spotted that too. I agree this should not be here.
f53480c
to
0a4902d
Compare
Are the changes to release_key really necessary? This is something I would much rather do in a dedicated PR |
I think I would prefer us to fail rather than ignore the issue, otherwise we will never have the chance to fix it. In the past we've had many, many of these "let's just ignore that this task should be in a different state" which ultimately caused a lot of inconsistencies |
That basically happens if
|
distributed/worker.py
Outdated
# FIXME We should never have duplicates in self.constrained or self.ready; | ||
# however replacing the block below and the matching ones later in | ||
# this function for the ready queue with just 'assert ts not in recs' | ||
# causes *sporadic* failures in the test suite. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #6062 (comment)
Unless we're willing to pay for a delete-from-heap or disable stealing, we need to allow duplicates.
We have similar problems with, for example, pending_data_per_worker
where I introduced the UniqueTaskHeap
distributed/worker.py
Outdated
return merge_recs_instructions( | ||
self.transition_generic_released(ts, stimulus_id=stimulus_id), | ||
({ts: next_state} if next_state != "released" else {}, []), | ||
self._ensure_computing(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we call _ensure_computing
as part of transition_generic_released
we no longer need this here, do we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume calling it twice is harmless. I'm OK with keeping it. I actually don't like it being part of transition_generic_released
but there is probably more work hidden so I'm good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Third and hopefully last ✅ if tests are cool with it
Great job @crusaderky ! I believe this is a big step towards a cleaner version of the worker even though it might still feel messy
ensure_computing
transitions to newWorkerState
event mechanism #5895In scope for this PR
def ensure_computing() -> None
todef _ensure_computing() -> RecsInstrs
Out of scope for this PR; in scope for the issue
Worker.stimulus_history
Worker._handle_instructions
and cancel them in Worker.closeOut of scope for the issue