forked from dask/distributed
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Redesign TaskState and scheduler messages (dask#5922)
- Loading branch information
1 parent
8d9b5c2
commit 183959c
Showing
9 changed files
with
629 additions
and
479 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
import pytest | ||
|
||
from distributed.utils import recursive_to_dict | ||
from distributed.worker_state_machine import ( | ||
ReleaseWorkerDataMsg, | ||
SendMessageToScheduler, | ||
TaskState, | ||
UniqueTaskHeap, | ||
) | ||
|
||
|
||
def test_TaskState_get_nbytes(): | ||
assert TaskState("x", nbytes=123).get_nbytes() == 123 | ||
# Default to distributed.scheduler.default-data-size | ||
assert TaskState("y").get_nbytes() == 1024 | ||
|
||
|
||
def test_TaskState__to_dict(): | ||
"""Tasks that are listed as dependencies or dependents of other tasks are dumped as | ||
a short repr and always appear in full directly under Worker.tasks. Uninteresting | ||
fields are omitted. | ||
""" | ||
x = TaskState("x", state="memory", done=True) | ||
y = TaskState("y", priority=(0,), dependencies={x}) | ||
x.dependents.add(y) | ||
actual = recursive_to_dict([x, y]) | ||
assert actual == [ | ||
{ | ||
"key": "x", | ||
"state": "memory", | ||
"done": True, | ||
"dependents": ["<TaskState 'y' released>"], | ||
}, | ||
{ | ||
"key": "y", | ||
"state": "released", | ||
"dependencies": ["<TaskState 'x' memory>"], | ||
"priority": [0], | ||
}, | ||
] | ||
|
||
|
||
def test_unique_task_heap(): | ||
heap = UniqueTaskHeap() | ||
|
||
for x in range(10): | ||
ts = TaskState(f"f{x}", priority=(0,)) | ||
ts.priority = (0, 0, 1, x % 3) | ||
heap.push(ts) | ||
|
||
heap_list = list(heap) | ||
# iteration does not empty heap | ||
assert len(heap) == 10 | ||
assert heap_list == sorted(heap_list, key=lambda ts: ts.priority) | ||
|
||
seen = set() | ||
last_prio = (0, 0, 0, 0) | ||
while heap: | ||
peeked = heap.peek() | ||
ts = heap.pop() | ||
assert peeked == ts | ||
seen.add(ts.key) | ||
assert ts.priority | ||
assert last_prio <= ts.priority | ||
last_prio = last_prio | ||
|
||
ts = TaskState("foo", priority=(0,)) | ||
heap.push(ts) | ||
heap.push(ts) | ||
assert len(heap) == 1 | ||
|
||
assert repr(heap) == "<UniqueTaskHeap: 1 items>" | ||
|
||
assert heap.pop() == ts | ||
assert not heap | ||
|
||
# Test that we're cleaning the seen set on pop | ||
heap.push(ts) | ||
assert len(heap) == 1 | ||
assert heap.pop() == ts | ||
|
||
assert repr(heap) == "<UniqueTaskHeap: 0 items>" | ||
|
||
|
||
@pytest.mark.parametrize("cls", SendMessageToScheduler.__subclasses__()) | ||
def test_sendmsg_slots(cls): | ||
smsg = cls(**dict.fromkeys(cls.__annotations__)) | ||
assert not hasattr(smsg, "__dict__") | ||
|
||
|
||
def test_sendmsg_to_dict(): | ||
# Arbitrary sample class | ||
smsg = ReleaseWorkerDataMsg(key="x") | ||
assert smsg.to_dict() == {"op": "release-worker-data", "key": "x"} |
Oops, something went wrong.