Skip to content

Commit

Permalink
Attempt removing ensure_computing() completely
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Apr 5, 2022
1 parent 84ef18c commit 4be1c9d
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 30 deletions.
21 changes: 1 addition & 20 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@
AddKeysMsg,
AlreadyCancelledEvent,
CancelComputeEvent,
EnsureComputingEvent,
Execute,
ExecuteFailureEvent,
ExecuteSuccessEvent,
Expand Down Expand Up @@ -1180,9 +1179,7 @@ async def heartbeat(self):

async def handle_scheduler(self, comm):
try:
await self.handle_stream(
comm, every_cycle=[self.ensure_communicating, self.ensure_computing]
)
await self.handle_stream(comm, every_cycle=[self.ensure_communicating])
except Exception as e:
logger.exception(e)
raise
Expand Down Expand Up @@ -3044,7 +3041,6 @@ async def gather_dep(
recommendations[ts] = "fetch" if ts.who_has else "missing"
del data, response
self.transitions(recommendations, stimulus_id=stimulus_id)
self.ensure_computing()

if not busy:
self.repetitively_busy = 0
Expand Down Expand Up @@ -3085,7 +3081,6 @@ async def find_missing(self) -> None:
"find-missing"
].callback_time = self.periodic_callbacks["heartbeat"].callback_time
self.ensure_communicating()
self.ensure_computing()

async def query_who_has(self, *deps: str) -> dict[str, Collection[str]]:
with log_errors():
Expand Down Expand Up @@ -3218,7 +3213,6 @@ def release_key(

self._executing.discard(ts)
self._in_flight_tasks.discard(ts)
self.ensure_computing()
self.ensure_communicating()

self._notify_plugins(
Expand Down Expand Up @@ -3370,11 +3364,6 @@ async def _maybe_deserialize_task(
)
raise

def ensure_computing(self) -> None:
self.handle_stimulus(
EnsureComputingEvent(stimulus_id=f"ensure_computing-{time()}")
)

def _ensure_computing(self) -> RecsInstrs:
if self.status in (Status.paused, Status.closing_gracefully):
return {}, []
Expand Down Expand Up @@ -3558,14 +3547,6 @@ async def execute(self, key: str, *, stimulus_id: str) -> StateMachineEvent | No
def handle_event(self, ev: StateMachineEvent) -> RecsInstrs:
raise TypeError(ev) # pragma: nocover

@handle_event.register
def _(self, ev: EnsureComputingEvent) -> RecsInstrs:
"""Let various methods of worker give an artificial 'kick' to _ensure_computing.
This is a temporary hack to be removed as part of
https://github.com/dask/distributed/issues/5894.
"""
return self._ensure_computing()

@handle_event.register
def _(self, ev: UnpauseEvent) -> RecsInstrs:
"""Emerge from paused status. Do not send this event directly. Instead, just set
Expand Down
10 changes: 0 additions & 10 deletions distributed/worker_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,16 +357,6 @@ class StateMachineEvent:
stimulus_id: str


@dataclass
class EnsureComputingEvent(StateMachineEvent):
"""Let various methods of worker give an artificial 'kick' to _ensure_computing.
This is a temporary hack to be removed as part of
https://github.com/dask/distributed/issues/5894.
"""

__slots__ = ()


@dataclass
class UnpauseEvent(StateMachineEvent):
__slots__ = ()
Expand Down

0 comments on commit 4be1c9d

Please sign in to comment.