Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Commit

Permalink
Move the loop finalisation within the loop-owning thread
Browse files Browse the repository at this point in the history
  • Loading branch information
nolar committed Feb 20, 2020
1 parent 4664013 commit 941fa3d
Showing 1 changed file with 13 additions and 4 deletions.
17 changes: 13 additions & 4 deletions kopf/toolkits/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ def __init__(
self.kwargs = kwargs
self.reraise = reraise
self.timeout = timeout
self._loop = asyncio.new_event_loop()
self._stop = threading.Event()
self._ready = threading.Event() # NB: not asyncio.Event!
self._thread = threading.Thread(target=self._target)
Expand All @@ -90,8 +89,6 @@ def __exit__(
# but instead wait for the thread+loop (CLI command) to finish.
self._stop.set()
self._thread.join(timeout=self.timeout)
self._loop.run_until_complete(self._loop.shutdown_asyncgens())
self._loop.close()

# If the thread is not finished, it is a bigger problem than exceptions.
if self._thread.is_alive():
Expand All @@ -115,7 +112,8 @@ def _target(self) -> None:

# Every thread must have its own loop. The parent thread (pytest)
# needs to know when the loop is set up, to be able to shut it down.
asyncio.set_event_loop(self._loop)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self._ready.set()

# Execute the requested CLI command in the thread & thread's loop.
Expand All @@ -128,6 +126,17 @@ def _target(self) -> None:
self._future.set_exception(e)
else:
self._future.set_result(result)
finally:

# Shut down the API-watching streams.
loop.run_until_complete(loop.shutdown_asyncgens())

# Shut down the transports and prevent ResourceWarning: unclosed transport.
# See: https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown
# TODO: Try a hack: https://github.com/aio-libs/aiohttp/issues/1925#issuecomment-575754386
loop.run_until_complete(asyncio.sleep(1.0))

loop.close()

@property
def future(self) -> ResultFuture:
Expand Down

0 comments on commit 941fa3d

Please sign in to comment.