Skip to content

Commit

Permalink
Fork out #6009
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Mar 28, 2022
1 parent 70a7ba6 commit 5dcee7e
Showing 1 changed file with 6 additions and 11 deletions.
17 changes: 6 additions & 11 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3438,22 +3438,17 @@ async def execute(self, key: str, *, stimulus_id: str) -> tuple[Recs, Instructio

args2, kwargs2 = self._prepare_args_for_execution(ts, args, kwargs)

try:
executor = ts.annotations["executor"] # type: ignore
except (TypeError, KeyError):
if ts.annotations is not None and "executor" in ts.annotations:
executor = ts.annotations["executor"]
else:
executor = "default"
try:
e = self.executors[executor]
except KeyError:
raise ValueError(
f"Invalid executor {executor!r}; "
f"expected one of: {sorted(self.executors)}"
)

assert executor in self.executors
assert key == ts.key
self.active_keys.add(ts.key)

result: dict
try:
e = self.executors[executor]
ts.start_time = time()
if iscoroutinefunction(function):
result = await apply_function_async(
Expand Down

0 comments on commit 5dcee7e

Please sign in to comment.