Skip to content

Commit

Permalink
fix progress_stream teardown
Browse files Browse the repository at this point in the history
  • Loading branch information
graingert committed Feb 17, 2022
1 parent 60d82c2 commit 2ae4e38
Showing 1 changed file with 3 additions and 5 deletions.
8 changes: 3 additions & 5 deletions distributed/diagnostics/progress_stream.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import logging
from functools import partial

from tlz import merge, valmap

from ..core import coerce_to_address, connect
from ..scheduler import Scheduler
from ..utils import color_of, key_split
from ..worker import dumps_function
from .progress import AllProgress
Expand All @@ -22,10 +20,10 @@ def counts(scheduler, allprogress):
)


def remove_plugin(**kwargs):
def _remove_all_progress_plugin(self, *args, **kwargs):
# Wrapper function around `Scheduler.remove_plugin` to avoid raising a
# `PicklingError` when using a cythonized scheduler
return Scheduler.remove_plugin(**kwargs)
self.remove_plugin(name=AllProgress.name)


async def progress_stream(address, interval):
Expand Down Expand Up @@ -54,7 +52,7 @@ async def progress_stream(address, interval):
"setup": dumps_function(AllProgress),
"function": dumps_function(counts),
"interval": interval,
"teardown": dumps_function(partial(remove_plugin, name=AllProgress.name)),
"teardown": dumps_function(_remove_all_progress_plugin),
}
)
return comm
Expand Down

0 comments on commit 2ae4e38

Please sign in to comment.