Skip to content

Commit

Permalink
Merge pull request #179 from nolar/more-exit-logs
Browse files Browse the repository at this point in the history
Log verbosely on the operator termination reasons
  • Loading branch information
Sergey Vasilyev authored Aug 12, 2019
2 parents 8b9fbc1 + 8e2513e commit 49f6e5a
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 13 deletions.
3 changes: 1 addition & 2 deletions kopf/reactor/queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,10 @@ async def watcher(
streams[key].replenished.set() # interrupt current sleeps, if any.
await streams[key].watchevents.put(event)
await scheduler.spawn(worker(handler=handler, streams=streams, key=key))

finally:
# Allow the existing workers to finish gracefully before killing them.
await _wait_for_depletion(scheduler=scheduler, streams=streams)

finally:
# Forcedly terminate all the fire-and-forget per-object jobs, of they are still running.
await asyncio.shield(scheduler.close())

Expand Down
40 changes: 29 additions & 11 deletions kopf/reactor/running.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async def spawn_tasks(
registry = registry if registry is not None else registries.get_default_registry()
event_queue = asyncio.Queue(loop=loop)
freeze_flag = asyncio.Event(loop=loop)
should_stop = asyncio.Event(loop=loop)
should_stop = asyncio.Future(loop=loop)
tasks = []

# A top-level task for external stopping by setting a stop-flag. Once set,
Expand All @@ -104,8 +104,8 @@ async def spawn_tasks(
# K8s-event posting. Events are queued in-memory and posted in the background.
# NB: currently, it is a global task, but can be made per-resource or per-object.
tasks.extend([
loop.create_task(posting.poster(
event_queue=event_queue)),
loop.create_task(_root_task_checker("poster of events", posting.poster(
event_queue=event_queue))),
])

# Monitor the peers, unless explicitly disabled.
Expand All @@ -117,32 +117,32 @@ async def spawn_tasks(
tasks.extend([
loop.create_task(peering.peers_keepalive(
ourselves=ourselves)),
loop.create_task(queueing.watcher(
loop.create_task(_root_task_checker("watcher of peering", queueing.watcher(
namespace=namespace,
resource=ourselves.resource,
handler=functools.partial(peering.peers_handler,
ourselves=ourselves,
freeze=freeze_flag))), # freeze is set/cleared
freeze=freeze_flag)))), # freeze is set/cleared
])

# Resource event handling, only once for every known resource (de-duplicated).
for resource in registry.resources:
tasks.extend([
loop.create_task(queueing.watcher(
loop.create_task(_root_task_checker(f"watcher of {resource.name}", queueing.watcher(
namespace=namespace,
resource=resource,
handler=functools.partial(handling.custom_object_handler,
lifecycle=lifecycle,
registry=registry,
resource=resource,
event_queue=event_queue,
freeze=freeze_flag))), # freeze is only checked
freeze=freeze_flag)))), # freeze is only checked
])

# On Ctrl+C or pod termination, cancel all tasks gracefully.
if threading.current_thread() is threading.main_thread():
loop.add_signal_handler(signal.SIGINT, should_stop.set)
loop.add_signal_handler(signal.SIGTERM, should_stop.set)
loop.add_signal_handler(signal.SIGINT, should_stop.set_result, signal.SIGINT)
loop.add_signal_handler(signal.SIGTERM, should_stop.set_result, signal.SIGTERM)
else:
logger.warning("OS signals are ignored: running not in the main thread.")

Expand Down Expand Up @@ -252,13 +252,31 @@ async def _reraise(tasks):
pass


async def _root_task_checker(name, coro):
try:
await coro
except asyncio.CancelledError:
logger.debug(f"Root task {name!r} is cancelled.")
raise
except Exception as e:
logger.error(f"Root task {name!r} is failed: %r", e)
raise # fail the process and its exit status
else:
logger.warning(f"Root task {name!r} is finished unexpectedly.")


async def _stop_flag_checker(should_stop):
try:
await should_stop.wait()
result = await should_stop
except asyncio.CancelledError:
pass # operator is stopping for any other reason
else:
logger.debug("Stop-flag is raised. Operator is stopping.")
if result is None:
logger.info("Stop-flag is raised. Operator is stopping.")
elif isinstance(result, signal.Signals):
logger.info("Signal %s is received. Operator is stopping.", result.name)
else:
logger.info("Stop-flag is set to %r. Operator is stopping.", result)


def create_tasks(loop: asyncio.AbstractEventLoop, *arg, **kwargs):
Expand Down

0 comments on commit 49f6e5a

Please sign in to comment.