Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add explicit fetch state to worker TaskState #4470

Merged
merged 29 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c422b32
Add state "new" and "fetch" to worker
gforsyth Jan 25, 2021
538f426
Handle when tasks have been reassigned by scheduler
gforsyth Jan 27, 2021
bfab13f
Add check that task has no who_has
gforsyth Jan 27, 2021
df5631f
waiting_for_data isn't working right
gforsyth Jan 29, 2021
8eef9d1
Better handling for dependencies that get re-added
gforsyth Jan 29, 2021
36b0de5
Release keys in `fetch` if no dependents
gforsyth Jan 29, 2021
e051e27
Don't overload `data_needed`
gforsyth Jan 29, 2021
2f31344
Remove breakpoints
gforsyth Jan 29, 2021
1c7db04
Add updated task transitions to test_worker_plugin
gforsyth Jan 30, 2021
7e74a7d
Remove bad pass
gforsyth Jan 30, 2021
4b467d9
Only use `who_has` and `has_what` for non-local tasks
gforsyth Jan 30, 2021
7ee6797
black
gforsyth Jan 30, 2021
24cb8ab
Transitions for scheduler reassignment following broken workers
gforsyth Feb 1, 2021
7498285
validate method for fetch state
gforsyth Feb 1, 2021
40a5ae3
add try/catch to new transition functions
gforsyth Feb 1, 2021
7224111
Remove commented out code
gforsyth Feb 1, 2021
66171d7
Update docs on internal scheduling
gforsyth Feb 1, 2021
8f0007d
Fuse state checks and gate data_needed
gforsyth Feb 2, 2021
d577751
Remove `remove=` kwarg from fetch transition
gforsyth Feb 2, 2021
56ad76e
Set runspec to positional arg in rescheduling transitions
gforsyth Feb 2, 2021
da831f4
Remove `stealing` transitions
gforsyth Feb 3, 2021
3f40266
Fix for deprecation warning in tests
gforsyth Feb 3, 2021
f50ab4e
Add "cause" to most calls to release_key
gforsyth Mar 3, 2021
e7b86de
Don't release keys in flight
gforsyth Mar 3, 2021
6259063
Reserve `self.data_needed` for keys that will be executed
gforsyth Mar 3, 2021
cdeb85c
Don't discard result in transition
gforsyth Mar 3, 2021
94d184a
Add a few comments and swap in a set comprehension
gforsyth Mar 3, 2021
47ada8e
Check in-flight task count in steal test
gforsyth Mar 3, 2021
14d2904
Remove in-transition call to `release_key`
gforsyth Mar 3, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions distributed/diagnostics/tests/test_worker_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ async def test_create_on_construction(c, s, a, b):
@gen_cluster(nthreads=[("127.0.0.1", 1)], client=True)
async def test_normal_task_transitions_called(c, s, w):
expected_notifications = [
{"key": "task", "start": "new", "finish": "waiting"},
{"key": "task", "start": "waiting", "finish": "ready"},
{"key": "task", "start": "ready", "finish": "executing"},
{"key": "task", "start": "executing", "finish": "memory"},
Expand All @@ -95,6 +96,7 @@ def failing(x):
raise Exception()

expected_notifications = [
{"key": "task", "start": "new", "finish": "waiting"},
{"key": "task", "start": "waiting", "finish": "ready"},
{"key": "task", "start": "ready", "finish": "executing"},
{"key": "task", "start": "executing", "finish": "error"},
Expand All @@ -113,6 +115,7 @@ def failing(x):
)
async def test_superseding_task_transitions_called(c, s, w):
expected_notifications = [
{"key": "task", "start": "new", "finish": "waiting"},
{"key": "task", "start": "waiting", "finish": "constrained"},
{"key": "task", "start": "constrained", "finish": "executing"},
{"key": "task", "start": "executing", "finish": "memory"},
Expand All @@ -131,9 +134,11 @@ async def test_release_dep_called(c, s, w):
dsk = {"dep": 1, "task": (inc, "dep")}

expected_notifications = [
{"key": "dep", "start": "new", "finish": "waiting"},
{"key": "dep", "start": "waiting", "finish": "ready"},
{"key": "dep", "start": "ready", "finish": "executing"},
{"key": "dep", "start": "executing", "finish": "memory"},
{"key": "task", "start": "new", "finish": "waiting"},
{"key": "task", "start": "waiting", "finish": "ready"},
{"key": "task", "start": "ready", "finish": "executing"},
{"key": "task", "start": "executing", "finish": "memory"},
Expand Down
2 changes: 1 addition & 1 deletion distributed/pytest_resourceleaks.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def pytest_addoption(parser):
group.addoption(
"--leaks-timeout",
action="store",
type="float",
type=float,
dest="leaks_timeout",
default=0.5,
help="""\
Expand Down
3 changes: 3 additions & 0 deletions distributed/tests/test_steal.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,9 @@ async def test_steal_twice(c, s, a, b):
)
assert max(map(len, has_what.values())) < 30

assert a.in_flight_tasks == 0
assert b.in_flight_tasks == 0

await c._close()
await asyncio.gather(*[w.close() for w in workers])

Expand Down
Loading