Skip to content

Commit

Permalink
Rework cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Mar 30, 2022
1 parent e27d281 commit 0b25660
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 1 deletion.
37 changes: 36 additions & 1 deletion distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
PROCESSING,
READY,
AddKeysMsg,
AlreadyCancelledEvent,
CancelComputeEvent,
Execute,
ExecuteFailureEvent,
Expand Down Expand Up @@ -3428,7 +3429,11 @@ async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent | No
if not ts:
return None
if ts.state == "cancelled":
return CancelComputeEvent(key=ts.key, stimulus_id=stimulus_id)
logger.debug(
"Trying to execute task %s which is not in executing state anymore",
ts,
)
return AlreadyCancelledEvent(key=ts.key, stimulus_id=stimulus_id)

try:
if self.validate:
Expand Down Expand Up @@ -3549,8 +3554,26 @@ async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent | No
def handle_event(self, ev: StateMachineEvent) -> RecsInstrs:
raise TypeError(ev) # pragma: nocover

@handle_event.register
def _(self, ev: EnsureComputingEvent) -> RecsInstrs:
"""Let various methods of worker give an artificial 'kick' to _ensure_computing.
This is a temporary hack to be removed as part of
https://github.com/dask/distributed/issues/5894.
"""
return self._ensure_computing()

@handle_event.register
def _(self, ev: UnpauseEvent) -> RecsInstrs:
"""Emerge from paused status. Do not send this event directly. Instead, just set
Worker.status back to running.
"""
assert self.status == Status.running
self.ensure_communicating()
return self._ensure_computing()

@handle_event.register
def _(self, ev: CancelComputeEvent) -> RecsInstrs:
"""Scheduler requested to cancel a task"""
ts = self.tasks.get(ev.key)
if not ts or ts.state not in READY | {"waiting"}:
return {}, []
Expand All @@ -3559,11 +3582,21 @@ def _(self, ev: CancelComputeEvent) -> RecsInstrs:
# All possible dependents of ts should not be in state Processing on
# scheduler side and therefore should not be assigned to a worker, yet.
assert not ts.dependents
return {ts: "released"}, []

@handle_event.register
def _(self, ev: AlreadyCancelledEvent) -> RecsInstrs:
"""Task is already cancelled by the time execute() runs"""
# key *must* be still in tasks. Releasing it directly is forbidden
# without going through cancelled
ts = self.tasks.get(ev.key) # type: ignore
assert ts, self.story(ev.key)
ts.done = True
return {ts: "released"}, []

@handle_event.register
def _(self, ev: ExecuteSuccessEvent) -> RecsInstrs:
"""Task completed successfully"""
# key *must* be still in tasks. Releasing it directly is forbidden
# without going through cancelled
ts = self.tasks.get(ev.key) # type: ignore
Expand All @@ -3577,6 +3610,7 @@ def _(self, ev: ExecuteSuccessEvent) -> RecsInstrs:

@handle_event.register
def _(self, ev: ExecuteFailureEvent) -> RecsInstrs:
"""Task execution failed"""
# key *must* be still in tasks. Releasing it directly is forbidden
# without going through cancelled
ts = self.tasks.get(ev.key) # type: ignore
Expand All @@ -3600,6 +3634,7 @@ def _(self, ev: ExecuteFailureEvent) -> RecsInstrs:

@handle_event.register
def _(self, ev: RescheduleEvent) -> RecsInstrs:
"""Task raised Reschedule exception while it was running"""
# key *must* be still in tasks. Releasing it directly is forbidden
# without going through cancelled
ts = self.tasks.get(ev.key) # type: ignore
Expand Down
6 changes: 6 additions & 0 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,12 @@ class CancelComputeEvent(StateMachineEvent):
key: str


@dataclass
class AlreadyCancelledEvent(StateMachineEvent):
__slots__ = ("key",)
key: str


# Not to be confused with RescheduleMsg above or the distributed.Reschedule Exception
@dataclass
class RescheduleEvent(StateMachineEvent):
Expand Down

0 comments on commit 0b25660

Please sign in to comment.