From c422b32c9859bebea0ce2331663f90a9eaa46869 Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Mon, 25 Jan 2021 15:26:07 -0500 Subject: [PATCH 01/29] Add state "new" and "fetch" to worker --- distributed/worker.py | 72 +++++++++++++++++++++++++++---------------- 1 file changed, 45 insertions(+), 27 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 7e282c2e46a..798048e4b8f 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -163,7 +163,7 @@ def __init__(self, key, runspec=None): self.dependents = set() self.duration = None self.priority = None - self.state = None + self.state = "new" self.who_has = set() self.coming_from = None self.waiting_for_data = set() @@ -448,27 +448,26 @@ def __init__( self.validate = validate self._transitions = { + # Basic state transitions + ("new", "waiting"): self.transition_new_waiting, + ("new", "fetch"): self.transition_new_fetch, ("waiting", "ready"): self.transition_waiting_ready, - ("waiting", "memory"): self.transition_waiting_done, - ("waiting", "error"): self.transition_waiting_done, - ("waiting", "flight"): self.transition_waiting_flight, + ("fetch", "flight"): self.transition_fetch_flight, ("ready", "executing"): self.transition_ready_executing, - ("ready", "memory"): self.transition_ready_memory, - ("ready", "error"): self.transition_ready_error, - ("ready", "waiting"): self.transition_ready_waiting, - ("constrained", "waiting"): self.transition_ready_waiting, - ("constrained", "error"): self.transition_ready_error, - ("constrained", "executing"): self.transition_constrained_executing, ("executing", "memory"): self.transition_executing_done, + ("flight", "memory"): self.transition_flight_memory, + ("flight", "fetch"): self.transition_flight_fetch, + # Scheduler intercession + ("fetch", "waiting"): self.transition_fetch_waiting, + # Errors, long-running, constrained + ("waiting", "error"): self.transition_waiting_done, + ("constrained", "executing"): self.transition_constrained_executing, ("executing", "error"): self.transition_executing_done, ("executing", "rescheduled"): self.transition_executing_done, ("executing", "long-running"): self.transition_executing_long_running, ("long-running", "error"): self.transition_executing_done, ("long-running", "memory"): self.transition_executing_done, ("long-running", "rescheduled"): self.transition_executing_done, - ("flight", "memory"): self.transition_flight_memory, - ("flight", "ready"): self.transition_flight_memory, - ("flight", "waiting"): self.transition_flight_waiting, } self.incoming_transfer_log = deque(maxlen=100000) @@ -1473,13 +1472,18 @@ def add_task( ts.exception = None ts.traceback = None else: - ts.state = "waiting" + # self.log.append((ts.key, "re-add", ts.state, "waiting")) + self.log.append((ts.key, "re-adding key, new TaskState")) + self.tasks[key] = ts = TaskState( + key=key, runspec=SerializedTask(function, args, kwargs, task) + ) + self.transition(ts, "waiting") else: self.log.append((key, "new")) self.tasks[key] = ts = TaskState( key=key, runspec=SerializedTask(function, args, kwargs, task) ) - ts.state = "waiting" + self.transition(ts, "waiting") if priority is not None: priority = tuple(priority) + (self.generation,) @@ -1500,9 +1504,7 @@ def add_task( assert workers if dependency not in self.tasks: self.tasks[dependency] = dep_ts = TaskState(key=dependency) - dep_ts.state = ( - "waiting" if dependency not in self.data else "memory" - ) + dep_ts.state = "fetch" if dependency not in self.data else "memory" dep_ts = self.tasks[dependency] self.log.append((dependency, "new-dep", dep_ts.state)) @@ -1558,10 +1560,25 @@ def transition(self, ts, finish, **kwargs): self.validate_task(ts) self._notify_plugins("transition", ts.key, start, state or finish, **kwargs) - def transition_waiting_flight(self, ts, worker=None): + def transition_new_waiting(self, ts): + if self.validate: + assert ts.state == "new" + assert ts.runspec is not None + + def transition_new_fetch(self, ts): + if self.validate: + assert ts.state == "new" + assert ts.runspec is None + + def transition_fetch_waiting(self, ts): + if self.validate: + assert ts.state == "fetch" + assert ts.runspec is not None + + def transition_fetch_flight(self, ts, worker=None): try: if self.validate: - assert ts.state != "flight" + assert ts.state == "fetch" assert ts.dependents ts.coming_from = worker @@ -1574,7 +1591,7 @@ def transition_waiting_flight(self, ts, worker=None): pdb.set_trace() raise - def transition_flight_waiting(self, ts, worker=None, remove=True, runspec=None): + def transition_flight_fetch(self, ts, worker=None, remove=True): try: if self.validate: assert ts.state == "flight" @@ -1867,7 +1884,7 @@ def ensure_communicating(self): if self.validate: assert all(dep.key in self.tasks for dep in deps) - deps = [dep for dep in deps if dep.state == "waiting"] + deps = [dep for dep in deps if dep.state == "fetch"] missing_deps = {dep for dep in deps if not dep.who_has} if missing_deps: @@ -1892,7 +1909,7 @@ def ensure_communicating(self): or self.comm_nbytes < self.total_comm_nbytes ): dep = deps.pop() - if dep.state != "waiting": + if dep.state != "fetch": continue if not dep.who_has: continue @@ -2019,7 +2036,7 @@ def select_keys_for_gather(self, worker, dep): while L: d = L.popleft() ts = self.tasks.get(d) - if ts is None or ts.state != "waiting": + if ts is None or ts.state != "fetch": continue if total_bytes + ts.get_nbytes() > self.target_message_size: break @@ -2069,7 +2086,7 @@ async def gather_dep(self, worker, dep, deps, total_nbytes, cause=None): self.log.append(("busy-gather", worker, deps)) for ts in deps_ts: if ts.state == "flight": - self.transition(ts, "waiting") + self.transition(ts, "fetch") return if cause: @@ -2152,7 +2169,8 @@ async def gather_dep(self, worker, dep, deps, total_nbytes, cause=None): self.release_key(d) continue elif ts.state not in ("ready", "memory"): - self.transition(ts, "waiting", worker=worker, remove=not busy) + # "waiting" or "fetch"? + self.transition(ts, "fetch", worker=worker, remove=not busy) if not busy and d not in data and ts.dependents: self.log.append(("missing-dep", d)) @@ -3012,7 +3030,7 @@ def validate_state(self): ts_wait = self.tasks[key] assert ( ts_wait.state == "flight" - or ts_wait.state == "waiting" + or ts_wait.state == "fetch" or ts_wait.key in self._missing_dep_flight or ts_wait.who_has.issubset(self.in_flight_workers) ) From 538f426d3698e9ccd9d7bff402cf60366aecde30 Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Wed, 27 Jan 2021 13:01:08 -0500 Subject: [PATCH 02/29] Handle when tasks have been reassigned by scheduler This is usually a `fetch` -> `waiting` transition --- distributed/worker.py | 47 +++++++++++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 798048e4b8f..26278671514 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1472,7 +1472,6 @@ def add_task( ts.exception = None ts.traceback = None else: - # self.log.append((ts.key, "re-add", ts.state, "waiting")) self.log.append((ts.key, "re-adding key, new TaskState")) self.tasks[key] = ts = TaskState( key=key, runspec=SerializedTask(function, args, kwargs, task) @@ -1504,20 +1503,20 @@ def add_task( assert workers if dependency not in self.tasks: self.tasks[dependency] = dep_ts = TaskState(key=dependency) - dep_ts.state = "fetch" if dependency not in self.data else "memory" + state = "fetch" if dependency not in self.data else "memory" dep_ts = self.tasks[dependency] + self.transition(dep_ts, state, child=ts) self.log.append((dependency, "new-dep", dep_ts.state)) - if dep_ts.state != "memory": - ts.waiting_for_data.add(dep_ts.key) - self.waiting_for_data_count += 1 - dep_ts.who_has.update(workers) ts.dependencies.add(dep_ts) dep_ts.dependents.add(ts) + if dep_ts.state == "fetch": + ts.waiting_for_data.add(dep_ts.key) + for worker in workers: self.has_what[worker].add(dep_ts.key) if dep_ts.state != "memory": @@ -1527,9 +1526,17 @@ def add_task( for key, value in nbytes.items(): self.tasks[key].nbytes = value - if ts.waiting_for_data: - self.data_needed.append(ts.key) - else: + # If a worker dies there can be a scheduler-triggered reassignment + # of the task leading to a state change from `fetch` -> `waiting` + # This can cause problems if a worker goes looking for who has a + # dependency when the answer is suddenly "me" + # To stop this, update who has what after all dependencies + # have been (re)created. + # TODO: move this into the appropriate transition functions + self.update_who_has(who_has) + # if ts.waiting_for_data: + # self.data_needed.append(ts.key) + if not ts.waiting_for_data: self.transition(ts, "ready") if self.validate: if who_has: @@ -1565,10 +1572,16 @@ def transition_new_waiting(self, ts): assert ts.state == "new" assert ts.runspec is not None - def transition_new_fetch(self, ts): + def transition_new_fetch(self, parent, child=None): if self.validate: - assert ts.state == "new" - assert ts.runspec is None + assert parent.state == "new" + assert parent.runspec is None + assert child.runspec is not None + assert child.state != "new" + + child.waiting_for_data.add(parent.key) + self.data_needed.append(child.key) + self.waiting_for_data_count += 1 def transition_fetch_waiting(self, ts): if self.validate: @@ -1663,7 +1676,7 @@ def transition_waiting_ready(self, ts): assert all( dep.key in self.data or dep.key in self.actors for dep in ts.dependencies - ) + ), breakpoint() assert all(dep.state == "memory" for dep in ts.dependencies) assert ts.key not in self.ready @@ -2165,9 +2178,9 @@ async def gather_dep(self, worker, dep, deps, total_nbytes, cause=None): if not busy and d in data: self.transition(ts, "memory", value=data[d]) - elif ts is None or ts.state == "executing": - self.release_key(d) - continue + # elif ts is None or ts.state == "executing": + # self.release_key(d) + # continue elif ts.state not in ("ready", "memory"): # "waiting" or "fetch"? self.transition(ts, "fetch", worker=worker, remove=not busy) @@ -3041,7 +3054,7 @@ def validate_state(self): for worker, keys in self.has_what.items(): for k in keys: - assert worker in self.tasks[k].who_has + assert worker in self.tasks[k].who_has, breakpoint() for ts in self.tasks.values(): self.validate_task(ts) From bfab13f1ee720ba6c3c53aaa8c9158c50c4cbdc9 Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Wed, 27 Jan 2021 15:35:08 -0500 Subject: [PATCH 03/29] Add check that task has no who_has --- distributed/worker.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 26278671514..ceec12b4b37 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1571,6 +1571,7 @@ def transition_new_waiting(self, ts): if self.validate: assert ts.state == "new" assert ts.runspec is not None + assert not ts.who_has, breakpoint() def transition_new_fetch(self, parent, child=None): if self.validate: @@ -2178,9 +2179,9 @@ async def gather_dep(self, worker, dep, deps, total_nbytes, cause=None): if not busy and d in data: self.transition(ts, "memory", value=data[d]) - # elif ts is None or ts.state == "executing": - # self.release_key(d) - # continue + elif ts is None or ts.state == "executing": + self.release_key(d) + continue elif ts.state not in ("ready", "memory"): # "waiting" or "fetch"? self.transition(ts, "fetch", worker=worker, remove=not busy) From df5631f606d5f3d97835226f634f9e4eb29172c3 Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Fri, 29 Jan 2021 14:37:13 -0500 Subject: [PATCH 04/29] waiting_for_data isn't working right --- distributed/worker.py | 75 ++++++++++++++++++++++++++++++------------- 1 file changed, 52 insertions(+), 23 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index ceec12b4b37..186c1a2e869 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -468,6 +468,10 @@ def __init__( ("long-running", "error"): self.transition_executing_done, ("long-running", "memory"): self.transition_executing_done, ("long-running", "rescheduled"): self.transition_executing_done, + # Stealing transitions + # ("waiting", "new"): self.transition_waiting_new, + ("ready", "waiting"): self.transition_ready_waiting, + ("constrained", "waiting"): self.transition_constrained_waiting, } self.incoming_transfer_log = deque(maxlen=100000) @@ -1504,18 +1508,15 @@ def add_task( if dependency not in self.tasks: self.tasks[dependency] = dep_ts = TaskState(key=dependency) - state = "fetch" if dependency not in self.data else "memory" dep_ts = self.tasks[dependency] - self.transition(dep_ts, state, child=ts) - self.log.append((dependency, "new-dep", dep_ts.state)) - - dep_ts.who_has.update(workers) - ts.dependencies.add(dep_ts) dep_ts.dependents.add(ts) - if dep_ts.state == "fetch": - ts.waiting_for_data.add(dep_ts.key) + state = "fetch" if dependency not in self.data else "memory" + self.transition(dep_ts, state) + self.log.append((dependency, "new-dep", dep_ts.state, ts)) + + dep_ts.who_has.update(workers) for worker in workers: self.has_what[worker].add(dep_ts.key) @@ -1534,11 +1535,14 @@ def add_task( # have been (re)created. # TODO: move this into the appropriate transition functions self.update_who_has(who_has) - # if ts.waiting_for_data: - # self.data_needed.append(ts.key) if not ts.waiting_for_data: self.transition(ts, "ready") if self.validate: + for worker, keys in self.has_what.items(): + for k in keys: + # TODO: is this getting tripped up by stealing? + # assert worker in self.tasks[k].who_has, breakpoint() + pass if who_has: assert all(self.tasks[dep] in ts.dependencies for dep in who_has) assert all(self.tasks[dep.key] for dep in ts.dependencies) @@ -1559,7 +1563,11 @@ def transition(self, ts, finish, **kwargs): start = ts.state if start == finish: return - func = self._transitions[start, finish] + try: + func = self._transitions[start, finish] + except KeyError: + breakpoint() + pass state = func(ts, **kwargs) self.log.append((ts.key, start, state or finish)) ts.state = state or finish @@ -1573,15 +1581,15 @@ def transition_new_waiting(self, ts): assert ts.runspec is not None assert not ts.who_has, breakpoint() - def transition_new_fetch(self, parent, child=None): + def transition_new_fetch(self, ts): if self.validate: - assert parent.state == "new" - assert parent.runspec is None - assert child.runspec is not None - assert child.state != "new" + assert ts.state == "new" + assert ts.runspec is None + + for dependent in ts.dependents: + dependent.waiting_for_data.add(ts.key) - child.waiting_for_data.add(parent.key) - self.data_needed.append(child.key) + self.data_needed.append(ts.key) self.waiting_for_data_count += 1 def transition_fetch_waiting(self, ts): @@ -1589,6 +1597,9 @@ def transition_fetch_waiting(self, ts): assert ts.state == "fetch" assert ts.runspec is not None + for dependent in ts.dependents: + dependent.waiting_for_data.discard(ts.key) + def transition_fetch_flight(self, ts, worker=None): try: if self.validate: @@ -1625,6 +1636,7 @@ def transition_flight_fetch(self, ts, worker=None, remove=True): self._missing_dep_flight.add(ts.key) self.loop.add_callback(self.handle_missing_dep, ts) for dependent in ts.dependents: + dependent.waiting_for_data.add(ts.key) if dependent.state == "waiting": if remove: # try a new worker immediately self.data_needed.appendleft(dependent.key) @@ -1681,7 +1693,9 @@ def transition_waiting_ready(self, ts): assert all(dep.state == "memory" for dep in ts.dependencies) assert ts.key not in self.ready - ts.waiting_for_data.clear() + # ts.waiting_for_data.clear() + # TODO WHAT? + # self.has_what.update() if ts.resource_restrictions is not None: self.constrained.append(ts.key) @@ -1752,7 +1766,19 @@ def transition_ready_waiting(self, ts): """ This transition is common for work stealing """ - pass + ts.runspec = None + + def transition_waiting_new(self, ts): + """ + Common in work stealing + """ + ts.runspec = None + + def transition_constrained_waiting(self, ts): + """ + Common in work stealing + """ + ts.runspec = None def transition_constrained_executing(self, ts): self.transition_ready_executing(ts) @@ -2316,8 +2342,8 @@ def steal_request(self, key): # Resetting the runspec should be reset by the transition. However, # the waiting->waiting transition results in a no-op which would not # reset. - ts.runspec = None self.transition(ts, "waiting") + ts.runspec = None if not ts.dependents: self.release_key(ts.key) if self.validate: @@ -2995,7 +3021,9 @@ def validate_task_waiting(self, ts): assert ts.key not in self.data assert ts.state == "waiting" if ts.dependencies and ts.runspec: - assert not all(dep.key in self.data for dep in ts.dependencies) + assert not all( + dep.key in self.data for dep in ts.dependencies + ), breakpoint() def validate_task_flight(self, ts): assert ts.key not in self.data @@ -3055,7 +3083,8 @@ def validate_state(self): for worker, keys in self.has_what.items(): for k in keys: - assert worker in self.tasks[k].who_has, breakpoint() + # assert worker in self.tasks[k].who_has, breakpoint() + pass for ts in self.tasks.values(): self.validate_task(ts) From 8eef9d183111eb9e10418fc3041c7b682b3abb50 Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Fri, 29 Jan 2021 15:26:47 -0500 Subject: [PATCH 05/29] Better handling for dependencies that get re-added A dependency might get created for the first time because it's required by an added task. Alternatively, a task may be added that requires a dependency that already has a TaskState object, in which case we want to be more careful about how we go about setting up dependency/dependent links and data comms requests. --- distributed/worker.py | 38 +++++++++++++++++++++++++++++++------- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 186c1a2e869..14c6438eb92 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1506,15 +1506,38 @@ def add_task( for dependency, workers in who_has.items(): assert workers if dependency not in self.tasks: + # initial state is "new" + # this dependency does not already exist on worker self.tasks[dependency] = dep_ts = TaskState(key=dependency) - dep_ts = self.tasks[dependency] - ts.dependencies.add(dep_ts) - dep_ts.dependents.add(ts) + # link up to child / parents + ts.dependencies.add(dep_ts) + dep_ts.dependents.add(ts) - state = "fetch" if dependency not in self.data else "memory" - self.transition(dep_ts, state) - self.log.append((dependency, "new-dep", dep_ts.state, ts)) + # check to ensure task wasn't already executed and partially released + # # TODO: make this less bad + state = "fetch" if dependency not in self.data else "memory" + + # transition from new -> fetch handles adding dependency + # to waiting_for_data + self.transition(dep_ts, state) + + self.log.append((dependency, "new-dep", dep_ts.state, ts)) + + else: + # task was already present on worker + dep_ts = self.tasks[dependency] + + # link up to child / parents + ts.dependencies.add(dep_ts) + dep_ts.dependents.add(ts) + + # possible that a new task has asked for a dependency we're already + # planning on fetching -- make sure that task knows it has to + # wait for its dependencies to arrive + if dep_ts.state in ("fetch", "flight"): + ts.waiting_for_data.add(dep_ts.key) + self.data_needed.append(ts.key) dep_ts.who_has.update(workers) @@ -1541,7 +1564,7 @@ def add_task( for worker, keys in self.has_what.items(): for k in keys: # TODO: is this getting tripped up by stealing? - # assert worker in self.tasks[k].who_has, breakpoint() + assert worker in self.tasks[k].who_has, breakpoint() pass if who_has: assert all(self.tasks[dep] in ts.dependencies for dep in who_has) @@ -1621,6 +1644,7 @@ def transition_flight_fetch(self, ts, worker=None, remove=True): if self.validate: assert ts.state == "flight" + breakpoint() self.in_flight_tasks -= 1 ts.coming_from = None ts.runspec = runspec or ts.runspec From 36b0de52c27eec5a5f8dce72b6ec3151adac5772 Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Fri, 29 Jan 2021 16:00:36 -0500 Subject: [PATCH 06/29] Release keys in `fetch` if no dependents --- distributed/worker.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 14c6438eb92..ab35d09fe55 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1550,13 +1550,8 @@ def add_task( for key, value in nbytes.items(): self.tasks[key].nbytes = value - # If a worker dies there can be a scheduler-triggered reassignment - # of the task leading to a state change from `fetch` -> `waiting` - # This can cause problems if a worker goes looking for who has a - # dependency when the answer is suddenly "me" - # To stop this, update who has what after all dependencies - # have been (re)created. # TODO: move this into the appropriate transition functions + # or remove it altogether self.update_who_has(who_has) if not ts.waiting_for_data: self.transition(ts, "ready") @@ -2359,6 +2354,7 @@ def steal_request(self, key): else: state = None + print(f"stealing {key}") response = {"op": "steal-response", "key": key, "state": state} self.batched_stream.send(response) @@ -2366,8 +2362,8 @@ def steal_request(self, key): # Resetting the runspec should be reset by the transition. However, # the waiting->waiting transition results in a no-op which would not # reset. - self.transition(ts, "waiting") ts.runspec = None + self.transition(ts, "waiting") if not ts.dependents: self.release_key(ts.key) if self.validate: @@ -2399,6 +2395,7 @@ def release_key(self, key, cause=None, reason=None, report=True): if not dependency.dependents and dependency.state in ( "waiting", "flight", + "fetch", ): self.release_key(dependency.key) From e051e277353fb25e7e4d986f734d3c87e7340d84 Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Fri, 29 Jan 2021 16:32:42 -0500 Subject: [PATCH 07/29] Don't overload `data_needed` Only add task once to data_needed, not once for _every_ dep it might need (otherwise everything times out when you steal) --- distributed/worker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index ab35d09fe55..8f68815e978 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1537,7 +1537,6 @@ def add_task( # wait for its dependencies to arrive if dep_ts.state in ("fetch", "flight"): ts.waiting_for_data.add(dep_ts.key) - self.data_needed.append(ts.key) dep_ts.who_has.update(workers) @@ -1553,7 +1552,9 @@ def add_task( # TODO: move this into the appropriate transition functions # or remove it altogether self.update_who_has(who_has) - if not ts.waiting_for_data: + if ts.waiting_for_data: + self.data_needed.append(ts.key) + else: self.transition(ts, "ready") if self.validate: for worker, keys in self.has_what.items(): @@ -2354,7 +2355,6 @@ def steal_request(self, key): else: state = None - print(f"stealing {key}") response = {"op": "steal-response", "key": key, "state": state} self.batched_stream.send(response) From 2f313443f64f04cf4e7a23daf8bc3ad93fe96502 Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Fri, 29 Jan 2021 16:36:40 -0500 Subject: [PATCH 08/29] Remove breakpoints --- distributed/worker.py | 21 ++++++--------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 8f68815e978..54a9981f65e 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1559,8 +1559,7 @@ def add_task( if self.validate: for worker, keys in self.has_what.items(): for k in keys: - # TODO: is this getting tripped up by stealing? - assert worker in self.tasks[k].who_has, breakpoint() + assert worker in self.tasks[k].who_has pass if who_has: assert all(self.tasks[dep] in ts.dependencies for dep in who_has) @@ -1582,11 +1581,7 @@ def transition(self, ts, finish, **kwargs): start = ts.state if start == finish: return - try: - func = self._transitions[start, finish] - except KeyError: - breakpoint() - pass + func = self._transitions[start, finish] state = func(ts, **kwargs) self.log.append((ts.key, start, state or finish)) ts.state = state or finish @@ -1598,7 +1593,7 @@ def transition_new_waiting(self, ts): if self.validate: assert ts.state == "new" assert ts.runspec is not None - assert not ts.who_has, breakpoint() + assert not ts.who_has def transition_new_fetch(self, ts): if self.validate: @@ -1640,7 +1635,6 @@ def transition_flight_fetch(self, ts, worker=None, remove=True): if self.validate: assert ts.state == "flight" - breakpoint() self.in_flight_tasks -= 1 ts.coming_from = None ts.runspec = runspec or ts.runspec @@ -1709,7 +1703,7 @@ def transition_waiting_ready(self, ts): assert all( dep.key in self.data or dep.key in self.actors for dep in ts.dependencies - ), breakpoint() + ) assert all(dep.state == "memory" for dep in ts.dependencies) assert ts.key not in self.ready @@ -3042,9 +3036,7 @@ def validate_task_waiting(self, ts): assert ts.key not in self.data assert ts.state == "waiting" if ts.dependencies and ts.runspec: - assert not all( - dep.key in self.data for dep in ts.dependencies - ), breakpoint() + assert not all(dep.key in self.data for dep in ts.dependencies) def validate_task_flight(self, ts): assert ts.key not in self.data @@ -3104,8 +3096,7 @@ def validate_state(self): for worker, keys in self.has_what.items(): for k in keys: - # assert worker in self.tasks[k].who_has, breakpoint() - pass + assert worker in self.tasks[k].who_has for ts in self.tasks.values(): self.validate_task(ts) From 1c7db0430d96535542fe7254bfc7385fe560606e Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Sat, 30 Jan 2021 10:14:40 -0500 Subject: [PATCH 09/29] Add updated task transitions to test_worker_plugin --- distributed/diagnostics/tests/test_worker_plugin.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/distributed/diagnostics/tests/test_worker_plugin.py b/distributed/diagnostics/tests/test_worker_plugin.py index 79d180c360e..858f8feedc2 100644 --- a/distributed/diagnostics/tests/test_worker_plugin.py +++ b/distributed/diagnostics/tests/test_worker_plugin.py @@ -76,6 +76,7 @@ async def test_create_on_construction(c, s, a, b): @gen_cluster(nthreads=[("127.0.0.1", 1)], client=True) async def test_normal_task_transitions_called(c, s, w): expected_notifications = [ + {"key": "task", "start": "new", "finish": "waiting"}, {"key": "task", "start": "waiting", "finish": "ready"}, {"key": "task", "start": "ready", "finish": "executing"}, {"key": "task", "start": "executing", "finish": "memory"}, @@ -95,6 +96,7 @@ def failing(x): raise Exception() expected_notifications = [ + {"key": "task", "start": "new", "finish": "waiting"}, {"key": "task", "start": "waiting", "finish": "ready"}, {"key": "task", "start": "ready", "finish": "executing"}, {"key": "task", "start": "executing", "finish": "error"}, @@ -113,6 +115,7 @@ def failing(x): ) async def test_superseding_task_transitions_called(c, s, w): expected_notifications = [ + {"key": "task", "start": "new", "finish": "waiting"}, {"key": "task", "start": "waiting", "finish": "constrained"}, {"key": "task", "start": "constrained", "finish": "executing"}, {"key": "task", "start": "executing", "finish": "memory"}, @@ -131,9 +134,11 @@ async def test_release_dep_called(c, s, w): dsk = {"dep": 1, "task": (inc, "dep")} expected_notifications = [ + {"key": "dep", "start": "new", "finish": "waiting"}, {"key": "dep", "start": "waiting", "finish": "ready"}, {"key": "dep", "start": "ready", "finish": "executing"}, {"key": "dep", "start": "executing", "finish": "memory"}, + {"key": "task", "start": "new", "finish": "waiting"}, {"key": "task", "start": "waiting", "finish": "ready"}, {"key": "task", "start": "ready", "finish": "executing"}, {"key": "task", "start": "executing", "finish": "memory"}, From 7e74a7d537b9bb206d27b010d356f0085dbb4186 Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Sat, 30 Jan 2021 11:09:57 -0500 Subject: [PATCH 10/29] Remove bad pass --- distributed/worker.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/distributed/worker.py b/distributed/worker.py index 54a9981f65e..fc746ea3824 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1488,6 +1488,8 @@ def add_task( ) self.transition(ts, "waiting") + # TODO: move transition of `ts` to end of `add_task` + if priority is not None: priority = tuple(priority) + (self.generation,) self.generation -= 1 @@ -1560,7 +1562,6 @@ def add_task( for worker, keys in self.has_what.items(): for k in keys: assert worker in self.tasks[k].who_has - pass if who_has: assert all(self.tasks[dep] in ts.dependencies for dep in who_has) assert all(self.tasks[dep.key] for dep in ts.dependencies) From 4b467d905e27aa52a84836b7acc98b27a615071f Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Sat, 30 Jan 2021 11:26:49 -0500 Subject: [PATCH 11/29] Only use `who_has` and `has_what` for non-local tasks Should only be set for `fetch` or `flight` -- possible following failed workers that a task will end up looking remote when it isn't, so we discard the key from `has_what` when a task is transitioned to `ready` --- distributed/worker.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index fc746ea3824..7505891a781 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1540,11 +1540,13 @@ def add_task( if dep_ts.state in ("fetch", "flight"): ts.waiting_for_data.add(dep_ts.key) - dep_ts.who_has.update(workers) + if dep_ts.state in ("fetch", "flight"): + # if we _need_ to grab data or are in the process + dep_ts.who_has.update(workers) - for worker in workers: - self.has_what[worker].add(dep_ts.key) - if dep_ts.state != "memory": + for worker in workers: + self.has_what[worker].add(dep_ts.key) + #if dep_ts.state != "memory": self.pending_data_per_worker[worker].append(dep_ts.key) if nbytes is not None: @@ -1553,7 +1555,7 @@ def add_task( # TODO: move this into the appropriate transition functions # or remove it altogether - self.update_who_has(who_has) + #self.update_who_has(who_has) if ts.waiting_for_data: self.data_needed.append(ts.key) else: @@ -1709,8 +1711,11 @@ def transition_waiting_ready(self, ts): assert ts.key not in self.ready # ts.waiting_for_data.clear() - # TODO WHAT? - # self.has_what.update() + # TODO this should probably only be set for tasks that are in `fetch` + # but there's some weirdness with `has_what` being out-of-sync + # when workers fail + #ts.who_has.add(self.address) + self.has_what[self.address].discard(ts.key) if ts.resource_restrictions is not None: self.constrained.append(ts.key) From 7ee6797a99dfdb516a90a456278f5d1086b75edb Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Sat, 30 Jan 2021 12:17:38 -0500 Subject: [PATCH 12/29] black --- distributed/worker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 7505891a781..a2c56f899ad 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1546,7 +1546,7 @@ def add_task( for worker in workers: self.has_what[worker].add(dep_ts.key) - #if dep_ts.state != "memory": + # if dep_ts.state != "memory": self.pending_data_per_worker[worker].append(dep_ts.key) if nbytes is not None: @@ -1555,7 +1555,7 @@ def add_task( # TODO: move this into the appropriate transition functions # or remove it altogether - #self.update_who_has(who_has) + # self.update_who_has(who_has) if ts.waiting_for_data: self.data_needed.append(ts.key) else: @@ -1714,7 +1714,7 @@ def transition_waiting_ready(self, ts): # TODO this should probably only be set for tasks that are in `fetch` # but there's some weirdness with `has_what` being out-of-sync # when workers fail - #ts.who_has.add(self.address) + # ts.who_has.add(self.address) self.has_what[self.address].discard(ts.key) if ts.resource_restrictions is not None: From 24cb8ab220e51c5ba5b338b425a939909b369b42 Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Mon, 1 Feb 2021 10:57:33 -0500 Subject: [PATCH 13/29] Transitions for scheduler reassignment following broken workers Tasks can transition from `fetch` -> `waiting` or `flight` -> `waiting` if the worker that has the data dies before the transfer is completed. If that task is reassigned to a worker that was expecting just the data, add the `runspec` and clear out any references to where the data _was_ coming from since it will now be executed locally. --- distributed/worker.py | 56 ++++++++++++++++++++++++++++++++++--------- 1 file changed, 45 insertions(+), 11 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index a2c56f899ad..83b08bd9483 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -459,6 +459,7 @@ def __init__( ("flight", "fetch"): self.transition_flight_fetch, # Scheduler intercession ("fetch", "waiting"): self.transition_fetch_waiting, + ("flight", "waiting"): self.transition_flight_waiting, # Errors, long-running, constrained ("waiting", "error"): self.transition_waiting_done, ("constrained", "executing"): self.transition_constrained_executing, @@ -469,7 +470,6 @@ def __init__( ("long-running", "memory"): self.transition_executing_done, ("long-running", "rescheduled"): self.transition_executing_done, # Stealing transitions - # ("waiting", "new"): self.transition_waiting_new, ("ready", "waiting"): self.transition_ready_waiting, ("constrained", "waiting"): self.transition_constrained_waiting, } @@ -1462,7 +1462,6 @@ def add_task( runspec = SerializedTask(function, args, kwargs, task) if key in self.tasks: ts = self.tasks[key] - ts.runspec = runspec if ts.state == "memory": assert key in self.data or key in self.actors logger.debug( @@ -1476,11 +1475,10 @@ def add_task( ts.exception = None ts.traceback = None else: + # This is a scheduler re-assignment + # Either `fetch` -> `waiting` or `flight` -> `waiting` self.log.append((ts.key, "re-adding key, new TaskState")) - self.tasks[key] = ts = TaskState( - key=key, runspec=SerializedTask(function, args, kwargs, task) - ) - self.transition(ts, "waiting") + self.transition(ts, "waiting", runspec=runspec) else: self.log.append((key, "new")) self.tasks[key] = ts = TaskState( @@ -1489,6 +1487,8 @@ def add_task( self.transition(ts, "waiting") # TODO: move transition of `ts` to end of `add_task` + # This will require a chained recommendation transition system like + # the scheduler if priority is not None: priority = tuple(priority) + (self.generation,) @@ -1553,9 +1553,6 @@ def add_task( for key, value in nbytes.items(): self.tasks[key].nbytes = value - # TODO: move this into the appropriate transition functions - # or remove it altogether - # self.update_who_has(who_has) if ts.waiting_for_data: self.data_needed.append(ts.key) else: @@ -1609,11 +1606,48 @@ def transition_new_fetch(self, ts): self.data_needed.append(ts.key) self.waiting_for_data_count += 1 - def transition_fetch_waiting(self, ts): + def transition_fetch_waiting(self, ts, runspec=None): + """This is a rescheduling transition that occurs after a worker failure. + A task was available from another worker but that worker died and the + scheduler reassigned the task for computation here. + """ if self.validate: assert ts.state == "fetch" - assert ts.runspec is not None + assert ts.runspec is None + + ts.runspec = runspec + + # remove any stale entries in `has_what` + for worker in self.has_what.keys(): + self.has_what[worker].discard(ts.key) + + # clear `who_has` of stale info + ts.who_has.clear() + + # remove entry from dependents to avoid a spurious `gather_dep` call`` + for dependent in ts.dependents: + dependent.waiting_for_data.discard(ts.key) + + def transition_flight_waiting(self, ts, runspec=None): + """This is a rescheduling transition that occurs after + a worker failure. A task was in flight from another worker to this + worker when that worker died and the scheduler reassigned the task for + computation here. + """ + if self.validate: + assert ts.state == "flight" + assert ts.runspec is None + + ts.runspec = runspec + + # remove any stale entries in `has_what` + for worker in self.has_what.keys(): + self.has_what[worker].discard(ts.key) + + # clear `who_has` of stale info + ts.who_has.clear() + # remove entry from dependents to avoid a spurious `gather_dep` call`` for dependent in ts.dependents: dependent.waiting_for_data.discard(ts.key) From 7498285cb361c76bb16689702fb99df173590d2c Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Mon, 1 Feb 2021 11:07:32 -0500 Subject: [PATCH 14/29] validate method for fetch state --- distributed/worker.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/distributed/worker.py b/distributed/worker.py index 83b08bd9483..b702b456e78 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3083,6 +3083,10 @@ def validate_task_flight(self, ts): assert not any(dep.key in self.ready for dep in ts.dependents) assert ts.key in self.in_flight_workers[ts.coming_from] + def validate_task_fetch(self, ts): + assert ts.runspec is None + assert ts.key not in self.data + def validate_task(self, ts): try: if ts.state == "memory": @@ -3095,6 +3099,8 @@ def validate_task(self, ts): self.validate_task_executing(ts) elif ts.state == "flight": self.validate_task_flight(ts) + elif ts.state == "fetch": + self.validate_task_fetch(ts) except Exception as e: logger.exception(e) if LOG_PDB: From 40a5ae3c7ddf93a5b3d69c9cecba9a764cd64435 Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Mon, 1 Feb 2021 11:11:25 -0500 Subject: [PATCH 15/29] add try/catch to new transition functions --- distributed/worker.py | 108 +++++++++++++++++++++++++++--------------- 1 file changed, 70 insertions(+), 38 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index b702b456e78..2987d2cc9b6 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -114,13 +114,13 @@ class TaskState: The priority this task given by the scheduler. Determines run order. * **state**: ``str`` The current state of the task. One of ["waiting", "ready", "executing", - "memory", "flight", "long-running", "rescheduled", "error"] + "fetch", "memory", "flight", "long-running", "rescheduled", "error"] * **who_has**: ``set(worker)`` Workers that we believe have this data * **coming_from**: ``str`` The worker that current task data is coming from if task is in flight * **waiting_for_data**: ``set(keys of dependencies)`` - A dynamic verion of dependencies. All dependencies that we still don't + A dynamic version of dependencies. All dependencies that we still don't have for a particular key. * **resource_restrictions**: ``{str: number}`` Abstract resources required to run a task @@ -457,7 +457,7 @@ def __init__( ("executing", "memory"): self.transition_executing_done, ("flight", "memory"): self.transition_flight_memory, ("flight", "fetch"): self.transition_flight_fetch, - # Scheduler intercession + # Scheduler intercession (re-assignment) ("fetch", "waiting"): self.transition_fetch_waiting, ("flight", "waiting"): self.transition_flight_waiting, # Errors, long-running, constrained @@ -1590,43 +1590,67 @@ def transition(self, ts, finish, **kwargs): self._notify_plugins("transition", ts.key, start, state or finish, **kwargs) def transition_new_waiting(self, ts): - if self.validate: - assert ts.state == "new" - assert ts.runspec is not None - assert not ts.who_has + try: + if self.validate: + assert ts.state == "new" + assert ts.runspec is not None + assert not ts.who_has + except Exception as e: + logger.exception(e) + if LOG_PDB: + import pdb + + pdb.set_trace() + raise def transition_new_fetch(self, ts): - if self.validate: - assert ts.state == "new" - assert ts.runspec is None + try: + if self.validate: + assert ts.state == "new" + assert ts.runspec is None - for dependent in ts.dependents: - dependent.waiting_for_data.add(ts.key) + for dependent in ts.dependents: + dependent.waiting_for_data.add(ts.key) + + self.data_needed.append(ts.key) + self.waiting_for_data_count += 1 + except Exception as e: + logger.exception(e) + if LOG_PDB: + import pdb - self.data_needed.append(ts.key) - self.waiting_for_data_count += 1 + pdb.set_trace() + raise def transition_fetch_waiting(self, ts, runspec=None): """This is a rescheduling transition that occurs after a worker failure. A task was available from another worker but that worker died and the scheduler reassigned the task for computation here. """ - if self.validate: - assert ts.state == "fetch" - assert ts.runspec is None + try: + if self.validate: + assert ts.state == "fetch" + assert ts.runspec is None - ts.runspec = runspec + ts.runspec = runspec + + # remove any stale entries in `has_what` + for worker in self.has_what.keys(): + self.has_what[worker].discard(ts.key) - # remove any stale entries in `has_what` - for worker in self.has_what.keys(): - self.has_what[worker].discard(ts.key) + # clear `who_has` of stale info + ts.who_has.clear() - # clear `who_has` of stale info - ts.who_has.clear() + # remove entry from dependents to avoid a spurious `gather_dep` call`` + for dependent in ts.dependents: + dependent.waiting_for_data.discard(ts.key) + except Exception as e: + logger.exception(e) + if LOG_PDB: + import pdb - # remove entry from dependents to avoid a spurious `gather_dep` call`` - for dependent in ts.dependents: - dependent.waiting_for_data.discard(ts.key) + pdb.set_trace() + raise def transition_flight_waiting(self, ts, runspec=None): """This is a rescheduling transition that occurs after @@ -1634,22 +1658,30 @@ def transition_flight_waiting(self, ts, runspec=None): worker when that worker died and the scheduler reassigned the task for computation here. """ - if self.validate: - assert ts.state == "flight" - assert ts.runspec is None + try: + if self.validate: + assert ts.state == "flight" + assert ts.runspec is None - ts.runspec = runspec + ts.runspec = runspec - # remove any stale entries in `has_what` - for worker in self.has_what.keys(): - self.has_what[worker].discard(ts.key) + # remove any stale entries in `has_what` + for worker in self.has_what.keys(): + self.has_what[worker].discard(ts.key) - # clear `who_has` of stale info - ts.who_has.clear() + # clear `who_has` of stale info + ts.who_has.clear() - # remove entry from dependents to avoid a spurious `gather_dep` call`` - for dependent in ts.dependents: - dependent.waiting_for_data.discard(ts.key) + # remove entry from dependents to avoid a spurious `gather_dep` call`` + for dependent in ts.dependents: + dependent.waiting_for_data.discard(ts.key) + except Exception as e: + logger.exception(e) + if LOG_PDB: + import pdb + + pdb.set_trace() + raise def transition_fetch_flight(self, ts, worker=None): try: From 7224111a9bdec5ed5cd7eed138de9787f0c2a2b7 Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Mon, 1 Feb 2021 14:40:41 -0500 Subject: [PATCH 16/29] Remove commented out code --- distributed/worker.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 2987d2cc9b6..852b881f8d4 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1776,11 +1776,6 @@ def transition_waiting_ready(self, ts): assert all(dep.state == "memory" for dep in ts.dependencies) assert ts.key not in self.ready - # ts.waiting_for_data.clear() - # TODO this should probably only be set for tasks that are in `fetch` - # but there's some weirdness with `has_what` being out-of-sync - # when workers fail - # ts.who_has.add(self.address) self.has_what[self.address].discard(ts.key) if ts.resource_restrictions is not None: From 66171d784b9a0de783474d2e674ac5fae8933a1a Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Mon, 1 Feb 2021 14:40:58 -0500 Subject: [PATCH 17/29] Update docs on internal scheduling --- docs/source/images/worker-dep-state.dot | 4 +- docs/source/images/worker-dep-state.svg | 65 ++++++++----- docs/source/images/worker-task-state.dot | 1 + docs/source/images/worker-task-state.svg | 111 ++++++++++++++--------- docs/source/worker.rst | 32 +++++-- 5 files changed, 141 insertions(+), 72 deletions(-) diff --git a/docs/source/images/worker-dep-state.dot b/docs/source/images/worker-dep-state.dot index 8da78a1c789..18a5e40cfac 100644 --- a/docs/source/images/worker-dep-state.dot +++ b/docs/source/images/worker-dep-state.dot @@ -3,6 +3,8 @@ digraph{ bgcolor="#FFFFFFF00", rankdir=LR, ]; - waiting -> flight; + new -> fetch; + fetch -> flight; + flight -> fetch; flight -> memory; } diff --git a/docs/source/images/worker-dep-state.svg b/docs/source/images/worker-dep-state.svg index 412d4dc7d3a..9b7b04e8d99 100644 --- a/docs/source/images/worker-dep-state.svg +++ b/docs/source/images/worker-dep-state.svg @@ -1,38 +1,61 @@ - - + %3 - - -waiting - -waiting + + + +new + +new + + + +fetch + +fetch + + + +new->fetch + + -flight - -flight + +flight + +flight + + + +fetch->flight + + - -waiting->flight - - + + +flight->fetch + + -memory - -memory + +memory + +memory -flight->memory - - + +flight->memory + + diff --git a/docs/source/images/worker-task-state.dot b/docs/source/images/worker-task-state.dot index 5bd9ac93a8f..4a6fc8cbacf 100644 --- a/docs/source/images/worker-task-state.dot +++ b/docs/source/images/worker-task-state.dot @@ -3,6 +3,7 @@ digraph{ bgcolor="#FFFFFFF00", rankdir=LR, ]; + new -> waiting; waiting -> ready; ready -> executing; executing -> "long-running"; diff --git a/docs/source/images/worker-task-state.svg b/docs/source/images/worker-task-state.svg index 2b307d80d64..edc4b834044 100644 --- a/docs/source/images/worker-task-state.svg +++ b/docs/source/images/worker-task-state.svg @@ -1,78 +1,103 @@ - - + %3 - + + + +new + +new + -waiting - -waiting + +waiting + +waiting + + + +new->waiting + + -ready - -ready + +ready + +ready -waiting->ready - - + +waiting->ready + + -executing - -executing + +executing + +executing -ready->executing - - + +ready->executing + + -long-running - -long-running + +long-running + +long-running -executing->long-running - - + +executing->long-running + + -memory - -memory + +memory + +memory -executing->memory - - + +executing->memory + + -error - -error + +error + +error -executing->error - - + +executing->error + + -long-running->memory - - + +long-running->memory + + -long-running->error - - + +long-running->error + + diff --git a/docs/source/worker.rst b/docs/source/worker.rst index 77a361dc686..eaf547a8321 100644 --- a/docs/source/worker.rst +++ b/docs/source/worker.rst @@ -89,13 +89,18 @@ Internal Scheduling ------------------- Internally tasks that come to the scheduler proceed through the following -pipeline: +pipeline as :py:class:`distributed.worker.TaskState` objects. Tasks which +follow this path have a :py:attr:`distributed.worker.TaskState.runspec` defined +which instructs the worker how to execute them. .. image:: images/worker-task-state.svg :alt: Dask worker task states -The worker also tracks data dependencies that are required to run the tasks -above. These follow through a simpler pipeline: +Data dependencies are also represented as +:py:class:`distributed.worker.TaskState` objects and follow a simpler path +through the execution pipeline. These tasks do not have a +:py:attr:`distributed.worker.TaskState.runspec` defined and instead contain a +listing of workers to collect their result from. .. image:: images/worker-dep-state.svg @@ -108,9 +113,12 @@ dependency from that worker. To improve bandwidth we opportunistically gather other dependencies of other tasks that are known to be on that worker, up to a maximum of 200MB of data (too little data and bandwidth suffers, too much data and responsiveness suffers). We use a fixed number of connections (around -10-50) so as to avoid overly-fragmenting our network bandwidth. After all -dependencies for a task are in memory we transition the task to the ready state -and put the task again into a heap of tasks that are ready to run. +10-50) so as to avoid overly-fragmenting our network bandwidth. In the event +that the network comms between two workers are saturated, a dependency task may +cycle between ``fetch`` and ``flight`` until it is successfully collected. + +After all dependencies for a task are in memory we transition the task to the +ready state and put the task again into a heap of tasks that are ready to run. We collect from this heap and put the task into a thread from a local thread pool to execute. @@ -122,7 +130,17 @@ thread pool. A task either errs or its result is put into memory. In either case a response is sent back to the scheduler. -.. _memman:: +Tasks slated for execution and tasks marked for collection from other workers +must follow their respective transition paths as defined above. The only +exceptions to this are when: + +* A task is `stolen `_, in which case a task which might have + been collected will instead be executed on the thieving worker +* Scheduler intercession, in which the scheduler reassigns a task that was + previously assigned to a separate worker to a new worker. This most commonly + occurs when a `worker dies `_ during computation. + +.. _memman: Memory Management ----------------- From 8f0007da8083fe4ba880093f8c8862512ccca9ec Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Tue, 2 Feb 2021 11:09:35 -0500 Subject: [PATCH 18/29] Fuse state checks and gate data_needed --- distributed/worker.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 852b881f8d4..bcf3e1dec8d 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1534,19 +1534,14 @@ def add_task( ts.dependencies.add(dep_ts) dep_ts.dependents.add(ts) - # possible that a new task has asked for a dependency we're already - # planning on fetching -- make sure that task knows it has to - # wait for its dependencies to arrive - if dep_ts.state in ("fetch", "flight"): - ts.waiting_for_data.add(dep_ts.key) - if dep_ts.state in ("fetch", "flight"): # if we _need_ to grab data or are in the process + ts.waiting_for_data.add(dep_ts.key) + # Ensure we know which workers to grab data from dep_ts.who_has.update(workers) for worker in workers: self.has_what[worker].add(dep_ts.key) - # if dep_ts.state != "memory": self.pending_data_per_worker[worker].append(dep_ts.key) if nbytes is not None: @@ -1612,8 +1607,9 @@ def transition_new_fetch(self, ts): for dependent in ts.dependents: dependent.waiting_for_data.add(ts.key) - self.data_needed.append(ts.key) - self.waiting_for_data_count += 1 + if ts.key not in self.data_needed: + self.data_needed.append(ts.key) + self.waiting_for_data_count += 1 except Exception as e: logger.exception(e) if LOG_PDB: @@ -1631,6 +1627,7 @@ def transition_fetch_waiting(self, ts, runspec=None): if self.validate: assert ts.state == "fetch" assert ts.runspec is None + assert runspec is not None ts.runspec = runspec @@ -1662,6 +1659,7 @@ def transition_flight_waiting(self, ts, runspec=None): if self.validate: assert ts.state == "flight" assert ts.runspec is None + assert runspec is not None ts.runspec = runspec From d577751f27e8498441cb9f6792af7ff11bc00d4d Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Tue, 2 Feb 2021 11:16:09 -0500 Subject: [PATCH 19/29] Remove `remove=` kwarg from fetch transition Trying this out as it might not be needed anymore (but can revert if test complain) --- distributed/worker.py | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index bcf3e1dec8d..e07e450209d 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1697,7 +1697,7 @@ def transition_fetch_flight(self, ts, worker=None): pdb.set_trace() raise - def transition_flight_fetch(self, ts, worker=None, remove=True): + def transition_flight_fetch(self, ts, worker=None, runspec=None): try: if self.validate: assert ts.state == "flight" @@ -1705,12 +1705,6 @@ def transition_flight_fetch(self, ts, worker=None, remove=True): self.in_flight_tasks -= 1 ts.coming_from = None ts.runspec = runspec or ts.runspec - if remove: - try: - ts.who_has.remove(worker) - self.has_what[worker].remove(ts.key) - except KeyError: - pass if not ts.who_has: if ts.key not in self._missing_dep_flight: @@ -1719,10 +1713,7 @@ def transition_flight_fetch(self, ts, worker=None, remove=True): for dependent in ts.dependents: dependent.waiting_for_data.add(ts.key) if dependent.state == "waiting": - if remove: # try a new worker immediately - self.data_needed.appendleft(dependent.key) - else: # worker was probably busy, wait a while - self.data_needed.append(dependent.key) + self.data_needed.append(dependent.key) if not ts.dependents: self.release_key(ts.key) @@ -2288,8 +2279,7 @@ async def gather_dep(self, worker, dep, deps, total_nbytes, cause=None): self.release_key(d) continue elif ts.state not in ("ready", "memory"): - # "waiting" or "fetch"? - self.transition(ts, "fetch", worker=worker, remove=not busy) + self.transition(ts, "fetch", worker=worker) if not busy and d not in data and ts.dependents: self.log.append(("missing-dep", d)) From 56ad76eaab9bdef0dda5d2fe09920bb070381fe9 Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Tue, 2 Feb 2021 12:47:26 -0500 Subject: [PATCH 20/29] Set runspec to positional arg in rescheduling transitions --- distributed/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index e07e450209d..845bcb1a4e4 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1618,7 +1618,7 @@ def transition_new_fetch(self, ts): pdb.set_trace() raise - def transition_fetch_waiting(self, ts, runspec=None): + def transition_fetch_waiting(self, ts, runspec): """This is a rescheduling transition that occurs after a worker failure. A task was available from another worker but that worker died and the scheduler reassigned the task for computation here. @@ -1649,7 +1649,7 @@ def transition_fetch_waiting(self, ts, runspec=None): pdb.set_trace() raise - def transition_flight_waiting(self, ts, runspec=None): + def transition_flight_waiting(self, ts, runspec): """This is a rescheduling transition that occurs after a worker failure. A task was in flight from another worker to this worker when that worker died and the scheduler reassigned the task for From da831f4aaed62d024622c71165cefc80ae17f506 Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Wed, 3 Feb 2021 14:38:27 -0500 Subject: [PATCH 21/29] Remove `stealing` transitions --- distributed/worker.py | 36 +++++------------------------------- 1 file changed, 5 insertions(+), 31 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 845bcb1a4e4..5badca69b3d 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -469,9 +469,6 @@ def __init__( ("long-running", "error"): self.transition_executing_done, ("long-running", "memory"): self.transition_executing_done, ("long-running", "rescheduled"): self.transition_executing_done, - # Stealing transitions - ("ready", "waiting"): self.transition_ready_waiting, - ("constrained", "waiting"): self.transition_constrained_waiting, } self.incoming_transfer_log = deque(maxlen=100000) @@ -1832,24 +1829,6 @@ def transition_ready_memory(self, ts, value=None): self.put_key_in_memory(ts, value=value) self.send_task_state_to_scheduler(ts) - def transition_ready_waiting(self, ts): - """ - This transition is common for work stealing - """ - ts.runspec = None - - def transition_waiting_new(self, ts): - """ - Common in work stealing - """ - ts.runspec = None - - def transition_constrained_waiting(self, ts): - """ - Common in work stealing - """ - ts.runspec = None - def transition_constrained_executing(self, ts): self.transition_ready_executing(ts) for resource, quantity in ts.resource_restrictions.items(): @@ -2408,17 +2387,12 @@ def steal_request(self, key): self.batched_stream.send(response) if state in ("ready", "waiting", "constrained"): - # Resetting the runspec should be reset by the transition. However, - # the waiting->waiting transition results in a no-op which would not - # reset. - ts.runspec = None - self.transition(ts, "waiting") - if not ts.dependents: - self.release_key(ts.key) - if self.validate: - assert ts.key not in self.tasks + # If task is marked as "constrained" we haven't yet assigned it an + # `available_resources` to run on, that happens in + # `transition_constrained_executing` + self.release_key(ts.key) if self.validate: - assert ts.runspec is None + assert ts.key not in self.tasks def release_key(self, key, cause=None, reason=None, report=True): try: From 3f40266ac3c7bdd7f0a35ddfc7cc8a364c1e311b Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Wed, 3 Feb 2021 17:04:05 -0500 Subject: [PATCH 22/29] Fix for deprecation warning in tests --- distributed/pytest_resourceleaks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/pytest_resourceleaks.py b/distributed/pytest_resourceleaks.py index 348472892d6..f86117c7862 100644 --- a/distributed/pytest_resourceleaks.py +++ b/distributed/pytest_resourceleaks.py @@ -30,7 +30,7 @@ def pytest_addoption(parser): group.addoption( "--leaks-timeout", action="store", - type="float", + type=float, dest="leaks_timeout", default=0.5, help="""\ From f50ab4e6a5c62f27d469f7108b8142868db28185 Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Wed, 3 Mar 2021 14:54:01 -0500 Subject: [PATCH 23/29] Add "cause" to most calls to release_key --- distributed/worker.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 5badca69b3d..448d4ed2cc8 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1417,7 +1417,7 @@ def delete_data(self, comm=None, keys=None, report=True): if keys: for key in list(keys): self.log.append((key, "delete")) - self.release_key(key) + self.release_key(key, cause="delete data") logger.debug("Worker %s -- Deleted %d keys", self.name, len(keys)) return "OK" @@ -1521,7 +1521,9 @@ def add_task( # to waiting_for_data self.transition(dep_ts, state) - self.log.append((dependency, "new-dep", dep_ts.state, ts)) + self.log.append( + (dependency, "new-dep", dep_ts.state, f"requested by {ts.key}") + ) else: # task was already present on worker @@ -1713,7 +1715,8 @@ def transition_flight_fetch(self, ts, worker=None, runspec=None): self.data_needed.append(dependent.key) if not ts.dependents: - self.release_key(ts.key) + self.release_key(ts.key, cause="transition flight->fetch") + except Exception as e: logger.exception(e) if LOG_PDB: @@ -2255,7 +2258,7 @@ async def gather_dep(self, worker, dep, deps, total_nbytes, cause=None): if not busy and d in data: self.transition(ts, "memory", value=data[d]) elif ts is None or ts.state == "executing": - self.release_key(d) + self.release_key(d, cause="already executing at gather") continue elif ts.state not in ("ready", "memory"): self.transition(ts, "fetch", worker=worker) @@ -2291,7 +2294,7 @@ def bad_dep(self, dep): ts.exception = msg["exception"] ts.traceback = msg["traceback"] self.transition(ts, "error") - self.release_key(dep.key) + self.release_key(dep.key, cause="bad dep") async def handle_missing_dep(self, *deps, **kwargs): self.log.append(("handle-missing", deps)) @@ -2390,7 +2393,7 @@ def steal_request(self, key): # If task is marked as "constrained" we haven't yet assigned it an # `available_resources` to run on, that happens in # `transition_constrained_executing` - self.release_key(ts.key) + self.release_key(ts.key, cause="stolen") if self.validate: assert ts.key not in self.tasks From e7b86de73c03611ddd925393b7baba681750d260 Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Wed, 3 Mar 2021 14:54:43 -0500 Subject: [PATCH 24/29] Don't release keys in flight There's a very real chance here for a task to be reassigned here and the extra task doesn't hurt anything. --- distributed/worker.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 448d4ed2cc8..ce5d828c0f0 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2418,12 +2418,14 @@ def release_key(self, key, cause=None, reason=None, report=True): # for any dependencies of key we are releasing remove task as dependent for dependency in ts.dependencies: dependency.dependents.discard(ts) + # don't boot keys that are in flight + # we don't know if they're already queued up for transit + # in a gather_dep callback if not dependency.dependents and dependency.state in ( "waiting", - "flight", "fetch", ): - self.release_key(dependency.key) + self.release_key(dependency.key, cause=f"Dependent {ts} released") for worker in ts.who_has: self.has_what[worker].discard(ts.key) From 625906358cb5f512512b08e84e202fa5427330e9 Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Wed, 3 Mar 2021 14:55:51 -0500 Subject: [PATCH 25/29] Reserve `self.data_needed` for keys that will be executed E.g. don't add keys which will be fetch from other workers -- that information belongs in `TaskState.waiting_for_data` --- distributed/worker.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index ce5d828c0f0..030703c6dc7 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1606,9 +1606,6 @@ def transition_new_fetch(self, ts): for dependent in ts.dependents: dependent.waiting_for_data.add(ts.key) - if ts.key not in self.data_needed: - self.data_needed.append(ts.key) - self.waiting_for_data_count += 1 except Exception as e: logger.exception(e) if LOG_PDB: From cdeb85c736e01bbc5b1cf1b90913401dfacfbf98 Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Wed, 3 Mar 2021 14:56:55 -0500 Subject: [PATCH 26/29] Don't discard result in transition We shouldn't call `release_key` in transition functions at all, it is very likely to cause mixups. There's an outside chance this leads to worker memory problems over very long jobs, but I think that is better handled by a cleanup callback. --- distributed/worker.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 030703c6dc7..9db3e30d786 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1729,18 +1729,15 @@ def transition_flight_memory(self, ts, value=None): self.in_flight_tasks -= 1 ts.coming_from = None - if ts.dependents: - self.put_key_in_memory(ts, value) - for dependent in ts.dependents: - try: - dependent.waiting_for_data.remove(ts.key) - self.waiting_for_data_count -= 1 - except KeyError: - pass + self.put_key_in_memory(ts, value) + for dependent in ts.dependents: + try: + dependent.waiting_for_data.remove(ts.key) + self.waiting_for_data_count -= 1 + except KeyError: + pass - self.batched_stream.send({"op": "add-keys", "keys": [ts.key]}) - else: - self.release_key(ts.key) + self.batched_stream.send({"op": "add-keys", "keys": [ts.key]}) except Exception as e: logger.exception(e) From 94d184ac65122e2557fe2312f710e95047d06f27 Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Wed, 3 Mar 2021 14:58:57 -0500 Subject: [PATCH 27/29] Add a few comments and swap in a set comprehension --- distributed/worker.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 9db3e30d786..524bfaeb412 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -261,7 +261,7 @@ class Worker(ServerNode): Dictionary mapping keys to actual values stored on disk. Only available if condition for **data** being a zict.Buffer is met. * **data_needed**: deque(keys) - The keys whose data we still lack, arranged in a deque + The keys which still require data in order to execute, arranged in a deque * **ready**: [keys] Keys that are ready to run. Stored in a LIFO stack * **constrained**: [keys] @@ -1970,7 +1970,7 @@ def ensure_communicating(self): if self.validate: assert all(dep.key in self.tasks for dep in deps) - deps = [dep for dep in deps if dep.state == "fetch"] + deps = {dep for dep in deps if dep.state == "fetch"} missing_deps = {dep for dep in deps if not dep.who_has} if missing_deps: @@ -2436,7 +2436,9 @@ def release_key(self, key, cause=None, reason=None, report=True): for resource, quantity in ts.resource_restrictions.items(): self.available_resources[resource] += quantity - if report and ts.state in PROCESSING: # not finished + # Inform the scheduler of keys which will have gone missing + # We are releasing them before they have completed + if report and ts.state in PROCESSING: self.batched_stream.send({"op": "release", "key": key, "cause": cause}) self._notify_plugins("release_key", key, ts.state, cause, reason, report) From 47ada8e93816f2a94b1243420fdfa50054a1d56b Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Wed, 3 Mar 2021 15:07:08 -0500 Subject: [PATCH 28/29] Check in-flight task count in steal test --- distributed/tests/test_steal.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index ac37c0d5acf..de7ffda8f83 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -660,6 +660,9 @@ async def test_steal_twice(c, s, a, b): ) assert max(map(len, has_what.values())) < 30 + assert a.in_flight_tasks == 0 + assert b.in_flight_tasks == 0 + await c._close() await asyncio.gather(*[w.close() for w in workers]) From 14d290425f171f0ce098e766fa5e9c62bf68681e Mon Sep 17 00:00:00 2001 From: Gil Forsyth Date: Wed, 3 Mar 2021 15:11:38 -0500 Subject: [PATCH 29/29] Remove in-transition call to `release_key` --- distributed/worker.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 524bfaeb412..276b41fd853 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1711,9 +1711,6 @@ def transition_flight_fetch(self, ts, worker=None, runspec=None): if dependent.state == "waiting": self.data_needed.append(dependent.key) - if not ts.dependents: - self.release_key(ts.key, cause="transition flight->fetch") - except Exception as e: logger.exception(e) if LOG_PDB: