Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bpo-46309: Added reference to task created by StreamReaderProtocol #30505

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Lib/asyncio/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ def __init__(self, stream_reader, client_connected_cb=None, loop=None):
self._strong_reader = stream_reader
self._reject_connection = False
self._stream_writer = None
self._connection_tasks = set()
self._transport = None
self._client_connected_cb = client_connected_cb
self._over_ssl = False
Expand Down Expand Up @@ -241,7 +242,9 @@ def connection_made(self, transport):
res = self._client_connected_cb(reader,
self._stream_writer)
if coroutines.iscoroutine(res):
self._loop.create_task(res)
task = self._loop.create_task(res)
task.add_done_callback(self._connection_tasks.remove)
self._connection_tasks.add(task)
self._strong_reader = None

def connection_lost(self, exc):
Expand Down
19 changes: 19 additions & 0 deletions Lib/test/test_asyncio/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,25 @@ def test_streamreaderprotocol_constructor_use_global_loop(self):
self.assertEqual(cm.warnings[0].filename, __file__)
self.assertIs(protocol._loop, self.loop)

def test_streamreaderprotocol_task_reference(self):
# See http://bugs.python.org/issue46309
messages = []
self.loop.set_exception_handler(lambda loop, ctx: messages.append(ctx))

async def callback(*args):
await self.loop.create_future()

This comment was marked as resolved.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking at the changes! I signed the CLA ~3 hours before this PR, so hopefully it will be OK'd sometime today. Your requested changes makes sense. However from my testing, creating the future outside of the callback will not allow the task to be garbage collected. That is, the test will pass even without keeping a reference to the task in StreamReaderProtocol.

One solution could be to still create the future inside the callback, but cancel the task via the newly created reference in StreamReaderProtocol. Perhaps you have a better suggestion. I can spend some more time on this later today.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are completely correct. I've spent quite a few hours triaging the source of the original issue on stackoverflow and realized the bug is caused when the garbage collector breaks a cyclic reference between the writer, the reader, the task and a future created inside the reader. If any of them are referenced outside of the function, the bug will not appear.

I don't suggest accessing the internals from the test. You're welcome to decline my changes in the tests as long as asyncio doesn't throw any warning about the leak. The garbage collector will take care of it when the loop is closed, or it will be cancelled accordingly.

Copy link
Author

@simwr872 simwr872 Jan 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've accepted your suggested changes in streams.py. As for the leaking test callback, a different solution could be to use asyncio.all_tasks(self.loop).pop().cancel() right after the assertion.

Copy link
Contributor

@bharel bharel Jan 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it's unnecessary - It will cause more confusion, and will get bugged if the internals will change and a new task will be created in the background.

I think we're better off letting the cleanup take care of it.


async def test():
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader, callback)
transport = mock.Mock()
protocol.connection_made(transport)
await asyncio.sleep(0)
gc.collect()

This comment was marked as resolved.

self.loop.run_until_complete(test())
self.assertEqual(messages, [])

def test_drain_raises(self):
# See http://bugs.python.org/issue25441

Expand Down
1 change: 1 addition & 0 deletions Misc/ACKS
Original file line number Diff line number Diff line change
Expand Up @@ -1958,6 +1958,7 @@ Gordon Worley
Darren Worrall
Thomas Wouters
Daniel Wozniak
Simon Wrede
Marcin Niemira
Wei Wu
Heiko Wundram
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Added reference to task created in
:meth:`asyncio.streams.StreamReaderProtocol.connection_made`.