Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker State Machine refactor: redesign TaskState and scheduler messages #5922

Merged
merged 14 commits into from
Mar 14, 2022
2 changes: 1 addition & 1 deletion distributed/active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from .metrics import time
from .utils import import_term, log_errors

if TYPE_CHECKING: # pragma: nocover
if TYPE_CHECKING:
from .client import Client
from .scheduler import Scheduler, TaskState, WorkerState

Expand Down
2 changes: 1 addition & 1 deletion distributed/batched.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def _background_send(self):
self.stopped.set()
self.abort()

def send(self, *msgs):
def send(self, *msgs: dict) -> None:
"""Schedule a message for sending to the other side

This completes quickly and synchronously
Expand Down
2 changes: 2 additions & 0 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,8 @@ def _to_dict_no_nest(self, *, exclude: "Container[str]" = ()) -> dict:
class TaskState:
"""
A simple object holding information about a task.
Not to be confused with :class:`distributed.worker_state_machine.TaskState`, which
holds similar information on the Worker side.

.. attribute:: key: str

Expand Down
17 changes: 6 additions & 11 deletions distributed/tests/test_failed_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
slowadd,
slowinc,
)
from distributed.worker_state_machine import TaskState

pytestmark = pytest.mark.ci1

Expand Down Expand Up @@ -494,19 +495,13 @@ async def test_worker_time_to_live(c, s, a, b):

@gen_cluster()
async def test_forget_data_not_supposed_to_have(s, a, b):
"""
If a depednecy fetch finishes on a worker after the scheduler already
released everything, the worker might be stuck with a redundant replica
which is never cleaned up.
"""If a dependency fetch finishes on a worker after the scheduler already released
everything, the worker might be stuck with a redundant replica which is never
cleaned up.
"""
# FIXME: Replace with "blackbox test" which shows an actual example where
# this situation is provoked if this is even possible.
# If this cannot be constructed, the entire superfuous_data handler and its
# corresponding pieces on the scheduler side may be removed
from distributed.worker import TaskState

ts = TaskState("key")
ts.state = "flight"
# this situation is provoked if this is even possible.
ts = TaskState("key", state="flight")
a.tasks["key"] = ts
recommendations = {ts: ("memory", 123)}
a.transitions(recommendations, stimulus_id="test")
Expand Down
70 changes: 1 addition & 69 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,7 @@
slowinc,
slowsum,
)
from distributed.worker import (
TaskState,
UniqueTaskHeap,
Worker,
error_message,
logger,
parse_memory_limit,
)
from distributed.worker import Worker, error_message, logger, parse_memory_limit

pytestmark = pytest.mark.ci1

Expand Down Expand Up @@ -3747,67 +3740,6 @@ async def test_Worker__to_dict(c, s, a):
assert d["tasks"]["x"]["key"] == "x"


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_TaskState__to_dict(c, s, a):
"""tasks that are listed as dependencies of other tasks are dumped as a short repr
and always appear in full under Worker.tasks
"""
x = c.submit(inc, 1, key="x")
y = c.submit(inc, x, key="y")
z = c.submit(inc, 2, key="z")
await wait([x, y, z])

tasks = a._to_dict()["tasks"]

assert isinstance(tasks["x"], dict)
assert isinstance(tasks["y"], dict)
assert isinstance(tasks["z"], dict)
assert tasks["x"]["dependents"] == ["<TaskState 'y' memory>"]
assert tasks["y"]["dependencies"] == ["<TaskState 'x' memory>"]


def test_unique_task_heap():
heap = UniqueTaskHeap()

for x in range(10):
ts = TaskState(f"f{x}")
ts.priority = (0, 0, 1, x % 3)
heap.push(ts)

heap_list = list(heap)
# iteration does not empty heap
assert len(heap) == 10
assert heap_list == sorted(heap_list, key=lambda ts: ts.priority)

seen = set()
last_prio = (0, 0, 0, 0)
while heap:
peeked = heap.peek()
ts = heap.pop()
assert peeked == ts
seen.add(ts.key)
assert ts.priority
assert last_prio <= ts.priority
last_prio = last_prio

ts = TaskState("foo")
heap.push(ts)
heap.push(ts)
assert len(heap) == 1

assert repr(heap) == "<UniqueTaskHeap: 1 items>"

assert heap.pop() == ts
assert not heap

# Test that we're cleaning the seen set on pop
heap.push(ts)
assert len(heap) == 1
assert heap.pop() == ts

assert repr(heap) == "<UniqueTaskHeap: 0 items>"


@gen_cluster(nthreads=[])
async def test_do_not_block_event_loop_during_shutdown(s):
loop = asyncio.get_running_loop()
Expand Down
94 changes: 94 additions & 0 deletions distributed/tests/test_worker_state_machine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import pytest

from distributed.utils import recursive_to_dict
from distributed.worker_state_machine import (
ReleaseWorkerDataMsg,
SendMessageToScheduler,
TaskState,
UniqueTaskHeap,
)


def test_TaskState_get_nbytes():
assert TaskState("x", nbytes=123).get_nbytes() == 123
# Default to distributed.scheduler.default-data-size
assert TaskState("y").get_nbytes() == 1024


def test_TaskState__to_dict():
"""Tasks that are listed as dependencies or dependents of other tasks are dumped as
a short repr and always appear in full directly under Worker.tasks. Uninteresting
fields are omitted.
"""
x = TaskState("x", state="memory", done=True)
y = TaskState("y", priority=(0,), dependencies={x})
x.dependents.add(y)
actual = recursive_to_dict([x, y])
assert actual == [
{
"key": "x",
"state": "memory",
"done": True,
"dependents": ["<TaskState 'y' released>"],
},
{
"key": "y",
"state": "released",
"dependencies": ["<TaskState 'x' memory>"],
"priority": [0],
},
]


def test_unique_task_heap():
heap = UniqueTaskHeap()

for x in range(10):
ts = TaskState(f"f{x}", priority=(0,))
ts.priority = (0, 0, 1, x % 3)
heap.push(ts)

heap_list = list(heap)
# iteration does not empty heap
assert len(heap) == 10
assert heap_list == sorted(heap_list, key=lambda ts: ts.priority)

seen = set()
last_prio = (0, 0, 0, 0)
while heap:
peeked = heap.peek()
ts = heap.pop()
assert peeked == ts
seen.add(ts.key)
assert ts.priority
assert last_prio <= ts.priority
last_prio = last_prio

ts = TaskState("foo", priority=(0,))
heap.push(ts)
heap.push(ts)
assert len(heap) == 1

assert repr(heap) == "<UniqueTaskHeap: 1 items>"

assert heap.pop() == ts
assert not heap

# Test that we're cleaning the seen set on pop
heap.push(ts)
assert len(heap) == 1
assert heap.pop() == ts

assert repr(heap) == "<UniqueTaskHeap: 0 items>"


@pytest.mark.parametrize("cls", SendMessageToScheduler.__subclasses__())
def test_sendmsg_slots(cls):
smsg = cls(**dict.fromkeys(cls.__annotations__))
assert not hasattr(smsg, "__dict__")


def test_sendmsg_to_dict():
# Arbitrary sample class
smsg = ReleaseWorkerDataMsg(key="x")
assert smsg.to_dict() == {"op": "release-worker-data", "key": "x"}
Loading