From 85afcbc10f39f9dd48ee7810e27d903d5009be46 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 14 Jul 2022 16:23:46 -0400 Subject: [PATCH] Add back broadcasting restart message to clients If multiple clients are connected, the ones that didn't call `restart` still need to release their keys. driveby: refcounts were not being reset on clients that didn't call `restart`. So after restart, if a client reused a key that was referenced before restart, it would never be releasable. --- distributed/client.py | 22 +++++++++++----------- distributed/scheduler.py | 1 + distributed/tests/test_failed_workers.py | 18 ++++++++++++++++++ 3 files changed, 30 insertions(+), 11 deletions(-) diff --git a/distributed/client.py b/distributed/client.py index 369be1bbe10..f5e8d28abfd 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -908,6 +908,7 @@ def __init__( "cancelled-key": self._handle_cancelled_key, "task-retried": self._handle_retried_key, "task-erred": self._handle_task_erred, + "restart": self._handle_restart, "error": self._handle_error, "event": self._handle_event, } @@ -1463,6 +1464,15 @@ def _handle_task_erred(self, key=None, exception=None, traceback=None): if state is not None: state.set_error(exception, traceback) + def _handle_restart(self): + logger.info("Receive restart signal from scheduler") + for state in self.futures.values(): + state.cancel() + self.futures.clear() + self.generation += 1 + with self._refcount_lock: + self.refcount.clear() + def _handle_error(self, exception=None): logger.warning("Scheduler exception:") logger.exception(exception) @@ -3316,17 +3326,7 @@ async def _restart(self, timeout=no_default): if timeout is not None: timeout = parse_timedelta(timeout, "s") - try: - await self.scheduler.restart(timeout=timeout) - finally: - for state in self.futures.values(): - state.cancel() - self.futures.clear() - - self.generation += 1 - with self._refcount_lock: - self.refcount.clear() - + await self.scheduler.restart(timeout=timeout) return self def restart(self, **kwargs): diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 8eb3aaae0bd..edd00f9af17 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5146,6 +5146,7 @@ async def restart(self, client=None, timeout=30): ) self.clear_task_state() + self.report({"op": "restart"}) start = time() diff --git a/distributed/tests/test_failed_workers.py b/distributed/tests/test_failed_workers.py index 9aac7d6f4b7..754e2916c0c 100644 --- a/distributed/tests/test_failed_workers.py +++ b/distributed/tests/test_failed_workers.py @@ -19,6 +19,7 @@ from distributed.utils import CancelledError, sync from distributed.utils_test import ( BlockedGatherDep, + async_wait_for, captured_logger, cluster, div, @@ -238,6 +239,23 @@ async def test_multiple_clients_restart(s, a, b): await asyncio.sleep(0.01) assert time() < start + 5 + assert not c1.futures + assert not c2.futures + + # Ensure both clients still work after restart. + # Reusing a previous key has no effect. + x2 = c1.submit(inc, 1, key=x.key) + y2 = c2.submit(inc, 2, key=y.key) + + assert x2._generation != x._generation + assert y2._generation != y._generation + + assert await x2 == 2 + assert await y2 == 3 + + del x2, y2 + await async_wait_for(lambda: not s.tasks, timeout=5) + await c1.close() await c2.close()