Skip to content

Commit

Permalink
Merge pull request #628 from nolar/reinforce-shields
Browse files Browse the repository at this point in the history
Reinforce the watchers's shields from cancellations
  • Loading branch information
nolar authored Jan 3, 2021
2 parents 436219a + 142c414 commit 42b8628
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions kopf/reactor/queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
is done in the `kopf.reactor.handling` routines.
"""
import asyncio
import contextlib
import enum
import logging
from typing import TYPE_CHECKING, MutableMapping, NamedTuple, NewType, Optional, Tuple, Union
Expand Down Expand Up @@ -160,6 +161,7 @@ def exception_handler(scheduler: aiojobs.Scheduler, context: _aiojobs_Context) -

# All per-object workers are handled as fire-and-forget jobs via the scheduler,
# and communicated via the per-object event queues.
scheduler: aiojobs.Scheduler
signaller = asyncio.Condition()
scheduler = await aiojobs.create_scheduler(limit=settings.batching.worker_limit,
exception_handler=exception_handler)
Expand Down Expand Up @@ -197,15 +199,23 @@ def exception_handler(scheduler: aiojobs.Scheduler, context: _aiojobs_Context) -
"The operator will stop to prevent damage.") from worker_error
finally:
# Allow the existing workers to finish gracefully before killing them.
await _wait_for_depletion(
# Ensure the depletion is done even if the watcher is double-cancelled (e.g. in tests).
depletion_task = asyncio.create_task(_wait_for_depletion(
signaller=signaller,
scheduler=scheduler,
streams=streams,
settings=settings,
)
))
while not depletion_task.done():
with contextlib.suppress(asyncio.CancelledError):
await asyncio.shield(depletion_task)

# Terminate all the fire-and-forget per-object jobs if they are still running.
await asyncio.shield(scheduler.close())
# Ensure the scheduler is closed even if the watcher is double-cancelled (e.g. in tests).
closing_task = asyncio.create_task(scheduler.close())
while not closing_task.done():
with contextlib.suppress(asyncio.CancelledError):
await asyncio.shield(closing_task)


async def worker(
Expand Down

0 comments on commit 42b8628

Please sign in to comment.