diff --git a/kopf/reactor/queueing.py b/kopf/reactor/queueing.py index a927a2f0..ec00b08d 100644 --- a/kopf/reactor/queueing.py +++ b/kopf/reactor/queueing.py @@ -23,6 +23,7 @@ is done in the `kopf.reactor.handling` routines. """ import asyncio +import contextlib import enum import logging from typing import TYPE_CHECKING, MutableMapping, NamedTuple, NewType, Optional, Tuple, Union @@ -160,6 +161,7 @@ def exception_handler(scheduler: aiojobs.Scheduler, context: _aiojobs_Context) - # All per-object workers are handled as fire-and-forget jobs via the scheduler, # and communicated via the per-object event queues. + scheduler: aiojobs.Scheduler signaller = asyncio.Condition() scheduler = await aiojobs.create_scheduler(limit=settings.batching.worker_limit, exception_handler=exception_handler) @@ -197,15 +199,23 @@ def exception_handler(scheduler: aiojobs.Scheduler, context: _aiojobs_Context) - "The operator will stop to prevent damage.") from worker_error finally: # Allow the existing workers to finish gracefully before killing them. - await _wait_for_depletion( + # Ensure the depletion is done even if the watcher is double-cancelled (e.g. in tests). + depletion_task = asyncio.create_task(_wait_for_depletion( signaller=signaller, scheduler=scheduler, streams=streams, settings=settings, - ) + )) + while not depletion_task.done(): + with contextlib.suppress(asyncio.CancelledError): + await asyncio.shield(depletion_task) # Terminate all the fire-and-forget per-object jobs if they are still running. - await asyncio.shield(scheduler.close()) + # Ensure the scheduler is closed even if the watcher is double-cancelled (e.g. in tests). + closing_task = asyncio.create_task(scheduler.close()) + while not closing_task.done(): + with contextlib.suppress(asyncio.CancelledError): + await asyncio.shield(closing_task) async def worker(