From 2ae4e3834b3a32da2a1c43fed5adc4e359223d4c Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Wed, 16 Feb 2022 16:42:18 +0000 Subject: [PATCH] fix progress_stream teardown --- distributed/diagnostics/progress_stream.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/distributed/diagnostics/progress_stream.py b/distributed/diagnostics/progress_stream.py index a9c02846e14..440f7a59043 100644 --- a/distributed/diagnostics/progress_stream.py +++ b/distributed/diagnostics/progress_stream.py @@ -1,10 +1,8 @@ import logging -from functools import partial from tlz import merge, valmap from ..core import coerce_to_address, connect -from ..scheduler import Scheduler from ..utils import color_of, key_split from ..worker import dumps_function from .progress import AllProgress @@ -22,10 +20,10 @@ def counts(scheduler, allprogress): ) -def remove_plugin(**kwargs): +def _remove_all_progress_plugin(self, *args, **kwargs): # Wrapper function around `Scheduler.remove_plugin` to avoid raising a # `PicklingError` when using a cythonized scheduler - return Scheduler.remove_plugin(**kwargs) + self.remove_plugin(name=AllProgress.name) async def progress_stream(address, interval): @@ -54,7 +52,7 @@ async def progress_stream(address, interval): "setup": dumps_function(AllProgress), "function": dumps_function(counts), "interval": interval, - "teardown": dumps_function(partial(remove_plugin, name=AllProgress.name)), + "teardown": dumps_function(_remove_all_progress_plugin), } ) return comm