Skip to content

Commit

Permalink
Add back broadcasting restart message to clients
Browse files Browse the repository at this point in the history
If multiple clients are connected, the ones that didn't call `restart` still need to release their keys.

driveby: refcounts were not being reset on clients that didn't call `restart`. So after restart, if a client reused a key that was referenced before restart, it would never be releasable.
  • Loading branch information
gjoseph92 committed Jul 14, 2022
1 parent 5a388d1 commit 85afcbc
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 11 deletions.
22 changes: 11 additions & 11 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,7 @@ def __init__(
"cancelled-key": self._handle_cancelled_key,
"task-retried": self._handle_retried_key,
"task-erred": self._handle_task_erred,
"restart": self._handle_restart,
"error": self._handle_error,
"event": self._handle_event,
}
Expand Down Expand Up @@ -1463,6 +1464,15 @@ def _handle_task_erred(self, key=None, exception=None, traceback=None):
if state is not None:
state.set_error(exception, traceback)

def _handle_restart(self):
logger.info("Receive restart signal from scheduler")
for state in self.futures.values():
state.cancel()
self.futures.clear()
self.generation += 1
with self._refcount_lock:
self.refcount.clear()

def _handle_error(self, exception=None):
logger.warning("Scheduler exception:")
logger.exception(exception)
Expand Down Expand Up @@ -3316,17 +3326,7 @@ async def _restart(self, timeout=no_default):
if timeout is not None:
timeout = parse_timedelta(timeout, "s")

try:
await self.scheduler.restart(timeout=timeout)
finally:
for state in self.futures.values():
state.cancel()
self.futures.clear()

self.generation += 1
with self._refcount_lock:
self.refcount.clear()

await self.scheduler.restart(timeout=timeout)
return self

def restart(self, **kwargs):
Expand Down
1 change: 1 addition & 0 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5146,6 +5146,7 @@ async def restart(self, client=None, timeout=30):
)

self.clear_task_state()
self.report({"op": "restart"})

start = time()

Expand Down
18 changes: 18 additions & 0 deletions distributed/tests/test_failed_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from distributed.utils import CancelledError, sync
from distributed.utils_test import (
BlockedGatherDep,
async_wait_for,
captured_logger,
cluster,
div,
Expand Down Expand Up @@ -238,6 +239,23 @@ async def test_multiple_clients_restart(s, a, b):
await asyncio.sleep(0.01)
assert time() < start + 5

assert not c1.futures
assert not c2.futures

# Ensure both clients still work after restart.
# Reusing a previous key has no effect.
x2 = c1.submit(inc, 1, key=x.key)
y2 = c2.submit(inc, 2, key=y.key)

assert x2._generation != x._generation
assert y2._generation != y._generation

assert await x2 == 2
assert await y2 == 3

del x2, y2
await async_wait_for(lambda: not s.tasks, timeout=5)

await c1.close()
await c2.close()

Expand Down

0 comments on commit 85afcbc

Please sign in to comment.