From 608338516f7eb0e6f8205cdf70a579c3d4f92480 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Wed, 16 Mar 2022 10:13:11 +0000 Subject: [PATCH 1/5] drop pkg_resources in favour of importlib.metadata (#5923) --- .pre-commit-config.yaml | 1 - distributed/comm/registry.py | 67 +++++++++++++++------------- distributed/comm/tests/test_comms.py | 41 ++++++----------- distributed/utils.py | 2 - requirements.txt | 1 - 5 files changed, 50 insertions(+), 62 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0d1689ae23f..329103e7d02 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -39,7 +39,6 @@ repos: - types-docutils - types-requests - types-paramiko - - types-pkg_resources - types-PyYAML - types-setuptools - types-psutil diff --git a/distributed/comm/registry.py b/distributed/comm/registry.py index 00b10336a70..47ba730a7d9 100644 --- a/distributed/comm/registry.py +++ b/distributed/comm/registry.py @@ -1,6 +1,29 @@ from __future__ import annotations +import importlib.metadata +import sys from abc import ABC, abstractmethod +from collections.abc import Iterable +from typing import Protocol + + +class _EntryPoints(Protocol): + def __call__(self, **kwargs: str) -> Iterable[importlib.metadata.EntryPoint]: + ... + + +if sys.version_info >= (3, 10): + # py3.10 importlib.metadata type annotations are not in mypy yet + # https://github.com/python/typeshed/pull/7331 + _entry_points: _EntryPoints = importlib.metadata.entry_points # type: ignore[assignment] +else: + + def _entry_points( + *, group: str, name: str + ) -> Iterable[importlib.metadata.EntryPoint]: + for ep in importlib.metadata.entry_points().get(group, []): + if ep.name == name: + yield ep class Backend(ABC): @@ -59,40 +82,24 @@ def get_local_address_for(self, loc): backends: dict[str, Backend] = {} -def get_backend(scheme: str, require: bool = True) -> Backend: +def get_backend(scheme: str) -> Backend: """ Get the Backend instance for the given *scheme*. It looks for matching scheme in dask's internal cache, and falls-back to package metadata for the group name ``distributed.comm.backends`` - - Parameters - ---------- - - require : bool - Verify that the backends requirements are properly installed. See - https://setuptools.readthedocs.io/en/latest/pkg_resources.html for more - information. """ backend = backends.get(scheme) - if backend is None: - import pkg_resources - - backend = None - for backend_class_ep in pkg_resources.iter_entry_points( - "distributed.comm.backends", scheme - ): - # resolve and require are equivalent to load - backend_factory = backend_class_ep.resolve() - if require: - backend_class_ep.require() - backend = backend_factory() - - if backend is None: - raise ValueError( - "unknown address scheme %r (known schemes: %s)" - % (scheme, sorted(backends)) - ) - else: - backends[scheme] = backend - return backend + if backend is not None: + return backend + + for backend_class_ep in _entry_points( + name=scheme, group="distributed.comm.backends" + ): + backend = backend_class_ep.load()() + backends[scheme] = backend + return backend + + raise ValueError( + f"unknown address scheme {scheme!r} (known schemes: {sorted(backends)})" + ) diff --git a/distributed/comm/tests/test_comms.py b/distributed/comm/tests/test_comms.py index 46b5ab8d03b..f9ecf5e0726 100644 --- a/distributed/comm/tests/test_comms.py +++ b/distributed/comm/tests/test_comms.py @@ -2,18 +2,15 @@ import os import sys import threading -import types import warnings from functools import partial -import pkg_resources import pytest from tornado import ioloop from tornado.concurrent import Future import dask -import distributed from distributed.comm import ( CommClosedError, asyncio_tcp, @@ -30,7 +27,7 @@ from distributed.comm.registry import backends, get_backend from distributed.metrics import time from distributed.protocol import Serialized, deserialize, serialize, to_serialize -from distributed.utils import get_ip, get_ipv6 +from distributed.utils import get_ip, get_ipv6, mp_context from distributed.utils_test import ( get_cert, get_client_ssl_context, @@ -1313,30 +1310,18 @@ async def test_inproc_adresses(): await check_addresses(a, b) -def test_register_backend_entrypoint(): - # Code adapted from pandas backend entry point testing - # https://github.com/pandas-dev/pandas/blob/2470690b9f0826a8feb426927694fa3500c3e8d2/pandas/tests/plotting/test_backend.py#L50-L76 +def _get_backend_on_path(path): + sys.path.append(os.fsdecode(path)) + return get_backend("udp") - dist = pkg_resources.get_distribution("distributed") - if dist.module_path not in distributed.__file__: - # We are running from a non-installed distributed, and this test is invalid - pytest.skip("Testing a non-installed distributed") - mod = types.ModuleType("dask_udp") - mod.UDPBackend = lambda: 1 - sys.modules[mod.__name__] = mod - - entry_point_name = "distributed.comm.backends" - backends_entry_map = pkg_resources.get_entry_map("distributed") - if entry_point_name not in backends_entry_map: - backends_entry_map[entry_point_name] = dict() - backends_entry_map[entry_point_name]["udp"] = pkg_resources.EntryPoint( - "udp", mod.__name__, attrs=["UDPBackend"], dist=dist +def test_register_backend_entrypoint(tmp_path): + (tmp_path / "dask_udp.py").write_bytes(b"def udp_backend():\n return 1\n") + dist_info = tmp_path / "dask_udp-0.0.0.dist-info" + dist_info.mkdir() + (dist_info / "entry_points.txt").write_bytes( + b"[distributed.comm.backends]\nudp = dask_udp:udp_backend\n" ) - - # The require is disabled here since particularly unit tests may install - # dirty or dev versions which are conflicting with backend entrypoints if - # they are demanding for exact, stable versions. This should not fail the - # test - result = get_backend("udp", require=False) - assert result == 1 + with mp_context.Pool(1) as pool: + assert pool.apply(_get_backend_on_path, args=(tmp_path,)) == 1 + pool.join() diff --git a/distributed/utils.py b/distributed/utils.py index eecb6734731..afe048c2ac8 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -74,8 +74,6 @@ def _initialize_mp_context(): if method == "forkserver": # Makes the test suite much faster preload = ["distributed"] - if "pkg_resources" in sys.modules: - preload.append("pkg_resources") from distributed.versions import optional_packages, required_packages diff --git a/requirements.txt b/requirements.txt index 558b0570971..4d535c0ad4d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,4 +11,3 @@ toolz >= 0.8.2 tornado >= 6.0.3 zict >= 0.1.3 pyyaml -setuptools From 3492ef1ea308af5b3850fd0d11205168d38ad28f Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Wed, 16 Mar 2022 06:39:34 -0600 Subject: [PATCH 2/5] Make log_event threadsafe (#5946) --- distributed/worker.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/distributed/worker.py b/distributed/worker.py index 50fc33795e1..918f01d2e5c 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -922,12 +922,13 @@ def logs(self): return self._deque_handler.deque def log_event(self, topic, msg): - self.batched_stream.send( + self.loop.add_callback( + self.batched_stream.send, { "op": "log-event", "topic": topic, "msg": msg, - } + }, ) @property From 1b4d13fe4f6898bb825bd9e58dc3e795a7230be5 Mon Sep 17 00:00:00 2001 From: jakirkham Date: Wed, 16 Mar 2022 05:40:51 -0700 Subject: [PATCH 3/5] Only run `gh-pages` workflow on `dask/distributed` (#5942) --- .github/workflows/test-report.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/test-report.yaml b/.github/workflows/test-report.yaml index 008ef404e46..4aae39e0308 100644 --- a/.github/workflows/test-report.yaml +++ b/.github/workflows/test-report.yaml @@ -8,6 +8,8 @@ on: jobs: test-report: name: Test Report + # Do not run the report job on forks + if: github.repository == 'dask/distributed' || github.event_name != 'workflow_dispatch' runs-on: ubuntu-latest env: GITHUB_TOKEN: ${{ github.token }} From 8957c147574f15aeb13de0a700a11c176f344d62 Mon Sep 17 00:00:00 2001 From: Gabe Joseph Date: Thu, 17 Mar 2022 10:55:31 -0600 Subject: [PATCH 4/5] Some updates to scheduling policies docs (#5911) --- docs/source/scheduling-policies.rst | 58 ++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 18 deletions(-) diff --git a/docs/source/scheduling-policies.rst b/docs/source/scheduling-policies.rst index 63fa9fa41f4..4375e4b38fe 100644 --- a/docs/source/scheduling-policies.rst +++ b/docs/source/scheduling-policies.rst @@ -50,7 +50,7 @@ would all belong to the TaskGroup ``random-a1b2c3``.) To identify the root(ish) tasks, we use this heuristic: 1. The TaskGroup has 2x more tasks than there are threads in the cluster -2. The TaskGroup has fewer than 5 dependencies across *all* tasks in the group. +2. The TaskGroup has fewer than 5 unique dependencies across *all* tasks in the group. We don't just say "The task has no dependencies", because real-world cases like :obj:`dask.array.from_zarr` and :obj:`dask.array.from_array` produce graphs like the one @@ -82,18 +82,36 @@ where can the task run the soonest, considering both data transfer and worker bu Tasks that don't meet the root-ish criteria described above are selected as follows: +First, we identify the pool of viable workers: + 1. If the task has no dependencies and no restrictions, then we find the least-occupied worker. 2. Otherwise, if a task has user-provided restrictions (for example it must run on a machine with a GPU) then we restrict the available pool of workers - to just that set, otherwise we consider all workers. -3. From among this pool of workers, we determine the workers to whom the least - amount of data would need to be transferred. -4. We break ties by choosing the worker that currently has the fewest tasks, - counting both those tasks in memory and those tasks processing currently. + to just that set. Otherwise, we consider all workers. +3. We restrict the above set to just workers that hold at least one dependency + of the task. + +From among this pool of workers, we then determine the worker where we think the task will +start running the soonest, using :meth:`Scheduler.worker_objective`. For each worker: + +1. We consider the estimated runtime of other tasks already queued on that worker. + Then, we add how long it will take to transfer any dependencies to that worker that + it doesn't already have, based on their size, in bytes, and the measured network + bandwith between workers. Note that this does *not* consider (de)serialization + time, time to retrieve the data from disk if it was spilled, or potential differences + between size in memory and serialized size. In practice, the + queue-wait-time (known as *occupancy*) usually dominates, so data will usually be + transferred to a different worker if it means the task can start any sooner. +2. It's possible for ties to occur with the "start soonest" metric, though uncommon + when all workers are busy. We break ties by choosing the worker that has the + fewest number of bytes of Dask data stored (including spilled data). Note that + this is the same as :ref:`managed ` plus :ref:`spilled ` + memory, not the :ref:`process ` memory. This process is easy to change (and indeed this document may be outdated). We -encourage readers to inspect the ``decide_worker`` functions in ``scheduler.py``. +encourage readers to inspect the ``decide_worker`` and ``worker_objective`` +functions in ``scheduler.py``. .. currentmodule:: distributed.scheduler @@ -101,6 +119,8 @@ encourage readers to inspect the ``decide_worker`` functions in ``scheduler.py`` .. autosummary:: Scheduler.decide_worker +.. autosummary:: Scheduler.worker_objective + Choosing Tasks -------------- @@ -125,11 +145,11 @@ all of these (they all come up in important workloads) quickly. Last in, first out ~~~~~~~~~~~~~~~~~~ -When a worker finishes a task the immediate dependencies of that task get top +When a worker finishes a task, the immediate dependencies of that task get top priority. This encourages a behavior of finishing ongoing work immediately -before starting new work. This often conflicts with the -first-come-first-served objective but often results in shorter total runtimes -and significantly reduced memory footprints. +before starting new work (depth-first graph traversal). This often conflicts with +the first-come-first-served objective, but often results in significantly reduced +memory footprints and, due to avoiding data spillage to disk, better overall runtimes. .. _priority-break-ties: @@ -162,10 +182,11 @@ However, workers inevitably run out of tasks that were related to tasks they were just working on and the last-in-first-out policy eventually exhausts itself. In these cases workers often pull tasks from the common task pool. The tasks in this pool *are* ordered in a first-come-first-served basis and so -workers do behave in a fair scheduling manner at a *coarse* level if not a fine -grained one. +workers do behave in a scheduling manner that's fair to multiple submissions +at a *coarse* level, if not a fine-grained one. -Dask's scheduling policies are short-term-efficient and long-term-fair. +Dask's scheduling policies are short-term-efficient and long-term-fair +to multiple clients. Where these decisions are made @@ -187,9 +208,10 @@ scheduler, and workers at various points in the computation. policy between computations. All tasks from a previous call to compute have a higher priority than all tasks from a subsequent call to compute (or submit, persist, map, or any operation that generates futures). -3. Whenever a task is ready to run the scheduler assigns it to a worker. The - scheduler does not wait based on priority. -4. However when the worker receives these tasks it considers their priorities - when determining which tasks to prioritize for communication or for +3. Whenever a task is ready to run (its dependencies, if any, are complete), + the scheduler assigns it to a worker. When multiple tasks are ready at once, + they are all submitted to workers, in priority order. +4. However, when the worker receives these tasks, it considers their priorities + when determining which tasks to prioritize for fetching data or for computation. The worker maintains a heap of all ready-to-run tasks ordered by this priority. From 9f7027ef30fa67acb98d20edd719a7bd167724fd Mon Sep 17 00:00:00 2001 From: jakirkham Date: Thu, 17 Mar 2022 14:45:48 -0700 Subject: [PATCH 5/5] Invert `event_name` check in `test-report` job (#5959) --- .github/workflows/test-report.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test-report.yaml b/.github/workflows/test-report.yaml index 4aae39e0308..e5d925cea33 100644 --- a/.github/workflows/test-report.yaml +++ b/.github/workflows/test-report.yaml @@ -9,7 +9,7 @@ jobs: test-report: name: Test Report # Do not run the report job on forks - if: github.repository == 'dask/distributed' || github.event_name != 'workflow_dispatch' + if: github.repository == 'dask/distributed' || github.event_name == 'workflow_dispatch' runs-on: ubuntu-latest env: GITHUB_TOKEN: ${{ github.token }}