From e184e95cc04052934fc27524ab54e2c4fa815ee2 Mon Sep 17 00:00:00 2001 From: Erik Soma Date: Sun, 20 Mar 2022 11:43:44 -0400 Subject: [PATCH 01/15] Retry recvfrom when ERROR_PORT_UNREACHABLE received. --- Lib/asyncio/windows_events.py | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 90b259cbafead2..492d4303631751 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -45,6 +45,15 @@ CONNECT_PIPE_MAX_DELAY = 0.100 +class _Retry(RuntimeError): + + def __init__(self, ov, obj, callback): + super().__init__() + self.ov = ov + self.obj = obj + self.callback = callback + + class _OverlappedFuture(futures.Future): """Subclass of Future which represents an overlapped operation. @@ -493,6 +502,9 @@ def finish_recv(trans, key, ov): return self._register(ov, conn, finish_recv) def recvfrom(self, conn, nbytes, flags=0): + return self._register(*self._recvfrom(conn, nbytes, flags)) + + def _recvfrom(self, conn, nbytes, flags): self._register_with_iocp(conn) ov = _overlapped.Overlapped(NULL) try: @@ -504,13 +516,15 @@ def finish_recv(trans, key, ov): try: return ov.getresult() except OSError as exc: - if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, + if exc.winerror == 1234: # ERROR_PORT_UNREACHABLE + raise _Retry(*self._recvfrom(conn, nbytes, flags)) + elif exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, _overlapped.ERROR_OPERATION_ABORTED): raise ConnectionResetError(*exc.args) else: raise - return self._register(ov, conn, finish_recv) + return ov, conn, finish_recv def recvfrom_into(self, conn, buf, flags=0): self._register_with_iocp(conn) @@ -835,6 +849,14 @@ def _poll(self, timeout=None): elif not f.done(): try: value = callback(transferred, key, ov) + except _Retry as retry: + self._cache[retry.ov.address] = ( + f, + retry.ov, + retry.obj, + retry.callback + ) + continue except OSError as e: f.set_exception(e) self._results.append(f) From 3456205835627d098d03743f5783ed3bb63f7a28 Mon Sep 17 00:00:00 2001 From: Erik Soma Date: Sun, 20 Mar 2022 11:55:05 -0400 Subject: [PATCH 02/15] Forgot something. --- Lib/asyncio/windows_events.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 492d4303631751..898fcb57d182ef 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -768,6 +768,14 @@ def _register(self, ov, obj, callback): f = _OverlappedFuture(ov, loop=self._loop) if f._source_traceback: del f._source_traceback[-1] + self._register_early_out(f, ov, callback) + # Register the overlapped operation for later. Note that + # we only store obj to prevent it from being garbage + # collected too early. + self._cache[ov.address] = (f, ov, obj, callback) + return f + + def _register_early_out(self, f, ov, callback): if not ov.pending: # The operation has completed, so no need to postpone the # work. We cannot take this short cut if we need the @@ -775,6 +783,8 @@ def _register(self, ov, obj, callback): # PostQueuedCompletionStatus(). try: value = callback(None, None, ov) + except _Retry as retry: + return self._register_early_out(f, retry.ov, retry.callback) except OSError as e: f.set_exception(e) else: @@ -785,12 +795,6 @@ def _register(self, ov, obj, callback): # OVERLAPPED object, otherwise the memory is freed and Windows may # read uninitialized memory. - # Register the overlapped operation for later. Note that - # we only store obj to prevent it from being garbage - # collected too early. - self._cache[ov.address] = (f, ov, obj, callback) - return f - def _unregister(self, ov): """Unregister an overlapped object. From d5fb49e9ee23cfe561474133a321f95ec9c8be0e Mon Sep 17 00:00:00 2001 From: Erik Soma Date: Thu, 28 Jul 2022 16:42:18 -0400 Subject: [PATCH 03/15] Revert "Forgot something." This reverts commit 3456205835627d098d03743f5783ed3bb63f7a28. --- Lib/asyncio/windows_events.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 898fcb57d182ef..492d4303631751 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -768,14 +768,6 @@ def _register(self, ov, obj, callback): f = _OverlappedFuture(ov, loop=self._loop) if f._source_traceback: del f._source_traceback[-1] - self._register_early_out(f, ov, callback) - # Register the overlapped operation for later. Note that - # we only store obj to prevent it from being garbage - # collected too early. - self._cache[ov.address] = (f, ov, obj, callback) - return f - - def _register_early_out(self, f, ov, callback): if not ov.pending: # The operation has completed, so no need to postpone the # work. We cannot take this short cut if we need the @@ -783,8 +775,6 @@ def _register_early_out(self, f, ov, callback): # PostQueuedCompletionStatus(). try: value = callback(None, None, ov) - except _Retry as retry: - return self._register_early_out(f, retry.ov, retry.callback) except OSError as e: f.set_exception(e) else: @@ -795,6 +785,12 @@ def _register_early_out(self, f, ov, callback): # OVERLAPPED object, otherwise the memory is freed and Windows may # read uninitialized memory. + # Register the overlapped operation for later. Note that + # we only store obj to prevent it from being garbage + # collected too early. + self._cache[ov.address] = (f, ov, obj, callback) + return f + def _unregister(self, ov): """Unregister an overlapped object. From 03b0e08afb826166c43f7ef1f6d47bd67bf3b11d Mon Sep 17 00:00:00 2001 From: Erik Soma Date: Thu, 28 Jul 2022 16:42:20 -0400 Subject: [PATCH 04/15] Revert "Retry recvfrom when ERROR_PORT_UNREACHABLE received." This reverts commit e184e95cc04052934fc27524ab54e2c4fa815ee2. --- Lib/asyncio/windows_events.py | 26 ++------------------------ 1 file changed, 2 insertions(+), 24 deletions(-) diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 492d4303631751..90b259cbafead2 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -45,15 +45,6 @@ CONNECT_PIPE_MAX_DELAY = 0.100 -class _Retry(RuntimeError): - - def __init__(self, ov, obj, callback): - super().__init__() - self.ov = ov - self.obj = obj - self.callback = callback - - class _OverlappedFuture(futures.Future): """Subclass of Future which represents an overlapped operation. @@ -502,9 +493,6 @@ def finish_recv(trans, key, ov): return self._register(ov, conn, finish_recv) def recvfrom(self, conn, nbytes, flags=0): - return self._register(*self._recvfrom(conn, nbytes, flags)) - - def _recvfrom(self, conn, nbytes, flags): self._register_with_iocp(conn) ov = _overlapped.Overlapped(NULL) try: @@ -516,15 +504,13 @@ def finish_recv(trans, key, ov): try: return ov.getresult() except OSError as exc: - if exc.winerror == 1234: # ERROR_PORT_UNREACHABLE - raise _Retry(*self._recvfrom(conn, nbytes, flags)) - elif exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, + if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, _overlapped.ERROR_OPERATION_ABORTED): raise ConnectionResetError(*exc.args) else: raise - return ov, conn, finish_recv + return self._register(ov, conn, finish_recv) def recvfrom_into(self, conn, buf, flags=0): self._register_with_iocp(conn) @@ -849,14 +835,6 @@ def _poll(self, timeout=None): elif not f.done(): try: value = callback(transferred, key, ov) - except _Retry as retry: - self._cache[retry.ov.address] = ( - f, - retry.ov, - retry.obj, - retry.callback - ) - continue except OSError as e: f.set_exception(e) self._results.append(f) From 27731e1dfc81943f15ca6176a365f627954e5b60 Mon Sep 17 00:00:00 2001 From: Erik Soma Date: Thu, 28 Jul 2022 17:20:30 -0400 Subject: [PATCH 05/15] Ignore ERROR_PORT_UNREACHABLE errors in connectionless receives in asyncio proactor. --- Lib/asyncio/windows_events.py | 8 ++++++-- Modules/overlapped.c | 1 + 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 90b259cbafead2..d3df0a6fd728ec 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -504,7 +504,9 @@ def finish_recv(trans, key, ov): try: return ov.getresult() except OSError as exc: - if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, + if exc.winerror == _overlapped.ERROR_PORT_UNREACHABLE: + return b'', None + elif exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, _overlapped.ERROR_OPERATION_ABORTED): raise ConnectionResetError(*exc.args) else: @@ -524,7 +526,9 @@ def finish_recv(trans, key, ov): try: return ov.getresult() except OSError as exc: - if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, + if exc.winerror == _overlapped.ERROR_PORT_UNREACHABLE: + return b'', None + elif exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, _overlapped.ERROR_OPERATION_ABORTED): raise ConnectionResetError(*exc.args) else: diff --git a/Modules/overlapped.c b/Modules/overlapped.c index 19d25a53af7bb5..2b8962cf14c16b 100644 --- a/Modules/overlapped.c +++ b/Modules/overlapped.c @@ -2028,6 +2028,7 @@ overlapped_exec(PyObject *module) WINAPI_CONSTANT(F_DWORD, ERROR_OPERATION_ABORTED); WINAPI_CONSTANT(F_DWORD, ERROR_SEM_TIMEOUT); WINAPI_CONSTANT(F_DWORD, ERROR_PIPE_BUSY); + WINAPI_CONSTANT(F_DWORD, ERROR_PORT_UNREACHABLE); WINAPI_CONSTANT(F_DWORD, INFINITE); WINAPI_CONSTANT(F_HANDLE, INVALID_HANDLE_VALUE); WINAPI_CONSTANT(F_HANDLE, NULL); From ce32e7b4c486a551f04cd7dfc57fb0ac25d2348e Mon Sep 17 00:00:00 2001 From: Erik Soma Date: Sat, 24 Feb 2024 15:35:36 -0500 Subject: [PATCH 06/15] Add test for issue 91227. --- Lib/test/test_asyncio/test_events.py | 74 ++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index c92c88bd5b2429..68aed981b4abb6 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -1373,6 +1373,80 @@ def test_create_datagram_endpoint_sock(self): self.assertIsInstance(pr, MyDatagramProto) tr.close() self.loop.run_until_complete(pr.done) + + def test_datagram_send_to_non_listening_address(self): + # see: + # https://github.com/python/cpython/issues/91227 + # https://github.com/python/cpython/issues/88906 + # https://bugs.python.org/issue47071 + # https://bugs.python.org/issue44743 + # The Proactor event loop would fail to receive datagram messages after + # sending a message to an address that wasn't listening. + loop = self.loop + + class Protocol(asyncio.DatagramProtocol): + + _received_datagram = None + + def datagram_received(self, data, addr): + self._received_datagram.set_result(data) + + async def wait_for_datagram_received(self): + self._received_datagram = loop.create_future() + result = await asyncio.wait_for(self._recevied_datagram, 10) + self._received_datagram = None + return result + + transport_1, protocol_1 = loop.run_until_complete( + loop.create_datagram_endpoint( + Protocol, + local_addr=('127.0.0.1', 0) + ) + ) + addr_1 = transport_1.sockets[0].getsockname() + + transport_2, protocol_2 = loop.run_until_complete( + loop.create_datagram_endpoint( + Protocol, + local_addr=('127.0.0.1', 0) + ) + )[0] + addr_2 = transport_1.sockets[0].getsockname() + + # creating and immediately closing this to try to get an address that + # is not listening + transport_3, protocol_3 = loop.run_until_complete( + loop.create_datagram_endpoint( + Protocol, + local_addr=('127.0.0.1', 0) + ) + )[0] + addr_3 = transport_1.sockets[0].getsockname() + loop.run_until_complete(transport_3.wait_closed()) + + transport_1.sendto(b'a', addr=addr_2) + assert loop.run_until_complete( + protocol_2.wait_for_datagram_received() + ) == b'a' + + transport_2.sendto(b'b', addr=addr_1) + assert loop.run_until_complete( + protocol_1.wait_for_datagram_received() + ) == b'b' + + # this should send to an address that isn't listening + transport_1.sendto(b'c', addr=addr_3) + loop.run_until_complete(asyncio.sleep(0)) + + # transport 1 should still be able to receive messages after sending to + # an address that wasn't listening + transport_2.sendto(b'd', addr=addr_1) + assert loop.run_until_complete( + protocol_1.wait_for_datagram_received() + ) == b'd' + + loop.run_until_complete(transport_1.wait_closed()) + loop.run_until_complete(transport_2.wait_closed()) def test_internal_fds(self): loop = self.create_event_loop() From decfa464c34a0adb9af1861c3554d390a2ee22bf Mon Sep 17 00:00:00 2001 From: Erik Soma Date: Sat, 24 Feb 2024 17:38:13 -0500 Subject: [PATCH 07/15] Fix test for issue 91227. --- Lib/asyncio/windows_events.py | 2 +- Lib/test/test_asyncio/test_events.py | 43 +++++++++++++--------------- 2 files changed, 21 insertions(+), 24 deletions(-) diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 3b8630137be67b..dcf765332c8b4f 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -463,7 +463,7 @@ def finish_socket_func(trans, key, ov): except OSError as exc: if exc.winerror == _overlapped.ERROR_PORT_UNREACHABLE: return b'', None - if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, + elif exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, _overlapped.ERROR_OPERATION_ABORTED): raise ConnectionResetError(*exc.args) else: diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 68aed981b4abb6..88956bec39ba57 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -1393,37 +1393,37 @@ def datagram_received(self, data, addr): async def wait_for_datagram_received(self): self._received_datagram = loop.create_future() - result = await asyncio.wait_for(self._recevied_datagram, 10) + result = await asyncio.wait_for(self._received_datagram, 10) self._received_datagram = None return result + def create_socket(): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setblocking(False) + sock.bind(('127.0.0.1', 0)) + return sock + + socket_1 = create_socket() transport_1, protocol_1 = loop.run_until_complete( - loop.create_datagram_endpoint( - Protocol, - local_addr=('127.0.0.1', 0) - ) + loop.create_datagram_endpoint(Protocol, sock=socket_1) ) - addr_1 = transport_1.sockets[0].getsockname() + addr_1 = socket_1.getsockname() + socket_2 = create_socket() transport_2, protocol_2 = loop.run_until_complete( - loop.create_datagram_endpoint( - Protocol, - local_addr=('127.0.0.1', 0) - ) - )[0] - addr_2 = transport_1.sockets[0].getsockname() + loop.create_datagram_endpoint(Protocol, sock=socket_2) + ) + addr_2 = socket_2.getsockname() # creating and immediately closing this to try to get an address that # is not listening + socket_3 = create_socket() transport_3, protocol_3 = loop.run_until_complete( - loop.create_datagram_endpoint( - Protocol, - local_addr=('127.0.0.1', 0) - ) - )[0] - addr_3 = transport_1.sockets[0].getsockname() - loop.run_until_complete(transport_3.wait_closed()) - + loop.create_datagram_endpoint(Protocol, sock=socket_3) + ) + addr_3 = socket_3.getsockname() + transport_3.abort() + transport_1.sendto(b'a', addr=addr_2) assert loop.run_until_complete( protocol_2.wait_for_datagram_received() @@ -1444,9 +1444,6 @@ async def wait_for_datagram_received(self): assert loop.run_until_complete( protocol_1.wait_for_datagram_received() ) == b'd' - - loop.run_until_complete(transport_1.wait_closed()) - loop.run_until_complete(transport_2.wait_closed()) def test_internal_fds(self): loop = self.create_event_loop() From 7709e05b3d0c59f2e625c6c26fd868451eec16ce Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Sat, 24 Feb 2024 23:03:44 +0000 Subject: [PATCH 08/15] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20b?= =?UTF-8?q?lurb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Windows/2024-02-24-23-03-43.gh-issue-91227.sL4zWC.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Windows/2024-02-24-23-03-43.gh-issue-91227.sL4zWC.rst diff --git a/Misc/NEWS.d/next/Windows/2024-02-24-23-03-43.gh-issue-91227.sL4zWC.rst b/Misc/NEWS.d/next/Windows/2024-02-24-23-03-43.gh-issue-91227.sL4zWC.rst new file mode 100644 index 00000000000000..8e53afdd619001 --- /dev/null +++ b/Misc/NEWS.d/next/Windows/2024-02-24-23-03-43.gh-issue-91227.sL4zWC.rst @@ -0,0 +1 @@ +Fix the asyncio ProactorEventLoop implementation so that sending a datagram to an address that is not listening does not prevent receiving any more datagrams. From 47c5e13bc4ade8e8218cc90ac379c5180b1f8299 Mon Sep 17 00:00:00 2001 From: Erik Soma Date: Sat, 24 Feb 2024 18:05:31 -0500 Subject: [PATCH 09/15] Fix linter errors. --- Lib/test/test_asyncio/test_events.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 88956bec39ba57..5e6980e490b7a6 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -1373,7 +1373,7 @@ def test_create_datagram_endpoint_sock(self): self.assertIsInstance(pr, MyDatagramProto) tr.close() self.loop.run_until_complete(pr.done) - + def test_datagram_send_to_non_listening_address(self): # see: # https://github.com/python/cpython/issues/91227 @@ -1383,38 +1383,38 @@ def test_datagram_send_to_non_listening_address(self): # The Proactor event loop would fail to receive datagram messages after # sending a message to an address that wasn't listening. loop = self.loop - + class Protocol(asyncio.DatagramProtocol): - + _received_datagram = None - + def datagram_received(self, data, addr): self._received_datagram.set_result(data) - + async def wait_for_datagram_received(self): self._received_datagram = loop.create_future() result = await asyncio.wait_for(self._received_datagram, 10) self._received_datagram = None return result - + def create_socket(): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setblocking(False) sock.bind(('127.0.0.1', 0)) return sock - + socket_1 = create_socket() transport_1, protocol_1 = loop.run_until_complete( loop.create_datagram_endpoint(Protocol, sock=socket_1) ) addr_1 = socket_1.getsockname() - + socket_2 = create_socket() transport_2, protocol_2 = loop.run_until_complete( loop.create_datagram_endpoint(Protocol, sock=socket_2) ) addr_2 = socket_2.getsockname() - + # creating and immediately closing this to try to get an address that # is not listening socket_3 = create_socket() @@ -1428,16 +1428,16 @@ def create_socket(): assert loop.run_until_complete( protocol_2.wait_for_datagram_received() ) == b'a' - + transport_2.sendto(b'b', addr=addr_1) assert loop.run_until_complete( protocol_1.wait_for_datagram_received() ) == b'b' - + # this should send to an address that isn't listening transport_1.sendto(b'c', addr=addr_3) loop.run_until_complete(asyncio.sleep(0)) - + # transport 1 should still be able to receive messages after sending to # an address that wasn't listening transport_2.sendto(b'd', addr=addr_1) From 0878d992f2e26313a58f1108ed9852eee5ee5be9 Mon Sep 17 00:00:00 2001 From: Erik Soma Date: Thu, 21 Mar 2024 19:15:30 -0400 Subject: [PATCH 10/15] Fix test_datagram_send_to_non_listening_address test to use unittest assertion methods instead of assert. --- Lib/test/test_asyncio/test_events.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 8b31d83fa0f553..2331fb8ea36af2 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -1425,14 +1425,14 @@ def create_socket(): transport_3.abort() transport_1.sendto(b'a', addr=addr_2) - assert loop.run_until_complete( + self.assertEqual(loop.run_until_complete( protocol_2.wait_for_datagram_received() - ) == b'a' + ), b'a') transport_2.sendto(b'b', addr=addr_1) - assert loop.run_until_complete( + self.assertEqual(loop.run_until_complete( protocol_1.wait_for_datagram_received() - ) == b'b' + ), b'b') # this should send to an address that isn't listening transport_1.sendto(b'c', addr=addr_3) @@ -1441,9 +1441,9 @@ def create_socket(): # transport 1 should still be able to receive messages after sending to # an address that wasn't listening transport_2.sendto(b'd', addr=addr_1) - assert loop.run_until_complete( + self.assertEqual(loop.run_until_complete( protocol_1.wait_for_datagram_received() - ) == b'd' + ), b'd') def test_internal_fds(self): loop = self.create_event_loop() From 5559f4b16d5518b48e6d8992583f8063a52d7332 Mon Sep 17 00:00:00 2001 From: Erik Soma Date: Thu, 21 Mar 2024 19:52:31 -0400 Subject: [PATCH 11/15] Close transports in test_datagram_send_to_non_listening_address. --- Lib/test/test_asyncio/test_events.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Lib/test/test_asyncio/test_events.py b/Lib/test/test_asyncio/test_events.py index 2331fb8ea36af2..73fb66095c4b21 100644 --- a/Lib/test/test_asyncio/test_events.py +++ b/Lib/test/test_asyncio/test_events.py @@ -1445,6 +1445,9 @@ def create_socket(): protocol_1.wait_for_datagram_received() ), b'd') + transport_1.close() + transport_2.close() + def test_internal_fds(self): loop = self.create_event_loop() if not isinstance(loop, selector_events.BaseSelectorEventLoop): From ed8e383062ec45e3fb8a2b7f1e8b08d664d5a8d7 Mon Sep 17 00:00:00 2001 From: Erik Soma Date: Fri, 22 Mar 2024 18:43:20 -0400 Subject: [PATCH 12/15] X --- Lib/asyncio/windows_events.py | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 0f5dd18fbe7426..58a2ca07461d77 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -461,14 +461,26 @@ def finish_socket_func(trans, key, ov): try: return ov.getresult() except OSError as exc: - if exc.winerror == _overlapped.ERROR_PORT_UNREACHABLE: - return b'', None - elif exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, + if exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, _overlapped.ERROR_OPERATION_ABORTED): raise ConnectionResetError(*exc.args) else: raise + @classmethod + def _finish_recvfrom(cls, trans, key, ov): + try: + return cls.finish_socket_func(trans, key, ov) + except OSError as exc: + raise + # WSARecvFrom will report ERROR_PORT_UNREACHABLE when the + # same socket is used to send to an address that is not + # listening. + if exc.winerror == _overlapped.ERROR_PORT_UNREACHABLE: + return b'', None + else: + raise + def recv(self, conn, nbytes, flags=0): self._register_with_iocp(conn) ov = _overlapped.Overlapped(NULL) @@ -503,7 +515,7 @@ def recvfrom(self, conn, nbytes, flags=0): except BrokenPipeError: return self._result((b'', None)) - return self._register(ov, conn, self.finish_socket_func) + return self._register(ov, conn, self._finish_recvfrom) def recvfrom_into(self, conn, buf, flags=0): self._register_with_iocp(conn) @@ -513,19 +525,7 @@ def recvfrom_into(self, conn, buf, flags=0): except BrokenPipeError: return self._result((0, None)) - def finish_recv(trans, key, ov): - try: - return ov.getresult() - except OSError as exc: - if exc.winerror == _overlapped.ERROR_PORT_UNREACHABLE: - return b'', None - elif exc.winerror in (_overlapped.ERROR_NETNAME_DELETED, - _overlapped.ERROR_OPERATION_ABORTED): - raise ConnectionResetError(*exc.args) - else: - raise - - return self._register(ov, conn, finish_recv) + return self._register(ov, conn, self._finish_recvfrom) def sendto(self, conn, buf, flags=0, addr=None): self._register_with_iocp(conn) From b3b5626794c9ba3a171e12ffb4aee2206867b83f Mon Sep 17 00:00:00 2001 From: Erik Soma Date: Fri, 22 Mar 2024 22:14:38 -0400 Subject: [PATCH 13/15] Add low level socket tests for ERROR_PORT_UNREACHABLE error in ProactorEventLoop. --- Lib/asyncio/windows_events.py | 10 +-- Lib/test/test_asyncio/test_sock_lowlevel.py | 78 +++++++++++++++++++++ 2 files changed, 83 insertions(+), 5 deletions(-) diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index 58a2ca07461d77..f174e76df8711f 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -8,6 +8,7 @@ import _overlapped import _winapi import errno +from functools import partial import math import msvcrt import socket @@ -468,16 +469,15 @@ def finish_socket_func(trans, key, ov): raise @classmethod - def _finish_recvfrom(cls, trans, key, ov): + def _finish_recvfrom(cls, trans, key, ov, *, empty_result): try: return cls.finish_socket_func(trans, key, ov) except OSError as exc: - raise # WSARecvFrom will report ERROR_PORT_UNREACHABLE when the # same socket is used to send to an address that is not # listening. if exc.winerror == _overlapped.ERROR_PORT_UNREACHABLE: - return b'', None + return empty_result, None else: raise @@ -515,7 +515,7 @@ def recvfrom(self, conn, nbytes, flags=0): except BrokenPipeError: return self._result((b'', None)) - return self._register(ov, conn, self._finish_recvfrom) + return self._register(ov, conn, partial(self._finish_recvfrom, empty_result=b'')) def recvfrom_into(self, conn, buf, flags=0): self._register_with_iocp(conn) @@ -525,7 +525,7 @@ def recvfrom_into(self, conn, buf, flags=0): except BrokenPipeError: return self._result((0, None)) - return self._register(ov, conn, self._finish_recvfrom) + return self._register(ov, conn, partial(self._finish_recvfrom, empty_result=0)) def sendto(self, conn, buf, flags=0, addr=None): self._register_with_iocp(conn) diff --git a/Lib/test/test_asyncio/test_sock_lowlevel.py b/Lib/test/test_asyncio/test_sock_lowlevel.py index 075113cbe8e4a6..c762a053178f91 100644 --- a/Lib/test/test_asyncio/test_sock_lowlevel.py +++ b/Lib/test/test_asyncio/test_sock_lowlevel.py @@ -555,12 +555,90 @@ class SelectEventLoopTests(BaseSockTestsMixin, def create_event_loop(self): return asyncio.SelectorEventLoop() + class ProactorEventLoopTests(BaseSockTestsMixin, test_utils.TestCase): def create_event_loop(self): return asyncio.ProactorEventLoop() + + async def _basetest_datagram_send_to_non_listening_address(self, recvfrom): + await asyncio.sleep(0) + # see: + # https://github.com/python/cpython/issues/91227 + # https://github.com/python/cpython/issues/88906 + # https://bugs.python.org/issue47071 + # https://bugs.python.org/issue44743 + # The Proactor event loop would fail to receive datagram messages after + # sending a message to an address that wasn't listening. + + def create_socket(): + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.setblocking(False) + sock.bind(('127.0.0.1', 0)) + return sock + + socket_1 = create_socket() + addr_1 = socket_1.getsockname() + + socket_2 = create_socket() + addr_2 = socket_2.getsockname() + + # creating and immediately closing this to try to get an address that + # is not listening + socket_3 = create_socket() + addr_3 = socket_3.getsockname() + socket_3.shutdown(socket.SHUT_RDWR) + socket_3.close() + + socket_1_recv_task = self.loop.create_task(recvfrom(socket_1)) + socket_2_recv_task = self.loop.create_task(recvfrom(socket_2)) + await asyncio.sleep(0) + + await self.loop.sock_sendto(socket_1, b'a', addr_2) + self.assertEqual(await socket_2_recv_task, b'a') + + await self.loop.sock_sendto(socket_2, b'b', addr_1) + self.assertEqual(await socket_1_recv_task, b'b') + socket_1_recv_task = self.loop.create_task(recvfrom(socket_1)) + await asyncio.sleep(0) + + # this should send to an address that isn't listening + await self.loop.sock_sendto(socket_1, b'c', addr_3) + self.assertEqual(await socket_1_recv_task, b'') + socket_1_recv_task = self.loop.create_task(recvfrom(socket_1)) + await asyncio.sleep(0) + + # socket 1 should still be able to receive messages after sending to + # an address that wasn't listening + socket_2.sendto(b'd', addr_1) + self.assertEqual(await socket_1_recv_task, b'd') + + socket_1.shutdown(socket.SHUT_RDWR) + socket_1.close() + socket_2.shutdown(socket.SHUT_RDWR) + socket_2.close() + + + def test_datagram_send_to_non_listening_address_recvfrom(self): + async def recvfrom(socket): + data, _ = await self.loop.sock_recvfrom(socket, 4096) + return data + + self.loop.run_until_complete( + self._basetest_datagram_send_to_non_listening_address(recvfrom)) + + + def test_datagram_send_to_non_listening_address_recvfrom_into(self): + async def recvfrom_into(socket): + buf = bytearray(4096) + length, _ = await self.loop.sock_recvfrom_into(socket, buf, 4096) + return buf[:length] + + self.loop.run_until_complete( + self._basetest_datagram_send_to_non_listening_address(recvfrom_into)) + else: import selectors From a6915cd53d44f71359f0642c129a24f5bd4cee7a Mon Sep 17 00:00:00 2001 From: Erik Soma Date: Fri, 22 Mar 2024 22:29:21 -0400 Subject: [PATCH 14/15] Formatting. --- Lib/asyncio/windows_events.py | 11 +++++----- Lib/test/test_asyncio/test_sock_lowlevel.py | 24 ++++++++++++--------- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py index f174e76df8711f..bf99bc271c7acd 100644 --- a/Lib/asyncio/windows_events.py +++ b/Lib/asyncio/windows_events.py @@ -473,9 +473,8 @@ def _finish_recvfrom(cls, trans, key, ov, *, empty_result): try: return cls.finish_socket_func(trans, key, ov) except OSError as exc: - # WSARecvFrom will report ERROR_PORT_UNREACHABLE when the - # same socket is used to send to an address that is not - # listening. + # WSARecvFrom will report ERROR_PORT_UNREACHABLE when the same + # socket is used to send to an address that is not listening. if exc.winerror == _overlapped.ERROR_PORT_UNREACHABLE: return empty_result, None else: @@ -515,7 +514,8 @@ def recvfrom(self, conn, nbytes, flags=0): except BrokenPipeError: return self._result((b'', None)) - return self._register(ov, conn, partial(self._finish_recvfrom, empty_result=b'')) + return self._register(ov, conn, partial(self._finish_recvfrom, + empty_result=b'')) def recvfrom_into(self, conn, buf, flags=0): self._register_with_iocp(conn) @@ -525,7 +525,8 @@ def recvfrom_into(self, conn, buf, flags=0): except BrokenPipeError: return self._result((0, None)) - return self._register(ov, conn, partial(self._finish_recvfrom, empty_result=0)) + return self._register(ov, conn, partial(self._finish_recvfrom, + empty_result=0)) def sendto(self, conn, buf, flags=0, addr=None): self._register_with_iocp(conn) diff --git a/Lib/test/test_asyncio/test_sock_lowlevel.py b/Lib/test/test_asyncio/test_sock_lowlevel.py index c762a053178f91..ac2470d44b29ff 100644 --- a/Lib/test/test_asyncio/test_sock_lowlevel.py +++ b/Lib/test/test_asyncio/test_sock_lowlevel.py @@ -563,15 +563,16 @@ def create_event_loop(self): return asyncio.ProactorEventLoop() - async def _basetest_datagram_send_to_non_listening_address(self, recvfrom): + async def _basetest_datagram_send_to_non_listening_address(self, + recvfrom): await asyncio.sleep(0) # see: # https://github.com/python/cpython/issues/91227 # https://github.com/python/cpython/issues/88906 # https://bugs.python.org/issue47071 # https://bugs.python.org/issue44743 - # The Proactor event loop would fail to receive datagram messages after - # sending a message to an address that wasn't listening. + # The Proactor event loop would fail to receive datagram messages + # after sending a message to an address that wasn't listening. def create_socket(): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) @@ -585,8 +586,8 @@ def create_socket(): socket_2 = create_socket() addr_2 = socket_2.getsockname() - # creating and immediately closing this to try to get an address that - # is not listening + # creating and immediately closing this to try to get an address + # that is not listening socket_3 = create_socket() addr_3 = socket_3.getsockname() socket_3.shutdown(socket.SHUT_RDWR) @@ -610,8 +611,8 @@ def create_socket(): socket_1_recv_task = self.loop.create_task(recvfrom(socket_1)) await asyncio.sleep(0) - # socket 1 should still be able to receive messages after sending to - # an address that wasn't listening + # socket 1 should still be able to receive messages after sending + # to an address that wasn't listening socket_2.sendto(b'd', addr_1) self.assertEqual(await socket_1_recv_task, b'd') @@ -627,17 +628,20 @@ async def recvfrom(socket): return data self.loop.run_until_complete( - self._basetest_datagram_send_to_non_listening_address(recvfrom)) + self._basetest_datagram_send_to_non_listening_address( + recvfrom)) def test_datagram_send_to_non_listening_address_recvfrom_into(self): async def recvfrom_into(socket): buf = bytearray(4096) - length, _ = await self.loop.sock_recvfrom_into(socket, buf, 4096) + length, _ = await self.loop.sock_recvfrom_into(socket, buf, + 4096) return buf[:length] self.loop.run_until_complete( - self._basetest_datagram_send_to_non_listening_address(recvfrom_into)) + self._basetest_datagram_send_to_non_listening_address( + recvfrom_into)) else: import selectors From 69bf497513c487870973dadadd88a8518c7c6cf6 Mon Sep 17 00:00:00 2001 From: Erik Soma Date: Fri, 22 Mar 2024 22:34:39 -0400 Subject: [PATCH 15/15] Remove extra asyncio.sleep in _basetest_datagram_send_to_non_listening_address. --- Lib/test/test_asyncio/test_sock_lowlevel.py | 1 - 1 file changed, 1 deletion(-) diff --git a/Lib/test/test_asyncio/test_sock_lowlevel.py b/Lib/test/test_asyncio/test_sock_lowlevel.py index ac2470d44b29ff..acef24a703ba38 100644 --- a/Lib/test/test_asyncio/test_sock_lowlevel.py +++ b/Lib/test/test_asyncio/test_sock_lowlevel.py @@ -565,7 +565,6 @@ def create_event_loop(self): async def _basetest_datagram_send_to_non_listening_address(self, recvfrom): - await asyncio.sleep(0) # see: # https://github.com/python/cpython/issues/91227 # https://github.com/python/cpython/issues/88906