Skip to content

Commit

Permalink
add tests, and some types
Browse files Browse the repository at this point in the history
  • Loading branch information
jakkdl committed Jan 31, 2025
1 parent 1a7714e commit b0b8b02
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 15 deletions.
1 change: 1 addition & 0 deletions src/trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
MemoryChannelStatistics as MemoryChannelStatistics,
MemoryReceiveChannel as MemoryReceiveChannel,
MemorySendChannel as MemorySendChannel,
background_with_channel as background_with_channel,
open_memory_channel as open_memory_channel,
)
from ._core import (
Expand Down
33 changes: 21 additions & 12 deletions src/trio/_channel.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from __future__ import annotations

import sys
from collections import OrderedDict, deque
from collections.abc import AsyncGenerator, Callable
from contextlib import AbstractAsyncContextManager, asynccontextmanager
from functools import wraps
from math import inf
from typing import (
TYPE_CHECKING,
Generic,
Protocol,
TypeVar,
)

import attrs
Expand All @@ -20,24 +22,30 @@
from ._util import NoPublicConstructor, final, generic_function

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Awaitable, Callable
from types import TracebackType

from typing_extensions import ParamSpec, Self

P = ParamSpec("P")

try:
if sys.version_info >= (3, 10):
from contextlib import aclosing # new in Python 3.10
except ImportError:
else:

class _SupportsAclose(Protocol):
def aclose(self) -> Awaitable[object]: ...

_SupportsAcloseT = TypeVar("_SupportsAcloseT", bound=_SupportsAclose)

class aclosing:
def __init__(self, aiter):
self._aiter = aiter
class aclosing(AbstractAsyncContextManager[_SupportsAcloseT, None]):
def __init__(self, thing: _SupportsAcloseT) -> None:
self._aiter = thing

async def __aenter__(self):
async def __aenter__(self) -> _SupportsAcloseT:
return self._aiter

async def __aexit__(self, *args):
async def __aexit__(self, *exc_info: object) -> None:
await self._aiter.aclose()


Expand Down Expand Up @@ -471,14 +479,14 @@ def background_with_channel(max_buffer_size: float = 0) -> Callable[
The `yield` keyword offers a very convenient way to write iterators...
which makes it really unfortunate that async generators are so difficult
to call correctly. Yielding from the inside of a cancel scope or a nursery
to call correctly. Yielding from the inside of a cancel scope or a nursery
to the outside `violates structured concurrency <https://xkcd.com/292/>`_
with consequences explainined in :pep:`789`. Even then, resource cleanup
with consequences explained in :pep:`789`. Even then, resource cleanup
errors remain common (:pep:`533`) unless you wrap every call in
:func:`~contextlib.aclosing`.
This decorator gives you the best of both worlds: with careful exception
handling and a background task we preserve structured concurrency by
handling and a background task we preserve structured concurrency by
offering only the safe interface, and you can still write your iterables
with the convenience of `yield`. For example:
Expand All @@ -493,7 +501,7 @@ async def my_async_iterable(arg, *, kwarg=True):
...
While the combined async-with-async-for can be inconvenient at first,
the context manager is indispensible for both correctness and for prompt
the context manager is indispensable for both correctness and for prompt
cleanup of resources.
"""
# Perhaps a future PEP will adopt `async with for` syntax, like
Expand Down Expand Up @@ -545,6 +553,7 @@ async def _move_elems_to_channel(
except trio.Cancelled:
raise
except BaseException as error_from_send:
# TODO: add test case ... but idk how
# Forward any other errors to the generator. Exit cleanly
# if exhausted; otherwise it was handled in there and we
# can continue the inner loop with this value.
Expand Down
84 changes: 81 additions & 3 deletions src/trio/_tests/test_channel.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from __future__ import annotations

from typing import Union
from typing import TYPE_CHECKING, Union

import pytest

import trio
from trio import EndOfChannel, open_memory_channel
from trio import EndOfChannel, background_with_channel, open_memory_channel

from ..testing import assert_checkpoints, wait_all_tasks_blocked
from ..testing import RaisesGroup, assert_checkpoints, wait_all_tasks_blocked

if TYPE_CHECKING:
from collections.abc import AsyncGenerator


async def test_channel() -> None:
Expand Down Expand Up @@ -411,3 +414,78 @@ async def do_send(s: trio.MemorySendChannel[int], v: int) -> None:
assert await r.receive() == 1
with pytest.raises(trio.WouldBlock):
r.receive_nowait()


async def test_background_with_channel() -> None:
@background_with_channel()
async def agen() -> AsyncGenerator[int]:
yield 1
await trio.sleep_forever() # simulate deadlock
yield 2

async with agen() as recv_chan:
async for x in recv_chan:
assert x == 1
break # exit, cleanup should be quick
# comment `nursery.cancel_scope.cancel()` and it hangs


async def test_background_with_channel_exhaust() -> None:
@background_with_channel()
async def agen() -> AsyncGenerator[int]:
yield 1

async with agen() as recv_chan:
async for x in recv_chan:
assert x == 1


async def test_background_with_channel_broken_resource() -> None:
@background_with_channel()
async def agen() -> AsyncGenerator[int]:
yield 1
yield 2

async with agen() as recv_chan:
assert await recv_chan.__anext__() == 1

# close the receiving channel
await recv_chan.aclose()

# trying to get the next element errors
with pytest.raises(trio.ClosedResourceError):
await recv_chan.__anext__()

# but we don't get an error on exit of the cm


async def test_background_with_channel_cancelled() -> None:
with trio.CancelScope() as cs:

@background_with_channel()
async def agen() -> AsyncGenerator[int]:
yield 1
yield 1

async with agen():
cs.cancel()


async def test_background_with_channel_waitwhat() -> None:
@background_with_channel()
async def agen() -> AsyncGenerator[int]:
yield 1
# this exception sometimes disappear, and I don't know why
# gc? trio randomness?
# idk if it's gonna show up in CI, but I have like a 50% shot of failing
# when running the test case by itself
raise ValueError("oae")

with RaisesGroup(ValueError):
async with agen() as recv_chan:
async for x in recv_chan:
assert x == 1


# TODO: I'm also failing to figure out how to test max_buffer_size
# and/or what changing it even achieves

0 comments on commit b0b8b02

Please sign in to comment.