Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate ensure_computing transitions to new WorkerState event mechanism - part 1 #6003

Merged
merged 19 commits into from
Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

from distributed.utils import recursive_to_dict
from distributed.worker_state_machine import (
Execute,
ReleaseWorkerDataMsg,
SendMessageToScheduler,
StateMachineEvent,
TaskState,
UniqueTaskHeap,
)
Expand Down Expand Up @@ -82,6 +84,12 @@ def test_unique_task_heap():
assert repr(heap) == "<UniqueTaskHeap: 0 items>"


@pytest.mark.parametrize("cls", [Execute])
def test_instruction_slots(cls):
instr = cls(**dict.fromkeys(cls.__annotations__))
assert not hasattr(instr, "__dict__")


@pytest.mark.parametrize("cls", SendMessageToScheduler.__subclasses__())
def test_sendmsg_slots(cls):
smsg = cls(**dict.fromkeys(cls.__annotations__))
Expand All @@ -92,3 +100,9 @@ def test_sendmsg_to_dict():
# Arbitrary sample class
smsg = ReleaseWorkerDataMsg(key="x")
assert smsg.to_dict() == {"op": "release-worker-data", "key": "x"}


@pytest.mark.parametrize("cls", StateMachineEvent.__subclasses__())
def test_event_slots(cls):
smsg = cls(**dict.fromkeys(cls.__annotations__), stimulus_id="test")
assert not hasattr(smsg, "__dict__")
244 changes: 154 additions & 90 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import bisect
import builtins
import errno
import functools
import heapq
import logging
import os
Expand Down Expand Up @@ -108,12 +109,18 @@
PROCESSING,
READY,
AddKeysMsg,
CancelComputeEvent,
Execute,
ExecuteFailureEvent,
ExecuteSuccessEvent,
InvalidTransition,
LongRunningMsg,
ReleaseWorkerDataMsg,
RescheduleEvent,
RescheduleMsg,
SendMessageToScheduler,
SerializedTask,
StateMachineEvent,
TaskErredMsg,
TaskFinishedMsg,
TaskState,
Expand All @@ -133,7 +140,9 @@
# {TaskState -> finish: TaskStateState | (finish: TaskStateState, transition *args)}
Recs: TypeAlias = "dict[TaskState, TaskStateState | tuple]"
Instructions: TypeAlias = "list[Instruction]"

else:
Recs = dict
Instructions = list

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -1773,14 +1782,7 @@ def handle_cancel_compute(self, key: str, stimulus_id: str) -> None:
is in state `waiting` or `ready`.
Nothing will happen otherwise.
"""
ts = self.tasks.get(key)
if ts and ts.state in READY | {"waiting"}:
self.log.append((key, "cancel-compute", stimulus_id, time()))
# All possible dependents of TS should not be in state Processing on
# scheduler side and therefore should not be assigned to a worker,
# yet.
assert not ts.dependents
self.transition(ts, "released", stimulus_id=stimulus_id)
self.handle_stimulus(CancelComputeEvent(key=key, stimulus_id=stimulus_id))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future we should consider sending event objects directly from the scheduler

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I could see this very nicely being integrated in our RPC framework 👍


def handle_acquire_replicas(
self,
Expand Down Expand Up @@ -2337,8 +2339,8 @@ def transition_constrained_executing(
self.available_resources[resource] -= quantity
ts.state = "executing"
self._executing.add(ts)
self.loop.add_callback(self.execute, ts.key, stimulus_id=stimulus_id)
return {}, []
instr = Execute(key=ts.key, stimulus_id=stimulus_id)
return {}, [instr]

def transition_ready_executing(
self, ts: TaskState, *, stimulus_id: str
Expand All @@ -2355,8 +2357,8 @@ def transition_ready_executing(

ts.state = "executing"
self._executing.add(ts)
self.loop.add_callback(self.execute, ts.key, stimulus_id=stimulus_id)
return {}, []
instr = Execute(key=ts.key, stimulus_id=stimulus_id)
return {}, [instr]

def transition_flight_fetch(
self, ts: TaskState, *, stimulus_id: str
Expand Down Expand Up @@ -2595,12 +2597,35 @@ def transitions(self, recommendations: Recs, *, stimulus_id: str) -> None:
else:
self._handle_instructions(instructions)

def handle_stimulus(self, stim: StateMachineEvent) -> None:
with log_errors():
# self.stimulus_history.append(stim) # TODO
recs, instructions = self.handle_event(stim)
self.transitions(recs, stimulus_id=stim.stimulus_id)
self._handle_instructions(instructions)
self.ensure_computing()
self.ensure_communicating()

def _handle_stimulus_from_future(
self, future: asyncio.Future[StateMachineEvent | None]
) -> None:
with log_errors():
# This *should* never raise
stim = future.result()
if stim:
self.handle_stimulus(stim)

def _handle_instructions(self, instructions: list[Instruction]) -> None:
# TODO this method is temporary.
# See final design: https://github.com/dask/distributed/issues/5894
for inst in instructions:
if isinstance(inst, SendMessageToScheduler):
self.batched_stream.send(inst.to_dict())
elif isinstance(inst, Execute):
coro = self.execute(inst.key, stimulus_id=inst.stimulus_id)
task = asyncio.create_task(coro)
# TODO track task (at the moment it's fire-and-forget)
task.add_done_callback(self._handle_stimulus_from_future)
else:
raise TypeError(inst) # pragma: nocover

Expand Down Expand Up @@ -3392,24 +3417,16 @@ def ensure_computing(self) -> None:
pdb.set_trace()
raise

async def execute(self, key: str, *, stimulus_id: str) -> None:
async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent | None:
if self.status in {Status.closing, Status.closed, Status.closing_gracefully}:
return
if key not in self.tasks:
return
ts = self.tasks[key]
return None
ts = self.tasks.get(key)
if not ts:
return None
if ts.state == "cancelled":
return CancelComputeEvent(key=ts.key, stimulus_id=stimulus_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest a name like AlreadyCancelledEvent to distinguish this from Client.cancel events passed through the scheduler. We don't have this, yet, but there is some potential for confusion

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I split them now


try:
if ts.state == "cancelled":
# This might happen if keys are canceled
logger.debug(
"Trying to execute task %s which is not in executing state anymore",
ts,
)
ts.done = True
self.transition(ts, "released", stimulus_id=stimulus_id)
return

if self.validate:
assert not ts.waiting_for_data
assert ts.state == "executing"
Expand All @@ -3433,9 +3450,7 @@ async def execute(self, key: str, *, stimulus_id: str) -> None:
f"expected one of: {sorted(self.executors)}"
)

self.active_keys.add(ts.key)

result: dict
self.active_keys.add(key)
try:
ts.start_time = time()
if iscoroutinefunction(function):
Expand All @@ -3453,7 +3468,7 @@ async def execute(self, key: str, *, stimulus_id: str) -> None:
args2,
kwargs2,
self.execution_state,
ts.key,
key,
self.active_threads,
self.active_threads_lock,
self.scheduler_delay,
Expand All @@ -3468,75 +3483,124 @@ async def execute(self, key: str, *, stimulus_id: str) -> None:
self.scheduler_delay,
)
finally:
self.active_keys.discard(ts.key)

key = ts.key
# key *must* be still in tasks. Releasing it directly is forbidden
# without going through cancelled
ts = self.tasks.get(key) # type: ignore
assert ts, self.story(key)
ts.done = True
result["key"] = ts.key
value = result.pop("result", None)
ts.startstops.append(
{"action": "compute", "start": result["start"], "stop": result["stop"]}
)
self.threads[ts.key] = result["thread"]
recommendations: Recs = {}
self.active_keys.discard(key)

self.threads[key] = result["thread"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to move this to the result handlers as well? This will be a duplicated line but we'd cleanly separate result handling from the exceution and would only mutate state in a single place

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that neither the Worker ABC nor the WorkerStateMachine will ever have a concept of threads. That's why I left it here.


if result["op"] == "task-finished":
ts.nbytes = result["nbytes"]
ts.type = result["type"]
recommendations[ts] = ("memory", value)
if self.digests is not None:
self.digests["task-duration"].add(result["stop"] - result["start"])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. Do we want to move this to the result/event handler?

Copy link
Collaborator Author

@crusaderky crusaderky Mar 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.digests is populated with a wealth of information that should remain alien to the state machine:

  • latency
  • transfer-bandwidth
  • get-data-send-duration
  • disk-load-duration
  • profile-duration

so I think it should remain in Worker?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also nuke the digests if they're slowing us down. They're not commonly used today.

elif isinstance(result.pop("actual-exception"), Reschedule):
recommendations[ts] = "rescheduled"
else:
logger.warning(
"Compute Failed\n"
"Key: %s\n"
"Function: %s\n"
"args: %s\n"
"kwargs: %s\n"
"Exception: %r\n",
ts.key,
str(funcname(function))[:1000],
convert_args_to_str(args2, max_len=1000),
convert_kwargs_to_str(kwargs2, max_len=1000),
result["exception_text"],
)
recommendations[ts] = (
"error",
result["exception"],
result["traceback"],
result["exception_text"],
result["traceback_text"],
return ExecuteSuccessEvent(
key=key,
value=result["result"],
start=result["start"],
stop=result["stop"],
nbytes=result["nbytes"],
type=result["type"],
stimulus_id=stimulus_id,
)

self.transitions(recommendations, stimulus_id=stimulus_id)

logger.debug("Send compute response to scheduler: %s, %s", ts.key, result)
if isinstance(result["actual-exception"], Reschedule):
return RescheduleEvent(key=ts.key, stimulus_id=stimulus_id)

if self.validate:
assert ts.state != "executing"
assert not ts.waiting_for_data
logger.warning(
"Compute Failed\n"
"Key: %s\n"
"Function: %s\n"
"args: %s\n"
"kwargs: %s\n"
"Exception: %r\n",
key,
str(funcname(function))[:1000],
convert_args_to_str(args2, max_len=1000),
convert_kwargs_to_str(kwargs2, max_len=1000),
result["exception_text"],
)
return ExecuteFailureEvent(
key=key,
start=result["start"],
stop=result["stop"],
exception=result["exception"],
traceback=result["traceback"],
exception_text=result["exception_text"],
traceback_text=result["traceback_text"],
stimulus_id=stimulus_id,
)

except Exception as exc:
assert ts
logger.error(
"Exception during execution of task %s.", ts.key, exc_info=True
logger.error("Exception during execution of task %s.", key, exc_info=True)
msg = error_message(exc)
return ExecuteFailureEvent(
key=key,
start=None,
stop=None,
exception=msg["exception"],
traceback=msg["traceback"],
exception_text=msg["exception_text"],
traceback_text=msg["traceback_text"],
stimulus_id=stimulus_id,
)
emsg = error_message(exc)
emsg.pop("status")
self.transition(
ts,

@functools.singledispatchmethod
def handle_event(self, ev: StateMachineEvent) -> tuple[Recs, Instructions]:
raise TypeError(ev) # pragma: nocover

@handle_event.register
def _(self, ev: CancelComputeEvent) -> tuple[Recs, Instructions]:
ts = self.tasks.get(ev.key)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tasks must always exist. There is no way this could've been dropped earlier. If so, that would be a severe bug

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had conflated the implementation of handle_cancel_compute coming from the scheduler and the code that dealt with a task being already cancelled by the time execute starts. They are now separate.

if not ts or ts.state not in READY | {"waiting"}:
return {}, []

self.log.append((ev.key, "cancel-compute", ev.stimulus_id, time()))
# All possible dependents of ts should not be in state Processing on
# scheduler side and therefore should not be assigned to a worker, yet.
assert not ts.dependents
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to not introduce any new assertions. For this specific handler there is no need to assert this condition, is there?

I'm not even sure if this assert is correct. I've seen such race conditions happening. For instance, there is a network partition such that this worker briefly disconnects. Another worker gets this task assigned and finishes earlier. In the meantime, this worker reconnects but the task is still executing/not yet executing but got a depentent assigned in the meantime.

See also the warning on scheduler side Unexpected worker completed task [...]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had conflated the implementation of handle_cancel_compute coming from the scheduler and the code that dealt with a task being already cancelled by the time execute starts. They are now separate.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should now be functionally identical to main

ts.done = True
return {ts: "released"}, []

@handle_event.register
def _(self, ev: ExecuteSuccessEvent) -> tuple[Recs, Instructions]:
# key *must* be still in tasks. Releasing it directly is forbidden
# without going through cancelled
ts = self.tasks.get(ev.key) # type: ignore
assert ts, self.story(ev.key)

ts.done = True
ts.startstops.append({"action": "compute", "start": ev.start, "stop": ev.stop})
ts.nbytes = ev.nbytes
ts.type = ev.type
return {ts: ("memory", ev.value)}, []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are loosing stimulus_ids here aren't we?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We aren't; see handle_stimulus:

            recs, instructions = self.handle_event(stim)
            self.transitions(recs, stimulus_id=stim.stimulus_id)


@handle_event.register
def _(self, ev: ExecuteFailureEvent) -> tuple[Recs, Instructions]:
# key *must* be still in tasks. Releasing it directly is forbidden
# without going through cancelled
ts = self.tasks.get(ev.key) # type: ignore
assert ts, self.story(ev.key)

ts.done = True
if ev.start is not None and ev.stop is not None:
ts.startstops.append(
{"action": "compute", "start": ev.start, "stop": ev.stop}
)

return {
ts: (
"error",
**emsg,
stimulus_id=stimulus_id,
ev.exception,
ev.traceback,
ev.exception_text,
ev.traceback_text,
)
finally:
self.ensure_computing()
self.ensure_communicating()
}, []

@handle_event.register
def _(self, ev: RescheduleEvent) -> tuple[Recs, Instructions]:
# key *must* be still in tasks. Releasing it directly is forbidden
# without going through cancelled
ts = self.tasks.get(ev.key) # type: ignore
assert ts, self.story(ev.key)
return {ts: "rescheduled"}, []

def _prepare_args_for_execution(
self, ts: TaskState, args: tuple, kwargs: dict[str, Any]
Expand Down
Loading