Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emmit warning when assign/comparing string with Status Enum. #3875

Merged
merged 6 commits into from
Aug 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import traceback
import uuid
import weakref
import warnings

import dask
import tblib
Expand Down Expand Up @@ -52,11 +53,13 @@ class Status(Enum):
closing = "closing"
closing_gracefully = "closing-gracefully"
init = "init"
created = "created"
running = "running"
starting = "starting"
stopped = "stopped"
stopping = "stopping"
undefined = None
dont_reply = "dont-reply"

def __eq__(self, other):
"""
Expand All @@ -69,6 +72,11 @@ def __eq__(self, other):
if isinstance(other, type(self)):
return self.value == other.value
elif isinstance(other, str) or (other is None):
warnings.warn(
f"Since distributed 2.19 `.status` is now an Enum, please compare with `Status.{other}`",
PendingDeprecationWarning,
stacklevel=1,
)
assert other in [
s.value for s in type(self)
], f"comparison with non-existing states {other}"
Expand Down Expand Up @@ -261,9 +269,16 @@ def status(self, new_status):
if isinstance(new_status, Status):
self._status = new_status
elif isinstance(new_status, str) or new_status is None:
warnings.warn(
f"Since distributed 2.19 `.status` is now an Enum, please assign `Status.{new_status}`",
PendingDeprecationWarning,
stacklevel=1,
)
corresponding_enum_variants = [s for s in Status if s.value == new_status]
assert len(corresponding_enum_variants) == 1
self._status = corresponding_enum_variants[0]
else:
raise TypeError(f"expected Status or str, got {new_status}")

async def finished(self):
""" Wait until the server has finished """
Expand Down Expand Up @@ -519,7 +534,14 @@ async def handle_comm(self, comm, shutting_down=shutting_down):
logger.exception(e)
result = error_message(e, status="uncaught-error")

if reply and result != "dont-reply":
# result is not type stable:
# when LHS is not Status then RHS must not be Status or it raises.
# when LHS is Status then RHS must be status or it raises in tests
is_dont_reply = False
if isinstance(result, Status) and (result == Status.dont_reply):
is_dont_reply = True

if reply and not is_dont_reply:
try:
await comm.write(result, serializers=serializers)
except (EnvironmentError, TypeError) as e:
Expand Down
11 changes: 6 additions & 5 deletions distributed/deploy/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from .adaptive import Adaptive

from ..core import Status
from ..utils import (
log_errors,
sync,
Expand Down Expand Up @@ -57,7 +58,7 @@ def __init__(self, asynchronous):
self._watch_worker_status_task = None
self.scheduler_comm = None

self.status = "created"
self.status = Status.created

async def _start(self):
comm = await self.scheduler_comm.live_comm()
Expand All @@ -67,10 +68,10 @@ async def _start(self):
self._watch_worker_status_task = asyncio.ensure_future(
self._watch_worker_status(comm)
)
self.status = "running"
self.status = Status.running

async def _close(self):
if self.status == "closed":
if self.status == Status.closed:
return

if self._watch_worker_status_comm:
Expand All @@ -84,14 +85,14 @@ async def _close(self):
if self.scheduler_comm:
await self.scheduler_comm.close_rpc()

self.status = "closed"
self.status = Status.closed

def close(self, timeout=None):
with suppress(RuntimeError): # loop closed during process shutdown
return self.sync(self._close, callback_timeout=timeout)

def __del__(self):
if self.status != "closed":
if self.status != Status.closed:
with suppress(AttributeError, RuntimeError): # during closing
self.loop.add_callback(self.close)

Expand Down
56 changes: 38 additions & 18 deletions distributed/deploy/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import math
import weakref
import warnings

import dask
from tornado import gen
Expand Down Expand Up @@ -36,19 +37,39 @@ class ProcessInterface:
It should implement the methods below, like ``start`` and ``close``
"""

@property
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe instead we should implement this on the Cluster superclass in distributed/deploy/cluster.py

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would also affect most of the downstream projects automatically, which would be useful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but then ssh.Worker and ssh.Scheduler would not inherit this and I believe they were the reason I did this originally.

I think I might end up pulling this into a decorator that can be applied to classes maybe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, my apologies. I saw the spec.py filename and assumed that this was the SpecCluster class. I see now that it is the ProcessInterface class instead. Happy to retract the comment.

I'd prefer avoiding the iterator and being explicit for now if that's ok, even given the duplication.

def status(self):
return self._status

@status.setter
def status(self, new_status):
if isinstance(new_status, Status):
self._status = new_status
elif isinstance(new_status, str) or new_status is None:
warnings.warn(
f"Since distributed 2.19 `.status` is now an Enum, please assign `Status.{new_status}`",
PendingDeprecationWarning,
stacklevel=1,
)
corresponding_enum_variants = [s for s in Status if s.value == new_status]
assert len(corresponding_enum_variants) == 1
self._status = corresponding_enum_variants[0]
else:
raise TypeError(f"expected Status or str, got {new_status}")

def __init__(self, scheduler=None, name=None):
self.address = getattr(self, "address", None)
self.external_address = None
self.lock = asyncio.Lock()
self.status = "created"
self.status = Status.created
self._event_finished = asyncio.Event()

def __await__(self):
async def _():
async with self.lock:
if self.status == "created":
if self.status == Status.created:
await self.start()
assert self.status == "running"
assert self.status == Status.running
return self

return _().__await__()
Expand All @@ -63,7 +84,7 @@ async def start(self):
For the scheduler we will expect the scheduler's ``.address`` attribute
to be avaialble after this completes.
"""
self.status = "running"
self.status = Status.running

async def close(self):
""" Close the process
Expand All @@ -73,7 +94,7 @@ async def close(self):
This method should kill the process a bit more forcefully and does not
need to worry about shutting down gracefully
"""
self.status = "closed"
self.status = Status.closed
self._event_finished.set()

async def finished(self):
Expand Down Expand Up @@ -256,11 +277,11 @@ def __init__(
self.sync(self._correct_state)

async def _start(self):
while self.status == "starting":
while self.status == Status.starting:
await asyncio.sleep(0.01)
if self.status == "running":
if self.status == Status.running:
return
if self.status == "closed":
if self.status == Status.closed:
raise ValueError("Cluster is closed")

self._lock = asyncio.Lock()
Expand All @@ -279,7 +300,7 @@ async def _start(self):
cls = import_term(cls)
self.scheduler = cls(**self.scheduler_spec.get("options", {}))

self.status = "starting"
self.status = Status.starting
self.scheduler = await self.scheduler
self.scheduler_comm = rpc(
getattr(self.scheduler, "external_address", None) or self.scheduler.address,
Expand Down Expand Up @@ -359,7 +380,7 @@ def f():

def __await__(self):
async def _():
if self.status == "created":
if self.status == Status.created:
await self._start()
await self.scheduler
await self._correct_state()
Expand All @@ -370,13 +391,12 @@ async def _():
return _().__await__()

async def _close(self):
while self.status == "closing":
while self.status == Status.closing:
await asyncio.sleep(0.1)
if self.status == "closed":
if self.status == Status.closed:
return
if self.status == "running":
self.status = "closing"

if self.status == Status.running:
self.status = Status.closing
self.scale(0)
await self._correct_state()
for future in self._futures:
Expand All @@ -402,7 +422,7 @@ async def _close(self):
async def __aenter__(self):
await self
await self._correct_state()
assert self.status == "running"
assert self.status == Status.running
return self

def __exit__(self, typ, value, traceback):
Expand Down Expand Up @@ -453,7 +473,7 @@ def scale(self, n=0, memory=None, cores=None):
while len(self.worker_spec) > n:
self.worker_spec.popitem()

if self.status not in ("closing", "closed"):
if self.status not in (Status.closing, Status.closed):
while len(self.worker_spec) < n:
self.worker_spec.update(self.new_worker_spec())

Expand Down Expand Up @@ -617,5 +637,5 @@ async def run_spec(spec: dict, *args):
def close_clusters():
for cluster in list(SpecCluster._instances):
with suppress(gen.TimeoutError, TimeoutError):
if cluster.status != "closed":
if cluster.status != Status.closed:
cluster.close(timeout=10)
3 changes: 2 additions & 1 deletion distributed/deploy/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import dask

from .spec import SpecCluster, ProcessInterface
from ..core import Status
from ..utils import cli_keywords
from ..scheduler import Scheduler as _Scheduler
from ..worker import Worker as _Worker
Expand Down Expand Up @@ -130,7 +131,7 @@ async def start(self):
logger.info(line.strip())
if "worker at" in line:
self.address = line.split("worker at:")[1].strip()
self.status = "running"
self.status = Status.running
break
logger.debug("%s", line)
await super().start()
Expand Down
5 changes: 3 additions & 2 deletions distributed/deploy/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from dask.system import CPU_COUNT
from distributed import Client, Worker, Nanny, get_client
from distributed.core import Status
from distributed.deploy.local import LocalCluster, nprocesses_nthreads
from distributed.metrics import time
from distributed.system import MEMORY_LIMIT
Expand Down Expand Up @@ -188,7 +189,7 @@ def test_Client_with_local(loop):
def test_Client_solo(loop):
with Client(loop=loop, silence_logs=False) as c:
pass
assert c.cluster.status == "closed"
assert c.cluster.status == Status.closed


@gen_test()
Expand Down Expand Up @@ -223,7 +224,7 @@ def test_Client_kwargs(loop):
with Client(loop=loop, processes=False, n_workers=2, silence_logs=False) as c:
assert len(c.cluster.workers) == 2
assert all(isinstance(w, Worker) for w in c.cluster.workers.values())
assert c.cluster.status == "closed"
assert c.cluster.status == Status.closed


def test_Client_unused_kwargs_with_cluster(loop):
Expand Down
9 changes: 5 additions & 4 deletions distributed/deploy/tests/test_spec_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import dask
from dask.distributed import SpecCluster, Worker, Client, Scheduler, Nanny
from distributed.core import Status
from distributed.compatibility import WINDOWS
from distributed.deploy.spec import close_clusters, ProcessInterface, run_spec
from distributed.metrics import time
Expand Down Expand Up @@ -237,7 +238,7 @@ def test_spec_close_clusters(loop):
cluster = SpecCluster(workers=workers, scheduler=scheduler, loop=loop)
assert cluster in SpecCluster._instances
close_clusters()
assert cluster.status == "closed"
assert cluster.status == Status.closed


@pytest.mark.asyncio
Expand Down Expand Up @@ -267,11 +268,11 @@ async def test_nanny_port():
@pytest.mark.asyncio
async def test_spec_process():
proc = ProcessInterface()
assert proc.status == "created"
assert proc.status == Status.created
await proc
assert proc.status == "running"
assert proc.status == Status.running
await proc.close()
assert proc.status == "closed"
assert proc.status == Status.closed


@pytest.mark.asyncio
Expand Down
8 changes: 4 additions & 4 deletions distributed/nanny.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ async def start(self):

logger.info(" Start Nanny at: %r", self.address)
response = await self.instantiate()
if response == "running":
if response == Status.running:
assert self.worker_address
self.status = Status.running
else:
Expand All @@ -316,7 +316,7 @@ async def kill(self, comm=None, timeout=2):
deadline = self.loop.time() + timeout
await self.process.kill(timeout=0.8 * (deadline - self.loop.time()))

async def instantiate(self, comm=None):
async def instantiate(self, comm=None) -> Status:
""" Start a local worker process

Blocks until the process is up and the scheduler is properly informed
Expand Down Expand Up @@ -535,7 +535,7 @@ def __init__(
self.worker_dir = None
self.worker_address = None

async def start(self):
async def start(self) -> Status:
"""
Ensure the worker process is started.
"""
Expand Down Expand Up @@ -584,7 +584,7 @@ async def start(self):
self.worker_address = msg["address"]
self.worker_dir = msg["dir"]
assert self.worker_address
self.status = "running"
self.status = Status.running
self.running.set()

init_q.close()
Expand Down
Loading