-
-
Notifications
You must be signed in to change notification settings - Fork 727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Worker State Machine refactor: redesign TaskState and scheduler messages #5922
Conversation
Unit Test Results 12 files ± 0 12 suites ±0 5h 38m 49s ⏱️ + 3m 56s For more details on these failures and errors, see this check. Results for commit 297bed6. ± Comparison against base commit 85bf1be. ♻️ This comment has been updated with latest results. |
CI still seems to be pretty upset but otherwise the changes look good, so far |
All test failures are unrelated. This is ready for review and merge. |
docs/source/worker.rst
Outdated
.. autoclass:: distributed.worker_state_machine.UniqueTaskHeap | ||
:members: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this should be publicly documented
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense - removing it
distributed/scheduler.py
Outdated
@@ -5516,7 +5520,7 @@ def handle_task_erred(self, key=None, **msg): | |||
recommendations: dict | |||
client_msgs: dict | |||
worker_msgs: dict | |||
r: tuple = self.stimulus_task_erred(key=key, **msg) | |||
r: tuple = self.stimulus_task_erred(key=key, status="error", **msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the status
kwarg for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleaned up and broken out to #5926, which this PR incorporates.
TaskFinishedMsg, | ||
TaskState, | ||
UniqueTaskHeap, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Quite ugly but transitory. I expect that all *Msg
classes and the state sets won't need to be imported after we move the state machine to the other module.
"rescheduled", | ||
"resumed", | ||
"waiting", | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd rather have #5444 but this is the next best thing
Not to be confused with :class:`distributed.scheduler.TaskState`, which holds | ||
similar information on the scheduler side. | ||
""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reduced the size of the attributes declaration by a factor of 3 (docstring + class annotations + init method -> just the class annotations)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for the dataclass
Re doc string I'm fine with this but we should be aware that this obviously removes any sphinx rendering. I know scheduler state tasks are rendered atm
#: The previous state of the task. This is a state machine implementation detail. | ||
_previous: TaskStateState | None = None | ||
#: The next state of the task. This is a state machine implementation detail. | ||
_next: TaskStateState | None = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fjetter wanna chip in on these two?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some documentation about this here
distributed/distributed/worker.py
Lines 2415 to 2436 in 925c610
def _transition_from_resumed( | |
self, ts: TaskState, finish: str, *, stimulus_id: str | |
) -> tuple[Recs, Smsgs]: | |
"""`resumed` is an intermediate degenerate state which splits further up | |
into two states depending on what the last signal / next state is | |
intended to be. There are only two viable choices depending on whether | |
the task is required to be fetched from another worker `resumed(fetch)` | |
or the task shall be computed on this worker `resumed(waiting)`. | |
The only viable state transitions ending up here are | |
flight -> cancelled -> resumed(waiting) | |
or | |
executing -> cancelled -> resumed(fetch) | |
depending on the origin. Equally, only `fetch`, `waiting` or `released` | |
are allowed output states. | |
See also `transition_resumed_waiting` | |
""" |
dc_slots = {"slots": True} if sys.version_info >= (3, 10) else {} | ||
|
||
|
||
@dataclass(repr=False, eq=False, **dc_slots) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only way I could find to get __slots__
in Python 3.8/3.9 was not to use @dataclass
, which in my opinion offers much bigger rewards in terms of readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slots are a nice performance boost for attribute access but I don't think it's required on the worker side. I'm fine with this, shouldn't cause any problems.
docs/source/worker.rst
Outdated
.. autoclass:: distributed.worker_state_machine.UniqueTaskHeap | ||
:members: | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense - removing it
Blocked by #5926 |
#5926 no longer blocks this issue |
traceback=ts.traceback, | ||
exception_text=ts.exception_text, | ||
traceback_text=ts.traceback_text, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keys "status", "thread", and "startstops" were ignored by the scheduler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are not. They are subtly picked up by extensions such as TaskStream and EventStream. -__-
All test failures are unrelated. Ready for final review and merge. |
dc_slots = {"slots": True} if sys.version_info >= (3, 10) else {} | ||
|
||
|
||
@dataclass(repr=False, eq=False, **dc_slots) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slots are a nice performance boost for attribute access but I don't think it's required on the worker side. I'm fine with this, shouldn't cause any problems.
@lru_cache | ||
def _default_data_size() -> int: | ||
return parse_bytes(dask.config.get("distributed.scheduler.default-data-size")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reason for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To read the config on first use instead of when loading the module like it was before, thus avoiding headaches related to module load order.
Not to be confused with :class:`distributed.scheduler.TaskState`, which holds | ||
similar information on the scheduler side. | ||
""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for the dataclass
Re doc string I'm fine with this but we should be aware that this obviously removes any sphinx rendering. I know scheduler state tasks are rendered atm
#: The previous state of the task. This is a state machine implementation detail. | ||
_previous: TaskStateState | None = None | ||
#: The next state of the task. This is a state machine implementation detail. | ||
_next: TaskStateState | None = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some documentation about this here
distributed/distributed/worker.py
Lines 2415 to 2436 in 925c610
def _transition_from_resumed( | |
self, ts: TaskState, finish: str, *, stimulus_id: str | |
) -> tuple[Recs, Smsgs]: | |
"""`resumed` is an intermediate degenerate state which splits further up | |
into two states depending on what the last signal / next state is | |
intended to be. There are only two viable choices depending on whether | |
the task is required to be fetched from another worker `resumed(fetch)` | |
or the task shall be computed on this worker `resumed(waiting)`. | |
The only viable state transitions ending up here are | |
flight -> cancelled -> resumed(waiting) | |
or | |
executing -> cancelled -> resumed(fetch) | |
depending on the origin. Equally, only `fetch`, `waiting` or `released` | |
are allowed output states. | |
See also `transition_resumed_waiting` | |
""" |
distributed/worker_state_machine.py
Outdated
# Note: as of Python 3.10.2, @dataclass(slots=True) doesn't work with __init__subclass__ | ||
# https://bugs.python.org/issue46970 | ||
@dataclass | ||
class TaskFinishedMsg(SendMessageToScheduler, op="task-finished"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a functional difference to the case where I simply define the op in our subclasses, e.g.
class TaskFinishedMsg(SendMessageToScheduler):
op = "task-finished"
The usage of metaclasses feels a bit complex. is there anything else going on that I'm not aware of or is this a style question?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it's just style. Happy to remove it.
|
Interesting. Thanks for verifying. This is indeed much better and compact |
@fjetter are there any outstanding points? |
In scope
__slots__
in Python >=3.10. This should lead to substantial savings in unmanaged memory.Out of scope
instruction_id
, as described in (Worker) State Machine determinism and replayability #5736 - left to later discussion