From cced80d5ea715a916415c6dc01c95343b1ec7af8 Mon Sep 17 00:00:00 2001 From: Simon Perkins Date: Wed, 30 Mar 2022 15:42:42 +0200 Subject: [PATCH] Cluster Dump SchedulerPlugin (#5983) Add SchedulerPlugin to dump state on cluster close This also adds a new method to SchedulerPlugins that runs directly before closing time --- distributed/client.py | 4 +- distributed/cluster_dump.py | 5 ++- distributed/diagnostics/cluster_dump.py | 38 +++++++++++++++++++ distributed/diagnostics/plugin.py | 3 ++ .../tests/test_cluster_dump_plugin.py | 21 ++++++++++ distributed/scheduler.py | 5 +++ 6 files changed, 73 insertions(+), 3 deletions(-) create mode 100644 distributed/diagnostics/cluster_dump.py create mode 100644 distributed/diagnostics/tests/test_cluster_dump_plugin.py diff --git a/distributed/client.py b/distributed/client.py index f68570f7762..d406ca6333a 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -3934,8 +3934,8 @@ async def _dump_cluster_state( self, filename: str = "dask-cluster-dump", write_from_scheduler: bool | None = None, - exclude: Collection[str] = ("run_spec",), - format: Literal["msgpack", "yaml"] = "msgpack", + exclude: Collection[str] = cluster_dump.DEFAULT_CLUSTER_DUMP_EXCLUDE, + format: Literal["msgpack", "yaml"] = cluster_dump.DEFAULT_CLUSTER_DUMP_FORMAT, **storage_options, ): filename = str(filename) diff --git a/distributed/cluster_dump.py b/distributed/cluster_dump.py index 2d3a400b256..161f9091e7b 100644 --- a/distributed/cluster_dump.py +++ b/distributed/cluster_dump.py @@ -14,6 +14,9 @@ from distributed.stories import scheduler_story as _scheduler_story from distributed.stories import worker_story as _worker_story +DEFAULT_CLUSTER_DUMP_FORMAT: Literal["msgpack" | "yaml"] = "msgpack" +DEFAULT_CLUSTER_DUMP_EXCLUDE: Collection[str] = ("run_spec",) + def _tuple_to_list(node): if isinstance(node, (list, tuple)): @@ -27,7 +30,7 @@ def _tuple_to_list(node): async def write_state( get_state: Callable[[], Awaitable[Any]], url: str, - format: Literal["msgpack", "yaml"], + format: Literal["msgpack", "yaml"] = DEFAULT_CLUSTER_DUMP_FORMAT, **storage_options: dict[str, Any], ) -> None: "Await a cluster dump, then serialize and write it to a path" diff --git a/distributed/diagnostics/cluster_dump.py b/distributed/diagnostics/cluster_dump.py new file mode 100644 index 00000000000..c03b0293c05 --- /dev/null +++ b/distributed/diagnostics/cluster_dump.py @@ -0,0 +1,38 @@ +from typing import Any, Collection, Dict, Literal + +from distributed.cluster_dump import ( + DEFAULT_CLUSTER_DUMP_EXCLUDE, + DEFAULT_CLUSTER_DUMP_FORMAT, +) +from distributed.diagnostics.plugin import SchedulerPlugin +from distributed.scheduler import Scheduler + + +class ClusterDump(SchedulerPlugin): + """Dumps cluster state prior to Scheduler shutdown + + The Scheduler may shutdown in cases where it is in an error state, + or when it has been unexpectedly idle for long periods of time. + This plugin dumps the cluster state prior to Scheduler shutdown + for debugging purposes. + """ + + def __init__( + self, + url: str, + exclude: "Collection[str]" = DEFAULT_CLUSTER_DUMP_EXCLUDE, + format_: Literal["msgpack", "yaml"] = DEFAULT_CLUSTER_DUMP_FORMAT, + **storage_options: Dict[str, Any], + ): + self.url = url + self.exclude = exclude + self.format = format_ + self.storage_options = storage_options + + async def start(self, scheduler: Scheduler) -> None: + self.scheduler = scheduler + + async def before_close(self) -> None: + await self.scheduler.dump_cluster_state_to_url( + self.url, self.exclude, self.format, **self.storage_options + ) diff --git a/distributed/diagnostics/plugin.py b/distributed/diagnostics/plugin.py index 9940102b6ed..cf573eed562 100644 --- a/distributed/diagnostics/plugin.py +++ b/distributed/diagnostics/plugin.py @@ -61,6 +61,9 @@ async def start(self, scheduler: Scheduler) -> None: This runs at the end of the Scheduler startup process """ + async def before_close(self) -> None: + """Runs prior to any Scheduler shutdown logic""" + async def close(self) -> None: """Run when the scheduler closes down diff --git a/distributed/diagnostics/tests/test_cluster_dump_plugin.py b/distributed/diagnostics/tests/test_cluster_dump_plugin.py new file mode 100644 index 00000000000..67ce815954d --- /dev/null +++ b/distributed/diagnostics/tests/test_cluster_dump_plugin.py @@ -0,0 +1,21 @@ +from distributed.cluster_dump import DumpArtefact +from distributed.diagnostics.cluster_dump import ClusterDump +from distributed.utils_test import gen_cluster, inc + + +@gen_cluster(client=True) +async def test_cluster_dump_plugin(c, s, *workers, tmp_path): + dump_file = tmp_path / "cluster_dump.msgpack.gz" + await c.register_scheduler_plugin(ClusterDump(str(dump_file)), name="cluster-dump") + plugin = s.plugins["cluster-dump"] + assert plugin.scheduler is s + + f1 = c.submit(inc, 1) + f2 = c.submit(inc, f1) + + assert (await f2) == 3 + await s.close(close_workers=True) + + dump = DumpArtefact.from_url(str(dump_file)) + assert {f1.key, f2.key} == set(dump.scheduler_story(f1.key, f2.key).keys()) + assert {f1.key, f2.key} == set(dump.worker_story(f1.key, f2.key).keys()) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 849ac7b3aa2..4882bf81919 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4245,6 +4245,11 @@ async def close(self, fast=False, close_workers=False): if self.status in (Status.closing, Status.closed): await self.finished() return + + await asyncio.gather( + *[plugin.before_close() for plugin in list(self.plugins.values())] + ) + self.status = Status.closing logger.info("Scheduler closing...")