diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index e304a3959ec..c8a8d3acd7b 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -132,115 +132,104 @@ async def test_decide_worker_with_restrictions(client, s, a, b, c): @pytest.mark.parametrize("ndeps", [0, 1, 4]) -@pytest.mark.parametrize( - "nthreads", - [ - [("127.0.0.1", 1)] * 5, - [("127.0.0.1", 3), ("127.0.0.1", 2), ("127.0.0.1", 1)], - ], +@gen_cluster( + client=True, + nthreads=[("127.0.0.1", 3), ("127.0.0.1", 2), ("127.0.0.1", 1)], + config={"distributed.scheduler.work-stealing": False}, ) -def test_decide_worker_coschedule_order_neighbors(ndeps, nthreads): - @gen_cluster( - client=True, - nthreads=nthreads, - config={"distributed.scheduler.work-stealing": False}, - ) - async def test_decide_worker_coschedule_order_neighbors_(c, s, *workers): - r""" - Ensure that sibling root tasks are scheduled to the same node, reducing future - data transfer. - - We generate a wide layer of "root" tasks (random NumPy arrays). All of those - tasks share 0-5 trivial dependencies. The ``ndeps=0`` and ``ndeps=1`` cases are - most common in real-world use (``ndeps=1`` is basically ``da.from_array(..., - inline_array=False)`` or ``da.from_zarr``). The graph is structured like this - (though the number of tasks and workers is different): - - |-W1-| |-W2-| |-W3-| |-W4-| < ---- ideal task scheduling - - q r s t < --- `sum-aggregate-` - / \ / \ / \ / \ - i j k l m n o p < --- `sum-` - | | | | | | | | - a b c d e f g h < --- `random-` - \ \ \ | | / / / - TRIVIAL * 0..5 - - Neighboring `random-` tasks should be scheduled on the same worker. We test that - generally, only one worker holds each row of the array, that the `random-` tasks - are never transferred, and that there are few transfers overall. - """ - da = pytest.importorskip("dask.array") - np = pytest.importorskip("numpy") +async def test_decide_worker_coschedule_order_neighbors(c, s, *workers, ndeps): + r""" + Ensure that sibling root tasks are scheduled to the same node, reducing future + data transfer. + + We generate a wide layer of "root" tasks (random NumPy arrays). All of those + tasks share 0-5 trivial dependencies. The ``ndeps=0`` and ``ndeps=1`` cases are + most common in real-world use (``ndeps=1`` is basically ``da.from_array(..., + inline_array=False)`` or ``da.from_zarr``). The graph is structured like this + (though the number of tasks and workers is different): + + |-W1-| |-W2-| |-W3-| |-W4-| < ---- ideal task scheduling + + q r s t < --- `sum-aggregate-` + / \ / \ / \ / \ + i j k l m n o p < --- `sum-` + | | | | | | | | + a b c d e f g h < --- `random-` + \ \ \ | | / / / + TRIVIAL * 0..5 + + Neighboring `random-` tasks should be scheduled on the same worker. We test that + generally, only one worker holds each row of the array, that the `random-` tasks + are never transferred, and that there are few transfers overall. + """ + da = pytest.importorskip("dask.array") + np = pytest.importorskip("numpy") - if ndeps == 0: - x = da.random.random((100, 100), chunks=(10, 10)) - else: + if ndeps == 0: + x = da.random.random((100, 100), chunks=(10, 10)) + else: - def random(**kwargs): - assert len(kwargs) == ndeps - return np.random.random((10, 10)) + def random(**kwargs): + assert len(kwargs) == ndeps + return np.random.random((10, 10)) - trivial_deps = {f"k{i}": delayed(object()) for i in range(ndeps)} + trivial_deps = {f"k{i}": delayed(object()) for i in range(ndeps)} - # TODO is there a simpler (non-blockwise) way to make this sort of graph? - x = da.blockwise( - random, - "yx", - new_axes={"y": (10,) * 10, "x": (10,) * 10}, - dtype=float, - **trivial_deps, - ) + # TODO is there a simpler (non-blockwise) way to make this sort of graph? + x = da.blockwise( + random, + "yx", + new_axes={"y": (10,) * 10, "x": (10,) * 10}, + dtype=float, + **trivial_deps, + ) - xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20)) - await xsum - - # Check that each chunk-row of the array is (mostly) stored on the same worker - primary_worker_key_fractions = [] - secondary_worker_key_fractions = [] - for i, keys in enumerate(x.__dask_keys__()): - # Iterate along rows of the array. - keys = {stringify(k) for k in keys} - - # No more than 2 workers should have any keys - assert sum(any(k in w.data for k in keys) for w in workers) <= 2 - - # What fraction of the keys for this row does each worker hold? - key_fractions = [ - len(set(w.data).intersection(keys)) / len(keys) for w in workers - ] - key_fractions.sort() - # Primary worker: holds the highest percentage of keys - # Secondary worker: holds the second highest percentage of keys - primary_worker_key_fractions.append(key_fractions[-1]) - secondary_worker_key_fractions.append(key_fractions[-2]) - - # There may be one or two rows that were poorly split across workers, - # but the vast majority of rows should only be on one worker. - assert np.mean(primary_worker_key_fractions) >= 0.9 - assert np.median(primary_worker_key_fractions) == 1.0 - assert np.mean(secondary_worker_key_fractions) <= 0.1 - assert np.median(secondary_worker_key_fractions) == 0.0 - - # Check that there were few transfers - unexpected_transfers = [] - for worker in workers: - for log in worker.incoming_transfer_log: - keys = log["keys"] - # The root-ish tasks should never be transferred - assert not any(k.startswith("random") for k in keys), keys - # `object-` keys (the trivial deps of the root random tasks) should be - # transferred - if any(not k.startswith("object") for k in keys): - # But not many other things should be - unexpected_transfers.append(list(keys)) - - # A transfer at the very end to move aggregated results is fine (necessary with - # unbalanced workers in fact), but generally there should be very very few - # transfers. - assert len(unexpected_transfers) <= 3, unexpected_transfers - - test_decide_worker_coschedule_order_neighbors_() + xx, xsum = dask.persist(x, x.sum(axis=1, split_every=20)) + await xsum + + # Check that each chunk-row of the array is (mostly) stored on the same worker + primary_worker_key_fractions = [] + secondary_worker_key_fractions = [] + for i, keys in enumerate(x.__dask_keys__()): + # Iterate along rows of the array. + keys = {stringify(k) for k in keys} + + # No more than 2 workers should have any keys + assert sum(any(k in w.data for k in keys) for w in workers) <= 2 + + # What fraction of the keys for this row does each worker hold? + key_fractions = [ + len(set(w.data).intersection(keys)) / len(keys) for w in workers + ] + key_fractions.sort() + # Primary worker: holds the highest percentage of keys + # Secondary worker: holds the second highest percentage of keys + primary_worker_key_fractions.append(key_fractions[-1]) + secondary_worker_key_fractions.append(key_fractions[-2]) + + # There may be one or two rows that were poorly split across workers, + # but the vast majority of rows should only be on one worker. + assert np.mean(primary_worker_key_fractions) >= 0.9 + assert np.median(primary_worker_key_fractions) == 1.0 + assert np.mean(secondary_worker_key_fractions) <= 0.1 + assert np.median(secondary_worker_key_fractions) == 0.0 + + # Check that there were few transfers + unexpected_transfers = [] + for worker in workers: + for log in worker.incoming_transfer_log: + keys = log["keys"] + # The root-ish tasks should never be transferred + assert not any(k.startswith("random") for k in keys), keys + # `object-` keys (the trivial deps of the root random tasks) should be + # transferred + if any(not k.startswith("object") for k in keys): + # But not many other things should be + unexpected_transfers.append(list(keys)) + + # A transfer at the very end to move aggregated results is fine (necessary with + # unbalanced workers in fact), but generally there should be very few transfers. + assert len(unexpected_transfers) <= 6, unexpected_transfers @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 3)