From f43733b419bde9cf685a64e3aeba3d65697cadef Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 28 Mar 2022 14:01:58 +0100 Subject: [PATCH] Better error message on misspelled executor annotation --- distributed/tests/test_worker.py | 9 +++++++++ distributed/worker.py | 17 +++++++++++------ 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 317ccad635f..646717fec03 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1897,6 +1897,15 @@ def get_thread_name(): assert "Dask-Foo-Threads" in gpu_result +@gen_cluster(client=True) +async def test_bad_executor_annotation(c, s, a, b): + with dask.annotate(executor="bad"): + future = c.submit(inc, 1) + with pytest.raises(ValueError, match="Invalid executor 'bad'; expected one of: "): + await future + assert future.status == "error" + + @gen_cluster(client=True) async def test_process_executor(c, s, a, b): with ProcessPoolExecutor() as e: diff --git a/distributed/worker.py b/distributed/worker.py index 7a6e040303a..434700e937b 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -3421,17 +3421,22 @@ async def execute(self, key: str, *, stimulus_id: str) -> None: args2, kwargs2 = self._prepare_args_for_execution(ts, args, kwargs) - if ts.annotations is not None and "executor" in ts.annotations: - executor = ts.annotations["executor"] - else: + try: + executor = ts.annotations["executor"] # type: ignore + except (TypeError, KeyError): executor = "default" - assert executor in self.executors - assert key == ts.key + try: + e = self.executors[executor] + except KeyError: + raise ValueError( + f"Invalid executor {executor!r}; " + f"expected one of: {sorted(self.executors)}" + ) + self.active_keys.add(ts.key) result: dict try: - e = self.executors[executor] ts.start_time = time() if iscoroutinefunction(function): result = await apply_function_async(