Skip to content

Commit

Permalink
Merge branch 'main' into spill_extension
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Mar 18, 2022
2 parents b82da4d + 9f7027e commit 5883c96
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 82 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test-report.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ on:
jobs:
test-report:
name: Test Report
# Do not run the report job on forks
if: github.repository == 'dask/distributed' || github.event_name == 'workflow_dispatch'
runs-on: ubuntu-latest
env:
GITHUB_TOKEN: ${{ github.token }}
Expand Down
1 change: 0 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ repos:
- types-docutils
- types-requests
- types-paramiko
- types-pkg_resources
- types-PyYAML
- types-setuptools
- types-psutil
Expand Down
67 changes: 37 additions & 30 deletions distributed/comm/registry.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,29 @@
from __future__ import annotations

import importlib.metadata
import sys
from abc import ABC, abstractmethod
from collections.abc import Iterable
from typing import Protocol


class _EntryPoints(Protocol):
def __call__(self, **kwargs: str) -> Iterable[importlib.metadata.EntryPoint]:
...


if sys.version_info >= (3, 10):
# py3.10 importlib.metadata type annotations are not in mypy yet
# https://github.com/python/typeshed/pull/7331
_entry_points: _EntryPoints = importlib.metadata.entry_points # type: ignore[assignment]
else:

def _entry_points(
*, group: str, name: str
) -> Iterable[importlib.metadata.EntryPoint]:
for ep in importlib.metadata.entry_points().get(group, []):
if ep.name == name:
yield ep


class Backend(ABC):
Expand Down Expand Up @@ -59,40 +82,24 @@ def get_local_address_for(self, loc):
backends: dict[str, Backend] = {}


def get_backend(scheme: str, require: bool = True) -> Backend:
def get_backend(scheme: str) -> Backend:
"""
Get the Backend instance for the given *scheme*.
It looks for matching scheme in dask's internal cache, and falls-back to
package metadata for the group name ``distributed.comm.backends``
Parameters
----------
require : bool
Verify that the backends requirements are properly installed. See
https://setuptools.readthedocs.io/en/latest/pkg_resources.html for more
information.
"""

backend = backends.get(scheme)
if backend is None:
import pkg_resources

backend = None
for backend_class_ep in pkg_resources.iter_entry_points(
"distributed.comm.backends", scheme
):
# resolve and require are equivalent to load
backend_factory = backend_class_ep.resolve()
if require:
backend_class_ep.require()
backend = backend_factory()

if backend is None:
raise ValueError(
"unknown address scheme %r (known schemes: %s)"
% (scheme, sorted(backends))
)
else:
backends[scheme] = backend
return backend
if backend is not None:
return backend

for backend_class_ep in _entry_points(
name=scheme, group="distributed.comm.backends"
):
backend = backend_class_ep.load()()
backends[scheme] = backend
return backend

raise ValueError(
f"unknown address scheme {scheme!r} (known schemes: {sorted(backends)})"
)
41 changes: 13 additions & 28 deletions distributed/comm/tests/test_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,15 @@
import os
import sys
import threading
import types
import warnings
from functools import partial

import pkg_resources
import pytest
from tornado import ioloop
from tornado.concurrent import Future

import dask

import distributed
from distributed.comm import (
CommClosedError,
asyncio_tcp,
Expand All @@ -30,7 +27,7 @@
from distributed.comm.registry import backends, get_backend
from distributed.metrics import time
from distributed.protocol import Serialized, deserialize, serialize, to_serialize
from distributed.utils import get_ip, get_ipv6
from distributed.utils import get_ip, get_ipv6, mp_context
from distributed.utils_test import (
get_cert,
get_client_ssl_context,
Expand Down Expand Up @@ -1313,30 +1310,18 @@ async def test_inproc_adresses():
await check_addresses(a, b)


def test_register_backend_entrypoint():
# Code adapted from pandas backend entry point testing
# https://github.com/pandas-dev/pandas/blob/2470690b9f0826a8feb426927694fa3500c3e8d2/pandas/tests/plotting/test_backend.py#L50-L76
def _get_backend_on_path(path):
sys.path.append(os.fsdecode(path))
return get_backend("udp")

dist = pkg_resources.get_distribution("distributed")
if dist.module_path not in distributed.__file__:
# We are running from a non-installed distributed, and this test is invalid
pytest.skip("Testing a non-installed distributed")

mod = types.ModuleType("dask_udp")
mod.UDPBackend = lambda: 1
sys.modules[mod.__name__] = mod

entry_point_name = "distributed.comm.backends"
backends_entry_map = pkg_resources.get_entry_map("distributed")
if entry_point_name not in backends_entry_map:
backends_entry_map[entry_point_name] = dict()
backends_entry_map[entry_point_name]["udp"] = pkg_resources.EntryPoint(
"udp", mod.__name__, attrs=["UDPBackend"], dist=dist
def test_register_backend_entrypoint(tmp_path):
(tmp_path / "dask_udp.py").write_bytes(b"def udp_backend():\n return 1\n")
dist_info = tmp_path / "dask_udp-0.0.0.dist-info"
dist_info.mkdir()
(dist_info / "entry_points.txt").write_bytes(
b"[distributed.comm.backends]\nudp = dask_udp:udp_backend\n"
)

# The require is disabled here since particularly unit tests may install
# dirty or dev versions which are conflicting with backend entrypoints if
# they are demanding for exact, stable versions. This should not fail the
# test
result = get_backend("udp", require=False)
assert result == 1
with mp_context.Pool(1) as pool:
assert pool.apply(_get_backend_on_path, args=(tmp_path,)) == 1
pool.join()
2 changes: 0 additions & 2 deletions distributed/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ def _initialize_mp_context():
if method == "forkserver":
# Makes the test suite much faster
preload = ["distributed"]
if "pkg_resources" in sys.modules:
preload.append("pkg_resources")

from distributed.versions import optional_packages, required_packages

Expand Down
5 changes: 3 additions & 2 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -875,12 +875,13 @@ def logs(self):
return self._deque_handler.deque

def log_event(self, topic, msg):
self.batched_stream.send(
self.loop.add_callback(
self.batched_stream.send,
{
"op": "log-event",
"topic": topic,
"msg": msg,
}
},
)

@property
Expand Down
58 changes: 40 additions & 18 deletions docs/source/scheduling-policies.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ would all belong to the TaskGroup ``random-a1b2c3``.)
To identify the root(ish) tasks, we use this heuristic:

1. The TaskGroup has 2x more tasks than there are threads in the cluster
2. The TaskGroup has fewer than 5 dependencies across *all* tasks in the group.
2. The TaskGroup has fewer than 5 unique dependencies across *all* tasks in the group.

We don't just say "The task has no dependencies", because real-world cases like
:obj:`dask.array.from_zarr` and :obj:`dask.array.from_array` produce graphs like the one
Expand Down Expand Up @@ -82,25 +82,45 @@ where can the task run the soonest, considering both data transfer and worker bu

Tasks that don't meet the root-ish criteria described above are selected as follows:

First, we identify the pool of viable workers:

1. If the task has no dependencies and no restrictions, then we find the
least-occupied worker.
2. Otherwise, if a task has user-provided restrictions (for example it must
run on a machine with a GPU) then we restrict the available pool of workers
to just that set, otherwise we consider all workers.
3. From among this pool of workers, we determine the workers to whom the least
amount of data would need to be transferred.
4. We break ties by choosing the worker that currently has the fewest tasks,
counting both those tasks in memory and those tasks processing currently.
to just that set. Otherwise, we consider all workers.
3. We restrict the above set to just workers that hold at least one dependency
of the task.

From among this pool of workers, we then determine the worker where we think the task will
start running the soonest, using :meth:`Scheduler.worker_objective`. For each worker:

1. We consider the estimated runtime of other tasks already queued on that worker.
Then, we add how long it will take to transfer any dependencies to that worker that
it doesn't already have, based on their size, in bytes, and the measured network
bandwith between workers. Note that this does *not* consider (de)serialization
time, time to retrieve the data from disk if it was spilled, or potential differences
between size in memory and serialized size. In practice, the
queue-wait-time (known as *occupancy*) usually dominates, so data will usually be
transferred to a different worker if it means the task can start any sooner.
2. It's possible for ties to occur with the "start soonest" metric, though uncommon
when all workers are busy. We break ties by choosing the worker that has the
fewest number of bytes of Dask data stored (including spilled data). Note that
this is the same as :ref:`managed <memtypes>` plus :ref:`spilled <memtypes>`
memory, not the :ref:`process <memtypes>` memory.

This process is easy to change (and indeed this document may be outdated). We
encourage readers to inspect the ``decide_worker`` functions in ``scheduler.py``.
encourage readers to inspect the ``decide_worker`` and ``worker_objective``
functions in ``scheduler.py``.

.. currentmodule:: distributed.scheduler

.. autosummary:: decide_worker

.. autosummary:: Scheduler.decide_worker

.. autosummary:: Scheduler.worker_objective


Choosing Tasks
--------------
Expand All @@ -125,11 +145,11 @@ all of these (they all come up in important workloads) quickly.
Last in, first out
~~~~~~~~~~~~~~~~~~

When a worker finishes a task the immediate dependencies of that task get top
When a worker finishes a task, the immediate dependencies of that task get top
priority. This encourages a behavior of finishing ongoing work immediately
before starting new work. This often conflicts with the
first-come-first-served objective but often results in shorter total runtimes
and significantly reduced memory footprints.
before starting new work (depth-first graph traversal). This often conflicts with
the first-come-first-served objective, but often results in significantly reduced
memory footprints and, due to avoiding data spillage to disk, better overall runtimes.

.. _priority-break-ties:

Expand Down Expand Up @@ -162,10 +182,11 @@ However, workers inevitably run out of tasks that were related to tasks they
were just working on and the last-in-first-out policy eventually exhausts
itself. In these cases workers often pull tasks from the common task pool.
The tasks in this pool *are* ordered in a first-come-first-served basis and so
workers do behave in a fair scheduling manner at a *coarse* level if not a fine
grained one.
workers do behave in a scheduling manner that's fair to multiple submissions
at a *coarse* level, if not a fine-grained one.

Dask's scheduling policies are short-term-efficient and long-term-fair.
Dask's scheduling policies are short-term-efficient and long-term-fair
to multiple clients.


Where these decisions are made
Expand All @@ -187,9 +208,10 @@ scheduler, and workers at various points in the computation.
policy between computations. All tasks from a previous call to compute
have a higher priority than all tasks from a subsequent call to compute (or
submit, persist, map, or any operation that generates futures).
3. Whenever a task is ready to run the scheduler assigns it to a worker. The
scheduler does not wait based on priority.
4. However when the worker receives these tasks it considers their priorities
when determining which tasks to prioritize for communication or for
3. Whenever a task is ready to run (its dependencies, if any, are complete),
the scheduler assigns it to a worker. When multiple tasks are ready at once,
they are all submitted to workers, in priority order.
4. However, when the worker receives these tasks, it considers their priorities
when determining which tasks to prioritize for fetching data or for
computation. The worker maintains a heap of all ready-to-run tasks ordered
by this priority.
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,3 @@ toolz >= 0.8.2
tornado >= 6.0.3
zict >= 0.1.3
pyyaml
setuptools

0 comments on commit 5883c96

Please sign in to comment.