Skip to content

Commit

Permalink
Use the new HLG pack/unpack API in Dask (#4489)
Browse files Browse the repository at this point in the history
  • Loading branch information
madsbk authored Feb 26, 2021
1 parent 3e1fd2a commit 31119c4
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 176 deletions.
3 changes: 1 addition & 2 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
from .metrics import time
from .protocol import to_serialize
from .protocol.pickle import dumps, loads
from .protocol.highlevelgraph import highlevelgraph_pack
from .publish import Datasets
from .pubsub import PubSubClientExtension
from .security import Security
Expand Down Expand Up @@ -2540,7 +2539,7 @@ def _graph_to_futures(
if not isinstance(dsk, HighLevelGraph):
dsk = HighLevelGraph.from_collections(id(dsk), dsk, dependencies=())

dsk = highlevelgraph_pack(dsk, self, keyset)
dsk = dsk.__dask_distributed_pack__(self, keyset)

annotations = {}
if user_priority:
Expand Down
170 changes: 0 additions & 170 deletions distributed/protocol/highlevelgraph.py

This file was deleted.

8 changes: 5 additions & 3 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from tornado.ioloop import IOLoop, PeriodicCallback

import dask
from dask.highlevelgraph import HighLevelGraph

from . import profile
from .batched import BatchedSend
Expand Down Expand Up @@ -84,7 +85,6 @@
from .pubsub import PubSubSchedulerExtension
from .stealing import WorkStealing
from .variable import VariableExtension
from .protocol.highlevelgraph import highlevelgraph_unpack

try:
from cython import compiled
Expand Down Expand Up @@ -3856,8 +3856,10 @@ def update_graph_hlg(
fifo_timeout=0,
annotations=None,
):

dsk, dependencies, annotations = highlevelgraph_unpack(hlg, annotations)
unpacked_graph = HighLevelGraph.__dask_distributed_unpack__(hlg, annotations)
dsk = unpacked_graph["dsk"]
dependencies = unpacked_graph["deps"]
annotations = unpacked_graph["annotations"]

# Remove any self-dependencies (happens on test_publish_bag() and others)
for k, v in dependencies.items():
Expand Down
5 changes: 4 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
click >= 6.6
cloudpickle >= 1.5.0
contextvars;python_version<'3.7'
dask>=2021.02.0
# TODO: Update our dask version requirement to
# the latest dask release before releasing distributed
# xref https://github.com/dask/distributed/pull/4489
dask @ git+https://github.com/dask/dask.git@master
msgpack >= 0.6.0
psutil >= 5.0
sortedcontainers !=2.0.0, !=2.0.1
Expand Down

0 comments on commit 31119c4

Please sign in to comment.