From cfbeab0e9f325696964b103218820ed72f1a166a Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Sat, 3 Jul 2021 22:55:49 +0100 Subject: [PATCH 01/15] Don't cancel tasks when entering a timer context --- aiohttp/helpers.py | 1 - 1 file changed, 1 deletion(-) diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index 418de0f6f9a..b44181bf7f1 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -672,7 +672,6 @@ def __enter__(self) -> BaseTimerContext: ) if self._cancelled: - task.cancel() raise asyncio.TimeoutError from None self._tasks.append(task) From 31f3b544f0e8fd2164b177e71c5355077ee2622d Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Sat, 3 Jul 2021 23:08:09 +0100 Subject: [PATCH 02/15] Update test_helpers.py --- tests/test_helpers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_helpers.py b/tests/test_helpers.py index e9b99e12170..3f8b5120ec7 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -362,7 +362,7 @@ def test_timeout_handle_cb_exc(loop) -> None: assert not handle._callbacks -def test_timer_context_cancelled() -> None: +def test_timer_context_not_cancelled() -> None: with mock.patch("aiohttp.helpers.asyncio") as m_asyncio: m_asyncio.TimeoutError = asyncio.TimeoutError loop = mock.Mock() @@ -373,7 +373,7 @@ def test_timer_context_cancelled() -> None: with ctx: pass - assert m_asyncio.current_task.return_value.cancel.called + assert not m_asyncio.current_task.return_value.cancel.called def test_timer_context_no_task(loop) -> None: From 6835dab4cba1b2f4df47e205aeaa9ab0a5777f3e Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Sun, 4 Jul 2021 13:07:29 +0100 Subject: [PATCH 03/15] Create 5853.bugfix --- CHANGES/5853.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 CHANGES/5853.bugfix diff --git a/CHANGES/5853.bugfix b/CHANGES/5853.bugfix new file mode 100644 index 00000000000..3b3b03a374f --- /dev/null +++ b/CHANGES/5853.bugfix @@ -0,0 +1 @@ +Don't cancel current task when entering a cancelled timer. From 2cf68f203da339f8f7e3e703ec9039b11be699c4 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Sun, 4 Jul 2021 16:46:51 +0100 Subject: [PATCH 04/15] Yield to event loop while reading. --- aiohttp/streams.py | 21 ++++++++++++++------- tests/test_client_functional.py | 23 +++++++++++++++++++++++ 2 files changed, 37 insertions(+), 7 deletions(-) diff --git a/aiohttp/streams.py b/aiohttp/streams.py index a077b81b82d..57fdb13daf6 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -376,7 +376,7 @@ async def read(self, n: int = -1) -> bytes: while not self._buffer and not self._eof: await self._wait("read") - return self._read_nowait(n) + return await self._read_nowait(n) async def readany(self) -> bytes: if self._exception is not None: @@ -388,7 +388,7 @@ async def readany(self) -> bytes: while not self._buffer and not self._eof: await self._wait("readany") - return self._read_nowait(-1) + return await self._read_nowait(-1) async def readchunk(self) -> Tuple[bytes, bool]: """Returns a tuple of (data, end_of_http_chunk). When chunked transfer @@ -405,7 +405,7 @@ async def readchunk(self) -> Tuple[bytes, bool]: if pos == self._cursor: return (b"", True) if pos > self._cursor: - return (self._read_nowait(pos - self._cursor), True) + return (await self._read_nowait(pos - self._cursor), True) internal_logger.warning( "Skipping HTTP chunk end due to data " "consumption beyond chunk boundary" @@ -413,7 +413,7 @@ async def readchunk(self) -> Tuple[bytes, bool]: if self._buffer: return (self._read_nowait_chunk(-1), False) - # return (self._read_nowait(-1), False) + # return (await self._read_nowait(-1), False) if self._eof: # Special case for signifying EOF. @@ -437,7 +437,7 @@ async def readexactly(self, n: int) -> bytes: return b"".join(blocks) - def read_nowait(self, n: int = -1) -> bytes: + async def read_nowait(self, n: int = -1) -> bytes: # default was changed to be consistent with .read(-1) # # I believe the most users don't know about the method and @@ -450,7 +450,7 @@ def read_nowait(self, n: int = -1) -> bytes: "Called while some coroutine is waiting for incoming data." ) - return self._read_nowait(n) + return await self._read_nowait(n) def _read_nowait_chunk(self, n: int) -> bytes: first_buffer = self._buffer[0] @@ -479,11 +479,18 @@ def _read_nowait_chunk(self, n: int) -> bytes: self._protocol.resume_reading() return data - def _read_nowait(self, n: int) -> bytes: + async def _read_nowait(self, n: int) -> bytes: + if self._timer: + with self._timer: + return await self._read_nowait_internal(n) + return await self._read_nowait_internal(n) + + async def _read_nowait_internal(self, n: int) -> bytes: """ Read not more than n bytes, or whole buffer if n == -1 """ chunks = [] while self._buffer: + await asyncio.sleep(0) # Yield to event loop. chunk = self._read_nowait_chunk(n) chunks.append(chunk) if n != -1: diff --git a/tests/test_client_functional.py b/tests/test_client_functional.py index 52d74d98324..0c38529023e 100644 --- a/tests/test_client_functional.py +++ b/tests/test_client_functional.py @@ -2933,6 +2933,29 @@ async def handler(request): await resp.read() +async def test_timeout_with_full_buffer(aiohttp_client: Any) -> None: + async def handler(request): + """Server response that never ends and always has more data available.""" + resp = web.StreamResponse() + await resp.prepare(request) + while True: + await resp.write(b'1' * 100) + + async def request(client): + with pytest.raises(asyncio.TimeoutError): + async with await client.get("/", timeout=0.5) as resp: + async for data in resp.content.iter_chunked(1): + await asyncio.sleep(0.01) + + app = web.Application() + app.add_routes([web.get("/", handler)]) + + client = await aiohttp_client(app) + # wait_for() timeout should not be reached. + await asyncio.wait_for(request(client), 1) + + + async def test_read_bufsize_session_default(aiohttp_client: Any) -> None: async def handler(request): return web.Response(body=b"1234567") From 72a3bd8c37416286ceacd1621f6dae3b11612edd Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 13 May 2023 13:00:17 +0000 Subject: [PATCH 05/15] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/test_client_functional.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/test_client_functional.py b/tests/test_client_functional.py index ac0ef25244b..f9b0093c456 100644 --- a/tests/test_client_functional.py +++ b/tests/test_client_functional.py @@ -3028,7 +3028,7 @@ async def handler(request): resp = web.StreamResponse() await resp.prepare(request) while True: - await resp.write(b'1' * 100) + await resp.write(b"1" * 100) async def request(client): with pytest.raises(asyncio.TimeoutError): @@ -3044,7 +3044,6 @@ async def request(client): await asyncio.wait_for(request(client), 1) - async def test_read_bufsize_session_default(aiohttp_client: Any) -> None: async def handler(request): return web.Response(body=b"1234567") From 7542f7567e1116c9885b2f41c65672541043ad8c Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Sat, 13 May 2023 14:46:58 +0100 Subject: [PATCH 06/15] Update --- aiohttp/streams.py | 6 ++++++ tests/test_client_functional.py | 6 ++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/aiohttp/streams.py b/aiohttp/streams.py index af99b932379..979f986bd83 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -434,6 +434,12 @@ async def readexactly(self, n: int) -> bytes: return b"".join(blocks) async def read_nowait(self, n: int = -1) -> bytes: + """Read data from the buffer without waiting. + + While this function does not wait for incoming data, it will yield to the event + loop to avoid blocking the program in a busy loop when a large amount of + incoming data is available without waiting. + """ # default was changed to be consistent with .read(-1) # # I believe the most users don't know about the method and diff --git a/tests/test_client_functional.py b/tests/test_client_functional.py index f9b0093c456..a4116ba9f9b 100644 --- a/tests/test_client_functional.py +++ b/tests/test_client_functional.py @@ -3028,11 +3028,13 @@ async def handler(request): resp = web.StreamResponse() await resp.prepare(request) while True: - await resp.write(b"1" * 100) + await resp.write(b"1" * 1000) + await asyncio.sleep(0.01) async def request(client): + timeout = aiohttp.ClientTimeout(total=0.5) with pytest.raises(asyncio.TimeoutError): - async with await client.get("/", timeout=0.5) as resp: + async with await client.get("/", timeout=timeout) as resp: async for data in resp.content.iter_chunked(1): await asyncio.sleep(0.01) From 751603f467038fae24d05f2d2891762858b01bb6 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Sat, 13 May 2023 14:48:57 +0100 Subject: [PATCH 07/15] Update and rename 5853.bugfix to 5854.feature --- CHANGES/5853.bugfix | 1 - CHANGES/5854.feature | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) delete mode 100644 CHANGES/5853.bugfix create mode 100644 CHANGES/5854.feature diff --git a/CHANGES/5853.bugfix b/CHANGES/5853.bugfix deleted file mode 100644 index 3b3b03a374f..00000000000 --- a/CHANGES/5853.bugfix +++ /dev/null @@ -1 +0,0 @@ -Don't cancel current task when entering a cancelled timer. diff --git a/CHANGES/5854.feature b/CHANGES/5854.feature new file mode 100644 index 00000000000..e406f5317a6 --- /dev/null +++ b/CHANGES/5854.feature @@ -0,0 +1 @@ +Made ``StreamReader.read_nowait()`` async to avoid blocking the program in a busy loop -- by :user:`Dreamsorcerer`. From 61688bdeac6444c1ce084b0ef6b87a021008f8f4 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Sat, 13 May 2023 14:52:28 +0100 Subject: [PATCH 08/15] Update 5854.feature --- CHANGES/5854.feature | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES/5854.feature b/CHANGES/5854.feature index e406f5317a6..9c86651126a 100644 --- a/CHANGES/5854.feature +++ b/CHANGES/5854.feature @@ -1 +1 @@ -Made ``StreamReader.read_nowait()`` async to avoid blocking the program in a busy loop -- by :user:`Dreamsorcerer`. +Made ``StreamReader.read_nowait()`` async to avoid blocking the program in a busy loop (and ensure the client timeout works in this situation) -- by :user:`Dreamsorcerer`. From dd39a06652cd125cc460aa7e6237693d93c09239 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Sat, 13 May 2023 15:28:30 +0100 Subject: [PATCH 09/15] Revert to sync function --- aiohttp/helpers.py | 5 +++++ aiohttp/streams.py | 27 ++++++++------------------- 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index 431bd67dd14..b7cff138bd7 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -700,6 +700,11 @@ def __init__(self, loop: asyncio.AbstractEventLoop) -> None: self._tasks: List[asyncio.Task[Any]] = [] self._cancelled = False + def check_timeout(self) -> None: + """Raise TimeoutError if timer has already been cancelled.""" + if self._cancelled: + raise asyncio.TimeoutError from None + def __enter__(self) -> BaseTimerContext: task = asyncio.current_task(loop=self._loop) diff --git a/aiohttp/streams.py b/aiohttp/streams.py index 979f986bd83..5d09a40f997 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -370,7 +370,7 @@ async def read(self, n: int = -1) -> bytes: while not self._buffer and not self._eof: await self._wait("read") - return await self._read_nowait(n) + return self._read_nowait(n) async def readany(self) -> bytes: if self._exception is not None: @@ -401,7 +401,7 @@ async def readchunk(self) -> Tuple[bytes, bool]: if pos == self._cursor: return (b"", True) if pos > self._cursor: - return (await self._read_nowait(pos - self._cursor), True) + return (self._read_nowait(pos - self._cursor), True) internal_logger.warning( "Skipping HTTP chunk end due to data " "consumption beyond chunk boundary" @@ -409,7 +409,7 @@ async def readchunk(self) -> Tuple[bytes, bool]: if self._buffer: return (self._read_nowait_chunk(-1), False) - # return (await self._read_nowait(-1), False) + # return (self._read_nowait(-1), False) if self._eof: # Special case for signifying EOF. @@ -433,13 +433,7 @@ async def readexactly(self, n: int) -> bytes: return b"".join(blocks) - async def read_nowait(self, n: int = -1) -> bytes: - """Read data from the buffer without waiting. - - While this function does not wait for incoming data, it will yield to the event - loop to avoid blocking the program in a busy loop when a large amount of - incoming data is available without waiting. - """ + def read_nowait(self, n: int = -1) -> bytes: # default was changed to be consistent with .read(-1) # # I believe the most users don't know about the method and @@ -452,7 +446,7 @@ async def read_nowait(self, n: int = -1) -> bytes: "Called while some coroutine is waiting for incoming data." ) - return await self._read_nowait(n) + return self._read_nowait(n) def _read_nowait_chunk(self, n: int) -> bytes: first_buffer = self._buffer[0] @@ -481,18 +475,13 @@ def _read_nowait_chunk(self, n: int) -> bytes: self._protocol.resume_reading() return data - async def _read_nowait(self, n: int) -> bytes: + def _read_nowait(self, n: int) -> bytes: + """Read not more than n bytes, or whole buffer if n == -1""" if self._timer: - with self._timer: - return await self._read_nowait_internal(n) - return await self._read_nowait_internal(n) + self._timer.check_timeout() - async def _read_nowait_internal(self, n: int) -> bytes: - """Read not more than n bytes, or whole buffer if n == -1""" chunks = [] - while self._buffer: - await asyncio.sleep(0) # Yield to event loop. chunk = self._read_nowait_chunk(n) chunks.append(chunk) if n != -1: From 58f7995ae7839d35951f6d21f31c4696612c2be8 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Sat, 13 May 2023 15:30:12 +0100 Subject: [PATCH 10/15] Update changelog --- CHANGES/5854.bugfix | 1 + CHANGES/5854.feature | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 CHANGES/5854.bugfix delete mode 100644 CHANGES/5854.feature diff --git a/CHANGES/5854.bugfix b/CHANGES/5854.bugfix new file mode 100644 index 00000000000..b7de2f4d232 --- /dev/null +++ b/CHANGES/5854.bugfix @@ -0,0 +1 @@ +Fixed client timeout not working when incoming data is always available without waiting -- by :user:`Dreamsorcerer`. diff --git a/CHANGES/5854.feature b/CHANGES/5854.feature deleted file mode 100644 index 9c86651126a..00000000000 --- a/CHANGES/5854.feature +++ /dev/null @@ -1 +0,0 @@ -Made ``StreamReader.read_nowait()`` async to avoid blocking the program in a busy loop (and ensure the client timeout works in this situation) -- by :user:`Dreamsorcerer`. From 2e713b5f63778e5240eaf50393f06bdc98ff14cb Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Sat, 13 May 2023 15:31:19 +0100 Subject: [PATCH 11/15] fix --- aiohttp/streams.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aiohttp/streams.py b/aiohttp/streams.py index 5d09a40f997..5e16230b31b 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -382,7 +382,7 @@ async def readany(self) -> bytes: while not self._buffer and not self._eof: await self._wait("readany") - return await self._read_nowait(-1) + return self._read_nowait(-1) async def readchunk(self) -> Tuple[bytes, bool]: """Returns a tuple of (data, end_of_http_chunk). From c1e5961014406c9847e590a9eddba757f99e13cc Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Sat, 13 May 2023 15:43:38 +0100 Subject: [PATCH 12/15] Update Noop timer --- aiohttp/helpers.py | 3 ++- aiohttp/streams.py | 12 ++++-------- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index b7cff138bd7..785a0da3beb 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -676,7 +676,8 @@ def __call__(self) -> None: class BaseTimerContext(ContextManager["BaseTimerContext"]): - pass + def check_timeout(self) -> None: + """Raise TimeoutError if timeout has been exceeded.""" class TimerNoop(BaseTimerContext): diff --git a/aiohttp/streams.py b/aiohttp/streams.py index 5e16230b31b..acc8591951b 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -6,7 +6,7 @@ from typing_extensions import Final from .base_protocol import BaseProtocol -from .helpers import BaseTimerContext, set_exception, set_result +from .helpers import BaseTimerContext, TimerNoop, set_exception, set_result from .log import internal_logger try: # pragma: no cover @@ -122,7 +122,7 @@ def __init__( self._waiter: Optional[asyncio.Future[None]] = None self._eof_waiter: Optional[asyncio.Future[None]] = None self._exception: Optional[BaseException] = None - self._timer = timer + self._timer = timer if timer else TimerNoop() self._eof_callbacks: List[Callable[[], None]] = [] def __repr__(self) -> str: @@ -297,10 +297,7 @@ async def _wait(self, func_name: str) -> None: waiter = self._waiter = self._loop.create_future() try: - if self._timer: - with self._timer: - await waiter - else: + with self._timer: await waiter finally: self._waiter = None @@ -477,8 +474,7 @@ def _read_nowait_chunk(self, n: int) -> bytes: def _read_nowait(self, n: int) -> bytes: """Read not more than n bytes, or whole buffer if n == -1""" - if self._timer: - self._timer.check_timeout() + self._timer.check_timeout() chunks = [] while self._buffer: From fb2cf85c8fe3093ec8af8629ceacc604e9f61976 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Sun, 14 May 2023 16:59:35 +0100 Subject: [PATCH 13/15] Update helpers.py --- aiohttp/helpers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aiohttp/helpers.py b/aiohttp/helpers.py index 785a0da3beb..660f7cf900c 100644 --- a/aiohttp/helpers.py +++ b/aiohttp/helpers.py @@ -676,7 +676,7 @@ def __call__(self) -> None: class BaseTimerContext(ContextManager["BaseTimerContext"]): - def check_timeout(self) -> None: + def assert_timeout(self) -> None: """Raise TimeoutError if timeout has been exceeded.""" @@ -701,7 +701,7 @@ def __init__(self, loop: asyncio.AbstractEventLoop) -> None: self._tasks: List[asyncio.Task[Any]] = [] self._cancelled = False - def check_timeout(self) -> None: + def assert_timeout(self) -> None: """Raise TimeoutError if timer has already been cancelled.""" if self._cancelled: raise asyncio.TimeoutError from None From 42babe1cd3cbd2a0329c98ac945ee612911c3d11 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Sun, 14 May 2023 17:00:50 +0100 Subject: [PATCH 14/15] Update streams.py --- aiohttp/streams.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aiohttp/streams.py b/aiohttp/streams.py index acc8591951b..1a2c5147fc3 100644 --- a/aiohttp/streams.py +++ b/aiohttp/streams.py @@ -122,7 +122,7 @@ def __init__( self._waiter: Optional[asyncio.Future[None]] = None self._eof_waiter: Optional[asyncio.Future[None]] = None self._exception: Optional[BaseException] = None - self._timer = timer if timer else TimerNoop() + self._timer = TimerNoop() if timer is None else timer self._eof_callbacks: List[Callable[[], None]] = [] def __repr__(self) -> str: @@ -474,7 +474,7 @@ def _read_nowait_chunk(self, n: int) -> bytes: def _read_nowait(self, n: int) -> bytes: """Read not more than n bytes, or whole buffer if n == -1""" - self._timer.check_timeout() + self._timer.assert_timeout() chunks = [] while self._buffer: From 6ad70c22d42761890542ef171b0cfc7fda365cc4 Mon Sep 17 00:00:00 2001 From: Sam Bull Date: Sun, 14 May 2023 17:03:23 +0100 Subject: [PATCH 15/15] Update test_client_functional.py --- tests/test_client_functional.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_client_functional.py b/tests/test_client_functional.py index a4116ba9f9b..4edc00483cb 100644 --- a/tests/test_client_functional.py +++ b/tests/test_client_functional.py @@ -3033,8 +3033,8 @@ async def handler(request): async def request(client): timeout = aiohttp.ClientTimeout(total=0.5) - with pytest.raises(asyncio.TimeoutError): - async with await client.get("/", timeout=timeout) as resp: + async with await client.get("/", timeout=timeout) as resp: + with pytest.raises(asyncio.TimeoutError): async for data in resp.content.iter_chunked(1): await asyncio.sleep(0.01) @@ -3042,7 +3042,7 @@ async def request(client): app.add_routes([web.get("/", handler)]) client = await aiohttp_client(app) - # wait_for() timeout should not be reached. + # wait_for() used just to ensure that a failing test doesn't hang. await asyncio.wait_for(request(client), 1)