diff --git a/kopf/toolkits/runner.py b/kopf/toolkits/runner.py index 2ce28b42..06044720 100644 --- a/kopf/toolkits/runner.py +++ b/kopf/toolkits/runner.py @@ -67,7 +67,6 @@ def __init__( self.kwargs = kwargs self.reraise = reraise self.timeout = timeout - self._loop = asyncio.new_event_loop() self._stop = threading.Event() self._ready = threading.Event() # NB: not asyncio.Event! self._thread = threading.Thread(target=self._target) @@ -90,8 +89,6 @@ def __exit__( # but instead wait for the thread+loop (CLI command) to finish. self._stop.set() self._thread.join(timeout=self.timeout) - self._loop.run_until_complete(self._loop.shutdown_asyncgens()) - self._loop.close() # If the thread is not finished, it is a bigger problem than exceptions. if self._thread.is_alive(): @@ -115,7 +112,8 @@ def _target(self) -> None: # Every thread must have its own loop. The parent thread (pytest) # needs to know when the loop is set up, to be able to shut it down. - asyncio.set_event_loop(self._loop) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) self._ready.set() # Execute the requested CLI command in the thread & thread's loop. @@ -128,6 +126,17 @@ def _target(self) -> None: self._future.set_exception(e) else: self._future.set_result(result) + finally: + + # Shut down the API-watching streams. + loop.run_until_complete(loop.shutdown_asyncgens()) + + # Shut down the transports and prevent ResourceWarning: unclosed transport. + # See: https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown + # TODO: Try a hack: https://github.com/aio-libs/aiohttp/issues/1925#issuecomment-575754386 + loop.run_until_complete(asyncio.sleep(1.0)) + + loop.close() @property def future(self) -> ResultFuture: