-
-
Notifications
You must be signed in to change notification settings - Fork 727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Migrate ensure_computing transitions to new WorkerState event mechanism - part 1 #6003
Conversation
distributed/worker.py
Outdated
self._async_instruction_callback, | ||
self.execute(inst.key, stimulus_id=inst.stimulus_id), | ||
stimulus_id=inst.stimulus_id, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This differs from the design document, which instead was creating a fire-and-forget asyncio.Task.
As far as I understand, the difference between the two is purely cosmetic.
Added value would be given from spawning a task, track it e.g. in a set Worker.running_asyncio_tasks
, and then cancel it in Worker.close()
. Even if desirable, however, I think this is best left to a future PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added value would be given from spawning a task, track it e.g. in a set Worker.running_asyncio_tasks, and then cancel it in Worker.close(). Even if desirable, however, I think this is best left to a future PR.
That was my intention, I just didn't specify everything in my pseudo code. I was actually hoping we'd implement this already as part of #5922. I'm OK with postponing this to a follow up but I would like to get this done rather sooner than later.
I would like to avoid using add_callback
if at all possible since tracking the tasks would actually allow us to, e.g. deal with the exception
distributed/worker.py
Outdated
msg = error_message(exc) | ||
recommendations = {ts: tuple(msg.values())} | ||
|
||
return recommendations, [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This differs from #5895:
- The
Worker.execute
method is modified such that it no longer performs any transition but instead returns appropriateStateMachineEvents
that trigger the necessary handlers. For instance
- TaskFinished
- Rescheduled
- TaskErred
I could not find any benefit in implementing those events vs. just returning recommendations?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The benefit is that we want to log the events and keep the recommendations as an internal detail of the to-be-defined WorkerState
class. Not using recommendations here is one of the more important points of the design proposals
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please have a look now
distributed/worker.py
Outdated
self._async_instruction_callback, | ||
self.execute(inst.key, stimulus_id=inst.stimulus_id), | ||
stimulus_id=inst.stimulus_id, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added value would be given from spawning a task, track it e.g. in a set Worker.running_asyncio_tasks, and then cancel it in Worker.close(). Even if desirable, however, I think this is best left to a future PR.
That was my intention, I just didn't specify everything in my pseudo code. I was actually hoping we'd implement this already as part of #5922. I'm OK with postponing this to a follow up but I would like to get this done rather sooner than later.
I would like to avoid using add_callback
if at all possible since tracking the tasks would actually allow us to, e.g. deal with the exception
distributed/worker.py
Outdated
msg = error_message(exc) | ||
recommendations = {ts: tuple(msg.values())} | ||
|
||
return recommendations, [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The benefit is that we want to log the events and keep the recommendations as an internal detail of the to-be-defined WorkerState
class. Not using recommendations here is one of the more important points of the design proposals
b96cb5a
to
70a7ba6
Compare
Unit Test Results 17 files - 1 17 suites - 1 8h 42m 40s ⏱️ - 28m 20s For more details on these failures, see this check. Results for commit f83df7a. ± Comparison against base commit ccb0362. ♻️ This comment has been updated with latest results. |
5dcee7e
to
fdb59c5
Compare
# yet. | ||
assert not ts.dependents | ||
self.transition(ts, "released", stimulus_id=stimulus_id) | ||
self.handle_stimulus(CancelComputeEvent(key=key, stimulus_id=stimulus_id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the future we should consider sending event objects directly from the scheduler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I could see this very nicely being integrated in our RPC framework 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is some confusion abut the CancelComputeEvent but otherwise this looks already good
# yet. | ||
assert not ts.dependents | ||
self.transition(ts, "released", stimulus_id=stimulus_id) | ||
self.handle_stimulus(CancelComputeEvent(key=key, stimulus_id=stimulus_id)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I could see this very nicely being integrated in our RPC framework 👍
distributed/worker.py
Outdated
if not ts: | ||
return None | ||
if ts.state == "cancelled": | ||
return CancelComputeEvent(key=ts.key, stimulus_id=stimulus_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest a name like AlreadyCancelledEvent
to distinguish this from Client.cancel
events passed through the scheduler. We don't have this, yet, but there is some potential for confusion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I split them now
recommendations: Recs = {} | ||
self.active_keys.discard(key) | ||
|
||
self.threads[key] = result["thread"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to move this to the result handlers as well? This will be a duplicated line but we'd cleanly separate result handling from the exceution and would only mutate state in a single place
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that neither the Worker ABC nor the WorkerStateMachine will ever have a concept of threads. That's why I left it here.
if result["op"] == "task-finished": | ||
ts.nbytes = result["nbytes"] | ||
ts.type = result["type"] | ||
recommendations[ts] = ("memory", value) | ||
if self.digests is not None: | ||
self.digests["task-duration"].add(result["stop"] - result["start"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. Do we want to move this to the result/event handler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.digests is populated with a wealth of information that should remain alien to the state machine:
- latency
- transfer-bandwidth
- get-data-send-duration
- disk-load-duration
- profile-duration
so I think it should remain in Worker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also nuke the digests if they're slowing us down. They're not commonly used today.
distributed/worker.py
Outdated
raise TypeError(ev) # pragma: nocover | ||
|
||
# TODO Set return type annotation of all handle_event implementations | ||
# to tuple[Recs, Instructions] (requires Python >=3.9) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a functools limitation or why do we need py3.9? Do we even need individual annotations if they are all the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a functools limitation or why do we need py3.9?
@functools.singledispatchmethod
exec()'s the delayed annotation - which however require Python 3.9+.
I worked around it better now.
Do we even need individual annotations if they are all the same?
mypy is not smart enough to reserve a special treatment for singledispatch functions, so yes
# to tuple[Recs, Instructions] (requires Python >=3.9) | ||
@handle_event.register | ||
def _(self, ev: CancelComputeEvent): # -> tuple[Recs, Instructions]: | ||
ts = self.tasks.get(ev.key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The tasks must always exist. There is no way this could've been dropped earlier. If so, that would be a severe bug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had conflated the implementation of handle_cancel_compute coming from the scheduler and the code that dealt with a task being already cancelled by the time execute starts. They are now separate.
self.log.append((ev.key, "cancel-compute", ev.stimulus_id, time())) | ||
# All possible dependents of ts should not be in state Processing on | ||
# scheduler side and therefore should not be assigned to a worker, yet. | ||
assert not ts.dependents |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to not introduce any new assertions. For this specific handler there is no need to assert this condition, is there?
I'm not even sure if this assert is correct. I've seen such race conditions happening. For instance, there is a network partition such that this worker briefly disconnects. Another worker gets this task assigned and finishes earlier. In the meantime, this worker reconnects but the task is still executing/not yet executing but got a depentent assigned in the meantime.
See also the warning on scheduler side Unexpected worker completed task [...]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had conflated the implementation of handle_cancel_compute coming from the scheduler and the code that dealt with a task being already cancelled by the time execute starts. They are now separate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should now be functionally identical to main
distributed/worker.py
Outdated
# TODO Set return type annotation of all handle_event implementations | ||
# to tuple[Recs, Instructions] (requires Python >=3.9) | ||
@handle_event.register | ||
def _(self, ev: CancelComputeEvent): # -> tuple[Recs, Instructions]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused about the body of this handler. I don't think we should mix up the "CancelTask" event with a execute response that says "Didn't Execute. Task already cancelled"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I split them now
ts.startstops.append({"action": "compute", "start": ev.start, "stop": ev.stop}) | ||
ts.nbytes = ev.nbytes | ||
ts.type = ev.type | ||
return {ts: ("memory", ev.value)}, [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are loosing stimulus_ids here aren't we?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We aren't; see handle_stimulus:
recs, instructions = self.handle_event(stim)
self.transitions(recs, stimulus_id=stim.stimulus_id)
9f18059
to
0b25660
Compare
d056752
to
9d28bde
Compare
This reverts commit 9d28bde.
@fjetter I did the work on _ensure_computing (see 9d28bde) but there are still some pretty conceptual failures caused by resource counting, so I'm moderately inclined to merge this PR as it is. I reverted it for now. In scope for this PR
Out of scope for the PR but in scope for the issue (to be implemented immediately after this PR)
Out of scope for the issue
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Partially closes #5895
See comment below for scope of this PR