From 278d4b63271e392c78cb3c120bb3a8c5085248fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Fri, 5 Apr 2024 17:48:42 +0300 Subject: [PATCH 01/22] WIP fix for #695 --- tests/test_taskgroups.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/tests/test_taskgroups.py b/tests/test_taskgroups.py index bfc79c16..34e45db7 100644 --- a/tests/test_taskgroups.py +++ b/tests/test_taskgroups.py @@ -9,6 +9,7 @@ import pytest from exceptiongroup import catch +from pytest_mock import MockerFixture import anyio from anyio import ( @@ -257,6 +258,27 @@ async def taskfunc() -> None: await task +@pytest.mark.parametrize("anyio_backend", ["asyncio"]) +async def test_cancel_with_nested_shielded_scope(mocker: MockerFixture) -> None: + """Regression test for #695.""" + + async def shield_task() -> None: + with CancelScope(shield=True): + await sleep(0.5) + + async def middle_task() -> None: + async with create_task_group() as tg: + tg.start_soon(shield_task, name="shield task") + + async with create_task_group() as tg: + spy = mocker.spy(tg.cancel_scope, "_deliver_cancellation") + tg.start_soon(middle_task, name="middle task") + await wait_all_tasks_blocked() + tg.cancel_scope.cancel() + + assert len(spy.call_args_list) < 3 + + async def test_start_exception_delivery(anyio_backend_name: str) -> None: def task_fn(*, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None: task_status.started("hello") From 437c5075896213c9fdf2d8dbfd3970855ea36d0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Thu, 9 May 2024 14:35:29 +0300 Subject: [PATCH 02/22] WIP: debugging issue #695 --- src/anyio/_backends/_asyncio.py | 47 ++++++++++++++++++++++++--------- tests/test_taskgroups.py | 7 +++-- 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index 43b7cb0e..0013d0dc 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -393,6 +393,7 @@ def __enter__(self) -> CancelScope: if self._cancel_called: self._deliver_cancellation(self) + print(f"entered cancel scope {id(self):x}") return self def __exit__( @@ -401,6 +402,7 @@ def __exit__( exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> bool | None: + print(f"exiting cancel scope {id(self):x} with {exc_type}") if not self._active: raise RuntimeError("This cancel scope is not active") if current_task() is not self._host_task: @@ -467,7 +469,7 @@ def _timeout(self) -> None: else: self._timeout_handle = loop.call_at(self._deadline, self._timeout) - def _deliver_cancellation(self, origin: CancelScope) -> bool: + def _deliver_cancellation(self, origin: CancelScope, indent: str = "") -> bool: """ Deliver cancellation to directly contained tasks and nested cancel scopes. @@ -478,10 +480,14 @@ def _deliver_cancellation(self, origin: CancelScope) -> bool: :return: ``True`` if the delivery needs to be retried on the next cycle """ + print(f"{indent}scope {id(self):x}:") + indent += " " should_retry = False current = current_task() for task in self._tasks: + print(f"{indent}task {task.get_name()}: ", end="") if task._must_cancel: # type: ignore[attr-defined] + print("must_cancel flag already set") continue # The task is eligible for cancellation if it has started @@ -490,24 +496,37 @@ def _deliver_cancellation(self, origin: CancelScope) -> bool: waiter = task._fut_waiter # type: ignore[attr-defined] if not isinstance(waiter, asyncio.Future) or not waiter.done(): origin._cancel_calls += 1 + print("cancelling") if sys.version_info >= (3, 9): task.cancel(f"Cancelled by cancel scope {id(origin):x}") else: task.cancel() + else: + print("waiter is not a future or waiter is done") + else: + if task is current: + print("is the current task") + else: + assert task is not self._host_task and not _task_started(task) + print("is not the host task, and has not started") # Deliver cancellation to child scopes that aren't shielded or running their own # cancellation callbacks for scope in self._child_scopes: if not scope._shield and not scope.cancel_called: - should_retry = scope._deliver_cancellation(origin) or should_retry + should_retry = ( + scope._deliver_cancellation(origin, indent) or should_retry + ) # Schedule another callback if there are still tasks left if origin is self: if should_retry: + print("scheduling a retry") self._cancel_handle = get_running_loop().call_soon( self._deliver_cancellation, origin ) else: + print("stopping cancellation") self._cancel_handle = None return should_retry @@ -658,24 +677,28 @@ async def __aexit__( exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> bool | None: - ignore_exception = self.cancel_scope.__exit__(exc_type, exc_val, exc_tb) if exc_val is not None: self.cancel_scope.cancel() if not isinstance(exc_val, CancelledError): self._exceptions.append(exc_val) cancelled_exc_while_waiting_tasks: CancelledError | None = None - while self._tasks: - try: - await asyncio.wait(self._tasks) - except CancelledError as exc: - # This task was cancelled natively; reraise the CancelledError later - # unless this task was already interrupted by another exception - self.cancel_scope.cancel() - if cancelled_exc_while_waiting_tasks is None: - cancelled_exc_while_waiting_tasks = exc + with CancelScope(shield=True) as scope: + print( + f"task group {id(self):x} waiting for tasks to finish in cancel scope {id(scope):x}" + ) + while self._tasks: + try: + await asyncio.wait(self._tasks) + except CancelledError as exc: + # This task was cancelled natively; reraise the CancelledError later + # unless this task was already interrupted by another exception + self.cancel_scope.cancel() + if cancelled_exc_while_waiting_tasks is None: + cancelled_exc_while_waiting_tasks = exc self._active = False + ignore_exception = self.cancel_scope.__exit__(exc_type, exc_val, exc_tb) if self._exceptions: raise BaseExceptionGroup( "unhandled errors in a TaskGroup", self._exceptions diff --git a/tests/test_taskgroups.py b/tests/test_taskgroups.py index 34e45db7..0973de81 100644 --- a/tests/test_taskgroups.py +++ b/tests/test_taskgroups.py @@ -797,12 +797,15 @@ async def child(fail: bool) -> None: async def test_cancel_cascade() -> None: async def do_something() -> NoReturn: async with create_task_group() as tg2: - tg2.start_soon(sleep, 1) + print(f"tg2 ({id(tg2):x}) cancel scope: {id(tg2.cancel_scope):x}\n") + tg2.start_soon(sleep, 1, name="sleep") + print("exited task group tg2") raise Exception("foo") async with create_task_group() as tg: - tg.start_soon(do_something) + print(f"tg ({id(tg):x}) cancel scope: {id(tg.cancel_scope):x}") + tg.start_soon(do_something, name="do_something") await wait_all_tasks_blocked() tg.cancel_scope.cancel() From d5cc818d3e08a56ef28933fd77add4dc22a507a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sat, 24 Aug 2024 16:52:28 +0300 Subject: [PATCH 03/22] WIP --- src/anyio/_backends/_asyncio.py | 26 ++++++++++++------- tests/test_taskgroups.py | 46 +++++++++++++++++++++++++++++---- 2 files changed, 58 insertions(+), 14 deletions(-) diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index 0013d0dc..b56ac512 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -393,7 +393,7 @@ def __enter__(self) -> CancelScope: if self._cancel_called: self._deliver_cancellation(self) - print(f"entered cancel scope {id(self):x}") + # print(f"entered cancel scope {id(self):x}") return self def __exit__( @@ -469,7 +469,9 @@ def _timeout(self) -> None: else: self._timeout_handle = loop.call_at(self._deadline, self._timeout) - def _deliver_cancellation(self, origin: CancelScope, indent: str = "") -> bool: + def _deliver_cancellation( + self, origin: CancelScope, indent: str = "", attempt: int = 1 + ) -> bool: """ Deliver cancellation to directly contained tasks and nested cancel scopes. @@ -480,18 +482,20 @@ def _deliver_cancellation(self, origin: CancelScope, indent: str = "") -> bool: :return: ``True`` if the delivery needs to be retried on the next cycle """ - print(f"{indent}scope {id(self):x}:") + print( + f"{indent}delivering cancellation to scope {id(self):x} (attempt {attempt}):" + ) indent += " " should_retry = False current = current_task() for task in self._tasks: + should_retry = True print(f"{indent}task {task.get_name()}: ", end="") if task._must_cancel: # type: ignore[attr-defined] - print("must_cancel flag already set") + print(f"{indent}must_cancel flag already set") continue # The task is eligible for cancellation if it has started - should_retry = True if task is not current and (task is self._host_task or _task_started(task)): waiter = task._fut_waiter # type: ignore[attr-defined] if not isinstance(waiter, asyncio.Future) or not waiter.done(): @@ -515,20 +519,21 @@ def _deliver_cancellation(self, origin: CancelScope, indent: str = "") -> bool: for scope in self._child_scopes: if not scope._shield and not scope.cancel_called: should_retry = ( - scope._deliver_cancellation(origin, indent) or should_retry + scope._deliver_cancellation(origin, indent, attempt) or should_retry ) # Schedule another callback if there are still tasks left if origin is self: if should_retry: - print("scheduling a retry") + print(f"{indent}scheduling a retry") self._cancel_handle = get_running_loop().call_soon( - self._deliver_cancellation, origin + self._deliver_cancellation, origin, indent, attempt + 1 ) else: - print("stopping cancellation") + print(f"{indent}stopping cancellation") self._cancel_handle = None + print(f"{indent}{should_retry=}") return should_retry def _restart_cancellation_in_parent(self) -> None: @@ -540,6 +545,7 @@ def _restart_cancellation_in_parent(self) -> None: while scope is not None: if scope._cancel_called: if scope._cancel_handle is None: + print(f"restarting cancellation in parent scope ({id(scope):x})") scope._deliver_cancellation(scope) break @@ -693,6 +699,7 @@ async def __aexit__( except CancelledError as exc: # This task was cancelled natively; reraise the CancelledError later # unless this task was already interrupted by another exception + print("received native cancel") self.cancel_scope.cancel() if cancelled_exc_while_waiting_tasks is None: cancelled_exc_while_waiting_tasks = exc @@ -708,6 +715,7 @@ async def __aexit__( # unless the context manager itself was previously exited with another # exception, or if any of the child tasks raised an exception other than # CancelledError + print("cancelled_exc_while_waiting_tasks =", cancelled_exc_while_waiting_tasks) if cancelled_exc_while_waiting_tasks: if exc_val is None or ignore_exception: raise cancelled_exc_while_waiting_tasks diff --git a/tests/test_taskgroups.py b/tests/test_taskgroups.py index 0973de81..2b82a404 100644 --- a/tests/test_taskgroups.py +++ b/tests/test_taskgroups.py @@ -263,20 +263,56 @@ async def test_cancel_with_nested_shielded_scope(mocker: MockerFixture) -> None: """Regression test for #695.""" async def shield_task() -> None: - with CancelScope(shield=True): + with CancelScope(shield=True) as scope: + shielded_cancel_spy = mocker.spy(scope, "_deliver_cancellation") + print(f"innermost cancel scope: {id(scope):x}") await sleep(0.5) + # At this point, the outermost cancel scope was delivered cancellation once + # (when tg.cancel_scope.cancel() was called), and the shielded scope wasn't + # since it's shielded + assert len(outer_cancel_spy.call_args_list) == 2 + shielded_cancel_spy.assert_not_called() + async def middle_task() -> None: - async with create_task_group() as tg: - tg.start_soon(shield_task, name="shield task") + try: + async with create_task_group() as tg: + middle_cancel_spy = mocker.spy(tg.cancel_scope, "_deliver_cancellation") + print( + f"middle cancel scope: {id(tg.cancel_scope):x} task group: {id(tg):x}" + ) + tg.start_soon(shield_task, name="shield task") + finally: + # Cancellation is delivered to the the middle task groups's cancel scope: + # - When the outermost task group's.cancel_scope is cancelled + # - When the shielded innermost cancel scope is exited + # - When the middle task's task group's temporary shielded cancel scope is + # exited + assert len(middle_cancel_spy.call_args_list) == 6 + assert len(outer_cancel_spy.call_args_list) == 6 + print( + "exited middle task group, outer cancel scope now cancelled", + len(outer_cancel_spy.call_args_list), + "times", + ) async with create_task_group() as tg: - spy = mocker.spy(tg.cancel_scope, "_deliver_cancellation") + print( + f"outermost cancel scope: {id(tg.cancel_scope):x} task group: {id(tg):x}" + ) + outer_cancel_spy = mocker.spy(tg.cancel_scope, "_deliver_cancellation") tg.start_soon(middle_task, name="middle task") await wait_all_tasks_blocked() + print("cancellation should start now") tg.cancel_scope.cancel() - assert len(spy.call_args_list) < 3 + # Cancellation is delivered to the outermost cancel scope: + # - When tg.cancel_scope.cancel() is called + # - When the shielded innermost cancel scope is exited + # - When the middle task's task group's temporary shielded cancel scope is exited + # - + # - + assert len(outer_cancel_spy.call_args_list) == 9 async def test_start_exception_delivery(anyio_backend_name: str) -> None: From d80af79d81a713140397e3a2dbdcfd28b4c9521b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Mon, 26 Aug 2024 21:54:31 +0300 Subject: [PATCH 04/22] Tests pass on 3.9 --- src/anyio/_backends/_asyncio.py | 156 ++++++++++++++++++++------------ tests/test_taskgroups.py | 14 +-- 2 files changed, 107 insertions(+), 63 deletions(-) diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index b56ac512..c7c9d199 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -402,7 +402,7 @@ def __exit__( exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> bool | None: - print(f"exiting cancel scope {id(self):x} with {exc_type}") + print(f"exiting cancel scope {id(self):x} with {exc_val!r}") if not self._active: raise RuntimeError("This cancel scope is not active") if current_task() is not self._host_task: @@ -431,20 +431,29 @@ def __exit__( host_task_state.cancel_scope = self._parent_scope - # Restart the cancellation effort in the closest directly cancelled parent - # scope if this one was shielded - self._restart_cancellation_in_parent() - - if self._cancel_called and exc_val is not None: + # We allow immediately raising CancelledError in the following cases: + # * exc_val is None (no exceptions were raised within this scope) + # * we caught and swallowed a CancelledError meant for this scope, and there are + # no other exceptions being raised + not_swallowed_exceptions = 0 + if exc_val is not None: for exc in iterate_exceptions(exc_val): - if isinstance(exc, CancelledError): + if self._cancel_called and isinstance(exc, CancelledError): self._cancelled_caught = self._uncancel(exc) - if self._cancelled_caught: - break - - return self._cancelled_caught + if not self._cancelled_caught: + not_swallowed_exceptions += 1 + # print(f"cancelled_caught in {id(self):x} is {self._cancelled_caught}") + else: + not_swallowed_exceptions += 1 - return None + # Restart the cancellation effort in the closest directly cancelled parent + # scope if this one was shielded + # print(f"{not_swallowed_exceptions=}") + self._restart_cancellation_in_parent(immediate=not not_swallowed_exceptions) + print( + f"returning {self._cancelled_caught and not not_swallowed_exceptions} from CancelScope.__exit__; {exc_val=}" + ) + return self._cancelled_caught and not not_swallowed_exceptions def _uncancel(self, cancelled_exc: CancelledError) -> bool: if sys.version_info < (3, 9) or self._host_task is None: @@ -452,6 +461,7 @@ def _uncancel(self, cancelled_exc: CancelledError) -> bool: return True # Undo all cancellations done by this scope + print(f"uncancelling scope {id(self):x} with exception {cancelled_exc!r}") if self._cancelling is not None: while self._cancel_calls: self._cancel_calls -= 1 @@ -482,17 +492,17 @@ def _deliver_cancellation( :return: ``True`` if the delivery needs to be retried on the next cycle """ - print( - f"{indent}delivering cancellation to scope {id(self):x} (attempt {attempt}):" - ) + # print( + # f"{indent}delivering cancellation to scope {id(self):x} (attempt {attempt}):" + # ) indent += " " should_retry = False current = current_task() for task in self._tasks: should_retry = True - print(f"{indent}task {task.get_name()}: ", end="") + # print(f"{indent}task {task.get_name()}: ", end="") if task._must_cancel: # type: ignore[attr-defined] - print(f"{indent}must_cancel flag already set") + # print(f"{indent}must_cancel flag already set") continue # The task is eligible for cancellation if it has started @@ -500,19 +510,19 @@ def _deliver_cancellation( waiter = task._fut_waiter # type: ignore[attr-defined] if not isinstance(waiter, asyncio.Future) or not waiter.done(): origin._cancel_calls += 1 - print("cancelling") + # print("cancelling") if sys.version_info >= (3, 9): task.cancel(f"Cancelled by cancel scope {id(origin):x}") else: task.cancel() - else: - print("waiter is not a future or waiter is done") - else: - if task is current: - print("is the current task") - else: - assert task is not self._host_task and not _task_started(task) - print("is not the host task, and has not started") + # else: + # print("waiter is not a future or waiter is done") + # else: + # if task is current: + # print("is the current task") + # else: + # assert task is not self._host_task and not _task_started(task) + # print("is not the host task, and has not started") # Deliver cancellation to child scopes that aren't shielded or running their own # cancellation callbacks @@ -525,18 +535,18 @@ def _deliver_cancellation( # Schedule another callback if there are still tasks left if origin is self: if should_retry: - print(f"{indent}scheduling a retry") + # print(f"{indent}scheduling a retry") self._cancel_handle = get_running_loop().call_soon( self._deliver_cancellation, origin, indent, attempt + 1 ) else: - print(f"{indent}stopping cancellation") + # print(f"{indent}stopping cancellation") self._cancel_handle = None - print(f"{indent}{should_retry=}") + # print(f"{indent}{should_retry=}") return should_retry - def _restart_cancellation_in_parent(self) -> None: + def _restart_cancellation_in_parent(self, immediate: bool = False) -> None: """ Restart the cancellation effort in the closest directly cancelled parent scope. @@ -545,8 +555,15 @@ def _restart_cancellation_in_parent(self) -> None: while scope is not None: if scope._cancel_called: if scope._cancel_handle is None: - print(f"restarting cancellation in parent scope ({id(scope):x})") - scope._deliver_cancellation(scope) + # print(f"restarting cancellation in parent scope ({id(scope):x}, {immediate=})") + if scope._deliver_cancellation(scope) and immediate: + # print(" explicitly raising CancellationError") + if sys.version_info >= (3, 9): + raise CancelledError( + f"Cancelled by cancel scope {id(scope):x}" + ) + else: + raise CancelledError() break @@ -573,6 +590,7 @@ def cancel(self) -> None: self._timeout_handle.cancel() self._timeout_handle = None + print(f"cancel scope {id(self):x} was directly cancelled") self._cancel_called = True if self._host_task is not None: self._deliver_cancellation(self) @@ -683,44 +701,68 @@ async def __aexit__( exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> bool | None: + print(f"exiting task group {id(self):x} with {exc_val!r}") if exc_val is not None: self.cancel_scope.cancel() if not isinstance(exc_val, CancelledError): self._exceptions.append(exc_val) - cancelled_exc_while_waiting_tasks: CancelledError | None = None - with CancelScope(shield=True) as scope: + try: + # Wait for child tasks to finish in a shielded scope, to prevent the + # enclosing cancel scope from hammering this task with cancellation attempts + # (#695) + with CancelScope(shield=True) as scope: + print( + f"task group {id(self):x} waiting for tasks to finish in cancel scope {id(scope):x}" + ) + while self._tasks: + try: + await asyncio.wait(self._tasks) + except CancelledError as exc: + # This task was cancelled natively; reraise the CancelledError later + # unless this task was already interrupted by another exception + print("received native cancel") + self.cancel_scope.cancel() + if exc_val is None: + exc_val = exc + + self._active = False + if self._exceptions: + print(f"raising exception group ({self._exceptions=})") + raise BaseExceptionGroup( + "unhandled errors in a TaskGroup", self._exceptions + ) + elif exc_val: + print("no exceptions, raising native CancelledError") + raise exc_val + except BaseException as exc: print( - f"task group {id(self):x} waiting for tasks to finish in cancel scope {id(scope):x}" + f"task group {id(self):x}: got exception while waiting for tasks to finish; {exc=}, {self._exceptions=}" ) - while self._tasks: - try: - await asyncio.wait(self._tasks) - except CancelledError as exc: - # This task was cancelled natively; reraise the CancelledError later - # unless this task was already interrupted by another exception - print("received native cancel") - self.cancel_scope.cancel() - if cancelled_exc_while_waiting_tasks is None: - cancelled_exc_while_waiting_tasks = exc + if self.cancel_scope.__exit__(type(exc), exc, exc.__traceback__): + print( + f" cancel scope {id(self.cancel_scope):x} swallowed the exception" + ) + return True - self._active = False - ignore_exception = self.cancel_scope.__exit__(exc_type, exc_val, exc_tb) - if self._exceptions: - raise BaseExceptionGroup( - "unhandled errors in a TaskGroup", self._exceptions - ) + print(" re-raising exception") + raise + else: + print(f"passing {exc_type} to cancel_scope.__exit__") + return self.cancel_scope.__exit__(exc_type, exc_val, exc_tb) # Raise the CancelledError received while waiting for child tasks to exit, # unless the context manager itself was previously exited with another # exception, or if any of the child tasks raised an exception other than # CancelledError - print("cancelled_exc_while_waiting_tasks =", cancelled_exc_while_waiting_tasks) - if cancelled_exc_while_waiting_tasks: - if exc_val is None or ignore_exception: - raise cancelled_exc_while_waiting_tasks - - return ignore_exception + # print(f"exiting {id(self):x}") + # print(f"{cancelled_exc_while_waiting_tasks=}") + # print(f"{ignore_exception=}") + # if cancelled_exc_while_waiting_tasks: + # if exc_val is None or ignore_exception: + # raise cancelled_exc_while_waiting_tasks + # + # return ignore_exception def _spawn( self, diff --git a/tests/test_taskgroups.py b/tests/test_taskgroups.py index 2b82a404..62a6393c 100644 --- a/tests/test_taskgroups.py +++ b/tests/test_taskgroups.py @@ -452,7 +452,7 @@ async def g() -> NoReturn: async with create_task_group(): await sleep(1) - assert False + pytest.fail("Execution should not reach this point") async with create_task_group() as tg: tg.start_soon(g) @@ -720,10 +720,8 @@ async def test_cancel_host_asyncgen() -> None: async def host_task() -> None: nonlocal done - async with create_task_group() as tg: - with CancelScope(shield=True) as inner_scope: - assert inner_scope.shield - tg.cancel_scope.cancel() + with CancelScope() as inner_scope: + inner_scope.cancel() with pytest.raises(get_cancelled_exc_class()): await sleep(0) @@ -1020,14 +1018,17 @@ async def fn() -> None: async def test_cancel_propagation_with_inner_spawn() -> None: async def g() -> NoReturn: async with anyio.create_task_group() as tg2: + print(f"tg2 = {id(tg2):x} cancel scope = {id(tg2.cancel_scope):x}") tg2.start_soon(anyio.sleep, 10) await anyio.sleep(1) - assert False + pytest.fail("Execution should not have reached this line") async with anyio.create_task_group() as tg: + print(f"tg = {id(tg):x}n cancel scope = {id(tg.cancel_scope):x}") tg.start_soon(g) await wait_all_tasks_blocked() + print("cancelling scope", hex(id(tg.cancel_scope))) tg.cancel_scope.cancel() @@ -1247,6 +1248,7 @@ async def nested() -> None: async def main() -> NoReturn: async with anyio.create_task_group() as tg: + print(f"tg cancel scope = {id(tg.cancel_scope):x}") tg.start_soon(nested) await wait_all_tasks_blocked() asyncio.get_running_loop().call_soon(crash) From 8e3eeb18eb9c83b7a213835cd344796356489213 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Wed, 28 Aug 2024 00:11:57 +0300 Subject: [PATCH 05/22] Dropped Python 3.8 support --- .github/workflows/test.yml | 6 +- docs/versionhistory.rst | 1 + pyproject.toml | 5 +- src/anyio/_backends/_asyncio.py | 99 ++++++++------------------------- 4 files changed, 30 insertions(+), 81 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d2b3694b..9c54c39f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -61,14 +61,14 @@ jobs: fail-fast: false matrix: os: [ubuntu-latest] - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12", pypy-3.10] + python-version: ["3.9", "3.10", "3.11", "3.12", "3.13", pypy-3.10] include: - os: macos-latest - python-version: "3.8" + python-version: "3.9" - os: macos-latest python-version: "3.12" - os: windows-latest - python-version: "3.8" + python-version: "3.9" - os: windows-latest python-version: "3.12" runs-on: ${{ matrix.os }} diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 5a42d0b0..b4971451 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -5,6 +5,7 @@ This library adheres to `Semantic Versioning 2.0 `_. **UNRELEASED** +- Dropped support for Python 3.8 - Fixed ``__repr__()`` of ``MemoryObjectItemReceiver``, when ``item`` is not defined (`#767 `_; PR by @Danipulok) - Added support for the ``from_uri()``, ``full_match()``, ``parser`` methods/properties diff --git a/pyproject.toml b/pyproject.toml index c555cc9b..e0f3c3b3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,14 +19,13 @@ classifiers = [ "Typing :: Typed", "Programming Language :: Python", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.13", ] -requires-python = ">= 3.8" +requires-python = ">= 3.9" dependencies = [ "exceptiongroup >= 1.0.2; python_version < '3.11'", "idna >= 2.8", @@ -128,7 +127,7 @@ show_missing = true [tool.tox] legacy_tox_ini = """ [tox] -envlist = pre-commit, py38, py39, py310, py311, py312, pypy3 +envlist = pre-commit, py39, py310, py311, py312, py313, pypy3 skip_missing_interpreters = true minversion = 4.0.0 diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index 005285ec..ad120d4d 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -19,9 +19,20 @@ ) from asyncio.base_events import _run_until_complete_cb # type: ignore[attr-defined] from collections import OrderedDict, deque -from collections.abc import AsyncIterator, Generator, Iterable +from collections.abc import ( + AsyncGenerator, + AsyncIterator, + Awaitable, + Callable, + Collection, + Coroutine, + Generator, + Iterable, + Mapping, + Sequence, +) from concurrent.futures import Future -from contextlib import suppress +from contextlib import AbstractContextManager, suppress from contextvars import Context, copy_context from dataclasses import dataclass from functools import partial, wraps @@ -41,16 +52,7 @@ from typing import ( IO, Any, - AsyncGenerator, - Awaitable, - Callable, - Collection, - ContextManager, - Coroutine, - Mapping, Optional, - Sequence, - Tuple, TypeVar, cast, ) @@ -402,7 +404,6 @@ def __exit__( exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> bool | None: - print(f"exiting cancel scope {id(self):x} with {exc_val!r}") if not self._active: raise RuntimeError("This cancel scope is not active") if current_task() is not self._host_task: @@ -450,18 +451,14 @@ def __exit__( # scope if this one was shielded # print(f"{not_swallowed_exceptions=}") self._restart_cancellation_in_parent(immediate=not not_swallowed_exceptions) - print( - f"returning {self._cancelled_caught and not not_swallowed_exceptions} from CancelScope.__exit__; {exc_val=}" - ) return self._cancelled_caught and not not_swallowed_exceptions def _uncancel(self, cancelled_exc: CancelledError) -> bool: - if sys.version_info < (3, 9) or self._host_task is None: + if self._host_task is None: self._cancel_calls = 0 return True # Undo all cancellations done by this scope - print(f"uncancelling scope {id(self):x} with exception {cancelled_exc!r}") if self._cancelling is not None: while self._cancel_calls: self._cancel_calls -= 1 @@ -479,9 +476,7 @@ def _timeout(self) -> None: else: self._timeout_handle = loop.call_at(self._deadline, self._timeout) - def _deliver_cancellation( - self, origin: CancelScope, indent: str = "", attempt: int = 1 - ) -> bool: + def _deliver_cancellation(self, origin: CancelScope) -> bool: """ Deliver cancellation to directly contained tasks and nested cancel scopes. @@ -492,17 +487,11 @@ def _deliver_cancellation( :return: ``True`` if the delivery needs to be retried on the next cycle """ - # print( - # f"{indent}delivering cancellation to scope {id(self):x} (attempt {attempt}):" - # ) - indent += " " should_retry = False current = current_task() for task in self._tasks: should_retry = True - # print(f"{indent}task {task.get_name()}: ", end="") if task._must_cancel: # type: ignore[attr-defined] - # print(f"{indent}must_cancel flag already set") continue # The task is eligible for cancellation if it has started @@ -510,40 +499,23 @@ def _deliver_cancellation( waiter = task._fut_waiter # type: ignore[attr-defined] if not isinstance(waiter, asyncio.Future) or not waiter.done(): origin._cancel_calls += 1 - # print("cancelling") - if sys.version_info >= (3, 9): - task.cancel(f"Cancelled by cancel scope {id(origin):x}") - else: - task.cancel() - # else: - # print("waiter is not a future or waiter is done") - # else: - # if task is current: - # print("is the current task") - # else: - # assert task is not self._host_task and not _task_started(task) - # print("is not the host task, and has not started") + task.cancel(f"Cancelled by cancel scope {id(origin):x}") # Deliver cancellation to child scopes that aren't shielded or running their own # cancellation callbacks for scope in self._child_scopes: if not scope._shield and not scope.cancel_called: - should_retry = ( - scope._deliver_cancellation(origin, indent, attempt) or should_retry - ) + should_retry = scope._deliver_cancellation(origin) or should_retry # Schedule another callback if there are still tasks left if origin is self: if should_retry: - # print(f"{indent}scheduling a retry") self._cancel_handle = get_running_loop().call_soon( - self._deliver_cancellation, origin, indent, attempt + 1 + self._deliver_cancellation, origin ) else: - # print(f"{indent}stopping cancellation") self._cancel_handle = None - # print(f"{indent}{should_retry=}") return should_retry def _restart_cancellation_in_parent(self, immediate: bool = False) -> None: @@ -555,15 +527,8 @@ def _restart_cancellation_in_parent(self, immediate: bool = False) -> None: while scope is not None: if scope._cancel_called: if scope._cancel_handle is None: - # print(f"restarting cancellation in parent scope ({id(scope):x}, {immediate=})") if scope._deliver_cancellation(scope) and immediate: - # print(" explicitly raising CancellationError") - if sys.version_info >= (3, 9): - raise CancelledError( - f"Cancelled by cancel scope {id(scope):x}" - ) - else: - raise CancelledError() + raise CancelledError(f"Cancelled by cancel scope {id(scope):x}") break @@ -590,7 +555,6 @@ def cancel(self) -> None: self._timeout_handle.cancel() self._timeout_handle = None - print(f"cancel scope {id(self):x} was directly cancelled") self._cancel_called = True if self._host_task is not None: self._deliver_cancellation(self) @@ -701,7 +665,6 @@ async def __aexit__( exc_val: BaseException | None, exc_tb: TracebackType | None, ) -> bool | None: - print(f"exiting task group {id(self):x} with {exc_val!r}") if exc_val is not None: self.cancel_scope.cancel() if not isinstance(exc_val, CancelledError): @@ -711,44 +674,30 @@ async def __aexit__( # Wait for child tasks to finish in a shielded scope, to prevent the # enclosing cancel scope from hammering this task with cancellation attempts # (#695) - with CancelScope(shield=True) as scope: - print( - f"task group {id(self):x} waiting for tasks to finish in cancel scope {id(scope):x}" - ) + with CancelScope(shield=True): while self._tasks: try: await asyncio.wait(self._tasks) except CancelledError as exc: # This task was cancelled natively; reraise the CancelledError later # unless this task was already interrupted by another exception - print("received native cancel") self.cancel_scope.cancel() if exc_val is None: exc_val = exc self._active = False if self._exceptions: - print(f"raising exception group ({self._exceptions=})") raise BaseExceptionGroup( "unhandled errors in a TaskGroup", self._exceptions ) elif exc_val: - print("no exceptions, raising native CancelledError") raise exc_val except BaseException as exc: - print( - f"task group {id(self):x}: got exception while waiting for tasks to finish; {exc=}, {self._exceptions=}" - ) if self.cancel_scope.__exit__(type(exc), exc, exc.__traceback__): - print( - f" cancel scope {id(self.cancel_scope):x} swallowed the exception" - ) return True - print(" re-raising exception") raise else: - print(f"passing {exc_type} to cancel_scope.__exit__") return self.cancel_scope.__exit__(exc_type, exc_val, exc_tb) # Raise the CancelledError received while waiting for child tasks to exit, @@ -874,7 +823,7 @@ async def start( # Threads # -_Retval_Queue_Type = Tuple[Optional[T_Retval], Optional[BaseException]] +_Retval_Queue_Type = tuple[Optional[T_Retval], Optional[BaseException]] class WorkerThread(Thread): @@ -2018,7 +1967,7 @@ async def _call_in_runner_task( ) -> T_Retval: if not self._runner_task: self._send_stream, receive_stream = create_memory_object_stream[ - Tuple[Awaitable[Any], asyncio.Future] + tuple[Awaitable[Any], asyncio.Future] ](1) self._runner_task = self.get_loop().create_task( self._run_tests_and_fixtures(receive_stream) @@ -2370,7 +2319,7 @@ async def connect_tcp( cls, host: str, port: int, local_address: IPSockAddrType | None = None ) -> abc.SocketStream: transport, protocol = cast( - Tuple[asyncio.Transport, StreamProtocol], + tuple[asyncio.Transport, StreamProtocol], await get_running_loop().create_connection( StreamProtocol, host, port, local_addr=local_address ), @@ -2549,7 +2498,7 @@ def current_default_thread_limiter(cls) -> CapacityLimiter: @classmethod def open_signal_receiver( cls, *signals: Signals - ) -> ContextManager[AsyncIterator[Signals]]: + ) -> AbstractContextManager[AsyncIterator[Signals]]: return _SignalReceiver(signals) @classmethod From 903bc71fc7dad40c37ed2454655d6f6be46f4795 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Wed, 28 Aug 2024 00:16:32 +0300 Subject: [PATCH 06/22] Dropped Python 3.8 support --- src/anyio/_backends/_trio.py | 23 +++++++++++++---------- src/anyio/_core/_fileio.py | 3 +-- src/anyio/_core/_signals.py | 6 ++++-- src/anyio/_core/_streams.py | 4 ++-- src/anyio/abc/_eventloop.py | 8 +++----- src/anyio/abc/_sockets.py | 8 ++++---- src/anyio/from_thread.py | 19 +++++++++++-------- src/anyio/pytest_plugin.py | 4 ++-- src/anyio/streams/tls.py | 6 +++--- tests/streams/test_stapled.py | 3 ++- tests/streams/test_tls.py | 6 +++--- tests/test_from_thread.py | 4 ++-- tests/test_signals.py | 2 +- tests/test_sockets.py | 10 ++++++---- tests/test_subprocesses.py | 8 +++++--- tests/test_taskgroups.py | 20 -------------------- tests/test_typedattr.py | 3 ++- 17 files changed, 64 insertions(+), 73 deletions(-) diff --git a/src/anyio/_backends/_trio.py b/src/anyio/_backends/_trio.py index cf6f3db7..5402574a 100644 --- a/src/anyio/_backends/_trio.py +++ b/src/anyio/_backends/_trio.py @@ -6,8 +6,19 @@ import sys import types import weakref -from collections.abc import AsyncIterator, Iterable +from collections.abc import ( + AsyncGenerator, + AsyncIterator, + Awaitable, + Callable, + Collection, + Coroutine, + Iterable, + Mapping, + Sequence, +) from concurrent.futures import Future +from contextlib import AbstractContextManager from dataclasses import dataclass from functools import partial from io import IOBase @@ -18,16 +29,8 @@ from typing import ( IO, Any, - AsyncGenerator, - Awaitable, - Callable, - Collection, - ContextManager, - Coroutine, Generic, - Mapping, NoReturn, - Sequence, TypeVar, cast, overload, @@ -1138,7 +1141,7 @@ def current_default_thread_limiter(cls) -> CapacityLimiter: @classmethod def open_signal_receiver( cls, *signals: Signals - ) -> ContextManager[AsyncIterator[Signals]]: + ) -> AbstractContextManager[AsyncIterator[Signals]]: return _SignalReceiver(signals) @classmethod diff --git a/src/anyio/_core/_fileio.py b/src/anyio/_core/_fileio.py index 9503d944..23ccb0d6 100644 --- a/src/anyio/_core/_fileio.py +++ b/src/anyio/_core/_fileio.py @@ -3,7 +3,7 @@ import os import pathlib import sys -from collections.abc import Callable, Iterable, Iterator, Sequence +from collections.abc import AsyncIterator, Callable, Iterable, Iterator, Sequence from dataclasses import dataclass from functools import partial from os import PathLike @@ -12,7 +12,6 @@ TYPE_CHECKING, Any, AnyStr, - AsyncIterator, Final, Generic, overload, diff --git a/src/anyio/_core/_signals.py b/src/anyio/_core/_signals.py index 115c749b..f3451d30 100644 --- a/src/anyio/_core/_signals.py +++ b/src/anyio/_core/_signals.py @@ -1,13 +1,15 @@ from __future__ import annotations from collections.abc import AsyncIterator +from contextlib import AbstractContextManager from signal import Signals -from typing import ContextManager from ._eventloop import get_async_backend -def open_signal_receiver(*signals: Signals) -> ContextManager[AsyncIterator[Signals]]: +def open_signal_receiver( + *signals: Signals, +) -> AbstractContextManager[AsyncIterator[Signals]]: """ Start receiving operating system signals. diff --git a/src/anyio/_core/_streams.py b/src/anyio/_core/_streams.py index aa6b0c22..6a9814e5 100644 --- a/src/anyio/_core/_streams.py +++ b/src/anyio/_core/_streams.py @@ -1,7 +1,7 @@ from __future__ import annotations import math -from typing import Tuple, TypeVar +from typing import TypeVar from warnings import warn from ..streams.memory import ( @@ -14,7 +14,7 @@ class create_memory_object_stream( - Tuple[MemoryObjectSendStream[T_Item], MemoryObjectReceiveStream[T_Item]], + tuple[MemoryObjectSendStream[T_Item], MemoryObjectReceiveStream[T_Item]], ): """ Create a memory object stream. diff --git a/src/anyio/abc/_eventloop.py b/src/anyio/abc/_eventloop.py index a50afefa..3d01d3ab 100644 --- a/src/anyio/abc/_eventloop.py +++ b/src/anyio/abc/_eventloop.py @@ -3,7 +3,8 @@ import math import sys from abc import ABCMeta, abstractmethod -from collections.abc import AsyncIterator, Awaitable, Mapping +from collections.abc import AsyncIterator, Awaitable, Callable, Mapping, Sequence +from contextlib import AbstractContextManager from os import PathLike from signal import Signals from socket import AddressFamily, SocketKind, socket @@ -11,9 +12,6 @@ IO, TYPE_CHECKING, Any, - Callable, - ContextManager, - Sequence, TypeVar, overload, ) @@ -366,7 +364,7 @@ def current_default_thread_limiter(cls) -> CapacityLimiter: @abstractmethod def open_signal_receiver( cls, *signals: Signals - ) -> ContextManager[AsyncIterator[Signals]]: + ) -> AbstractContextManager[AsyncIterator[Signals]]: pass @classmethod diff --git a/src/anyio/abc/_sockets.py b/src/anyio/abc/_sockets.py index b321225a..1c6a450c 100644 --- a/src/anyio/abc/_sockets.py +++ b/src/anyio/abc/_sockets.py @@ -8,7 +8,7 @@ from ipaddress import IPv4Address, IPv6Address from socket import AddressFamily from types import TracebackType -from typing import Any, Tuple, TypeVar, Union +from typing import Any, TypeVar, Union from .._core._typedattr import ( TypedAttributeProvider, @@ -19,10 +19,10 @@ from ._tasks import TaskGroup IPAddressType = Union[str, IPv4Address, IPv6Address] -IPSockAddrType = Tuple[str, int] +IPSockAddrType = tuple[str, int] SockAddrType = Union[IPSockAddrType, str] -UDPPacketType = Tuple[bytes, IPSockAddrType] -UNIXDatagramPacketType = Tuple[bytes, str] +UDPPacketType = tuple[bytes, IPSockAddrType] +UNIXDatagramPacketType = tuple[bytes, str] T_Retval = TypeVar("T_Retval") diff --git a/src/anyio/from_thread.py b/src/anyio/from_thread.py index 88a854bb..5f3dcfd0 100644 --- a/src/anyio/from_thread.py +++ b/src/anyio/from_thread.py @@ -2,18 +2,19 @@ import sys import threading -from collections.abc import Awaitable, Callable, Generator +from collections.abc import Awaitable, Callable, Generator, Iterable from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait -from contextlib import AbstractContextManager, contextmanager +from contextlib import ( + AbstractAsyncContextManager, + AbstractContextManager, + contextmanager, +) from dataclasses import dataclass, field from inspect import isawaitable from types import TracebackType from typing import ( Any, - AsyncContextManager, - ContextManager, Generic, - Iterable, TypeVar, cast, overload, @@ -88,7 +89,9 @@ class _BlockingAsyncContextManager(Generic[T_co], AbstractContextManager): type[BaseException] | None, BaseException | None, TracebackType | None ] = (None, None, None) - def __init__(self, async_cm: AsyncContextManager[T_co], portal: BlockingPortal): + def __init__( + self, async_cm: AbstractAsyncContextManager[T_co], portal: BlockingPortal + ): self._async_cm = async_cm self._portal = portal @@ -375,8 +378,8 @@ def task_done(future: Future[T_Retval]) -> None: return f, task_status_future.result() def wrap_async_context_manager( - self, cm: AsyncContextManager[T_co] - ) -> ContextManager[T_co]: + self, cm: AbstractAsyncContextManager[T_co] + ) -> AbstractContextManager[T_co]: """ Wrap an async context manager as a synchronous context manager via this portal. diff --git a/src/anyio/pytest_plugin.py b/src/anyio/pytest_plugin.py index a8dd6f3e..5c61c4d3 100644 --- a/src/anyio/pytest_plugin.py +++ b/src/anyio/pytest_plugin.py @@ -3,7 +3,7 @@ from collections.abc import Iterator from contextlib import ExitStack, contextmanager from inspect import isasyncgenfunction, iscoroutinefunction -from typing import Any, Dict, Tuple, cast +from typing import Any, cast import pytest import sniffio @@ -21,7 +21,7 @@ def extract_backend_and_options(backend: object) -> tuple[str, dict[str, Any]]: return backend, {} elif isinstance(backend, tuple) and len(backend) == 2: if isinstance(backend[0], str) and isinstance(backend[1], dict): - return cast(Tuple[str, Dict[str, Any]], backend) + return cast(tuple[str, dict[str, Any]], backend) raise TypeError("anyio_backend must be either a string or tuple of (string, dict)") diff --git a/src/anyio/streams/tls.py b/src/anyio/streams/tls.py index e913eedb..83240b4d 100644 --- a/src/anyio/streams/tls.py +++ b/src/anyio/streams/tls.py @@ -7,7 +7,7 @@ from collections.abc import Callable, Mapping from dataclasses import dataclass from functools import wraps -from typing import Any, Tuple, TypeVar +from typing import Any, TypeVar from .. import ( BrokenResourceError, @@ -25,8 +25,8 @@ T_Retval = TypeVar("T_Retval") PosArgsT = TypeVarTuple("PosArgsT") -_PCTRTT = Tuple[Tuple[str, str], ...] -_PCTRTTT = Tuple[_PCTRTT, ...] +_PCTRTT = tuple[tuple[str, str], ...] +_PCTRTTT = tuple[_PCTRTT, ...] class TLSAttribute(TypedAttributeSet): diff --git a/tests/streams/test_stapled.py b/tests/streams/test_stapled.py index d7614314..b032e215 100644 --- a/tests/streams/test_stapled.py +++ b/tests/streams/test_stapled.py @@ -1,8 +1,9 @@ from __future__ import annotations from collections import deque +from collections.abc import Iterable from dataclasses import InitVar, dataclass, field -from typing import Iterable, TypeVar +from typing import TypeVar import pytest diff --git a/tests/streams/test_tls.py b/tests/streams/test_tls.py index 9846e0c1..90307657 100644 --- a/tests/streams/test_tls.py +++ b/tests/streams/test_tls.py @@ -2,9 +2,9 @@ import socket import ssl -from contextlib import ExitStack +from contextlib import AbstractContextManager, ExitStack from threading import Thread -from typing import ContextManager, NoReturn +from typing import NoReturn import pytest from pytest_mock import MockerFixture @@ -210,7 +210,7 @@ def serve_sync() -> None: finally: conn.close() - client_cm: ContextManager = ExitStack() + client_cm: AbstractContextManager = ExitStack() if client_compatible and not server_compatible: client_cm = pytest.raises(BrokenResourceError) diff --git a/tests/test_from_thread.py b/tests/test_from_thread.py index 038b0aaf..3f740580 100644 --- a/tests/test_from_thread.py +++ b/tests/test_from_thread.py @@ -4,12 +4,12 @@ import sys import threading import time -from collections.abc import Awaitable, Callable +from collections.abc import AsyncGenerator, Awaitable, Callable from concurrent import futures from concurrent.futures import CancelledError, Future from contextlib import asynccontextmanager, suppress from contextvars import ContextVar -from typing import Any, AsyncGenerator, Literal, NoReturn, TypeVar +from typing import Any, Literal, NoReturn, TypeVar import pytest import sniffio diff --git a/tests/test_signals.py b/tests/test_signals.py index 16861b82..161633d2 100644 --- a/tests/test_signals.py +++ b/tests/test_signals.py @@ -3,7 +3,7 @@ import os import signal import sys -from typing import AsyncIterable +from collections.abc import AsyncIterable import pytest diff --git a/tests/test_sockets.py b/tests/test_sockets.py index 219aa9d0..41ccb849 100644 --- a/tests/test_sockets.py +++ b/tests/test_sockets.py @@ -10,12 +10,13 @@ import tempfile import threading import time +from collections.abc import Generator, Iterable, Iterator from contextlib import suppress from pathlib import Path from socket import AddressFamily from ssl import SSLContext, SSLError from threading import Thread -from typing import Any, Generator, Iterable, Iterator, NoReturn, TypeVar, cast +from typing import Any, NoReturn, TypeVar, cast import psutil import pytest @@ -1124,9 +1125,10 @@ async def handle(stream: SocketStream) -> None: async with stream: await stream.send(b"Hello\n") - async with await create_unix_listener( - socket_path - ) as listener, create_task_group() as tg: + async with ( + await create_unix_listener(socket_path) as listener, + create_task_group() as tg, + ): tg.start_soon(listener.serve, handle) await wait_all_tasks_blocked() diff --git a/tests/test_subprocesses.py b/tests/test_subprocesses.py index 22bf882e..65dcd588 100644 --- a/tests/test_subprocesses.py +++ b/tests/test_subprocesses.py @@ -126,9 +126,11 @@ async def test_run_process_connect_to_file(tmp_path: Path) -> None: stdinfile.write_text("Hello, process!\n") stdoutfile = tmp_path / "stdout" stderrfile = tmp_path / "stderr" - with stdinfile.open("rb") as fin, stdoutfile.open("wb") as fout, stderrfile.open( - "wb" - ) as ferr: + with ( + stdinfile.open("rb") as fin, + stdoutfile.open("wb") as fout, + stderrfile.open("wb") as ferr, + ): async with await open_process( [ sys.executable, diff --git a/tests/test_taskgroups.py b/tests/test_taskgroups.py index 9df86f2b..19ed02da 100644 --- a/tests/test_taskgroups.py +++ b/tests/test_taskgroups.py @@ -265,7 +265,6 @@ async def test_cancel_with_nested_shielded_scope(mocker: MockerFixture) -> None: async def shield_task() -> None: with CancelScope(shield=True) as scope: shielded_cancel_spy = mocker.spy(scope, "_deliver_cancellation") - print(f"innermost cancel scope: {id(scope):x}") await sleep(0.5) # At this point, the outermost cancel scope was delivered cancellation once @@ -278,9 +277,6 @@ async def middle_task() -> None: try: async with create_task_group() as tg: middle_cancel_spy = mocker.spy(tg.cancel_scope, "_deliver_cancellation") - print( - f"middle cancel scope: {id(tg.cancel_scope):x} task group: {id(tg):x}" - ) tg.start_soon(shield_task, name="shield task") finally: # Cancellation is delivered to the the middle task groups's cancel scope: @@ -290,20 +286,11 @@ async def middle_task() -> None: # exited assert len(middle_cancel_spy.call_args_list) == 6 assert len(outer_cancel_spy.call_args_list) == 6 - print( - "exited middle task group, outer cancel scope now cancelled", - len(outer_cancel_spy.call_args_list), - "times", - ) async with create_task_group() as tg: - print( - f"outermost cancel scope: {id(tg.cancel_scope):x} task group: {id(tg):x}" - ) outer_cancel_spy = mocker.spy(tg.cancel_scope, "_deliver_cancellation") tg.start_soon(middle_task, name="middle task") await wait_all_tasks_blocked() - print("cancellation should start now") tg.cancel_scope.cancel() # Cancellation is delivered to the outermost cancel scope: @@ -832,14 +819,11 @@ async def child(fail: bool) -> None: async def test_cancel_cascade() -> None: async def do_something() -> NoReturn: async with create_task_group() as tg2: - print(f"tg2 ({id(tg2):x}) cancel scope: {id(tg2.cancel_scope):x}\n") tg2.start_soon(sleep, 1, name="sleep") - print("exited task group tg2") raise Exception("foo") async with create_task_group() as tg: - print(f"tg ({id(tg):x}) cancel scope: {id(tg.cancel_scope):x}") tg.start_soon(do_something, name="do_something") await wait_all_tasks_blocked() tg.cancel_scope.cancel() @@ -1022,17 +1006,14 @@ async def fn() -> None: async def test_cancel_propagation_with_inner_spawn() -> None: async def g() -> NoReturn: async with anyio.create_task_group() as tg2: - print(f"tg2 = {id(tg2):x} cancel scope = {id(tg2.cancel_scope):x}") tg2.start_soon(anyio.sleep, 10) await anyio.sleep(1) pytest.fail("Execution should not have reached this line") async with anyio.create_task_group() as tg: - print(f"tg = {id(tg):x}n cancel scope = {id(tg.cancel_scope):x}") tg.start_soon(g) await wait_all_tasks_blocked() - print("cancelling scope", hex(id(tg.cancel_scope))) tg.cancel_scope.cancel() @@ -1252,7 +1233,6 @@ async def nested() -> None: async def main() -> NoReturn: async with anyio.create_task_group() as tg: - print(f"tg cancel scope = {id(tg.cancel_scope):x}") tg.start_soon(nested) await wait_all_tasks_blocked() asyncio.get_running_loop().call_soon(crash) diff --git a/tests/test_typedattr.py b/tests/test_typedattr.py index 9930996a..48e175d5 100644 --- a/tests/test_typedattr.py +++ b/tests/test_typedattr.py @@ -1,6 +1,7 @@ from __future__ import annotations -from typing import Any, Callable, Mapping +from collections.abc import Mapping +from typing import Any, Callable import pytest From c0a8222e1455171feb0bfc250a63e7567fe55da2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Wed, 28 Aug 2024 02:10:14 +0300 Subject: [PATCH 07/22] Fixed subprocess finalization on cancellation --- src/anyio/_backends/_asyncio.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index ad120d4d..4df88541 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -972,7 +972,7 @@ class Process(abc.Process): _stderr: StreamReaderWrapper | None async def aclose(self) -> None: - with CancelScope(shield=True): + with CancelScope(shield=True) as scope: if self._stdin: await self._stdin.aclose() if self._stdout: @@ -980,14 +980,14 @@ async def aclose(self) -> None: if self._stderr: await self._stderr.aclose() - try: - await self.wait() - except BaseException: - self.kill() - with CancelScope(shield=True): + scope.shield = False + try: await self.wait() - - raise + except BaseException: + scope.shield = True + self.kill() + await self.wait() + raise async def wait(self) -> int: return await self._process.wait() From 9bd3c5eae6096441cf5063e6810c61cd4b7fcb1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Thu, 29 Aug 2024 00:01:55 +0300 Subject: [PATCH 08/22] Simplified the uncancellation logic --- src/anyio/_backends/_asyncio.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index 4df88541..364f29bc 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -364,7 +364,6 @@ def __init__(self, deadline: float = math.inf, shield: bool = False): self._tasks: set[asyncio.Task] = set() self._host_task: asyncio.Task | None = None self._cancel_calls: int = 0 - self._cancelling: int | None = None def __enter__(self) -> CancelScope: if self._active: @@ -388,8 +387,6 @@ def __enter__(self) -> CancelScope: self._timeout() self._active = True - if sys.version_info >= (3, 11): - self._cancelling = self._host_task.cancelling() # Start cancelling the host task if the scope was cancelled before entering if self._cancel_called: @@ -443,13 +440,11 @@ def __exit__( self._cancelled_caught = self._uncancel(exc) if not self._cancelled_caught: not_swallowed_exceptions += 1 - # print(f"cancelled_caught in {id(self):x} is {self._cancelled_caught}") else: not_swallowed_exceptions += 1 # Restart the cancellation effort in the closest directly cancelled parent # scope if this one was shielded - # print(f"{not_swallowed_exceptions=}") self._restart_cancellation_in_parent(immediate=not not_swallowed_exceptions) return self._cancelled_caught and not not_swallowed_exceptions @@ -459,13 +454,12 @@ def _uncancel(self, cancelled_exc: CancelledError) -> bool: return True # Undo all cancellations done by this scope - if self._cancelling is not None: + if sys.version_info >= (3, 11): while self._cancel_calls: self._cancel_calls -= 1 - if self._host_task.uncancel() <= self._cancelling: + if not self._host_task.uncancel(): return True - self._cancel_calls = 0 return f"Cancelled by cancel scope {id(self):x}" in cancelled_exc.args def _timeout(self) -> None: From 2cd17af21d9b9db44c48bcc876e2880f7c0a1fcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Thu, 29 Aug 2024 16:57:06 +0300 Subject: [PATCH 09/22] Fixed the last failing test --- src/anyio/_backends/_asyncio.py | 41 +++++++++++++++++---------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index 364f29bc..ec90c1af 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -364,6 +364,7 @@ def __init__(self, deadline: float = math.inf, shield: bool = False): self._tasks: set[asyncio.Task] = set() self._host_task: asyncio.Task | None = None self._cancel_calls: int = 0 + self._cancelling: int | None = None def __enter__(self) -> CancelScope: if self._active: @@ -387,12 +388,13 @@ def __enter__(self) -> CancelScope: self._timeout() self._active = True + if sys.version_info >= (3, 11): + self._cancelling = self._host_task.cancelling() # Start cancelling the host task if the scope was cancelled before entering if self._cancel_called: self._deliver_cancellation(self) - # print(f"entered cancel scope {id(self):x}") return self def __exit__( @@ -454,13 +456,25 @@ def _uncancel(self, cancelled_exc: CancelledError) -> bool: return True # Undo all cancellations done by this scope - if sys.version_info >= (3, 11): + if self._cancelling is not None: while self._cancel_calls: self._cancel_calls -= 1 - if not self._host_task.uncancel(): - return True + if self._host_task.uncancel() <= self._cancelling: + break - return f"Cancelled by cancel scope {id(self):x}" in cancelled_exc.args + # Sometimes third party frameworks catch a CancelledError and raise a new one, + # so as a workaround we have to look at the previous ones in __context__ too + # for a matching cancel message + expected_cancel_message = f"Cancelled by cancel scope {id(self):x}" + while True: + if expected_cancel_message in cancelled_exc.args: + return True + + if isinstance(cancelled_exc.__context__, CancelledError): + cancelled_exc = cancelled_exc.__context__ + continue + + return False def _timeout(self) -> None: if self._deadline != math.inf: @@ -691,21 +705,8 @@ async def __aexit__( return True raise - else: - return self.cancel_scope.__exit__(exc_type, exc_val, exc_tb) - - # Raise the CancelledError received while waiting for child tasks to exit, - # unless the context manager itself was previously exited with another - # exception, or if any of the child tasks raised an exception other than - # CancelledError - # print(f"exiting {id(self):x}") - # print(f"{cancelled_exc_while_waiting_tasks=}") - # print(f"{ignore_exception=}") - # if cancelled_exc_while_waiting_tasks: - # if exc_val is None or ignore_exception: - # raise cancelled_exc_while_waiting_tasks - # - # return ignore_exception + + return self.cancel_scope.__exit__(exc_type, exc_val, exc_tb) def _spawn( self, From 388af89bf82ec1dbe6b8dad8c20d346af17b58c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Thu, 29 Aug 2024 17:15:37 +0300 Subject: [PATCH 10/22] Added changelog note --- docs/versionhistory.rst | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index b4971451..2f762dd1 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -17,6 +17,12 @@ This library adheres to `Semantic Versioning 2.0 `_. - Fixed ``to_process.run_sync()`` failing to initialize if ``__main__.__file__`` pointed to a file in a nonexistent directory (`#696 `_) +- Fixed 100% CPU use on asyncio while waiting for an exiting task group to finish while + said task group is within a cancelled cancel scope + (`#695 `_) +- Fixed cancel scopes on asyncio not reraising ``CancelledError`` on exit while the + enclosing cancel scope has been effectively cancelled + (`#698 `_) **4.4.0** From c03fc1d8e08635db9eb8ff9530cee6db46d597a5 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 1 Sep 2024 12:08:25 +0000 Subject: [PATCH 11/22] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/anyio/_backends/_asyncio.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index 7c9aa778..adb31de4 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -26,7 +26,6 @@ Callable, Collection, Coroutine, - Generator, Iterable, Mapping, Sequence, From 23e687c3be4b2abfdd53b6fde6c92178d942afe1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Mon, 2 Sep 2024 20:22:27 +0300 Subject: [PATCH 12/22] Added the test case from #698 --- tests/test_taskgroups.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/tests/test_taskgroups.py b/tests/test_taskgroups.py index 7a7ef422..c6960b7e 100644 --- a/tests/test_taskgroups.py +++ b/tests/test_taskgroups.py @@ -259,7 +259,7 @@ async def taskfunc() -> None: @pytest.mark.parametrize("anyio_backend", ["asyncio"]) -async def test_cancel_with_nested_shielded_scope(mocker: MockerFixture) -> None: +async def test_cancel_with_nested_task_groups(mocker: MockerFixture) -> None: """Regression test for #695.""" async def shield_task() -> None: @@ -302,6 +302,21 @@ async def middle_task() -> None: assert len(outer_cancel_spy.call_args_list) == 9 +async def test_cancel_with_nested_cancel_scopes() -> None: + with CancelScope() as outer_scope: + with CancelScope() as inner_scope: + await checkpoint() + inner_scope.cancel() + try: + await checkpoint() + finally: + outer_scope.cancel() + + pytest.fail("Execution should not reach this point") + + pytest.fail("Execution should not reach this point") + + async def test_start_exception_delivery(anyio_backend_name: str) -> None: def task_fn(*, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None: task_status.started("hello") From 3a90e746da2069af67c8b53f0e579a8da002c6e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Mon, 2 Sep 2024 20:26:13 +0300 Subject: [PATCH 13/22] Added new failing test case --- tests/test_taskgroups.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/test_taskgroups.py b/tests/test_taskgroups.py index c6960b7e..9e04d1d5 100644 --- a/tests/test_taskgroups.py +++ b/tests/test_taskgroups.py @@ -720,6 +720,26 @@ async def test_cancelled_not_caught() -> None: assert not scope.cancelled_caught +@pytest.mark.parametrize("shield_inner", [False, True]) +async def test_cancelled_raises_beyond_origin(shield_inner: bool) -> None: + """Regression test for #698.""" + with CancelScope() as outer_scope: + with CancelScope(shield=shield_inner) as inner_scope: + inner_scope.cancel() + try: + await checkpoint() + finally: + outer_scope.cancel() + + pytest.fail("checkpoint should have raised") + + if not shield_inner: + pytest.fail("inner_scope should not have caught cancelled") + + assert inner_scope.cancelled_caught == shield_inner + assert outer_scope.cancelled_caught != shield_inner + + @pytest.mark.parametrize("anyio_backend", ["asyncio"]) async def test_cancel_host_asyncgen() -> None: done = False From 947c56e84a53cd88603d1d96fb693a94033ec03d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 8 Sep 2024 12:32:28 +0300 Subject: [PATCH 14/22] All tests pass now --- docs/versionhistory.rst | 2 + src/anyio/_backends/_asyncio.py | 128 ++++++++++++++++++-------------- tests/test_taskgroups.py | 113 +++++++++++++++++----------- 3 files changed, 147 insertions(+), 96 deletions(-) diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index ebf48183..ebeff274 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -49,6 +49,8 @@ This library adheres to `Semantic Versioning 2.0 `_. arrives in an exception group) - Fixed support for Linux abstract namespaces in UNIX sockets that was broken in v4.2 (#781 _; PR by @tapetersen) +- Fixed asyncio task groups not yielding control to the event loop at exit if there were + no child tasks to wait on **4.4.0** diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index d5c53697..5739553e 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -445,24 +445,45 @@ def __exit__( host_task_state.cancel_scope = self._parent_scope - # We allow immediately raising CancelledError in the following cases: - # * exc_val is None (no exceptions were raised within this scope) - # * we caught and swallowed a CancelledError meant for this scope, and there are - # no other exceptions being raised + # We only swallow the exception iff it was an AnyIO CancelledError, either + # directly as exc_val or inside an exception group and there are no cancelled + # parent cancel scopes visible to us here not_swallowed_exceptions = 0 + swallow_exception = False if exc_val is not None: for exc in iterate_exceptions(exc_val): if self._cancel_called and isinstance(exc, CancelledError): - self._cancelled_caught = self._uncancel(exc) - if not self._cancelled_caught: + if not (swallow_exception := self._uncancel(exc)): not_swallowed_exceptions += 1 else: not_swallowed_exceptions += 1 - # Restart the cancellation effort in the closest directly cancelled parent - # scope if this one was shielded - self._restart_cancellation_in_parent(immediate=not not_swallowed_exceptions) - return self._cancelled_caught and not not_swallowed_exceptions + # Restart the cancellation effort in the closest visible, cancelled parent + # scope if necessary + self._restart_cancellation_in_parent() + return swallow_exception and not not_swallowed_exceptions + + @property + def _effectively_cancelled(self) -> bool: + cancel_scope: CancelScope | None = self + while cancel_scope is not None: + if cancel_scope._cancel_called: + return True + + if cancel_scope.shield: + return False + + cancel_scope = cancel_scope._parent_scope + + return False + + @property + def _parent_cancellation_is_visible_to_us(self) -> bool: + return ( + self._parent_scope is not None + and not self.shield + and self._parent_scope._effectively_cancelled + ) def _uncancel(self, cancelled_exc: CancelledError) -> bool: if self._host_task is None: @@ -476,14 +497,24 @@ def _uncancel(self, cancelled_exc: CancelledError) -> bool: if self._host_task.uncancel() <= self._cancelling: break - # Sometimes third party frameworks catch a CancelledError and raise a new one, - # so as a workaround we have to look at the previous ones in __context__ too - # for a matching cancel message - expected_cancel_message = f"Cancelled by cancel scope {id(self):x}" while True: - if expected_cancel_message in cancelled_exc.args: - return True + if ( + cancelled_exc.args + and isinstance(cancelled_exc.args[0], str) + and cancelled_exc.args[0].startswith("Cancelled by cancel scope ") + ): + # Only swallow the cancellation exception if it's an AnyIO cancel + # exception and there are no other cancel scopes down the line pending + # cancellation + self._cancelled_caught = ( + self._effectively_cancelled + and not self._parent_cancellation_is_visible_to_us + ) + return self._cancelled_caught + # Sometimes third party frameworks catch a CancelledError and raise a new + # one, so as a workaround we have to look at the previous ones in + # __context__ too for a matching cancel message if isinstance(cancelled_exc.__context__, CancelledError): cancelled_exc = cancelled_exc.__context__ continue @@ -540,7 +571,7 @@ def _deliver_cancellation(self, origin: CancelScope) -> bool: return should_retry - def _restart_cancellation_in_parent(self, immediate: bool = False) -> None: + def _restart_cancellation_in_parent(self) -> None: """ Restart the cancellation effort in the closest directly cancelled parent scope. @@ -549,8 +580,7 @@ def _restart_cancellation_in_parent(self, immediate: bool = False) -> None: while scope is not None: if scope._cancel_called: if scope._cancel_handle is None: - if scope._deliver_cancellation(scope) and immediate: - raise CancelledError(f"Cancelled by cancel scope {id(scope):x}") + scope._deliver_cancellation(scope) break @@ -560,17 +590,6 @@ def _restart_cancellation_in_parent(self, immediate: bool = False) -> None: scope = scope._parent_scope - def _parent_cancelled(self) -> bool: - # Check whether any parent has been cancelled - cancel_scope = self._parent_scope - while cancel_scope is not None and not cancel_scope._shield: - if cancel_scope._cancel_called: - return True - else: - cancel_scope = cancel_scope._parent_scope - - return False - def cancel(self) -> None: if not self._cancel_called: if self._timeout_handle: @@ -683,27 +702,30 @@ async def __aexit__( self._exceptions.append(exc_val) try: - # Wait for child tasks to finish in a shielded scope, to prevent the - # enclosing cancel scope from hammering this task with cancellation attempts - # (#695) - with CancelScope(shield=True): - while self._tasks: - try: - await asyncio.wait(self._tasks) - except CancelledError as exc: - # This task was cancelled natively; reraise the CancelledError later - # unless this task was already interrupted by another exception - self.cancel_scope.cancel() - if exc_val is None: - exc_val = exc + if self._tasks: + with CancelScope() as wait_scope: + while self._tasks: + try: + await asyncio.wait(self._tasks) + except CancelledError as exc: + # Shield the scope against further cancellation attempts, + # as they're not productive (#695) + wait_scope.shield = True + self.cancel_scope.cancel() + if exc_val is None: + exc_val = exc + else: + # If there are no child tasks to wait on, run at least one checkpoint + # anyway + await AsyncIOBackend.cancel_shielded_checkpoint() - self._active = False - if self._exceptions: - raise BaseExceptionGroup( - "unhandled errors in a TaskGroup", self._exceptions - ) - elif exc_val: - raise exc_val + self._active = False + if self._exceptions: + raise BaseExceptionGroup( + "unhandled errors in a TaskGroup", self._exceptions + ) + elif exc_val: + raise exc_val except BaseException as exc: if self.cancel_scope.__exit__(type(exc), exc, exc.__traceback__): return True @@ -746,7 +768,7 @@ def task_done(_task: asyncio.Task) -> None: if not isinstance(exc, CancelledError): self._exceptions.append(exc) - if not self.cancel_scope._parent_cancelled(): + if not self.cancel_scope._effectively_cancelled: self.cancel_scope.cancel() else: task_status_future.set_exception(exc) @@ -2031,9 +2053,7 @@ def has_pending_cancellation(self) -> bool: if task_state := _task_states.get(task): if cancel_scope := task_state.cancel_scope: - return cancel_scope.cancel_called or ( - not cancel_scope.shield and cancel_scope._parent_cancelled() - ) + return cancel_scope._effectively_cancelled return False diff --git a/tests/test_taskgroups.py b/tests/test_taskgroups.py index 9e04d1d5..a83f7dcb 100644 --- a/tests/test_taskgroups.py +++ b/tests/test_taskgroups.py @@ -267,10 +267,7 @@ async def shield_task() -> None: shielded_cancel_spy = mocker.spy(scope, "_deliver_cancellation") await sleep(0.5) - # At this point, the outermost cancel scope was delivered cancellation once - # (when tg.cancel_scope.cancel() was called), and the shielded scope wasn't - # since it's shielded - assert len(outer_cancel_spy.call_args_list) == 2 + assert len(outer_cancel_spy.call_args_list) < 10 shielded_cancel_spy.assert_not_called() async def middle_task() -> None: @@ -279,13 +276,8 @@ async def middle_task() -> None: middle_cancel_spy = mocker.spy(tg.cancel_scope, "_deliver_cancellation") tg.start_soon(shield_task, name="shield task") finally: - # Cancellation is delivered to the the middle task groups's cancel scope: - # - When the outermost task group's.cancel_scope is cancelled - # - When the shielded innermost cancel scope is exited - # - When the middle task's task group's temporary shielded cancel scope is - # exited - assert len(middle_cancel_spy.call_args_list) == 6 - assert len(outer_cancel_spy.call_args_list) == 6 + assert len(middle_cancel_spy.call_args_list) < 10 + assert len(outer_cancel_spy.call_args_list) < 10 async with create_task_group() as tg: outer_cancel_spy = mocker.spy(tg.cancel_scope, "_deliver_cancellation") @@ -293,28 +285,7 @@ async def middle_task() -> None: await wait_all_tasks_blocked() tg.cancel_scope.cancel() - # Cancellation is delivered to the outermost cancel scope: - # - When tg.cancel_scope.cancel() is called - # - When the shielded innermost cancel scope is exited - # - When the middle task's task group's temporary shielded cancel scope is exited - # - - # - - assert len(outer_cancel_spy.call_args_list) == 9 - - -async def test_cancel_with_nested_cancel_scopes() -> None: - with CancelScope() as outer_scope: - with CancelScope() as inner_scope: - await checkpoint() - inner_scope.cancel() - try: - await checkpoint() - finally: - outer_scope.cancel() - - pytest.fail("Execution should not reach this point") - - pytest.fail("Execution should not reach this point") + assert len(outer_cancel_spy.call_args_list) < 10 async def test_start_exception_delivery(anyio_backend_name: str) -> None: @@ -720,11 +691,26 @@ async def test_cancelled_not_caught() -> None: assert not scope.cancelled_caught -@pytest.mark.parametrize("shield_inner", [False, True]) -async def test_cancelled_raises_beyond_origin(shield_inner: bool) -> None: - """Regression test for #698.""" +async def test_cancelled_scope_based_checkpoint() -> None: + """Regression test closely related to #698.""" with CancelScope() as outer_scope: - with CancelScope(shield=shield_inner) as inner_scope: + outer_scope.cancel() + + # The following two lines are a way to implement a checkpoint function. + # See also https://github.com/python-trio/trio/issues/860. + with CancelScope() as inner_scope: + inner_scope.cancel() + await sleep_forever() + + pytest.fail("checkpoint should have raised") + + assert not inner_scope.cancelled_caught + assert outer_scope.cancelled_caught + + +async def test_cancelled_raises_beyond_origin_unshielded() -> None: + with CancelScope() as outer_scope: + with CancelScope() as inner_scope: inner_scope.cancel() try: await checkpoint() @@ -733,11 +719,54 @@ async def test_cancelled_raises_beyond_origin(shield_inner: bool) -> None: pytest.fail("checkpoint should have raised") - if not shield_inner: - pytest.fail("inner_scope should not have caught cancelled") + pytest.fail("exiting the inner scope should've raised a cancellation error") - assert inner_scope.cancelled_caught == shield_inner - assert outer_scope.cancelled_caught != shield_inner + # Here, the outer scope is responsible for the cancellation, so the inner scope + # won't catch the cancellation exception, but the outer scope will + assert not inner_scope.cancelled_caught + assert outer_scope.cancelled_caught + + +async def test_cancelled_raises_beyond_origin_shielded() -> None: + code_between_scopes_was_run = False + with CancelScope() as outer_scope: + with CancelScope(shield=True) as inner_scope: + inner_scope.cancel() + try: + await checkpoint() + finally: + outer_scope.cancel() + + pytest.fail("checkpoint should have raised") + + code_between_scopes_was_run = True + + # Here, the inner scope is the one responsible for cancellation, and given that the + # outer scope was also cancelled, it is not considered to have "caught" the + # cancellation, even though it swallows it, because the inner scope triggered it + assert code_between_scopes_was_run + assert inner_scope.cancelled_caught + assert not outer_scope.cancelled_caught + + +async def test_empty_taskgroup_contains_yield_point() -> None: + """ + Test that a task group yields at exit at least once, even with no child tasks to + wait on. + + """ + outer_task_ran = False + + async def outer_task() -> None: + nonlocal outer_task_ran + outer_task_ran = True + + async with create_task_group() as tg_outer: + for _ in range(2): # this is to make sure Trio actually schedules outer_task() + async with create_task_group(): + tg_outer.start_soon(outer_task) + + assert outer_task_ran @pytest.mark.parametrize("anyio_backend", ["asyncio"]) @@ -859,7 +888,7 @@ async def do_something() -> NoReturn: async with create_task_group() as tg2: tg2.start_soon(sleep, 1, name="sleep") - raise Exception("foo") + pytest.fail("Execution should not reach this point") async with create_task_group() as tg: tg.start_soon(do_something, name="do_something") From f1b2738f7f2694e60ca67f285c28d01881489382 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sun, 8 Sep 2024 14:24:47 +0300 Subject: [PATCH 15/22] Enabled uvloop to be used in the test suite on Python 3.13 The pre-release version now works properly on 3.13. --- pyproject.toml | 4 ++-- tests/test_subprocesses.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 948c3293..4e726f4e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,8 +52,8 @@ test = [ "pytest-mock >= 3.6.1", "trustme", """\ - uvloop >= 0.17; platform_python_implementation == 'CPython' \ - and platform_system != 'Windows' and python_version < '3.13'\ + uvloop >= 0.21.0b1; platform_python_implementation == 'CPython' \ + and platform_system != 'Windows'\ """ ] doc = [ diff --git a/tests/test_subprocesses.py b/tests/test_subprocesses.py index 91f024e9..adf029a3 100644 --- a/tests/test_subprocesses.py +++ b/tests/test_subprocesses.py @@ -277,9 +277,9 @@ async def test_py39_arguments( [sys.executable, "-c", "print('hello')"], **{argname: argvalue_factory()}, ) - except TypeError as exc: + except ValueError as exc: if ( - "unexpected keyword argument" in str(exc) + "unexpected kwargs" in str(exc) and anyio_backend_name == "asyncio" and anyio_backend_options["loop_factory"] and anyio_backend_options["loop_factory"].__module__ == "uvloop" From 3435c723b518a09c9b7998ac06de66bdf859e2a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Thu, 12 Sep 2024 19:25:33 +0300 Subject: [PATCH 16/22] Fixed inconsistent uncancellation by asyncio cancel scopes --- docs/versionhistory.rst | 2 ++ src/anyio/_backends/_asyncio.py | 3 ++- tests/test_taskgroups.py | 18 ++++++++++++++++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index ee122076..860c20cd 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -52,6 +52,8 @@ This library adheres to `Semantic Versioning 2.0 `_. - Fixed ``KeyboardInterrupt`` (ctrl+c) hanging the asyncio pytest runner - Fixed asyncio task groups not yielding control to the event loop at exit if there were no child tasks to wait on +- Fixed inconsistent task uncancellation with asyncio cancel scopes belonging to a + task group when said task group has child tasks running **4.4.0** diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index 42654514..7f45b285 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -551,8 +551,9 @@ def _deliver_cancellation(self, origin: CancelScope) -> bool: if task is not current and (task is self._host_task or _task_started(task)): waiter = task._fut_waiter # type: ignore[attr-defined] if not isinstance(waiter, asyncio.Future) or not waiter.done(): - origin._cancel_calls += 1 task.cancel(f"Cancelled by cancel scope {id(origin):x}") + if task is self._host_task: + origin._cancel_calls += 1 # Deliver cancellation to child scopes that aren't shielded or running their own # cancellation callbacks diff --git a/tests/test_taskgroups.py b/tests/test_taskgroups.py index a83f7dcb..d178cdcb 100644 --- a/tests/test_taskgroups.py +++ b/tests/test_taskgroups.py @@ -4,6 +4,7 @@ import math import sys import time +from asyncio import CancelledError from collections.abc import AsyncGenerator, Coroutine, Generator from typing import Any, NoReturn, cast @@ -1427,6 +1428,23 @@ async def test_cancel_message_replaced(self) -> None: except asyncio.CancelledError: pytest.fail("Should have swallowed the CancelledError") + async def test_uncancel_after_taskgroup_cancelled(self) -> None: + """ + Test that a cancel scope only uncancels the host task as many times as it has + cancelled that specific task, and won't count child task cancellations towards + that amount. + + """ + task = asyncio.current_task() + assert task + with pytest.raises(CancelledError): + async with create_task_group() as tg: + tg.start_soon(sleep, 3) + await wait_all_tasks_blocked() + task.cancel() + + assert task.cancelling() == 1 + async def test_cancel_before_entering_task_group() -> None: with CancelScope() as scope: From 6eca8258710cfe43b4a17f1ca07552292761112a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sat, 14 Sep 2024 12:14:11 +0300 Subject: [PATCH 17/22] Fixed another cancel scope issue --- src/anyio/_backends/_asyncio.py | 2 +- tests/test_taskgroups.py | 32 ++++++++++++++++---------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index 7f45b285..40ddf959 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -552,7 +552,7 @@ def _deliver_cancellation(self, origin: CancelScope) -> bool: waiter = task._fut_waiter # type: ignore[attr-defined] if not isinstance(waiter, asyncio.Future) or not waiter.done(): task.cancel(f"Cancelled by cancel scope {id(origin):x}") - if task is self._host_task: + if task is origin._host_task: origin._cancel_calls += 1 # Deliver cancellation to child scopes that aren't shielded or running their own diff --git a/tests/test_taskgroups.py b/tests/test_taskgroups.py index d178cdcb..b147f4ab 100644 --- a/tests/test_taskgroups.py +++ b/tests/test_taskgroups.py @@ -487,19 +487,6 @@ async def test_cancel_before_entering_scope() -> None: pytest.fail("execution should not reach this point") -@pytest.mark.xfail( - sys.version_info < (3, 11), reason="Requires asyncio.Task.cancelling()" -) -@pytest.mark.parametrize("anyio_backend", ["asyncio"]) -async def test_cancel_counter_nested_scopes() -> None: - with CancelScope() as root_scope: - with CancelScope(): - root_scope.cancel() - await sleep(0.5) - - assert not cast(asyncio.Task, asyncio.current_task()).cancelling() - - async def test_exception_group_children() -> None: with pytest.raises(BaseExceptionGroup) as exc: async with create_task_group() as tg: @@ -1428,19 +1415,32 @@ async def test_cancel_message_replaced(self) -> None: except asyncio.CancelledError: pytest.fail("Should have swallowed the CancelledError") + async def test_cancel_counter_nested_scopes(self) -> None: + with CancelScope() as root_scope: + with CancelScope(): + root_scope.cancel() + await checkpoint() + + assert not cast(asyncio.Task, asyncio.current_task()).cancelling() + async def test_uncancel_after_taskgroup_cancelled(self) -> None: """ Test that a cancel scope only uncancels the host task as many times as it has cancelled that specific task, and won't count child task cancellations towards that amount. - """ + + async def child_task(task_status: TaskStatus[None]) -> None: + async with create_task_group() as tg: + tg.start_soon(sleep, 3) + await wait_all_tasks_blocked() + task_status.started() + task = asyncio.current_task() assert task with pytest.raises(CancelledError): async with create_task_group() as tg: - tg.start_soon(sleep, 3) - await wait_all_tasks_blocked() + await tg.start(child_task) task.cancel() assert task.cancelling() == 1 From ab5ebb8c58f69b99cb04dbc483a9242eda8159fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sat, 14 Sep 2024 14:20:01 +0300 Subject: [PATCH 18/22] Fixed TaskGroup swallowing native cancellation exceptions --- src/anyio/_backends/_asyncio.py | 23 +++++++++++++++++------ tests/test_taskgroups.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 6 deletions(-) diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index 40ddf959..86096cac 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -359,6 +359,14 @@ def _task_started(task: asyncio.Task) -> bool: # +def is_anyio_cancellation(exc: CancelledError) -> bool: + return ( + bool(exc.args) + and isinstance(exc.args[0], str) + and exc.args[0].startswith("Cancelled by cancel scope ") + ) + + class CancelScope(BaseCancelScope): def __new__( cls, *, deadline: float = math.inf, shield: bool = False @@ -498,11 +506,7 @@ def _uncancel(self, cancelled_exc: CancelledError) -> bool: break while True: - if ( - cancelled_exc.args - and isinstance(cancelled_exc.args[0], str) - and cancelled_exc.args[0].startswith("Cancelled by cancel scope ") - ): + if is_anyio_cancellation(cancelled_exc): # Only swallow the cancellation exception if it's an AnyIO cancel # exception and there are no other cancel scopes down the line pending # cancellation @@ -713,7 +717,14 @@ async def __aexit__( # as they're not productive (#695) wait_scope.shield = True self.cancel_scope.cancel() - if exc_val is None: + + # Set exc_val from the cancellation exception if it was + # previously unset. However, we should not replace a native + # cancellation exception with one raise by a cancel scope. + if exc_val is None or ( + isinstance(exc_val, CancelledError) + and not is_anyio_cancellation(exc) + ): exc_val = exc else: # If there are no child tasks to wait on, run at least one checkpoint diff --git a/tests/test_taskgroups.py b/tests/test_taskgroups.py index b147f4ab..ff8f7490 100644 --- a/tests/test_taskgroups.py +++ b/tests/test_taskgroups.py @@ -1445,6 +1445,35 @@ async def child_task(task_status: TaskStatus[None]) -> None: assert task.cancelling() == 1 + async def test_uncancel_after_group_aexit_native_cancel(self) -> None: + """Closely related to #695.""" + done = anyio.Event() + + async def shield_task() -> None: + with CancelScope(shield=True): + await done.wait() + + async def middle_task() -> None: + async with create_task_group() as tg: + tg.start_soon(shield_task) + + task = asyncio.get_running_loop().create_task(middle_task()) + try: + await wait_all_tasks_blocked() + task.cancel("native 1") + await sleep(0.1) + task.cancel("native 2") + finally: + done.set() + + with pytest.raises(asyncio.CancelledError) as exc: + await task + + # Neither native cancellation should have been uncancelled, and the latest + # cancellation message should be the one coming out of the task group. + assert task.cancelling() == 2 + assert str(exc.value) == "native 2" + async def test_cancel_before_entering_task_group() -> None: with CancelScope() as scope: From c082056af84a04f4ed717d1c446317af1c2c0e95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Thu, 19 Sep 2024 14:39:53 +0300 Subject: [PATCH 19/22] Updated the changelog --- docs/versionhistory.rst | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 505bfd31..3e7b94b8 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -3,9 +3,24 @@ Version history This library adheres to `Semantic Versioning 2.0 `_. -**4.5.0** +**UNRELEASED** - Dropped support for Python 3.8 + (as `#698 `_) cannot be resolved + without cancel message support) +- Fixed 100% CPU use on asyncio while waiting for an exiting task group to finish while + said task group is within a cancelled cancel scope + (`#695 `_) +- Fixed cancel scopes on asyncio not reraising ``CancelledError`` on exit while the + enclosing cancel scope has been effectively cancelled + (`#698 `_) +- Fixed asyncio task groups not yielding control to the event loop at exit if there were + no child tasks to wait on +- Fixed inconsistent task uncancellation with asyncio cancel scopes belonging to a + task group when said task group has child tasks running + +**4.5.0** + - Improved the performance of ``anyio.Lock`` and ``anyio.Semaphore`` on asyncio (even up to 50 %) - Added the ``fast_acquire`` parameter to ``anyio.Lock`` and ``anyio.Semaphore`` to @@ -30,12 +45,6 @@ This library adheres to `Semantic Versioning 2.0 `_. - Fixed ``to_process.run_sync()`` failing to initialize if ``__main__.__file__`` pointed to a file in a nonexistent directory (`#696 `_) -- Fixed 100% CPU use on asyncio while waiting for an exiting task group to finish while - said task group is within a cancelled cancel scope - (`#695 `_) -- Fixed cancel scopes on asyncio not reraising ``CancelledError`` on exit while the - enclosing cancel scope has been effectively cancelled - (`#698 `_) - Fixed ``AssertionError: feed_data after feed_eof`` on asyncio when a subprocess is closed early, before its output has been read (`#490 `_) @@ -50,10 +59,6 @@ This library adheres to `Semantic Versioning 2.0 `_. - Fixed support for Linux abstract namespaces in UNIX sockets that was broken in v4.2 (#781 _; PR by @tapetersen) - Fixed ``KeyboardInterrupt`` (ctrl+c) hanging the asyncio pytest runner -- Fixed asyncio task groups not yielding control to the event loop at exit if there were - no child tasks to wait on -- Fixed inconsistent task uncancellation with asyncio cancel scopes belonging to a - task group when said task group has child tasks running **4.4.0** From 90812136f5032085de7f335cea0406d1b4e515ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Thu, 19 Sep 2024 15:51:31 +0300 Subject: [PATCH 20/22] Fixed CancelScope not uncancelling if it was exited with a real exception --- src/anyio/_backends/_asyncio.py | 14 +++++++------- tests/test_taskgroups.py | 12 ++++++++++++ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py index 86096cac..9342fab8 100644 --- a/src/anyio/_backends/_asyncio.py +++ b/src/anyio/_backends/_asyncio.py @@ -453,6 +453,13 @@ def __exit__( host_task_state.cancel_scope = self._parent_scope + # Undo all cancellations done by this scope + if self._cancelling is not None: + while self._cancel_calls: + self._cancel_calls -= 1 + if self._host_task.uncancel() <= self._cancelling: + break + # We only swallow the exception iff it was an AnyIO CancelledError, either # directly as exc_val or inside an exception group and there are no cancelled # parent cancel scopes visible to us here @@ -498,13 +505,6 @@ def _uncancel(self, cancelled_exc: CancelledError) -> bool: self._cancel_calls = 0 return True - # Undo all cancellations done by this scope - if self._cancelling is not None: - while self._cancel_calls: - self._cancel_calls -= 1 - if self._host_task.uncancel() <= self._cancelling: - break - while True: if is_anyio_cancellation(cancelled_exc): # Only swallow the cancellation exception if it's an AnyIO cancel diff --git a/tests/test_taskgroups.py b/tests/test_taskgroups.py index ff8f7490..ac6579ba 100644 --- a/tests/test_taskgroups.py +++ b/tests/test_taskgroups.py @@ -1474,6 +1474,18 @@ async def middle_task() -> None: assert task.cancelling() == 2 assert str(exc.value) == "native 2" + async def test_uncancel_after_child_task_failed(self) -> None: + async def taskfunc() -> None: + raise Exception("dummy error") + + with pytest.raises(ExceptionGroup) as exc_info: + async with create_task_group() as tg: + tg.start_soon(taskfunc) + + assert len(exc_info.value.exceptions) == 1 + assert str(exc_info.value.exceptions[0]) == "dummy error" + assert not cast(asyncio.Task, asyncio.current_task()).cancelling() + async def test_cancel_before_entering_task_group() -> None: with CancelScope() as scope: From 2f68895e23679da4f2acd446b5cb2736448a0e25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Fri, 20 Sep 2024 12:00:49 +0300 Subject: [PATCH 21/22] Fixed unbalanced parentheses --- docs/versionhistory.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst index 3e7b94b8..ef7ac30c 100644 --- a/docs/versionhistory.rst +++ b/docs/versionhistory.rst @@ -6,7 +6,7 @@ This library adheres to `Semantic Versioning 2.0 `_. **UNRELEASED** - Dropped support for Python 3.8 - (as `#698 `_) cannot be resolved + (as `#698 `_ cannot be resolved without cancel message support) - Fixed 100% CPU use on asyncio while waiting for an exiting task group to finish while said task group is within a cancelled cancel scope From 15a4bcbe7e296fc01376cf41514b4d8a9035ee20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alex=20Gr=C3=B6nholm?= Date: Sat, 21 Sep 2024 13:07:46 +0300 Subject: [PATCH 22/22] Update tests/test_taskgroups.py Co-authored-by: Ganden Schaffner --- tests/test_taskgroups.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_taskgroups.py b/tests/test_taskgroups.py index ac6579ba..31490572 100644 --- a/tests/test_taskgroups.py +++ b/tests/test_taskgroups.py @@ -684,7 +684,7 @@ async def test_cancelled_scope_based_checkpoint() -> None: with CancelScope() as outer_scope: outer_scope.cancel() - # The following two lines are a way to implement a checkpoint function. + # The following three lines are a way to implement a checkpoint function. # See also https://github.com/python-trio/trio/issues/860. with CancelScope() as inner_scope: inner_scope.cancel()