diff --git a/distributed/diagnostics/tests/test_worker_plugin.py b/distributed/diagnostics/tests/test_worker_plugin.py index 79d180c360e..858f8feedc2 100644 --- a/distributed/diagnostics/tests/test_worker_plugin.py +++ b/distributed/diagnostics/tests/test_worker_plugin.py @@ -76,6 +76,7 @@ async def test_create_on_construction(c, s, a, b): @gen_cluster(nthreads=[("127.0.0.1", 1)], client=True) async def test_normal_task_transitions_called(c, s, w): expected_notifications = [ + {"key": "task", "start": "new", "finish": "waiting"}, {"key": "task", "start": "waiting", "finish": "ready"}, {"key": "task", "start": "ready", "finish": "executing"}, {"key": "task", "start": "executing", "finish": "memory"}, @@ -95,6 +96,7 @@ def failing(x): raise Exception() expected_notifications = [ + {"key": "task", "start": "new", "finish": "waiting"}, {"key": "task", "start": "waiting", "finish": "ready"}, {"key": "task", "start": "ready", "finish": "executing"}, {"key": "task", "start": "executing", "finish": "error"}, @@ -113,6 +115,7 @@ def failing(x): ) async def test_superseding_task_transitions_called(c, s, w): expected_notifications = [ + {"key": "task", "start": "new", "finish": "waiting"}, {"key": "task", "start": "waiting", "finish": "constrained"}, {"key": "task", "start": "constrained", "finish": "executing"}, {"key": "task", "start": "executing", "finish": "memory"}, @@ -131,9 +134,11 @@ async def test_release_dep_called(c, s, w): dsk = {"dep": 1, "task": (inc, "dep")} expected_notifications = [ + {"key": "dep", "start": "new", "finish": "waiting"}, {"key": "dep", "start": "waiting", "finish": "ready"}, {"key": "dep", "start": "ready", "finish": "executing"}, {"key": "dep", "start": "executing", "finish": "memory"}, + {"key": "task", "start": "new", "finish": "waiting"}, {"key": "task", "start": "waiting", "finish": "ready"}, {"key": "task", "start": "ready", "finish": "executing"}, {"key": "task", "start": "executing", "finish": "memory"}, diff --git a/distributed/pytest_resourceleaks.py b/distributed/pytest_resourceleaks.py index 348472892d6..f86117c7862 100644 --- a/distributed/pytest_resourceleaks.py +++ b/distributed/pytest_resourceleaks.py @@ -30,7 +30,7 @@ def pytest_addoption(parser): group.addoption( "--leaks-timeout", action="store", - type="float", + type=float, dest="leaks_timeout", default=0.5, help="""\ diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index ac37c0d5acf..de7ffda8f83 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -660,6 +660,9 @@ async def test_steal_twice(c, s, a, b): ) assert max(map(len, has_what.values())) < 30 + assert a.in_flight_tasks == 0 + assert b.in_flight_tasks == 0 + await c._close() await asyncio.gather(*[w.close() for w in workers]) diff --git a/distributed/worker.py b/distributed/worker.py index 7e282c2e46a..276b41fd853 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -114,13 +114,13 @@ class TaskState: The priority this task given by the scheduler. Determines run order. * **state**: ``str`` The current state of the task. One of ["waiting", "ready", "executing", - "memory", "flight", "long-running", "rescheduled", "error"] + "fetch", "memory", "flight", "long-running", "rescheduled", "error"] * **who_has**: ``set(worker)`` Workers that we believe have this data * **coming_from**: ``str`` The worker that current task data is coming from if task is in flight * **waiting_for_data**: ``set(keys of dependencies)`` - A dynamic verion of dependencies. All dependencies that we still don't + A dynamic version of dependencies. All dependencies that we still don't have for a particular key. * **resource_restrictions**: ``{str: number}`` Abstract resources required to run a task @@ -163,7 +163,7 @@ def __init__(self, key, runspec=None): self.dependents = set() self.duration = None self.priority = None - self.state = None + self.state = "new" self.who_has = set() self.coming_from = None self.waiting_for_data = set() @@ -261,7 +261,7 @@ class Worker(ServerNode): Dictionary mapping keys to actual values stored on disk. Only available if condition for **data** being a zict.Buffer is met. * **data_needed**: deque(keys) - The keys whose data we still lack, arranged in a deque + The keys which still require data in order to execute, arranged in a deque * **ready**: [keys] Keys that are ready to run. Stored in a LIFO stack * **constrained**: [keys] @@ -448,27 +448,27 @@ def __init__( self.validate = validate self._transitions = { + # Basic state transitions + ("new", "waiting"): self.transition_new_waiting, + ("new", "fetch"): self.transition_new_fetch, ("waiting", "ready"): self.transition_waiting_ready, - ("waiting", "memory"): self.transition_waiting_done, - ("waiting", "error"): self.transition_waiting_done, - ("waiting", "flight"): self.transition_waiting_flight, + ("fetch", "flight"): self.transition_fetch_flight, ("ready", "executing"): self.transition_ready_executing, - ("ready", "memory"): self.transition_ready_memory, - ("ready", "error"): self.transition_ready_error, - ("ready", "waiting"): self.transition_ready_waiting, - ("constrained", "waiting"): self.transition_ready_waiting, - ("constrained", "error"): self.transition_ready_error, - ("constrained", "executing"): self.transition_constrained_executing, ("executing", "memory"): self.transition_executing_done, + ("flight", "memory"): self.transition_flight_memory, + ("flight", "fetch"): self.transition_flight_fetch, + # Scheduler intercession (re-assignment) + ("fetch", "waiting"): self.transition_fetch_waiting, + ("flight", "waiting"): self.transition_flight_waiting, + # Errors, long-running, constrained + ("waiting", "error"): self.transition_waiting_done, + ("constrained", "executing"): self.transition_constrained_executing, ("executing", "error"): self.transition_executing_done, ("executing", "rescheduled"): self.transition_executing_done, ("executing", "long-running"): self.transition_executing_long_running, ("long-running", "error"): self.transition_executing_done, ("long-running", "memory"): self.transition_executing_done, ("long-running", "rescheduled"): self.transition_executing_done, - ("flight", "memory"): self.transition_flight_memory, - ("flight", "ready"): self.transition_flight_memory, - ("flight", "waiting"): self.transition_flight_waiting, } self.incoming_transfer_log = deque(maxlen=100000) @@ -1417,7 +1417,7 @@ def delete_data(self, comm=None, keys=None, report=True): if keys: for key in list(keys): self.log.append((key, "delete")) - self.release_key(key) + self.release_key(key, cause="delete data") logger.debug("Worker %s -- Deleted %d keys", self.name, len(keys)) return "OK" @@ -1459,7 +1459,6 @@ def add_task( runspec = SerializedTask(function, args, kwargs, task) if key in self.tasks: ts = self.tasks[key] - ts.runspec = runspec if ts.state == "memory": assert key in self.data or key in self.actors logger.debug( @@ -1473,13 +1472,20 @@ def add_task( ts.exception = None ts.traceback = None else: - ts.state = "waiting" + # This is a scheduler re-assignment + # Either `fetch` -> `waiting` or `flight` -> `waiting` + self.log.append((ts.key, "re-adding key, new TaskState")) + self.transition(ts, "waiting", runspec=runspec) else: self.log.append((key, "new")) self.tasks[key] = ts = TaskState( key=key, runspec=SerializedTask(function, args, kwargs, task) ) - ts.state = "waiting" + self.transition(ts, "waiting") + + # TODO: move transition of `ts` to end of `add_task` + # This will require a chained recommendation transition system like + # the scheduler if priority is not None: priority = tuple(priority) + (self.generation,) @@ -1499,26 +1505,42 @@ def add_task( for dependency, workers in who_has.items(): assert workers if dependency not in self.tasks: + # initial state is "new" + # this dependency does not already exist on worker self.tasks[dependency] = dep_ts = TaskState(key=dependency) - dep_ts.state = ( - "waiting" if dependency not in self.data else "memory" - ) - dep_ts = self.tasks[dependency] - self.log.append((dependency, "new-dep", dep_ts.state)) + # link up to child / parents + ts.dependencies.add(dep_ts) + dep_ts.dependents.add(ts) - if dep_ts.state != "memory": - ts.waiting_for_data.add(dep_ts.key) - self.waiting_for_data_count += 1 + # check to ensure task wasn't already executed and partially released + # # TODO: make this less bad + state = "fetch" if dependency not in self.data else "memory" + + # transition from new -> fetch handles adding dependency + # to waiting_for_data + self.transition(dep_ts, state) + + self.log.append( + (dependency, "new-dep", dep_ts.state, f"requested by {ts.key}") + ) + + else: + # task was already present on worker + dep_ts = self.tasks[dependency] - dep_ts.who_has.update(workers) + # link up to child / parents + ts.dependencies.add(dep_ts) + dep_ts.dependents.add(ts) - ts.dependencies.add(dep_ts) - dep_ts.dependents.add(ts) + if dep_ts.state in ("fetch", "flight"): + # if we _need_ to grab data or are in the process + ts.waiting_for_data.add(dep_ts.key) + # Ensure we know which workers to grab data from + dep_ts.who_has.update(workers) - for worker in workers: - self.has_what[worker].add(dep_ts.key) - if dep_ts.state != "memory": + for worker in workers: + self.has_what[worker].add(dep_ts.key) self.pending_data_per_worker[worker].append(dep_ts.key) if nbytes is not None: @@ -1530,6 +1552,9 @@ def add_task( else: self.transition(ts, "ready") if self.validate: + for worker, keys in self.has_what.items(): + for k in keys: + assert worker in self.tasks[k].who_has if who_has: assert all(self.tasks[dep] in ts.dependencies for dep in who_has) assert all(self.tasks[dep.key] for dep in ts.dependencies) @@ -1558,10 +1583,104 @@ def transition(self, ts, finish, **kwargs): self.validate_task(ts) self._notify_plugins("transition", ts.key, start, state or finish, **kwargs) - def transition_waiting_flight(self, ts, worker=None): + def transition_new_waiting(self, ts): + try: + if self.validate: + assert ts.state == "new" + assert ts.runspec is not None + assert not ts.who_has + except Exception as e: + logger.exception(e) + if LOG_PDB: + import pdb + + pdb.set_trace() + raise + + def transition_new_fetch(self, ts): + try: + if self.validate: + assert ts.state == "new" + assert ts.runspec is None + + for dependent in ts.dependents: + dependent.waiting_for_data.add(ts.key) + + except Exception as e: + logger.exception(e) + if LOG_PDB: + import pdb + + pdb.set_trace() + raise + + def transition_fetch_waiting(self, ts, runspec): + """This is a rescheduling transition that occurs after a worker failure. + A task was available from another worker but that worker died and the + scheduler reassigned the task for computation here. + """ + try: + if self.validate: + assert ts.state == "fetch" + assert ts.runspec is None + assert runspec is not None + + ts.runspec = runspec + + # remove any stale entries in `has_what` + for worker in self.has_what.keys(): + self.has_what[worker].discard(ts.key) + + # clear `who_has` of stale info + ts.who_has.clear() + + # remove entry from dependents to avoid a spurious `gather_dep` call`` + for dependent in ts.dependents: + dependent.waiting_for_data.discard(ts.key) + except Exception as e: + logger.exception(e) + if LOG_PDB: + import pdb + + pdb.set_trace() + raise + + def transition_flight_waiting(self, ts, runspec): + """This is a rescheduling transition that occurs after + a worker failure. A task was in flight from another worker to this + worker when that worker died and the scheduler reassigned the task for + computation here. + """ try: if self.validate: - assert ts.state != "flight" + assert ts.state == "flight" + assert ts.runspec is None + assert runspec is not None + + ts.runspec = runspec + + # remove any stale entries in `has_what` + for worker in self.has_what.keys(): + self.has_what[worker].discard(ts.key) + + # clear `who_has` of stale info + ts.who_has.clear() + + # remove entry from dependents to avoid a spurious `gather_dep` call`` + for dependent in ts.dependents: + dependent.waiting_for_data.discard(ts.key) + except Exception as e: + logger.exception(e) + if LOG_PDB: + import pdb + + pdb.set_trace() + raise + + def transition_fetch_flight(self, ts, worker=None): + try: + if self.validate: + assert ts.state == "fetch" assert ts.dependents ts.coming_from = worker @@ -1574,7 +1693,7 @@ def transition_waiting_flight(self, ts, worker=None): pdb.set_trace() raise - def transition_flight_waiting(self, ts, worker=None, remove=True, runspec=None): + def transition_flight_fetch(self, ts, worker=None, runspec=None): try: if self.validate: assert ts.state == "flight" @@ -1582,26 +1701,16 @@ def transition_flight_waiting(self, ts, worker=None, remove=True, runspec=None): self.in_flight_tasks -= 1 ts.coming_from = None ts.runspec = runspec or ts.runspec - if remove: - try: - ts.who_has.remove(worker) - self.has_what[worker].remove(ts.key) - except KeyError: - pass if not ts.who_has: if ts.key not in self._missing_dep_flight: self._missing_dep_flight.add(ts.key) self.loop.add_callback(self.handle_missing_dep, ts) for dependent in ts.dependents: + dependent.waiting_for_data.add(ts.key) if dependent.state == "waiting": - if remove: # try a new worker immediately - self.data_needed.appendleft(dependent.key) - else: # worker was probably busy, wait a while - self.data_needed.append(dependent.key) + self.data_needed.append(dependent.key) - if not ts.dependents: - self.release_key(ts.key) except Exception as e: logger.exception(e) if LOG_PDB: @@ -1617,18 +1726,15 @@ def transition_flight_memory(self, ts, value=None): self.in_flight_tasks -= 1 ts.coming_from = None - if ts.dependents: - self.put_key_in_memory(ts, value) - for dependent in ts.dependents: - try: - dependent.waiting_for_data.remove(ts.key) - self.waiting_for_data_count -= 1 - except KeyError: - pass + self.put_key_in_memory(ts, value) + for dependent in ts.dependents: + try: + dependent.waiting_for_data.remove(ts.key) + self.waiting_for_data_count -= 1 + except KeyError: + pass - self.batched_stream.send({"op": "add-keys", "keys": [ts.key]}) - else: - self.release_key(ts.key) + self.batched_stream.send({"op": "add-keys", "keys": [ts.key]}) except Exception as e: logger.exception(e) @@ -1650,7 +1756,7 @@ def transition_waiting_ready(self, ts): assert all(dep.state == "memory" for dep in ts.dependencies) assert ts.key not in self.ready - ts.waiting_for_data.clear() + self.has_what[self.address].discard(ts.key) if ts.resource_restrictions is not None: self.constrained.append(ts.key) @@ -1717,12 +1823,6 @@ def transition_ready_memory(self, ts, value=None): self.put_key_in_memory(ts, value=value) self.send_task_state_to_scheduler(ts) - def transition_ready_waiting(self, ts): - """ - This transition is common for work stealing - """ - pass - def transition_constrained_executing(self, ts): self.transition_ready_executing(ts) for resource, quantity in ts.resource_restrictions.items(): @@ -1867,7 +1967,7 @@ def ensure_communicating(self): if self.validate: assert all(dep.key in self.tasks for dep in deps) - deps = [dep for dep in deps if dep.state == "waiting"] + deps = {dep for dep in deps if dep.state == "fetch"} missing_deps = {dep for dep in deps if not dep.who_has} if missing_deps: @@ -1892,7 +1992,7 @@ def ensure_communicating(self): or self.comm_nbytes < self.total_comm_nbytes ): dep = deps.pop() - if dep.state != "waiting": + if dep.state != "fetch": continue if not dep.who_has: continue @@ -2019,7 +2119,7 @@ def select_keys_for_gather(self, worker, dep): while L: d = L.popleft() ts = self.tasks.get(d) - if ts is None or ts.state != "waiting": + if ts is None or ts.state != "fetch": continue if total_bytes + ts.get_nbytes() > self.target_message_size: break @@ -2069,7 +2169,7 @@ async def gather_dep(self, worker, dep, deps, total_nbytes, cause=None): self.log.append(("busy-gather", worker, deps)) for ts in deps_ts: if ts.state == "flight": - self.transition(ts, "waiting") + self.transition(ts, "fetch") return if cause: @@ -2149,10 +2249,10 @@ async def gather_dep(self, worker, dep, deps, total_nbytes, cause=None): if not busy and d in data: self.transition(ts, "memory", value=data[d]) elif ts is None or ts.state == "executing": - self.release_key(d) + self.release_key(d, cause="already executing at gather") continue elif ts.state not in ("ready", "memory"): - self.transition(ts, "waiting", worker=worker, remove=not busy) + self.transition(ts, "fetch", worker=worker) if not busy and d not in data and ts.dependents: self.log.append(("missing-dep", d)) @@ -2185,7 +2285,7 @@ def bad_dep(self, dep): ts.exception = msg["exception"] ts.traceback = msg["traceback"] self.transition(ts, "error") - self.release_key(dep.key) + self.release_key(dep.key, cause="bad dep") async def handle_missing_dep(self, *deps, **kwargs): self.log.append(("handle-missing", deps)) @@ -2281,17 +2381,12 @@ def steal_request(self, key): self.batched_stream.send(response) if state in ("ready", "waiting", "constrained"): - # Resetting the runspec should be reset by the transition. However, - # the waiting->waiting transition results in a no-op which would not - # reset. - ts.runspec = None - self.transition(ts, "waiting") - if not ts.dependents: - self.release_key(ts.key) - if self.validate: - assert ts.key not in self.tasks + # If task is marked as "constrained" we haven't yet assigned it an + # `available_resources` to run on, that happens in + # `transition_constrained_executing` + self.release_key(ts.key, cause="stolen") if self.validate: - assert ts.runspec is None + assert ts.key not in self.tasks def release_key(self, key, cause=None, reason=None, report=True): try: @@ -2314,11 +2409,14 @@ def release_key(self, key, cause=None, reason=None, report=True): # for any dependencies of key we are releasing remove task as dependent for dependency in ts.dependencies: dependency.dependents.discard(ts) + # don't boot keys that are in flight + # we don't know if they're already queued up for transit + # in a gather_dep callback if not dependency.dependents and dependency.state in ( "waiting", - "flight", + "fetch", ): - self.release_key(dependency.key) + self.release_key(dependency.key, cause=f"Dependent {ts} released") for worker in ts.who_has: self.has_what[worker].discard(ts.key) @@ -2335,7 +2433,9 @@ def release_key(self, key, cause=None, reason=None, report=True): for resource, quantity in ts.resource_restrictions.items(): self.available_resources[resource] += quantity - if report and ts.state in PROCESSING: # not finished + # Inform the scheduler of keys which will have gone missing + # We are releasing them before they have completed + if report and ts.state in PROCESSING: self.batched_stream.send({"op": "release", "key": key, "cause": cause}) self._notify_plugins("release_key", key, ts.state, cause, reason, report) @@ -2970,6 +3070,10 @@ def validate_task_flight(self, ts): assert not any(dep.key in self.ready for dep in ts.dependents) assert ts.key in self.in_flight_workers[ts.coming_from] + def validate_task_fetch(self, ts): + assert ts.runspec is None + assert ts.key not in self.data + def validate_task(self, ts): try: if ts.state == "memory": @@ -2982,6 +3086,8 @@ def validate_task(self, ts): self.validate_task_executing(ts) elif ts.state == "flight": self.validate_task_flight(ts) + elif ts.state == "fetch": + self.validate_task_fetch(ts) except Exception as e: logger.exception(e) if LOG_PDB: @@ -3012,7 +3118,7 @@ def validate_state(self): ts_wait = self.tasks[key] assert ( ts_wait.state == "flight" - or ts_wait.state == "waiting" + or ts_wait.state == "fetch" or ts_wait.key in self._missing_dep_flight or ts_wait.who_has.issubset(self.in_flight_workers) ) diff --git a/docs/source/images/worker-dep-state.dot b/docs/source/images/worker-dep-state.dot index 8da78a1c789..18a5e40cfac 100644 --- a/docs/source/images/worker-dep-state.dot +++ b/docs/source/images/worker-dep-state.dot @@ -3,6 +3,8 @@ digraph{ bgcolor="#FFFFFFF00", rankdir=LR, ]; - waiting -> flight; + new -> fetch; + fetch -> flight; + flight -> fetch; flight -> memory; } diff --git a/docs/source/images/worker-dep-state.svg b/docs/source/images/worker-dep-state.svg index 412d4dc7d3a..9b7b04e8d99 100644 --- a/docs/source/images/worker-dep-state.svg +++ b/docs/source/images/worker-dep-state.svg @@ -1,38 +1,61 @@ - - + %3 - - -waiting - -waiting + + + +new + +new + + + +fetch + +fetch + + + +new->fetch + + -flight - -flight + +flight + +flight + + + +fetch->flight + + - -waiting->flight - - + + +flight->fetch + + -memory - -memory + +memory + +memory -flight->memory - - + +flight->memory + + diff --git a/docs/source/images/worker-task-state.dot b/docs/source/images/worker-task-state.dot index 5bd9ac93a8f..4a6fc8cbacf 100644 --- a/docs/source/images/worker-task-state.dot +++ b/docs/source/images/worker-task-state.dot @@ -3,6 +3,7 @@ digraph{ bgcolor="#FFFFFFF00", rankdir=LR, ]; + new -> waiting; waiting -> ready; ready -> executing; executing -> "long-running"; diff --git a/docs/source/images/worker-task-state.svg b/docs/source/images/worker-task-state.svg index 2b307d80d64..edc4b834044 100644 --- a/docs/source/images/worker-task-state.svg +++ b/docs/source/images/worker-task-state.svg @@ -1,78 +1,103 @@ - - + %3 - + + + +new + +new + -waiting - -waiting + +waiting + +waiting + + + +new->waiting + + -ready - -ready + +ready + +ready -waiting->ready - - + +waiting->ready + + -executing - -executing + +executing + +executing -ready->executing - - + +ready->executing + + -long-running - -long-running + +long-running + +long-running -executing->long-running - - + +executing->long-running + + -memory - -memory + +memory + +memory -executing->memory - - + +executing->memory + + -error - -error + +error + +error -executing->error - - + +executing->error + + -long-running->memory - - + +long-running->memory + + -long-running->error - - + +long-running->error + + diff --git a/docs/source/worker.rst b/docs/source/worker.rst index 77a361dc686..eaf547a8321 100644 --- a/docs/source/worker.rst +++ b/docs/source/worker.rst @@ -89,13 +89,18 @@ Internal Scheduling ------------------- Internally tasks that come to the scheduler proceed through the following -pipeline: +pipeline as :py:class:`distributed.worker.TaskState` objects. Tasks which +follow this path have a :py:attr:`distributed.worker.TaskState.runspec` defined +which instructs the worker how to execute them. .. image:: images/worker-task-state.svg :alt: Dask worker task states -The worker also tracks data dependencies that are required to run the tasks -above. These follow through a simpler pipeline: +Data dependencies are also represented as +:py:class:`distributed.worker.TaskState` objects and follow a simpler path +through the execution pipeline. These tasks do not have a +:py:attr:`distributed.worker.TaskState.runspec` defined and instead contain a +listing of workers to collect their result from. .. image:: images/worker-dep-state.svg @@ -108,9 +113,12 @@ dependency from that worker. To improve bandwidth we opportunistically gather other dependencies of other tasks that are known to be on that worker, up to a maximum of 200MB of data (too little data and bandwidth suffers, too much data and responsiveness suffers). We use a fixed number of connections (around -10-50) so as to avoid overly-fragmenting our network bandwidth. After all -dependencies for a task are in memory we transition the task to the ready state -and put the task again into a heap of tasks that are ready to run. +10-50) so as to avoid overly-fragmenting our network bandwidth. In the event +that the network comms between two workers are saturated, a dependency task may +cycle between ``fetch`` and ``flight`` until it is successfully collected. + +After all dependencies for a task are in memory we transition the task to the +ready state and put the task again into a heap of tasks that are ready to run. We collect from this heap and put the task into a thread from a local thread pool to execute. @@ -122,7 +130,17 @@ thread pool. A task either errs or its result is put into memory. In either case a response is sent back to the scheduler. -.. _memman:: +Tasks slated for execution and tasks marked for collection from other workers +must follow their respective transition paths as defined above. The only +exceptions to this are when: + +* A task is `stolen `_, in which case a task which might have + been collected will instead be executed on the thieving worker +* Scheduler intercession, in which the scheduler reassigns a task that was + previously assigned to a separate worker to a new worker. This most commonly + occurs when a `worker dies `_ during computation. + +.. _memman: Memory Management -----------------