Skip to content

Commit

Permalink
Cluster Dump SchedulerPlugin (dask#5983)
Browse files Browse the repository at this point in the history
Add SchedulerPlugin to dump state on cluster close

This also adds a new method to SchedulerPlugins that runs directly before closing time
  • Loading branch information
sjperkins authored Mar 30, 2022
1 parent 4c4df91 commit cced80d
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 3 deletions.
4 changes: 2 additions & 2 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3934,8 +3934,8 @@ async def _dump_cluster_state(
self,
filename: str = "dask-cluster-dump",
write_from_scheduler: bool | None = None,
exclude: Collection[str] = ("run_spec",),
format: Literal["msgpack", "yaml"] = "msgpack",
exclude: Collection[str] = cluster_dump.DEFAULT_CLUSTER_DUMP_EXCLUDE,
format: Literal["msgpack", "yaml"] = cluster_dump.DEFAULT_CLUSTER_DUMP_FORMAT,
**storage_options,
):
filename = str(filename)
Expand Down
5 changes: 4 additions & 1 deletion distributed/cluster_dump.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
from distributed.stories import scheduler_story as _scheduler_story
from distributed.stories import worker_story as _worker_story

DEFAULT_CLUSTER_DUMP_FORMAT: Literal["msgpack" | "yaml"] = "msgpack"
DEFAULT_CLUSTER_DUMP_EXCLUDE: Collection[str] = ("run_spec",)


def _tuple_to_list(node):
if isinstance(node, (list, tuple)):
Expand All @@ -27,7 +30,7 @@ def _tuple_to_list(node):
async def write_state(
get_state: Callable[[], Awaitable[Any]],
url: str,
format: Literal["msgpack", "yaml"],
format: Literal["msgpack", "yaml"] = DEFAULT_CLUSTER_DUMP_FORMAT,
**storage_options: dict[str, Any],
) -> None:
"Await a cluster dump, then serialize and write it to a path"
Expand Down
38 changes: 38 additions & 0 deletions distributed/diagnostics/cluster_dump.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import Any, Collection, Dict, Literal

from distributed.cluster_dump import (
DEFAULT_CLUSTER_DUMP_EXCLUDE,
DEFAULT_CLUSTER_DUMP_FORMAT,
)
from distributed.diagnostics.plugin import SchedulerPlugin
from distributed.scheduler import Scheduler


class ClusterDump(SchedulerPlugin):
"""Dumps cluster state prior to Scheduler shutdown
The Scheduler may shutdown in cases where it is in an error state,
or when it has been unexpectedly idle for long periods of time.
This plugin dumps the cluster state prior to Scheduler shutdown
for debugging purposes.
"""

def __init__(
self,
url: str,
exclude: "Collection[str]" = DEFAULT_CLUSTER_DUMP_EXCLUDE,
format_: Literal["msgpack", "yaml"] = DEFAULT_CLUSTER_DUMP_FORMAT,
**storage_options: Dict[str, Any],
):
self.url = url
self.exclude = exclude
self.format = format_
self.storage_options = storage_options

async def start(self, scheduler: Scheduler) -> None:
self.scheduler = scheduler

async def before_close(self) -> None:
await self.scheduler.dump_cluster_state_to_url(
self.url, self.exclude, self.format, **self.storage_options
)
3 changes: 3 additions & 0 deletions distributed/diagnostics/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ async def start(self, scheduler: Scheduler) -> None:
This runs at the end of the Scheduler startup process
"""

async def before_close(self) -> None:
"""Runs prior to any Scheduler shutdown logic"""

async def close(self) -> None:
"""Run when the scheduler closes down
Expand Down
21 changes: 21 additions & 0 deletions distributed/diagnostics/tests/test_cluster_dump_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from distributed.cluster_dump import DumpArtefact
from distributed.diagnostics.cluster_dump import ClusterDump
from distributed.utils_test import gen_cluster, inc


@gen_cluster(client=True)
async def test_cluster_dump_plugin(c, s, *workers, tmp_path):
dump_file = tmp_path / "cluster_dump.msgpack.gz"
await c.register_scheduler_plugin(ClusterDump(str(dump_file)), name="cluster-dump")
plugin = s.plugins["cluster-dump"]
assert plugin.scheduler is s

f1 = c.submit(inc, 1)
f2 = c.submit(inc, f1)

assert (await f2) == 3
await s.close(close_workers=True)

dump = DumpArtefact.from_url(str(dump_file))
assert {f1.key, f2.key} == set(dump.scheduler_story(f1.key, f2.key).keys())
assert {f1.key, f2.key} == set(dump.worker_story(f1.key, f2.key).keys())
5 changes: 5 additions & 0 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4245,6 +4245,11 @@ async def close(self, fast=False, close_workers=False):
if self.status in (Status.closing, Status.closed):
await self.finished()
return

await asyncio.gather(
*[plugin.before_close() for plugin in list(self.plugins.values())]
)

self.status = Status.closing

logger.info("Scheduler closing...")
Expand Down

0 comments on commit cced80d

Please sign in to comment.