diff --git a/distributed/dashboard/tests/test_components.py b/distributed/dashboard/tests/test_components.py index 0d526739d94..a3e444e17e6 100644 --- a/distributed/dashboard/tests/test_components.py +++ b/distributed/dashboard/tests/test_components.py @@ -1,9 +1,10 @@ +import asyncio + import pytest pytest.importorskip("bokeh") from bokeh.models import ColumnDataSource, Model -from tornado import gen from distributed.utils_test import slowinc, gen_cluster from distributed.dashboard.components.shared import ( @@ -45,4 +46,4 @@ async def test_profile_time_plot(c, s, a, b): await c.gather(c.map(slowinc, range(10), delay=0.05)) ap.trigger_update() sp.trigger_update() - await gen.sleep(0.05) + await asyncio.sleep(0.05) diff --git a/distributed/dashboard/tests/test_scheduler_bokeh.py b/distributed/dashboard/tests/test_scheduler_bokeh.py index a8caf724ed9..fe4988c5d23 100644 --- a/distributed/dashboard/tests/test_scheduler_bokeh.py +++ b/distributed/dashboard/tests/test_scheduler_bokeh.py @@ -9,7 +9,6 @@ pytest.importorskip("bokeh") from tlz import first -from tornado import gen from tornado.httpclient import AsyncHTTPClient, HTTPRequest import dask @@ -46,7 +45,7 @@ async def test_simple(c, s, a, b): port = s.http_server.port future = c.submit(sleep, 1) - await gen.sleep(0.1) + await asyncio.sleep(0.1) http_client = AsyncHTTPClient() for suffix in applications: @@ -78,16 +77,16 @@ def test_basic(c, s, a, b): async def test_counters(c, s, a, b): pytest.importorskip("crick") while "tick-duration" not in s.digests: - await gen.sleep(0.01) + await asyncio.sleep(0.01) ss = Counters(s) ss.update() - await gen.sleep(0.1) + await asyncio.sleep(0.1) ss.update() start = time() while not len(ss.digest_sources["tick-duration"][0].data["x"]): - await gen.sleep(1) + await asyncio.sleep(1) assert time() < start + 5 @@ -100,7 +99,7 @@ async def test_stealing_events(c, s, a, b): ) while not b.task_state: # will steal soon - await gen.sleep(0.01) + await asyncio.sleep(0.01) se.update() @@ -116,7 +115,7 @@ async def test_events(c, s, a, b): ) while not b.task_state: - await gen.sleep(0.01) + await asyncio.sleep(0.01) e.update() d = dict(e.source.data) @@ -177,7 +176,7 @@ async def test_task_stream_clear_interval(c, s, a, b): await wait(c.map(inc, range(10))) ts.update() - await gen.sleep(0.010) + await asyncio.sleep(0.010) await wait(c.map(dec, range(10))) ts.update() @@ -185,7 +184,7 @@ async def test_task_stream_clear_interval(c, s, a, b): assert ts.source.data["name"].count("inc") == 10 assert ts.source.data["name"].count("dec") == 10 - await gen.sleep(0.300) + await asyncio.sleep(0.300) await wait(c.map(inc, range(10, 20))) ts.update() @@ -217,7 +216,7 @@ async def test_TaskProgress(c, s, a, b): del futures, futures2 while s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) tp.update() assert not tp.source.data["all"] @@ -234,7 +233,7 @@ async def test_TaskProgress_empty(c, s, a, b): del futures while s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) tp.update() assert not any(len(v) for v in tp.source.data.values()) @@ -264,7 +263,7 @@ async def test_ProcessingHistogram(c, s, a, b): futures = c.map(slowinc, range(10), delay=0.050) while not s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) ph.update() assert ph.source.data["right"][-1] > 2 @@ -460,7 +459,7 @@ async def test_TaskGraph(c, s, a, b): key = future.key del future, future2 while key in s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert "memory" in gp.node_source.data["state"] @@ -482,14 +481,14 @@ async def test_TaskGraph_clear(c, s, a, b): del total, futures while s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) gp.update() gp.update() start = time() while any(gp.node_source.data.values()) or any(gp.edge_source.data.values()): - await gen.sleep(0.1) + await asyncio.sleep(0.1) gp.update() assert time() < start + 5 @@ -533,7 +532,7 @@ async def test_TaskGraph_complex(c, s, a, b): assert len(gp.layout.index) == len(gp.node_source.data["x"]) assert len(gp.layout.index) == len(s.tasks) del z - await gen.sleep(0.2) + await asyncio.sleep(0.2) gp.update() assert len(gp.layout.index) == sum( v == "True" for v in gp.node_source.data["visible"] @@ -570,9 +569,9 @@ async def test_TaskGraph_order(c, s, a, b): async def test_profile_server(c, s, a, b): ptp = ProfileServer(s) start = time() - await gen.sleep(0.100) + await asyncio.sleep(0.100) while len(ptp.ts_source.data["time"]) < 2: - await gen.sleep(0.100) + await asyncio.sleep(0.100) ptp.trigger_update() assert time() < start + 2 diff --git a/distributed/dashboard/tests/test_worker_bokeh.py b/distributed/dashboard/tests/test_worker_bokeh.py index 9beb492683b..47ac89c6b0a 100644 --- a/distributed/dashboard/tests/test_worker_bokeh.py +++ b/distributed/dashboard/tests/test_worker_bokeh.py @@ -1,12 +1,12 @@ -from operator import add, sub +import asyncio import re +from operator import add, sub from time import sleep import pytest pytest.importorskip("bokeh") from tlz import first -from tornado import gen from tornado.httpclient import AsyncHTTPClient from distributed.client import wait @@ -32,7 +32,7 @@ async def test_routes(c, s, a, b): port = a.http_server.port future = c.submit(sleep, 1) - await gen.sleep(0.1) + await asyncio.sleep(0.1) http_client = AsyncHTTPClient() for suffix in ["status", "counters", "system", "profile", "profile-server"]: @@ -54,7 +54,7 @@ async def test_simple(c, s, a, b): assert s.workers[b.address].services == {"dashboard": b.http_server.port} future = c.submit(sleep, 1) - await gen.sleep(0.1) + await asyncio.sleep(0.1) http_client = AsyncHTTPClient() for suffix in ["crossfilter", "system"]: @@ -92,7 +92,7 @@ def slowall(*args): x = c.submit(slowall, xs, ys, 1, workers=a.address) y = c.submit(slowall, xs, ys, 2, workers=b.address) - await gen.sleep(0.1) + await asyncio.sleep(0.1) aa.update() bb.update() @@ -106,16 +106,16 @@ def slowall(*args): async def test_counters(c, s, a, b): pytest.importorskip("crick") while "tick-duration" not in a.digests: - await gen.sleep(0.01) + await asyncio.sleep(0.01) aa = Counters(a) aa.update() - await gen.sleep(0.1) + await asyncio.sleep(0.1) aa.update() start = time() while not len(aa.digest_sources["tick-duration"][0].data["x"]): - await gen.sleep(1) + await asyncio.sleep(1) assert time() < start + 5 a.digests["foo"].add(1) diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index 9055e910c9d..eeb32dec089 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -1,9 +1,9 @@ +import asyncio import math from time import sleep import dask import pytest -from tornado import gen from distributed import Client, wait, Adaptive, LocalCluster, SpecCluster, Worker from distributed.utils_test import gen_test, slowinc, clean @@ -39,13 +39,13 @@ def scale_down(self, workers): future = c.map(slowinc, [1, 1, 1], key=["a-4", "b-4", "c-1"]) while len(s.rprocessing) < 3: - await gen.sleep(0.001) + await asyncio.sleep(0.001) ta = cluster.adapt( interval="100 ms", scale_factor=2, Adaptive=TestAdaptive ) - await gen.sleep(0.3) + await asyncio.sleep(0.3) def test_adaptive_local_cluster(loop): @@ -90,7 +90,7 @@ async def test_adaptive_local_cluster_multi_workers(cleanup): start = time() while not cluster.scheduler.workers: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 15, adapt.log await c.gather(futures) @@ -99,13 +99,13 @@ async def test_adaptive_local_cluster_multi_workers(cleanup): start = time() # while cluster.workers: while cluster.scheduler.workers: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 15, adapt.log # no workers for a while for i in range(10): assert not cluster.scheduler.workers - await gen.sleep(0.05) + await asyncio.sleep(0.05) futures = c.map(slowinc, range(100), delay=0.01) await c.gather(futures) @@ -135,7 +135,7 @@ def scale_up(self, n, **kwargs): ta = cluster.adapt( min_size=2, interval=0.1, scale_factor=2, Adaptive=TestAdaptive ) - await gen.sleep(0.3) + await asyncio.sleep(0.3) # Assert that adaptive cycle does not reduce cluster below minimum size # as determined via override. @@ -158,10 +158,10 @@ async def test_min_max(): start = time() while not cluster.scheduler.workers: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 1 - await gen.sleep(0.2) + await asyncio.sleep(0.2) assert len(cluster.scheduler.workers) == 1 assert len(adapt.log) == 1 and adapt.log[-1][1] == {"status": "up", "n": 1} @@ -169,11 +169,11 @@ async def test_min_max(): start = time() while len(cluster.scheduler.workers) < 2: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 1 assert len(cluster.scheduler.workers) == 2 - await gen.sleep(0.5) + await asyncio.sleep(0.5) assert len(cluster.scheduler.workers) == 2 assert len(cluster.workers) == 2 assert len(adapt.log) == 2 and all(d["status"] == "up" for _, d in adapt.log) @@ -182,7 +182,7 @@ async def test_min_max(): start = time() while len(cluster.scheduler.workers) != 1: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 2 assert adapt.log[-1][1]["status"] == "down" finally: @@ -210,7 +210,7 @@ async def test_avoid_churn(cleanup): for i in range(10): await client.submit(slowinc, i, delay=0.040) - await gen.sleep(0.040) + await asyncio.sleep(0.040) assert len(adapt.log) == 1 @@ -240,24 +240,24 @@ async def test_adapt_quickly(): # Scale up when there is plenty of available work futures = client.map(slowinc, range(1000), delay=0.100) while len(adapt.log) == 1: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert len(adapt.log) == 2 assert adapt.log[-1][1]["status"] == "up" d = [x for x in adapt.log[-1] if isinstance(x, dict)][0] assert 2 < d["n"] <= adapt.maximum while len(cluster.workers) < adapt.maximum: - await gen.sleep(0.01) + await asyncio.sleep(0.01) del futures while len(cluster.scheduler.tasks) > 1: - await gen.sleep(0.01) + await asyncio.sleep(0.01) await cluster while len(cluster.scheduler.workers) > 1 or len(cluster.worker_spec) > 1: - await gen.sleep(0.01) + await asyncio.sleep(0.01) # Don't scale up for large sequential computations x = await client.scatter(1) @@ -265,7 +265,7 @@ async def test_adapt_quickly(): for i in range(100): x = client.submit(slowinc, x) - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert len(cluster.workers) == 1 finally: await client.close() @@ -288,13 +288,13 @@ async def test_adapt_down(): futures = client.map(slowinc, range(1000), delay=0.1) while len(cluster.scheduler.workers) < 5: - await gen.sleep(0.1) + await asyncio.sleep(0.1) cluster.adapt(maximum=2) start = time() while len(cluster.scheduler.workers) != 2: - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert time() < start + 1 @@ -349,12 +349,12 @@ async def test_target_duration(): adapt = cluster.adapt(interval="20ms", minimum=2, target_duration="5s") async with Client(cluster, asynchronous=True) as client: while len(cluster.scheduler.workers) < 2: - await gen.sleep(0.01) + await asyncio.sleep(0.01) futures = client.map(slowinc, range(100), delay=0.3) while len(adapt.log) < 2: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert adapt.log[0][1] == {"status": "up", "n": 2} assert adapt.log[1][1] == {"status": "up", "n": 20} @@ -382,7 +382,7 @@ def key(ws): await adaptive.adapt() while len(cluster.scheduler.workers) == 4: - await gen.sleep(0.01) + await asyncio.sleep(0.01) names = {ws.name for ws in cluster.scheduler.workers.values()} assert names == {"a-1", "a-2"} or names == {"b-1", "b-2"} diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index 05a3dabbc65..94a6016dd2a 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -10,7 +10,6 @@ from distutils.version import LooseVersion from tornado.ioloop import IOLoop -from tornado import gen import tornado from tornado.httpclient import AsyncHTTPClient import pytest @@ -784,14 +783,14 @@ def scale_down(self, *args, **kwargs): start = time() while len(cluster.scheduler.workers) != 2: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 3 await cluster.scale(1) start = time() while len(cluster.scheduler.workers) != 1: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 3 await c.close() diff --git a/distributed/diagnostics/tests/test_eventstream.py b/distributed/diagnostics/tests/test_eventstream.py index 59c872d3f2f..4af97799893 100644 --- a/distributed/diagnostics/tests/test_eventstream.py +++ b/distributed/diagnostics/tests/test_eventstream.py @@ -1,7 +1,7 @@ +import asyncio import collections import pytest -from tornado import gen from distributed.client import wait from distributed.diagnostics.eventstream import EventStream, eventstream @@ -49,7 +49,7 @@ async def test_eventstream_remote(c, s, a, b): start = time() while len(s.plugins) == base_plugins: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 futures = c.map(div, [1] * 10, range(10)) @@ -65,5 +65,5 @@ async def test_eventstream_remote(c, s, a, b): await comm.close() start = time() while len(s.plugins) > base_plugins: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 diff --git a/distributed/diagnostics/tests/test_graph_layout.py b/distributed/diagnostics/tests/test_graph_layout.py index 553ed9789b8..b63311f8432 100644 --- a/distributed/diagnostics/tests/test_graph_layout.py +++ b/distributed/diagnostics/tests/test_graph_layout.py @@ -1,9 +1,9 @@ +import asyncio import operator from distributed.utils_test import gen_cluster, inc from distributed.diagnostics import GraphLayout from distributed import wait -from tornado import gen @gen_cluster(client=True) @@ -60,7 +60,7 @@ async def test_release_tasks(c, s, a, b): key = total.key del total while key in s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert len(gl.visible_updates) == 1 assert len(gl.visible_edge_updates) == 5 @@ -75,7 +75,7 @@ async def test_forget(c, s, a, b): await wait(futures) del futures while s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert not gl.x assert not gl.y diff --git a/distributed/diagnostics/tests/test_progress.py b/distributed/diagnostics/tests/test_progress.py index 9ca69bfe42e..871dcb0c5a5 100644 --- a/distributed/diagnostics/tests/test_progress.py +++ b/distributed/diagnostics/tests/test_progress.py @@ -1,7 +1,6 @@ import asyncio import pytest -from tornado import gen from distributed import Nanny from distributed.client import wait @@ -42,7 +41,7 @@ async def test_many_Progress(c, s, a, b): start = time() while not all(b.status == "finished" for b in bars): - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert time() < start + 5 @@ -127,7 +126,7 @@ async def test_AllProgress(c, s, a, b): gc.collect() while any(k in s.who_has for k in keys): - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert p.state["released"]["inc"] == keys assert p.all["inc"] == keys @@ -146,7 +145,7 @@ async def test_AllProgress(c, s, a, b): gc.collect() while tkey in s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) for coll in [p.all, p.nbytes] + list(p.state.values()): assert "inc" not in coll @@ -161,7 +160,7 @@ def f(x): gc.collect() - await gen.sleep(1) + await asyncio.sleep(1) await wait([future]) assert p.state["memory"] == {"f": {future.key}} @@ -189,7 +188,7 @@ async def test_AllProgress_lost_key(c, s, a, b, timeout=None): start = time() while len(p.state["memory"]["inc"]) > 0: - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert time() < start + 5 @@ -213,6 +212,6 @@ async def test_GroupProgress(c, s, a, b): del x, y, z while s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert not fp.groups diff --git a/distributed/node.py b/distributed/node.py index 11645e86317..740776bed68 100644 --- a/distributed/node.py +++ b/distributed/node.py @@ -3,9 +3,9 @@ import warnings import weakref +from tornado import gen from tornado.ioloop import IOLoop from tornado.httpserver import HTTPServer -from tornado import gen import tlz import dask diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index 4419e2935cf..89233eaca24 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -1,7 +1,6 @@ import asyncio import operator from time import sleep -from tornado import gen import pytest @@ -80,7 +79,7 @@ async def test(c, s, a, b): counter.add(10) while (await counter.n) != 10 + 2: n = await counter.n - await gen.sleep(0.01) + await asyncio.sleep(0.01) await c.close() @@ -143,7 +142,7 @@ async def test_linear_access(c, s, a, b): actor.append(i) while True: - await gen.sleep(0.1) + await asyncio.sleep(0.1) L = await actor.L if len(L) == 100: break @@ -187,7 +186,7 @@ async def test_gc(c, s, a, b): del actor while a.actors or b.actors: - await gen.sleep(0.01) + await asyncio.sleep(0.01) @gen_cluster(client=True) @@ -198,7 +197,7 @@ async def test_track_dependencies(c, s, a, b): y = c.submit(lambda x, y: x, x, actor) del actor - await gen.sleep(0.3) + await asyncio.sleep(0.3) assert a.actors or b.actors @@ -214,7 +213,7 @@ async def test_future(c, s, a, b): assert isinstance(counter, Actor) assert counter._address - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert counter.key in c.futures # don't lose future @@ -343,7 +342,7 @@ def add(n, counter): while not done.done(): assert len(s.processing) <= a.nthreads + b.nthreads - await gen.sleep(0.01) + await asyncio.sleep(0.01) await done @@ -474,7 +473,7 @@ def check(counter, blanks): start = time() while a.data or b.data: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 @@ -545,7 +544,7 @@ async def wait(self): futures = [waiter.wait() for _ in range(5)] # way more than we have actor threads - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert not any(future.done() for future in futures) await waiter.set() diff --git a/distributed/tests/test_as_completed.py b/distributed/tests/test_as_completed.py index 68a4f2acc48..ae257f9bb8e 100644 --- a/distributed/tests/test_as_completed.py +++ b/distributed/tests/test_as_completed.py @@ -6,7 +6,6 @@ from time import sleep import pytest -from tornado import gen from distributed.client import _as_completed, as_completed, _first_completed, wait from distributed.metrics import time @@ -130,7 +129,7 @@ def test_as_completed_cancel_last(client): y = client.submit(inc, 0.3) async def _(): - await gen.sleep(0.1) + await asyncio.sleep(0.1) await w.cancel(asynchronous=True) await y.cancel(asynchronous=True) diff --git a/distributed/tests/test_asyncprocess.py b/distributed/tests/test_asyncprocess.py index 13682032ce6..3923d81cf2c 100644 --- a/distributed/tests/test_asyncprocess.py +++ b/distributed/tests/test_asyncprocess.py @@ -1,11 +1,12 @@ -from datetime import timedelta +import asyncio import gc import os import signal import sys import threading -from time import sleep import weakref +from datetime import timedelta +from time import sleep import pytest from tornado import gen @@ -133,7 +134,7 @@ async def test_simple(): pytest.fail("AsyncProcess should have been destroyed") t1 = time() while wr2() is not None: - await gen.sleep(0.01) + await asyncio.sleep(0.01) gc.collect() dt = time() - t1 assert dt < 2.0 @@ -236,7 +237,7 @@ def on_stop(_proc): proc.daemon = True await proc.start() - await gen.sleep(0.05) + await asyncio.sleep(0.05) assert proc.is_alive() assert not evt.is_set() @@ -252,7 +253,7 @@ def on_stop(_proc): proc.daemon = True await proc.start() - await gen.sleep(0.05) + await asyncio.sleep(0.05) assert proc.is_alive() assert not evt.is_set() @@ -304,7 +305,7 @@ async def test_num_fds(): start = time() while p.num_fds() > before: - await gen.sleep(0.1) + await asyncio.sleep(0.1) print("fds:", before, p.num_fds()) assert time() < start + 10 @@ -313,7 +314,7 @@ async def test_num_fds(): async def test_terminate_after_stop(): proc = AsyncProcess(target=sleep, args=(0,)) await proc.start() - await gen.sleep(0.1) + await asyncio.sleep(0.1) await proc.terminate() diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index ec742cbf261..bad0ff2399c 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -20,7 +20,6 @@ import pytest from tlz import identity, isdistinct, concat, pluck, valmap, first, merge -from tornado import gen import dask from dask import delayed @@ -387,18 +386,18 @@ async def test_Future_release(c, s, a, b): x = c.submit(div, 1, 1) await x x.release() - await gen.sleep(0) + await asyncio.sleep(0) assert not c.futures x = c.submit(slowinc, 1, delay=0.5) x.release() - await gen.sleep(0) + await asyncio.sleep(0) assert not c.futures x = c.submit(div, 1, 0) await x.exception() x.release() - await gen.sleep(0) + await asyncio.sleep(0) assert not c.futures @@ -718,19 +717,19 @@ async def test_garbage_collection(c, s, a, b): assert c.refcount[x.key] == 2 x.__del__() - await gen.sleep(0) + await asyncio.sleep(0) assert c.refcount[x.key] == 1 z = c.submit(inc, y) y.__del__() - await gen.sleep(0) + await asyncio.sleep(0) result = await z assert result == 3 ykey = y.key y.__del__() - await gen.sleep(0) + await asyncio.sleep(0) assert ykey not in c.futures @@ -744,7 +743,7 @@ async def test_garbage_collection_with_scatter(c, s, a, b): key = future.key assert c.refcount[key] == 1 future.__del__() - await gen.sleep(0) + await asyncio.sleep(0) assert c.refcount[key] == 0 start = time() @@ -753,7 +752,7 @@ async def test_garbage_collection_with_scatter(c, s, a, b): break else: assert time() < start + 3 - await gen.sleep(0.1) + await asyncio.sleep(0.1) @gen_cluster(timeout=1000, client=True) @@ -765,12 +764,12 @@ async def test_recompute_released_key(c, s, a, b): import gc gc.collect() - await gen.sleep(0) + await asyncio.sleep(0) assert c.refcount[xkey] == 0 # 1 second batching needs a second action to trigger while xkey in s.tasks and s.tasks[xkey].who_has or xkey in a.data or xkey in b.data: - await gen.sleep(0.1) + await asyncio.sleep(0.1) x = c.submit(inc, 100) assert x.key in c.futures @@ -805,7 +804,7 @@ async def test_missing_data_heals(c, s, a, b): if y.key in b.data: del b.data[y.key] b.release_key(y.key) - await gen.sleep(0) + await asyncio.sleep(0) w = c.submit(add, y, z) @@ -825,7 +824,7 @@ async def test_gather_robust_to_missing_data(c, s, a, b): for w in [a, b]: if f.key in w.data: del w.data[f.key] - await gen.sleep(0) + await asyncio.sleep(0) w.release_key(f.key) xx, yy, zz = await c.gather([x, y, z]) @@ -848,7 +847,7 @@ async def test_gather_robust_to_nested_missing_data(c, s, a, b): for datum in [y, z]: if datum.key in worker.data: del worker.data[datum.key] - await gen.sleep(0) + await asyncio.sleep(0) worker.release_key(datum.key) result = await c.gather([z]) @@ -968,7 +967,7 @@ async def test_errors_dont_block(c, s, w): start = time() while not (L[0].status == L[2].status == "finished"): assert time() < start + 5 - await gen.sleep(0.01) + await asyncio.sleep(0.01) result = await c.gather([L[0], L[2]]) assert result == [2, 3] @@ -1058,7 +1057,7 @@ async def test_aliases_2(c, s, a, b): for dsk, keys in dsk_keys: result = await c.gather(c.get(dsk, keys, sync=False)) assert list(result) == list(dask.get(dsk, keys)) - await gen.sleep(0) + await asyncio.sleep(0) @gen_cluster(client=True) @@ -1182,7 +1181,7 @@ async def test_get_releases_data(c, s, a, b): start = time() while c.refcount["x"]: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 2 @@ -1980,7 +1979,7 @@ async def test_forget_complex(e, s, A, B): start = time() while b.key in A.data or b.key in B.data: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 10 s.client_releases_keys(keys=[ac.key], client=e.id) @@ -2000,7 +1999,7 @@ async def test_forget_in_flight(e, s, A, B): s.validate_state() for i in range(5): - await gen.sleep(0.01) + await asyncio.sleep(0.01) s.validate_state() s.client_releases_keys(keys=[y.key], client=e.id) @@ -2091,7 +2090,7 @@ async def test_multi_client(s, a, b): start = time() while c.id in s.wants_what: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 assert c.id not in s.wants_what @@ -2102,7 +2101,7 @@ async def test_multi_client(s, a, b): start = time() while s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 2, s.tasks @@ -2122,14 +2121,14 @@ async def test_cleanup_after_broken_client_connection(s, a, b): start = time() while not s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 proc.terminate() start = time() while s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 @@ -2150,7 +2149,7 @@ async def test_multi_garbage_collection(s, a, b): x.__del__() start = time() while x.key in a.data or x.key in b.data: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 assert s.wants_what == {c.id: {y.key}, f.id: {y.key}, "fire-and-forget": set()} @@ -2159,10 +2158,10 @@ async def test_multi_garbage_collection(s, a, b): y.__del__() start = time() while x.key in s.wants_what[f.id]: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert y.key in a.data or y.key in b.data assert s.wants_what == {c.id: {y.key}, f.id: set(), "fire-and-forget": set()} assert s.who_wants == {y.key: {c.id}} @@ -2170,7 +2169,7 @@ async def test_multi_garbage_collection(s, a, b): y2.__del__() start = time() while y.key in a.data or y.key in b.data: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 assert not any(v for v in s.wants_what.values()) @@ -2230,7 +2229,7 @@ async def test__cancel(c, s, a, b): y = c.submit(slowinc, x) while y.key not in s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) await c.cancel([x]) @@ -2240,7 +2239,7 @@ async def test__cancel(c, s, a, b): start = time() while not y.cancelled(): - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 assert not s.tasks @@ -2273,7 +2272,7 @@ async def test_cancel_multi_client(s, a, b): start = time() while y.key not in s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 out = await y @@ -2372,7 +2371,7 @@ async def test_async_persist(c, s, a, b): assert w.__dask_keys__() == ww.__dask_keys__() while y.key not in s.tasks and w.key not in s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert s.who_wants[y.key] == {c.id} assert s.who_wants[w.key] == {c.id} @@ -2505,17 +2504,17 @@ async def test_dont_delete_recomputed_results(c, s, w): x = c.submit(inc, 1) # compute first time await wait([x]) x.__del__() # trigger garbage collection - await gen.sleep(0) + await asyncio.sleep(0) xx = c.submit(inc, 1) # compute second time start = time() while xx.key not in w.data: # data shows up - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 1 while time() < start + (s.delete_interval + 100) / 1000: # and stays assert xx.key in w.data - await gen.sleep(0.01) + await asyncio.sleep(0.01) @gen_cluster(nthreads=[], client=True) @@ -2525,7 +2524,7 @@ async def test_fatally_serialized_input(c, s): future = c.submit(inc, o) while not s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) @pytest.mark.skip(reason="Use fast random selection now") @@ -2728,7 +2727,7 @@ async def test_persist_get(c, s, a, b): xxyy2 = c.persist(xxyy) xxyy3 = delayed(add)(xxyy2, 10) - await gen.sleep(0.5) + await asyncio.sleep(0.5) result = await c.gather(c.get(xxyy3.dask, xxyy3.__dask_keys__(), sync=False)) assert result[0] == ((1 + 1) + (2 + 2)) + 10 @@ -2864,7 +2863,7 @@ def test_rebalance_sync(c, s, a, b): @gen_cluster(client=True) async def test_rebalance_unprepared(c, s, a, b): futures = c.map(slowinc, range(10), delay=0.05, workers=a.address) - await gen.sleep(0.1) + await asyncio.sleep(0.1) await c.rebalance(futures) s.validate_state() @@ -2887,7 +2886,7 @@ async def test_receive_lost_key(c, s, a, b): start = time() while x.status == "finished": assert time() < start + 5 - await gen.sleep(0.01) + await asyncio.sleep(0.01) @pytest.mark.skipif( @@ -2902,7 +2901,7 @@ async def test_unrunnable_task_runs(c, s, a, b): start = time() while x.status == "finished": assert time() < start + 5 - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert s.tasks[x.key] in s.unrunnable assert s.get_task_status(keys=[x.key]) == {x.key: "no-worker"} @@ -2912,7 +2911,7 @@ async def test_unrunnable_task_runs(c, s, a, b): start = time() while x.status != "finished": assert time() < start + 2 - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert s.tasks[x.key] not in s.unrunnable result = await x @@ -3172,7 +3171,7 @@ async def test_scheduler_saturates_cores(c, s, a, b): for w in s.workers.values() for p in w.processing.values() ) - await gen.sleep(0.01) + await asyncio.sleep(0.01) @gen_cluster(client=True, nthreads=[("127.0.0.1", 20)] * 2) @@ -3186,7 +3185,7 @@ async def test_scheduler_saturates_cores_random(c, s, a, b): for w in s.workers.values() for p in w.processing.values() ) - await gen.sleep(0.01) + await asyncio.sleep(0.01) @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 4) @@ -3194,14 +3193,14 @@ async def test_cancel_clears_processing(c, s, *workers): da = pytest.importorskip("dask.array") x = c.submit(slowinc, 1, delay=0.2) while not s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) await c.cancel(x) start = time() while any(v for w in s.workers.values() for v in w.processing): assert time() < start + 0.2 - await gen.sleep(0.01) + await asyncio.sleep(0.01) s.validate_state() @@ -3252,7 +3251,7 @@ async def test_get_processing(c, s, a, b): slowinc, range(10), delay=0.1, workers=[a.address], allow_other_workers=True ) - await gen.sleep(0.2) + await asyncio.sleep(0.2) x = await c.processing() assert set(x) == {a.address, b.address} @@ -3403,7 +3402,7 @@ async def test_Client_clears_references_after_restart(c, s, a, b): import gc gc.collect() - await gen.sleep(0) + await asyncio.sleep(0) assert key not in c.refcount @@ -3573,7 +3572,7 @@ async def test_reconnect_timeout(c, s): start = time() while c.status != "closed": await c._update_scheduler_info() - await gen.sleep(0.05) + await asyncio.sleep(0.05) assert time() < start + 5, "Timeout waiting for reconnect to fail" text = logger.getvalue() assert "Failed to reconnect" in text @@ -3597,7 +3596,7 @@ def test_open_close_many_workers(loop, worker, count, repeat): async def start_worker(sleep, duration, repeat=1): for i in range(repeat): - await gen.sleep(sleep) + await asyncio.sleep(sleep) if not status: return w = worker(s["address"], loop=loop) @@ -3606,10 +3605,10 @@ async def start_worker(sleep, duration, repeat=1): await w addr = w.worker_address running[w] = addr - await gen.sleep(duration) + await asyncio.sleep(duration) await w.close() del w - await gen.sleep(0) + await asyncio.sleep(0) done.release() for i in range(count): @@ -3659,7 +3658,7 @@ async def test_idempotence(s, a, b): y = f.submit(inc, 1) assert x.key == y.key await y - await gen.sleep(0.1) + await asyncio.sleep(0.1) log2 = list(s.transition_log) assert log == log2 @@ -3672,7 +3671,7 @@ async def test_idempotence(s, a, b): b = f.submit(div, 1, 0) assert a.key == b.key await wait(b) - await gen.sleep(0.1) + await asyncio.sleep(0.1) log2 = list(s.transition_log) assert log == log2 @@ -3748,7 +3747,7 @@ async def test_lose_scattered_data(c, s, a, b): [x] = await c.scatter([1], workers=a.address) await a.close() - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert x.status == "cancelled" assert x.key not in s.tasks @@ -3760,7 +3759,7 @@ async def test_partially_lose_scattered_data(e, s, a, b, c): await e.replicate(x, n=2) await a.close() - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert x.status == "finished" assert s.get_task_status(keys=[x.key]) == {x.key: "memory"} @@ -3772,7 +3771,7 @@ async def test_scatter_compute_lose(c, s, a, b): y = c.submit(inc, 1, workers=b.address) z = c.submit(slowadd, x, y, delay=0.2) - await gen.sleep(0.1) + await asyncio.sleep(0.1) await a.close() @@ -3804,7 +3803,7 @@ async def test_scatter_compute_store_lose(c, s, a, b): start = time() while x.status == "finished": - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 2 # assert xx.status == 'finished' @@ -3819,7 +3818,7 @@ async def test_scatter_compute_store_lose(c, s, a, b): start = time() while s.get_task_status(keys=[zkey]) != {zkey: "released"}: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 2 xxkey = xx.key @@ -3827,7 +3826,7 @@ async def test_scatter_compute_store_lose(c, s, a, b): start = time() while x.key in s.tasks and zkey not in s.tasks and xxkey not in s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 2 @@ -3844,12 +3843,12 @@ async def test_scatter_compute_store_lose_processing(c, s, a, b): y = c.submit(slowinc, x, delay=0.2) z = c.submit(inc, y) - await gen.sleep(0.1) + await asyncio.sleep(0.1) await a.close() start = time() while x.status == "finished": - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 2 assert y.status == "cancelled" @@ -4007,7 +4006,7 @@ async def test_retire_many_workers(c, s, *workers): assert results == list(range(100)) while len(s.workers) != 3: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert len(s.has_what) == len(s.nthreads) == 3 @@ -4085,7 +4084,7 @@ def g(future): t = time() while len(S) < 4 and time() - t < 2.0: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert S == {(f.key, f.status) for f in (u, v, w, x)} @@ -4211,14 +4210,14 @@ async def test_interleave_computations(c, s, a, b): done = ("memory", "released") - await gen.sleep(0.1) + await asyncio.sleep(0.1) x_keys = [x.key for x in xs] y_keys = [y.key for y in ys] z_keys = [z.key for z in zs] while not s.tasks or any(w.processing for w in s.workers.values()): - await gen.sleep(0.05) + await asyncio.sleep(0.05) x_done = sum(state in done for state in s.get_task_status(keys=x_keys).values()) y_done = sum(state in done for state in s.get_task_status(keys=y_keys).values()) z_done = sum(state in done for state in s.get_task_status(keys=z_keys).values()) @@ -4244,7 +4243,7 @@ async def test_interleave_computations_map(c, s, a, b): z_keys = [z.key for z in zs] while not s.tasks or any(w.processing for w in s.workers.values()): - await gen.sleep(0.05) + await asyncio.sleep(0.05) x_done = sum(state in done for state in s.get_task_status(keys=x_keys).values()) y_done = sum(state in done for state in s.get_task_status(keys=y_keys).values()) z_done = sum(state in done for state in s.get_task_status(keys=z_keys).values()) @@ -4266,7 +4265,7 @@ async def test_client_timeout(): c = Client("127.0.0.1:57484", asynchronous=True) s = Scheduler(loop=c.loop, port=57484) - await gen.sleep(4) + await asyncio.sleep(4) try: await s except EnvironmentError: # port in use @@ -4312,12 +4311,12 @@ async def test_dont_clear_waiting_data(c, s, a, b): x = await c.scatter(1) y = c.submit(slowinc, x, delay=0.5) while y.key not in s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) key = x.key del x for i in range(5): assert s.waiting_data[key] - await gen.sleep(0) + await asyncio.sleep(0) @gen_cluster(client=True) @@ -4461,7 +4460,7 @@ async def test_retire_workers(c, s, a, b): start = time() while a.status != "closed": - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 @@ -4541,7 +4540,7 @@ def f(x): start = time() while not hasattr(distributed, "foo"): - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 2 assert distributed.foo == 123 finally: @@ -4549,7 +4548,7 @@ def f(x): start = time() while len(s.tasks) > 1: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 2 assert set(s.who_wants) == {future.key} @@ -4559,12 +4558,12 @@ def f(x): @gen_cluster(client=True) async def test_fire_and_forget_err(c, s, a, b): fire_and_forget(c.submit(div, 1, 0)) - await gen.sleep(0.1) + await asyncio.sleep(0.1) # erred task should clear out quickly start = time() while s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 1 @@ -4609,7 +4608,7 @@ async def test_close(s, a, b): start = time() while c.id in s.wants_what or s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 @@ -4755,7 +4754,7 @@ def f(x): futures = c.map(f, range(100)) start = time() while not all(f.status == "finished" for f in futures): - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert threading.active_count() < count + 50 assert len(a.log) < 2 * len(b.log) @@ -4772,7 +4771,7 @@ def f(): client.submit(slowinc, 1, delay=0.2, key="slowinc") future = c.submit(f, key="f") - await gen.sleep(0.1) + await asyncio.sleep(0.1) if len(s.tasks) == 2: assert ( s.priorities["f"] > s.priorities["slowinc"] @@ -4931,7 +4930,7 @@ def test_warn_executor(loop, s, a, b): async def test_call_stack_future(c, s, a, b): x = c.submit(slowdec, 1, delay=0.5) future = c.submit(slowinc, 1, delay=0.5) - await gen.sleep(0.1) + await asyncio.sleep(0.1) results = await asyncio.gather( c.call_stack(future), c.call_stack(keys=[future.key]) ) @@ -4949,7 +4948,7 @@ async def test_call_stack_future(c, s, a, b): async def test_call_stack_all(c, s, a, b): future = c.submit(slowinc, 1, delay=0.8) while not a.executing and not b.executing: - await gen.sleep(0.01) + await asyncio.sleep(0.01) result = await c.call_stack() w = a if a.executing else b assert list(result) == [w.address] @@ -4962,7 +4961,7 @@ async def test_call_stack_collections(c, s, a, b): da = pytest.importorskip("dask.array") x = da.random.random(100, chunks=(10,)).map_blocks(slowinc, delay=0.5).persist() while not a.executing and not b.executing: - await gen.sleep(0.001) + await asyncio.sleep(0.001) result = await c.call_stack(x) assert result @@ -4972,7 +4971,7 @@ async def test_call_stack_collections_all(c, s, a, b): da = pytest.importorskip("dask.array") x = da.random.random(100, chunks=(10,)).map_blocks(slowinc, delay=0.5).persist() while not a.executing and not b.executing: - await gen.sleep(0.001) + await asyncio.sleep(0.001) result = await c.call_stack() assert result @@ -5048,7 +5047,7 @@ async def test_future_auto_inform(c, s, a, b): start = time() while future.status != "finished": - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 1 await client.close() @@ -5095,7 +5094,7 @@ async def test_task_metadata(c, s, a, b): del future while key in s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) with pytest.raises(KeyError): await c.get_metadata(key) @@ -5196,7 +5195,7 @@ async def test_scatter_direct(s, a, b): start = time() while s.clients[c.id].last_seen == last: - await gen.sleep(0.10) + await asyncio.sleep(0.10) assert time() < start + 5 await c.close() @@ -5405,7 +5404,7 @@ def bad_fn(x): await wait(y) assert y.status == "error" - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert y.status == "error" # not cancelled @@ -5471,7 +5470,7 @@ async def test_map_large_kwargs_in_graph(c, s, a, b): x = np.random.random(100000) futures = c.map(lambda a, b: a + b, range(100), b=x) while not s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert len(s.tasks) == 101 assert any(k.startswith("ndarray") for k in s.tasks) @@ -5613,7 +5612,7 @@ async def test_instances(c, s, a, b): @gen_cluster(client=True) async def test_wait_for_workers(c, s, a, b): future = asyncio.ensure_future(c.wait_for_workers(n_workers=3)) - await gen.sleep(0.22) # 2 chances + await asyncio.sleep(0.22) # 2 chances assert not future.done() w = await Worker(s.address) @@ -5780,7 +5779,7 @@ async def test_as_completed_async_for_cancel(c, s, a, b): ac = as_completed([x, y]) async def _(): - await gen.sleep(0.1) + await asyncio.sleep(0.1) await y.cancel(asynchronous=True) c.loop.add_callback(_) @@ -5818,7 +5817,7 @@ async def f(): def test_client_sync_with_async_def(loop): async def ff(): - await gen.sleep(0.01) + await asyncio.sleep(0.01) return 1 with cluster() as (s, [a, b]): @@ -5861,13 +5860,13 @@ async def test_dont_hold_on_to_large_messages(c, s, a, b): ) pytest.fail("array should have been destroyed") - await gen.sleep(0.200) + await asyncio.sleep(0.200) @gen_cluster(client=True) async def test_run_scheduler_async_def(c, s, a, b): async def f(dask_scheduler): - await gen.sleep(0.01) + await asyncio.sleep(0.01) dask_scheduler.foo = "bar" await c.run_on_scheduler(f) @@ -5875,7 +5874,7 @@ async def f(dask_scheduler): assert s.foo == "bar" async def f(dask_worker): - await gen.sleep(0.01) + await asyncio.sleep(0.01) dask_worker.foo = "bar" await c.run(f) @@ -5886,23 +5885,23 @@ async def f(dask_worker): @gen_cluster(client=True) async def test_run_scheduler_async_def_wait(c, s, a, b): async def f(dask_scheduler): - await gen.sleep(0.01) + await asyncio.sleep(0.01) dask_scheduler.foo = "bar" await c.run_on_scheduler(f, wait=False) while not hasattr(s, "foo"): - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert s.foo == "bar" async def f(dask_worker): - await gen.sleep(0.01) + await asyncio.sleep(0.01) dask_worker.foo = "bar" await c.run(f, wait=False) while not hasattr(a, "foo") or not hasattr(b, "foo"): - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert a.foo == "bar" assert b.foo == "bar" diff --git a/distributed/tests/test_failed_workers.py b/distributed/tests/test_failed_workers.py index df93c2c497e..e1556494fe2 100644 --- a/distributed/tests/test_failed_workers.py +++ b/distributed/tests/test_failed_workers.py @@ -1,10 +1,10 @@ +import asyncio import os import random from time import sleep import pytest from tlz import partition_all, first -from tornado import gen from dask import delayed from distributed import Client, Nanny, wait @@ -38,7 +38,7 @@ def test_submit_after_failed_worker_sync(loop): async def test_submit_after_failed_worker_async(c, s, a, b): n = await Nanny(s.address, nthreads=2, loop=s.loop) while len(s.workers) < 3: - await gen.sleep(0.1) + await asyncio.sleep(0.1) L = c.map(inc, range(10)) await wait(L) @@ -107,14 +107,14 @@ async def test_failed_worker_without_warning(c, s, a, b): await c._run(os._exit, 1, workers=[a.worker_address]) start = time() while a.pid == original_pid: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() - start < 10 - await gen.sleep(0.5) + await asyncio.sleep(0.5) start = time() while len(s.nthreads) < 2: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() - start < 10 await wait(L) @@ -278,7 +278,7 @@ async def test_multiple_clients_restart(s, a, b): assert x.cancelled() start = time() while not y.cancelled(): - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 await c1.close() @@ -307,7 +307,7 @@ async def test_forgotten_futures_dont_clean_up_new_futures(c, s, a, b): import gc gc.collect() - await gen.sleep(0.1) + await asyncio.sleep(0.1) await y @@ -318,7 +318,7 @@ async def test_broken_worker_during_computation(c, s, a, b): start = time() while len(s.nthreads) < 3: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 N = 256 @@ -333,13 +333,13 @@ async def test_broken_worker_during_computation(c, s, a, b): key=["add-%d-%d" % (i, j) for j in range(len(L) // 2)] ) - await gen.sleep(random.random() / 20) + await asyncio.sleep(random.random() / 20) with ignoring(CommClosedError): # comm will be closed abrupty await c._run(os._exit, 1, workers=[n.worker_address]) - await gen.sleep(random.random() / 20) + await asyncio.sleep(random.random() / 20) while len(s.workers) < 3: - await gen.sleep(0.01) + await asyncio.sleep(0.01) with ignoring( CommClosedError, EnvironmentError @@ -361,7 +361,7 @@ async def test_restart_during_computation(c, s, a, b): total = delayed(sum)(zs) result = c.compute(total) - await gen.sleep(0.5) + await asyncio.sleep(0.5) assert s.rprocessing await c.restart() assert not s.rprocessing @@ -376,7 +376,7 @@ async def test_worker_who_has_clears_after_failed_connection(c, s, a, b): start = time() while len(s.nthreads) < 3: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 futures = c.map(slowinc, range(20), delay=0.01, key=["f%d" % i for i in range(20)]) @@ -391,7 +391,7 @@ async def test_worker_who_has_clears_after_failed_connection(c, s, a, b): await c._run(os._exit, 1, workers=[n_worker_address]) while len(s.workers) > 2: - await gen.sleep(0.01) + await asyncio.sleep(0.01) total = c.submit(sum, futures, workers=a.address) await total @@ -407,7 +407,7 @@ async def test_worker_who_has_clears_after_failed_connection(c, s, a, b): async def test_restart_timeout_on_long_running_task(c, s, a): with captured_logger("distributed.scheduler") as sio: future = c.submit(sleep, 3600) - await gen.sleep(0.1) + await asyncio.sleep(0.1) await c.restart(timeout=20) text = sio.getvalue() @@ -418,12 +418,12 @@ async def test_restart_timeout_on_long_running_task(c, s, a): async def test_worker_time_to_live(c, s, a, b): assert set(s.workers) == {a.address, b.address} a.periodic_callbacks["heartbeat"].stop() - await gen.sleep(0.010) + await asyncio.sleep(0.010) assert set(s.workers) == {a.address, b.address} start = time() while set(s.workers) == {a.address, b.address}: - await gen.sleep(0.050) + await asyncio.sleep(0.050) assert time() < start + 2 set(s.workers) == {b.address} diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index 41dd8ea91cd..078a6dfa5cf 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -10,7 +10,6 @@ import pytest from tlz import valmap, first -from tornado import gen from tornado.ioloop import IOLoop import dask @@ -93,20 +92,20 @@ async def test_nanny_process_failure(c, s): start = time() while n.pid == pid: # wait while process dies and comes back - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() - start < 5 start = time() - await gen.sleep(1) + await asyncio.sleep(1) while not n.is_alive(): # wait while process comes back - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() - start < 5 # assert n.worker_address != original_address # most likely start = time() while n.worker_address not in s.nthreads or n.worker_dir is None: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() - start < 5 second_dir = n.worker_dir @@ -155,7 +154,7 @@ async def test_close_on_disconnect(s, w): start = time() while w.status != "closed": - await gen.sleep(0.05) + await asyncio.sleep(0.05) assert time() < start + 9 @@ -221,13 +220,13 @@ async def test_num_fds(s): for i in range(3): w = await Nanny(s.address) - await gen.sleep(0.1) + await asyncio.sleep(0.1) await w.close() start = time() while proc.num_fds() > before: print("fds:", before, proc.num_fds()) - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert time() < start + 10 @@ -270,7 +269,7 @@ async def test_nanny_timeout(c, s, a): start = time() while x.status != "cancelled": - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert time() < start + 7 @@ -296,7 +295,7 @@ def leak(): future = c.submit(leak) start = time() while a.process.pid == proc: - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert time() < start + 10 out = logger.getvalue() assert "restart" in out.lower() @@ -348,7 +347,7 @@ async def test_avoid_memory_monitor_if_zero_limit(c, s): future = c.submit(inc, 1) assert await future == 2 - await gen.sleep(0.02) + await asyncio.sleep(0.02) await c.submit(inc, 2) # worker doesn't pause @@ -363,7 +362,7 @@ async def test_scheduler_address_config(c, s): start = time() while not s.workers: - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert time() < start + 10 await nanny.close() @@ -375,7 +374,7 @@ async def test_wait_for_scheduler(): with captured_logger("distributed") as log: w = Nanny("127.0.0.1:44737") IOLoop.current().add_callback(w.start) - await gen.sleep(6) + await asyncio.sleep(6) await w.close() log = log.getvalue() @@ -489,7 +488,7 @@ async def test_nanny_closes_cleanly(cleanup): IOLoop.current().add_callback(w.terminate) start = time() while n.status != "closed": - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 assert n.status == "closed" diff --git a/distributed/tests/test_priorities.py b/distributed/tests/test_priorities.py index cc30279155c..cd4344da840 100644 --- a/distributed/tests/test_priorities.py +++ b/distributed/tests/test_priorities.py @@ -1,5 +1,6 @@ +import asyncio + import pytest -from tornado import gen from dask.core import flatten import dask @@ -105,7 +106,7 @@ async def test_repeated_persists_same_priority(c, s, w): while ( sum(t.state == "memory" for t in s.tasks.values()) < 5 ): # TODO: reduce this number - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert any(s.tasks[y.key].state == "memory" for y in ys) assert any(s.tasks[z.key].state == "memory" for z in zs) @@ -118,6 +119,6 @@ async def test_last_in_first_out(c, s, w): zs = [c.submit(slowinc, y, delay=0.05) for y in ys] while len(s.tasks) < 15 or not any(s.tasks[z.key].state == "memory" for z in zs): - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert not all(s.tasks[x.key].state == "memory" for x in xs) diff --git a/distributed/tests/test_publish.py b/distributed/tests/test_publish.py index f5197098591..ab32d52a112 100644 --- a/distributed/tests/test_publish.py +++ b/distributed/tests/test_publish.py @@ -8,7 +8,6 @@ from distributed.utils_test import gen_cluster, inc from distributed.utils_test import client, cluster_fixture, loop # noqa F401 from distributed.protocol import Serialized -from tornado import gen @gen_cluster(client=False) @@ -91,7 +90,7 @@ async def test_unpublish(c, s, a, b): start = time() while key in s.who_wants: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 with pytest.raises(KeyError) as exc_info: diff --git a/distributed/tests/test_pubsub.py b/distributed/tests/test_pubsub.py index 049f41ac35a..212d29d4802 100644 --- a/distributed/tests/test_pubsub.py +++ b/distributed/tests/test_pubsub.py @@ -2,7 +2,6 @@ from time import sleep import pytest -from tornado import gen import tlz as toolz from distributed import Pub, Sub, wait, get_worker, TimeoutError @@ -62,7 +61,7 @@ async def test_client(c, s): start = time() while not set(sps.client_subscribers["a"]) == {c.id}: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 3 pub.put(123) @@ -101,7 +100,7 @@ def f(x): or bps.publishers["a"] or len(sps.client_subscribers["a"]) != 1 ): - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 3 del sub @@ -112,7 +111,7 @@ def f(x): or any(aps.publish_to_scheduler.values()) or any(bps.publish_to_scheduler.values()) ): - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 3 @@ -146,7 +145,7 @@ async def publish(): i = 0 while True: - await gen.sleep(0.01) + await asyncio.sleep(0.01) pub._put(i) i += 1 diff --git a/distributed/tests/test_queues.py b/distributed/tests/test_queues.py index 3df7743f222..34009602a15 100644 --- a/distributed/tests/test_queues.py +++ b/distributed/tests/test_queues.py @@ -1,8 +1,7 @@ -from time import sleep import asyncio +from time import sleep import pytest -from tornado import gen from distributed import Client, Queue, Nanny, worker_client, wait, TimeoutError from distributed.metrics import time @@ -29,13 +28,13 @@ async def test_queue(c, s, a, b): del future, future2 - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert s.tasks # future still present in y's queue await y.get() # burn future start = time() while s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 @@ -75,7 +74,7 @@ async def test_hold_futures(s, a, b): del q1 await c1.close() - await gen.sleep(0.1) + await asyncio.sleep(0.1) c2 = await Client(s.address, asynchronous=True) q2 = await Queue("q") @@ -151,14 +150,14 @@ async def test_same_futures(c, s, a, b): for i in range(4): future2 = await q.get() assert s.wants_what["queue-x"] == {future.key} - await gen.sleep(0.05) + await asyncio.sleep(0.05) assert s.wants_what["queue-x"] == {future.key} await q.get() start = time() while s.wants_what["queue-x"]: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() - start < 2 @@ -213,7 +212,7 @@ async def test_Future_knows_status_immediately(c, s, a, b): break except Exception: assert time() < start + 5 - await gen.sleep(0.05) + await asyncio.sleep(0.05) await c2.close() @@ -223,7 +222,7 @@ async def test_erred_future(c, s, a, b): future = c.submit(div, 1, 0) q = await Queue() await q.put(future) - await gen.sleep(0.1) + await asyncio.sleep(0.1) future2 = await q.get() with pytest.raises(ZeroDivisionError): await future2.result() @@ -240,7 +239,7 @@ async def test_close(c, s, a, b): q.close() while q.name in s.extensions["queues"].queues: - await gen.sleep(0.01) + await asyncio.sleep(0.01) @gen_cluster(client=True) diff --git a/distributed/tests/test_resources.py b/distributed/tests/test_resources.py index f870b51ed31..870b930fdae 100644 --- a/distributed/tests/test_resources.py +++ b/distributed/tests/test_resources.py @@ -3,7 +3,6 @@ from dask import delayed import pytest -from tornado import gen from distributed import Worker from distributed.client import wait @@ -229,7 +228,7 @@ async def test_submit_many_non_overlapping(c, s, a, b): futures = c.map(slowinc, range(100), resources={"A": 1}, delay=0.02) while len(a.data) + len(b.data) < 100: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert len(a.executing) <= 2 assert len(b.executing) <= 1 @@ -243,7 +242,7 @@ async def test_minimum_resource(c, s, a): futures = c.map(slowinc, range(30), resources={"A": 1, "B": 1}, delay=0.02) while len(a.data) < 30: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert len(a.executing) <= 1 await wait(futures) @@ -291,7 +290,7 @@ async def test_set_resources(c, s, a): future = c.submit(slowinc, 1, delay=1, resources={"A": 1}) while a.available_resources["A"] == 2: - await gen.sleep(0.01) + await asyncio.sleep(0.01) await a.set_resources(A=3) assert a.total_resources["A"] == 3 diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index c516144d7b7..25855546ec0 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -1,18 +1,17 @@ import asyncio -import cloudpickle -import pickle -from collections import defaultdict import json +import logging +import pickle import operator import re import sys +from collections import defaultdict from time import sleep -import logging +import cloudpickle import dask from dask import delayed from tlz import merge, concat, valmap, first, frequencies -from tornado import gen import pytest @@ -70,7 +69,7 @@ async def test_respect_data_in_memory(c, s, a): f2 = c.persist(z) while f2.key not in s.tasks or not s.tasks[f2.key]: assert s.tasks[y.key].who_has - await gen.sleep(0.0001) + await asyncio.sleep(0.0001) @gen_cluster(client=True) @@ -82,7 +81,7 @@ async def test_recompute_released_results(c, s, a, b): await wait(yy) while s.tasks[x.key].who_has or x.key in a.data or x.key in b.data: # let x go away - await gen.sleep(0.01) + await asyncio.sleep(0.01) z = delayed(dec)(x) zz = c.compute(z) @@ -140,7 +139,7 @@ async def test_balance_with_restrictions(client, s, a, b, c): async def test_no_valid_workers(client, s, a, b, c): x = client.submit(inc, 1, workers="127.0.0.5:9999") while not s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert s.tasks[x.key] in s.unrunnable @@ -159,7 +158,7 @@ async def test_no_valid_workers_loose_restrictions(client, s, a, b, c): async def test_no_workers(client, s): x = client.submit(inc, 1) while not s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert s.tasks[x.key] in s.unrunnable @@ -238,7 +237,7 @@ async def test_clear_events_worker_removal(s, a, b): start = time() while a.address in s.events: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 2 assert b.address in s.events @@ -258,7 +257,7 @@ async def test_clear_events_client_removal(c, s, a, b): # If it doesn't reconnect after a given time, the events log should be cleared start = time() while c.id in s.events: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 2 @@ -355,7 +354,7 @@ def teardown(scheduler, state): await comm.close() start = time() while not hasattr(s, "flag"): - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() - start < 5 @@ -392,7 +391,7 @@ async def test_delete_data(c, s, a, b): start = time() while set(a.data) | set(b.data) != {"z"}: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 @@ -406,7 +405,7 @@ async def test_delete(c, s, a): start = time() while x.key in a.data: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 @@ -623,7 +622,7 @@ async def test_file_descriptors_dont_leak(s): start = time() while proc.num_fds() > before: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 @@ -725,7 +724,7 @@ async def test_retire_workers_n(c, s, a, b): assert len(s.workers) == 0 while not (a.status.startswith("clos") and b.status.startswith("clos")): - await gen.sleep(0.01) + await asyncio.sleep(0.01) @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)] * 4) @@ -735,7 +734,7 @@ async def test_workers_to_close(cl, s, *workers): ): futures = cl.map(slowinc, [1, 1, 1], key=["a-4", "b-4", "c-1"]) while sum(len(w.processing) for w in s.workers.values()) < 3: - await gen.sleep(0.001) + await asyncio.sleep(0.001) wtc = s.workers_to_close() assert all(not s.workers[w].processing for w in wtc) @@ -759,14 +758,14 @@ def key(ws): # Assert that job in one worker blocks closure of group future = c.submit(slowinc, 1, delay=0.2, workers=workers[0].address) while len(s.rprocessing) < 1: - await gen.sleep(0.001) + await asyncio.sleep(0.001) assert set(s.workers_to_close(key=key)) == {workers[2].address, workers[3].address} del future while len(s.rprocessing) > 0: - await gen.sleep(0.001) + await asyncio.sleep(0.001) # Assert that *total* byte count in group determines group priority av = await c.scatter("a" * 100, workers=workers[0].address) @@ -781,7 +780,7 @@ async def test_retire_workers_no_suspicious_tasks(c, s, a, b): future = c.submit( slowinc, 100, delay=0.5, workers=a.address, allow_other_workers=True ) - await gen.sleep(0.2) + await asyncio.sleep(0.2) await s.retire_workers(workers=[a.address]) assert all(ts.suspicious == 0 for ts in s.tasks.values()) @@ -794,7 +793,7 @@ async def test_retire_workers_no_suspicious_tasks(c, s, a, b): ) @gen_cluster(client=True, nthreads=[], timeout=240) async def test_file_descriptors(c, s): - await gen.sleep(0.1) + await asyncio.sleep(0.1) psutil = pytest.importorskip("psutil") da = pytest.importorskip("dask.array") proc = psutil.Process() @@ -804,11 +803,11 @@ async def test_file_descriptors(c, s): nannies = await asyncio.gather(*[Nanny(s.address, loop=s.loop) for _ in range(N)]) while len(s.nthreads) < N: - await gen.sleep(0.1) + await asyncio.sleep(0.1) num_fds_2 = proc.num_fds() - await gen.sleep(0.2) + await asyncio.sleep(0.2) num_fds_3 = proc.num_fds() assert num_fds_3 <= num_fds_2 + N # add some heartbeats @@ -826,7 +825,7 @@ async def test_file_descriptors(c, s): num_fds_5 = proc.num_fds() assert num_fds_5 < num_fds_4 + N - await gen.sleep(1) + await asyncio.sleep(1) num_fds_6 = proc.num_fds() assert num_fds_6 < num_fds_5 + N @@ -842,7 +841,7 @@ async def test_file_descriptors(c, s): start = time() while proc.num_fds() > num_fds_1 + N: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 3 @@ -852,7 +851,7 @@ async def test_file_descriptors(c, s): async def test_learn_occupancy(c, s, a, b): futures = c.map(slowinc, range(1000), delay=0.2) while sum(len(ts.who_has) for ts in s.tasks.values()) < 10: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert 100 < s.total_occupancy < 1000 for w in [a, b]: @@ -865,7 +864,7 @@ async def test_learn_occupancy(c, s, a, b): async def test_learn_occupancy_2(c, s, a, b): future = c.map(slowinc, range(1000), delay=0.2) while not any(ts.who_has for ts in s.tasks.values()): - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert 100 < s.total_occupancy < 1000 @@ -905,7 +904,7 @@ async def test_balance_many_workers_2(c, s, *workers): @gen_cluster(client=True) async def test_learn_occupancy_multiple_workers(c, s, a, b): x = c.submit(slowinc, 1, delay=0.2, workers=a.address) - await gen.sleep(0.05) + await asyncio.sleep(0.05) futures = c.map(slowinc, range(100), delay=0.2) await wait(x) @@ -940,7 +939,7 @@ async def test_worker_arrives_with_processing_data(c, s, a, b): yy, zz = c.persist([y, z]) while not any(w.processing for w in s.workers.values()): - await gen.sleep(0.01) + await asyncio.sleep(0.01) w = Worker(s.address, nthreads=1) w.put_key_in_memory(y.key, 3) @@ -950,7 +949,7 @@ async def test_worker_arrives_with_processing_data(c, s, a, b): start = time() while len(s.workers) < 3: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert s.get_task_status(keys={x.key, y.key, z.key}) == { x.key: "released", @@ -972,7 +971,7 @@ async def test_worker_breaks_and_returns(c, s, a): await a.batched_stream.comm.close() - await gen.sleep(0.1) + await asyncio.sleep(0.1) start = time() await wait(future, timeout=10) end = time() @@ -992,7 +991,7 @@ async def test_no_workers_to_memory(c, s): yy, zz = c.persist([y, z]) while not s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) w = Worker(s.address, nthreads=1) w.put_key_in_memory(y.key, 3) @@ -1002,7 +1001,7 @@ async def test_no_workers_to_memory(c, s): start = time() while not s.workers: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert s.get_task_status(keys={x.key, y.key, z.key}) == { x.key: "released", @@ -1022,7 +1021,7 @@ async def test_no_worker_to_memory_restrictions(c, s, a, b): yy, zz = c.persist([y, z], workers={(x, y, z): "alice"}) while not s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) w = Worker(s.address, nthreads=1, name="alice") w.put_key_in_memory(y.key, 3) @@ -1030,8 +1029,8 @@ async def test_no_worker_to_memory_restrictions(c, s, a, b): await w while len(s.workers) < 3: - await gen.sleep(0.01) - await gen.sleep(0.3) + await asyncio.sleep(0.01) + await asyncio.sleep(0.3) assert s.get_task_status(keys={x.key, y.key, z.key}) == { x.key: "released", @@ -1073,7 +1072,7 @@ async def test_close_worker(c, s, a, b): assert len(s.workers) == 1 assert a.address not in s.workers - await gen.sleep(0.5) + await asyncio.sleep(0.5) assert len(s.workers) == 1 @@ -1093,20 +1092,20 @@ async def test_close_nanny(c, s, a, b): start = time() while a.is_alive(): - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert time() < start + 5 assert not a.is_alive() assert a.pid is None for i in range(10): - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert len(s.workers) == 1 assert not a.is_alive() assert a.pid is None while a.status != "closed": - await gen.sleep(0.05) + await asyncio.sleep(0.05) assert time() < start + 10 @@ -1115,7 +1114,7 @@ async def test_retire_workers_close(c, s, a, b): await s.retire_workers(close_workers=True) assert not s.workers while a.status != "closed" and b.status != "closed": - await gen.sleep(0.01) + await asyncio.sleep(0.01) @gen_cluster(client=True, timeout=20, Worker=Nanny) @@ -1127,7 +1126,7 @@ async def test_retire_nannies_close(c, s, a, b): start = time() while any(n.status != "closed" for n in nannies): - await gen.sleep(0.05) + await asyncio.sleep(0.05) assert time() < start + 10 assert not any(n.is_alive() for n in nannies) @@ -1140,7 +1139,7 @@ async def test_fifo_submission(c, s, w): for i in range(20): future = c.submit(slowinc, i, delay=0.1, key="inc-%02d" % i, fifo_timeout=0.01) futures.append(future) - await gen.sleep(0.02) + await asyncio.sleep(0.02) await wait(futures[-1]) assert futures[10].status == "finished" @@ -1166,7 +1165,7 @@ async def test_non_existent_worker(c, s): address="127.0.0.1:5738", nthreads=2, nbytes={}, host_info={} ) futures = c.map(inc, range(10)) - await gen.sleep(0.300) + await asyncio.sleep(0.300) assert not s.workers assert all(ts.state == "no-worker" for ts in s.tasks.values()) @@ -1178,7 +1177,7 @@ async def test_correct_bad_time_estimate(c, s, *workers): futures = [c.submit(slowinc, future, delay=0.1, pure=False) for i in range(20)] - await gen.sleep(0.5) + await asyncio.sleep(0.5) await wait(futures) @@ -1212,7 +1211,7 @@ async def test_profile_metadata(c, s, a, b): start = time() - 1 futures = c.map(slowinc, range(10), delay=0.05, workers=a.address) await wait(futures) - await gen.sleep(0.200) + await asyncio.sleep(0.200) meta = await s.get_profile_metadata(profile_cycle_interval=0.100) now = time() + 1 @@ -1244,7 +1243,7 @@ async def test_cancel_fire_and_forget(c, s, a, b): future = c.compute(w) fire_and_forget(future) - await gen.sleep(0.05) + await asyncio.sleep(0.05) await future.cancel(force=True) assert future.status == "cancelled" assert not s.tasks @@ -1268,7 +1267,7 @@ async def test_reschedule(c, s, a, b): futures = c.map(slowinc, range(10, 20), delay=0.1, workers=a.address) while len(s.tasks) < len(x) + len(futures): - await gen.sleep(0.001) + await asyncio.sleep(0.001) for future in x: s.reschedule(key=future.key) @@ -1348,7 +1347,7 @@ async def test_mising_data_errant_worker(c, s, w1, w2, w3): y = c.submit(len, x, workers=w3.address) while not w3.tasks: - await gen.sleep(0.001) + await asyncio.sleep(0.001) await w1.close() await wait(y) @@ -1366,7 +1365,7 @@ async def test_dont_recompute_if_persisted(c, s, a, b): yyy = y.persist() await wait(yyy) - await gen.sleep(0.100) + await asyncio.sleep(0.100) assert list(s.transition_log) == old @@ -1384,7 +1383,7 @@ async def test_dont_recompute_if_persisted_2(c, s, a, b): zz = z.persist() await wait(zz) - await gen.sleep(0.100) + await asyncio.sleep(0.100) assert s.story("x", "y") == old @@ -1402,7 +1401,7 @@ async def test_dont_recompute_if_persisted_3(c, s, a, b): www = w.persist() await wait(www) - await gen.sleep(0.100) + await asyncio.sleep(0.100) assert list(s.transition_log) == old @@ -1418,7 +1417,7 @@ async def test_dont_recompute_if_persisted_4(c, s, a, b): old = s.story("x") while s.tasks["x"].state == "memory": - await gen.sleep(0.01) + await asyncio.sleep(0.01) yyy, zzz = dask.persist(y, z) await wait([yyy, zzz]) @@ -1437,7 +1436,7 @@ async def test_dont_forget_released_keys(c, s, a, b): del z while "z" in s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert "x" in s.tasks @@ -1455,7 +1454,7 @@ async def test_dont_recompute_if_erred(c, s, a, b): yyy = y.persist() await wait(yyy) - await gen.sleep(0.100) + await asyncio.sleep(0.100) assert list(s.transition_log) == old @@ -1465,7 +1464,7 @@ async def test_closing_scheduler_closes_workers(s, a, b): start = time() while a.status != "closed" or b.status != "closed": - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 2 @@ -1476,12 +1475,12 @@ async def test_resources_reset_after_cancelled_task(c, s, w): future = c.submit(sleep, 0.2, resources={"A": 1}) while not w.executing: - await gen.sleep(0.01) + await asyncio.sleep(0.01) await future.cancel() while w.executing: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert not s.workers[w.address].used_resources["A"] assert w.available_resources == {"A": 1} @@ -1510,7 +1509,7 @@ def qux(x): await y z = c.submit(qux, y, key="z") del y - await gen.sleep(0.1) + await asyncio.sleep(0.1) f = c.submit(bar, x, key="y") await f @@ -1536,12 +1535,12 @@ async def test_idle_timeout(c, s, a, b): with captured_logger("distributed.scheduler") as logs: start = time() while s.status != "closed": - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 3 start = time() while not (a.status == "closed" and b.status == "closed"): - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 1 assert "idle" in logs.getvalue() @@ -1642,16 +1641,16 @@ async def test_adaptive_target(c, s, a, b): # Long task x = c.submit(slowinc, 1, delay=0.5) while x.key not in s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert s.adaptive_target(target_duration=".1s") == 1 # still one L = c.map(slowinc, range(100), delay=0.5) while len(s.tasks) < 100: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert 10 < s.adaptive_target(target_duration=".1s") <= 100 del x, L while s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert s.adaptive_target(target_duration=".1s") == 0 diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index a508894cf35..24b4aeefdf8 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -24,7 +24,6 @@ slowinc, ) from tlz import concat, sliding_window -from tornado import gen # Most tests here are timing-dependent setup_module = nodebug_setup_module @@ -205,7 +204,7 @@ async def test_new_worker_steals(c, s, a): futures = c.map(slowinc, range(100), delay=0.05) total = c.submit(sum, futures) while len(a.task_state) < 10: - await gen.sleep(0.01) + await asyncio.sleep(0.01) b = await Worker(s.address, loop=s.loop, nthreads=1, memory_limit=MEMORY_LIMIT) @@ -247,14 +246,14 @@ async def test_dont_steal_worker_restrictions(c, s, a, b): futures = c.map(slowinc, range(100), delay=0.1, workers=a.address) while len(a.task_state) + len(b.task_state) < 100: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert len(a.task_state) == 100 assert len(b.task_state) == 0 result = s.extensions["stealing"].balance() - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert len(a.task_state) == 100 assert len(b.task_state) == 0 @@ -271,7 +270,7 @@ async def test_steal_worker_restrictions(c, s, wa, wb, wc): futures = c.map(slowinc, range(ntasks), delay=0.1, workers={wa.address, wb.address}) while sum(len(w.task_state) for w in [wa, wb, wc]) < ntasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert 0 < len(wa.task_state) < ntasks assert 0 < len(wb.task_state) < ntasks @@ -279,7 +278,7 @@ async def test_steal_worker_restrictions(c, s, wa, wb, wc): s.extensions["stealing"].balance() - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert 0 < len(wa.task_state) < ntasks assert 0 < len(wb.task_state) < ntasks @@ -296,13 +295,13 @@ async def test_dont_steal_host_restrictions(c, s, a, b): futures = c.map(slowinc, range(100), delay=0.1, workers="127.0.0.1") while len(a.task_state) + len(b.task_state) < 100: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert len(a.task_state) == 100 assert len(b.task_state) == 0 result = s.extensions["stealing"].balance() - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert len(a.task_state) == 100 assert len(b.task_state) == 0 @@ -318,7 +317,7 @@ async def test_steal_host_restrictions(c, s, wa, wb): ntasks = 100 futures = c.map(slowinc, range(ntasks), delay=0.1, workers="127.0.0.1") while len(wa.task_state) < ntasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert len(wa.task_state) == ntasks assert len(wb.task_state) == 0 @@ -326,10 +325,10 @@ async def test_steal_host_restrictions(c, s, wa, wb): start = time() while not wc.task_state or len(wa.task_state) == ntasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 3 - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert 0 < len(wa.task_state) < ntasks assert len(wb.task_state) == 0 assert 0 < len(wc.task_state) < ntasks @@ -344,13 +343,13 @@ async def test_dont_steal_resource_restrictions(c, s, a, b): futures = c.map(slowinc, range(100), delay=0.1, resources={"A": 1}) while len(a.task_state) + len(b.task_state) < 100: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert len(a.task_state) == 100 assert len(b.task_state) == 0 result = s.extensions["stealing"].balance() - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert len(a.task_state) == 100 assert len(b.task_state) == 0 @@ -364,14 +363,14 @@ async def test_steal_resource_restrictions(c, s, a): futures = c.map(slowinc, range(100), delay=0.2, resources={"A": 1}) while len(a.task_state) < 101: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert len(a.task_state) == 101 b = await Worker(s.address, loop=s.loop, nthreads=1, resources={"A": 4}) start = time() while not b.task_state or len(a.task_state) == 101: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 3 assert len(b.task_state) > 0 @@ -440,7 +439,7 @@ async def test_steal_when_more_tasks(c, s, a, *rest): start = time() while not any(w.task_state for w in rest): - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 1 @@ -467,7 +466,7 @@ def slow2(x): future = c.submit(slow2, x, priority=-1) while not any(w.task_state for w in rest): - await gen.sleep(0.01) + await asyncio.sleep(0.01) # good future moves first assert any(future.key in w.task_state for w in rest) @@ -511,13 +510,13 @@ async def assert_balanced(inp, expected, c, s, *workers): futures.append(f) while len(s.rprocessing) < len(futures): - await gen.sleep(0.001) + await asyncio.sleep(0.001) for i in range(10): steal.balance() while steal.in_flight: - await gen.sleep(0.001) + await asyncio.sleep(0.001) result = [ sorted([int(key_split(k)) for k in s.processing[w.address]], reverse=True) @@ -589,7 +588,7 @@ async def test_restart(c, s, a, b): slowinc, range(100), delay=0.1, workers=a.address, allow_other_workers=True ) while not s.processing[b.worker_address]: - await gen.sleep(0.01) + await asyncio.sleep(0.01) steal = s.extensions["stealing"] assert any(st for st in steal.stealable_all) @@ -624,11 +623,11 @@ async def test_steal_communication_heavy_tasks(c, s, a, b): ] while not any(f.key in s.rprocessing for f in futures): - await gen.sleep(0.01) + await asyncio.sleep(0.01) steal.balance() while steal.in_flight: - await gen.sleep(0.001) + await asyncio.sleep(0.001) assert s.processing[b.address] @@ -641,7 +640,7 @@ async def test_steal_twice(c, s, a, b): futures = [c.submit(slowadd, x, i, delay=0.2) for i in range(100)] while len(s.tasks) < 100: # tasks are all allocated - await gen.sleep(0.01) + await asyncio.sleep(0.01) # Army of new workers arrives to help workers = await asyncio.gather(*[Worker(s.address, loop=s.loop) for _ in range(20)]) @@ -667,12 +666,12 @@ async def test_dont_steal_executing_tasks(c, s, a, b): future = c.submit(slowinc, 1, delay=0.5, workers=a.address) while not a.executing: - await gen.sleep(0.01) + await asyncio.sleep(0.01) steal.move_task_request( s.tasks[future.key], s.workers[a.address], s.workers[b.address] ) - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert future.key in a.executing assert not b.executing @@ -688,11 +687,11 @@ def long(delay): long_tasks = c.map(long, [0.5, 0.6], workers=a.address, allow_other_workers=True) while sum(map(len, s.processing.values())) < 2: # let them start - await gen.sleep(0.01) + await asyncio.sleep(0.01) start = time() while any(t.key in s.extensions["stealing"].key_stealable for t in long_tasks): - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 1 na = len(a.executing) @@ -700,7 +699,7 @@ def long(delay): incs = c.map(inc, range(100), workers=a.address, allow_other_workers=True) - await gen.sleep(0.2) + await asyncio.sleep(0.2) await wait(long_tasks) @@ -739,7 +738,7 @@ class Foo: start = time() while a.data or b.data: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 1 assert not s.who_has @@ -761,7 +760,7 @@ async def test_lose_task(c, s, a, b): workers=a.address, allow_other_workers=True, ) - await gen.sleep(0.01) + await asyncio.sleep(0.01) del futures out = log.getvalue() diff --git a/distributed/tests/test_stress.py b/distributed/tests/test_stress.py index a22df828e2d..707b93c03cf 100644 --- a/distributed/tests/test_stress.py +++ b/distributed/tests/test_stress.py @@ -1,8 +1,8 @@ -from operator import add +import asyncio import random import sys +from operator import add from time import sleep -import asyncio from dask import delayed import pytest @@ -27,7 +27,6 @@ nodebug_teardown_module, ) from distributed.client import wait -from tornado import gen # All tests here are slow in some way @@ -41,7 +40,7 @@ async def test_stress_1(c, s, a, b): seq = c.map(inc, range(n)) while len(seq) > 1: - await gen.sleep(0.1) + await asyncio.sleep(0.1) seq = [c.submit(add, seq[i], seq[i + 1]) for i in range(0, len(seq), 2)] result = await seq[0] assert result == sum(map(inc, range(n))) @@ -72,7 +71,7 @@ async def test_cancel_stress(c, s, *workers): for i in range(5): f = c.compute(y) while len(s.waiting) > (random.random() + 1) * 0.5 * n_todo: - await gen.sleep(0.01) + await asyncio.sleep(0.01) await c._cancel(f) @@ -105,7 +104,7 @@ async def create_and_destroy_worker(delay): start = time() while time() < start + 5: n = await Nanny(s.address, nthreads=2, loop=s.loop) - await gen.sleep(delay) + await asyncio.sleep(delay) await n.close() print("Killed nanny") @@ -146,7 +145,7 @@ async def test_stress_scatter_death(c, s, *workers): from distributed.scheduler import logger for i in range(7): - await gen.sleep(0.1) + await asyncio.sleep(0.1) try: s.validate_state() except Exception as c: @@ -208,7 +207,7 @@ async def test_stress_steal(c, s, *workers): future = c.compute(total) while future.status != "finished": - await gen.sleep(0.1) + await asyncio.sleep(0.1) for i in range(3): a = random.choice(workers) b = random.choice(workers) @@ -229,7 +228,7 @@ async def test_close_connections(c, s, *workers): future = c.compute(x.sum()) while any(s.processing.values()): - await gen.sleep(0.5) + await asyncio.sleep(0.5) worker = random.choice(list(workers)) for comm in worker._comms: comm.abort() @@ -263,7 +262,7 @@ async def test_no_delay_during_large_transfer(c, s, w): with ResourceProfiler(dt=0.01) as rprof: future = await c.scatter(x, direct=True, hash=False) - await gen.sleep(0.5) + await asyncio.sleep(0.5) rprof.close() x = None # lose ref diff --git a/distributed/tests/test_tls_functional.py b/distributed/tests/test_tls_functional.py index a249bba9278..267b0689933 100644 --- a/distributed/tests/test_tls_functional.py +++ b/distributed/tests/test_tls_functional.py @@ -2,7 +2,7 @@ Various functional tests for TLS networking. Most are taken from other test files and adapted. """ -from tornado import gen +import asyncio from distributed import Nanny, worker_client, Queue from distributed.client import wait @@ -106,7 +106,7 @@ async def test_rebalance(c, s, a, b): async def test_work_stealing(c, s, a, b): [x] = await c._scatter([1], workers=a.address) futures = c.map(slowadd, range(50), [x] * 50, delay=0.1) - await gen.sleep(0.1) + await asyncio.sleep(0.1) await wait(futures) assert len(a.data) > 10 assert len(b.data) > 10 @@ -170,5 +170,5 @@ async def test_retire_workers(c, s, a, b): start = time() while a.status != "closed": - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 diff --git a/distributed/tests/test_utils.py b/distributed/tests/test_utils.py index 526e7f7e178..86f1ca0c208 100644 --- a/distributed/tests/test_utils.py +++ b/distributed/tests/test_utils.py @@ -1,3 +1,4 @@ +import asyncio import array import datetime from functools import partial @@ -11,7 +12,6 @@ import numpy as np import pytest -from tornado import gen from tornado.ioloop import IOLoop import dask @@ -55,7 +55,7 @@ async def throws(): 1 / 0 async def slow(): - await gen.sleep(10) + await asyncio.sleep(10) async def inc(x): return x + 1 @@ -107,7 +107,7 @@ def function2(x): def test_sync_timeout(loop_in_thread): loop = loop_in_thread with pytest.raises(TimeoutError): - sync(loop_in_thread, gen.sleep, 0.5, callback_timeout=0.05) + sync(loop_in_thread, asyncio.sleep, 0.5, callback_timeout=0.05) def test_sync_closed_loop(): @@ -483,13 +483,13 @@ async def test_loop_runner_gen(): runner = LoopRunner(asynchronous=True) assert runner.loop is IOLoop.current() assert not runner.is_started() - await gen.sleep(0.01) + await asyncio.sleep(0.01) runner.start() assert runner.is_started() - await gen.sleep(0.01) + await asyncio.sleep(0.01) runner.stop() assert not runner.is_started() - await gen.sleep(0.01) + await asyncio.sleep(0.01) def test_parse_bytes(): @@ -545,7 +545,7 @@ async def throws(): import gc gc.collect() - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert "foo1234" not in sio.getvalue() diff --git a/distributed/tests/test_utils_test.py b/distributed/tests/test_utils_test.py index fff28fab31f..fbec4a204bd 100755 --- a/distributed/tests/test_utils_test.py +++ b/distributed/tests/test_utils_test.py @@ -1,10 +1,10 @@ +import asyncio from contextlib import contextmanager import socket import threading from time import sleep import pytest -from tornado import gen from distributed import Scheduler, Worker, Client, config, default_client from distributed.core import rpc @@ -93,7 +93,7 @@ def test_gen_cluster_tls(e, s, a, b): @gen_test() async def test_gen_test(): - await gen.sleep(0.01) + await asyncio.sleep(0.01) @contextmanager @@ -189,4 +189,4 @@ async def test_gen_cluster_async(s, a, b): # flake8: noqa @gen_test() async def test_gen_test_async(): # flake8: noqa - await gen.sleep(0.001) + await asyncio.sleep(0.001) diff --git a/distributed/tests/test_variable.py b/distributed/tests/test_variable.py index 0fb21160751..a60345d0abb 100644 --- a/distributed/tests/test_variable.py +++ b/distributed/tests/test_variable.py @@ -4,7 +4,6 @@ import logging import pytest -from tornado import gen from tornado.ioloop import IOLoop from distributed import Client, Variable, worker_client, Nanny, wait, TimeoutError @@ -29,14 +28,14 @@ async def test_variable(c, s, a, b): del future, future2 - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert s.tasks # future still present x.delete() start = time() while s.tasks: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 @@ -82,7 +81,7 @@ async def test_hold_futures(s, a, b): del x1 await c1.close() - await gen.sleep(0.1) + await asyncio.sleep(0.1) c2 = await Client(s.address, asynchronous=True) x2 = Variable("x") @@ -138,10 +137,10 @@ async def test_cleanup(c, s, a, b): await v.set(x) del x - await gen.sleep(0.1) + await asyncio.sleep(0.1) t_future = xx = asyncio.ensure_future(vv._get()) - await gen.sleep(0) + await asyncio.sleep(0) asyncio.ensure_future(v.set(y)) future = await t_future @@ -201,7 +200,7 @@ def f(i): start = time() while len(s.wants_what["variable-x"]) != 1: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() - start < 2 @@ -233,7 +232,7 @@ async def test_Future_knows_status_immediately(c, s, a, b): break except Exception: assert time() < start + 5 - await gen.sleep(0.05) + await asyncio.sleep(0.05) await c2.close() @@ -243,7 +242,7 @@ async def test_erred_future(c, s, a, b): future = c.submit(div, 1, 0) var = Variable() await var.set(future) - await gen.sleep(0.1) + await asyncio.sleep(0.1) future2 = await var.get() with pytest.raises(ZeroDivisionError): await future2.result() diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 1f6d1944c86..cc5cbeb1aa8 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -16,7 +16,6 @@ from dask.system import CPU_COUNT import pytest from tlz import pluck, sliding_window, first -from tornado import gen from distributed import ( Client, @@ -290,10 +289,10 @@ def g(x): @gen_cluster(client=True) async def test_upload_large_file(c, s, a, b): pytest.importorskip("crick") - await gen.sleep(0.05) + await asyncio.sleep(0.05) with rpc(a.address) as aa: await aa.upload_file(filename="myfile.dat", data=b"0" * 100000000) - await gen.sleep(0.05) + await asyncio.sleep(0.05) assert a.digests["tick-duration"].components[0].max() < 0.050 @@ -469,7 +468,7 @@ def f(dask_worker=None): @gen_cluster(client=True) async def test_run_coroutine_dask_worker(c, s, a, b): async def f(dask_worker=None): - await gen.sleep(0.001) + await asyncio.sleep(0.001) return dask_worker.id response = await c.run(f) @@ -516,7 +515,7 @@ async def test_close_on_disconnect(s, w): start = time() while w.status != "closed": - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 @@ -571,7 +570,7 @@ async def test_clean(c, s, a, b): y.release() while x.key in a.task_state: - await gen.sleep(0.01) + await asyncio.sleep(0.01) for c in collections: assert not c @@ -609,7 +608,7 @@ async def test_types(c, s, a, b): start = time() while y.key in b.data: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 5 assert y.key not in b.types @@ -632,7 +631,7 @@ async def test_restrictions(c, s, a, b): await c._cancel(x) while x.key in a.task_state: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert a.resource_restrictions == {} @@ -648,7 +647,7 @@ async def test_clean_nbytes(c, s, a, b): future = c.compute(total) await wait(future) - await gen.sleep(1) + await asyncio.sleep(1) assert len(a.nbytes) + len(b.nbytes) == 1 @@ -725,7 +724,7 @@ async def test_log_exception_on_failed_task(c, s, a, b): future = c.submit(div, 1, 0) await wait(future) - await gen.sleep(0.1) + await asyncio.sleep(0.1) fh.flush() with open(fn) as f: text = f.read() @@ -749,7 +748,7 @@ async def test_clean_up_dependencies(c, s, a, b): start = time() while len(a.data) + len(b.data) > 1: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 2 assert set(a.data) | set(b.data) == {zz.key} @@ -764,7 +763,7 @@ async def test_hold_onto_dependents(c, s, a, b): assert x.key in b.data await c._cancel(y) - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert x.key in b.data @@ -788,13 +787,13 @@ async def test_worker_death_timeout(s): @gen_cluster(client=True) async def test_stop_doing_unnecessary_work(c, s, a, b): futures = c.map(slowinc, range(1000), delay=0.01) - await gen.sleep(0.1) + await asyncio.sleep(0.1) del futures start = time() while a.executing: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() - start < 0.5 @@ -826,9 +825,9 @@ async def test_priorities(c, s, w): async def test_heartbeats(c, s, a, b): x = s.workers[a.address].last_seen start = time() - await gen.sleep(a.periodic_callbacks["heartbeat"].callback_time / 1000 + 0.1) + await asyncio.sleep(a.periodic_callbacks["heartbeat"].callback_time / 1000 + 0.1) while s.workers[a.address].last_seen == x: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 2 assert a.periodic_callbacks["heartbeat"].callback_time < 1000 @@ -888,7 +887,7 @@ def __sizeof__(self): ) async def test_fail_write_many_to_disk(c, s, a): a.validate = False - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert not a.paused class Bad: @@ -990,22 +989,22 @@ async def test_global_workers(s, a, b): @gen_cluster(nthreads=[]) async def test_worker_fds(s): psutil = pytest.importorskip("psutil") - await gen.sleep(0.05) + await asyncio.sleep(0.05) start = psutil.Process().num_fds() worker = await Worker(s.address, loop=s.loop) - await gen.sleep(0.1) + await asyncio.sleep(0.1) middle = psutil.Process().num_fds() start = time() while middle > start: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 1 await worker.close() start = time() while psutil.Process().num_fds() > start: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 0.5 @@ -1047,7 +1046,7 @@ async def test_scheduler_delay(c, s, a, b): old = a.scheduler_delay assert abs(a.scheduler_delay) < 0.3 assert abs(b.scheduler_delay) < 0.3 - await gen.sleep(a.periodic_callbacks["heartbeat"].callback_time / 1000 + 0.3) + await asyncio.sleep(a.periodic_callbacks["heartbeat"].callback_time / 1000 + 0.3) assert a.scheduler_delay != old @@ -1109,7 +1108,7 @@ def f(n): start = time() while not a.data.disk: - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert time() < start + 5 @@ -1145,7 +1144,7 @@ def f(): start = time() while not a.paused: - await gen.sleep(0.01) + await asyncio.sleep(0.01) assert time() < start + 4, ( format_bytes(psutil.Process().memory_info().rss), format_bytes(a.memory_limit), @@ -1164,7 +1163,7 @@ def f(): async def test_statistical_profiling_cycle(c, s, a, b): futures = c.map(slowinc, range(20), delay=0.05) await wait(futures) - await gen.sleep(0.01) + await asyncio.sleep(0.01) end = time() assert len(a.profile_history) > 3 @@ -1231,7 +1230,7 @@ async def test_avoid_memory_monitor_if_zero_limit(c, s): future = c.submit(inc, 1) assert (await future) == 2 - await gen.sleep(worker.memory_monitor_interval / 1000) + await asyncio.sleep(worker.memory_monitor_interval / 1000) await c.submit(inc, 2) # worker doesn't pause @@ -1525,7 +1524,7 @@ async def test_worker_listens_on_same_interface_by_default(Worker): async def test_close_gracefully(c, s, a, b): futures = c.map(slowinc, range(200), delay=0.1) while not b.data: - await gen.sleep(0.1) + await asyncio.sleep(0.1) mem = set(b.data) proc = set(b.executing) @@ -1546,7 +1545,7 @@ async def test_lifetime(cleanup): async with Worker(s.address) as a, Worker(s.address, lifetime="1 seconds") as b: async with Client(s.address, asynchronous=True) as c: futures = c.map(slowinc, range(200), delay=0.1) - await gen.sleep(1.5) + await asyncio.sleep(1.5) assert b.status != "running" await b.finished() diff --git a/distributed/tests/test_worker_client.py b/distributed/tests/test_worker_client.py index a134819ad34..09ae20e8f20 100644 --- a/distributed/tests/test_worker_client.py +++ b/distributed/tests/test_worker_client.py @@ -1,3 +1,4 @@ +import asyncio import random import threading from time import sleep @@ -6,7 +7,6 @@ import dask from dask import delayed import pytest -from tornado import gen from distributed import ( worker_client, @@ -77,7 +77,7 @@ def func(): start = time() while not all(v == 1 for v in s.nthreads.values()): - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert time() < start + 5 @@ -156,7 +156,7 @@ def mysum(): start = time() while len(a.data) + len(b.data) > 1: - await gen.sleep(0.1) + await asyncio.sleep(0.1) assert time() < start + 3