From 3a53e6d7c50edd7d86aa22e507e3adfd7564c8ca Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Thu, 30 Nov 2023 14:53:55 +0100 Subject: [PATCH] feat: support anyio with a Cargo feature Asyncio is the standard and de facto main Python async runtime. Among non-standard runtime, only trio seems to have substantial traction, especially thanks to the anyio project. There is indeed a strong trend for anyio (e.g. FastApi), which can justify a dedicated support. --- Cargo.toml | 3 + guide/src/async-await.md | 9 ++- guide/src/building_and_distribution.md | 1 + newsfragments/3612.added.md | 1 + pytests/Cargo.toml | 3 +- pytests/pyproject.toml | 1 + pytests/src/anyio.rs | 34 ++++++++++ pytests/src/lib.rs | 3 + pytests/tests/test_anyio.py | 14 ++++ src/coroutine.rs | 4 ++ src/coroutine/anyio.rs | 69 ++++++++++++++++++++ src/coroutine/cancel.rs | 14 ++-- src/coroutine/trio.rs | 88 ++++++++++++++++++++++++++ src/coroutine/waker.rs | 18 ++++-- 14 files changed, 249 insertions(+), 13 deletions(-) create mode 100644 newsfragments/3612.added.md create mode 100644 pytests/src/anyio.rs create mode 100644 pytests/tests/test_anyio.py create mode 100644 src/coroutine/anyio.rs create mode 100644 src/coroutine/trio.rs diff --git a/Cargo.toml b/Cargo.toml index 3d25f578d76..38c7ac46f19 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,6 +64,9 @@ pyo3-build-config = { path = "pyo3-build-config", version = "0.21.0-dev", featur [features] default = ["macros"] +# Switch coroutine implementation to anyio instead of asyncio +anyio = ["macros"] + # Enables pyo3::inspect module and additional type information on FromPyObject # and IntoPy traits experimental-inspect = [] diff --git a/guide/src/async-await.md b/guide/src/async-await.md index 847fed2f47d..30d3e812796 100644 --- a/guide/src/async-await.md +++ b/guide/src/async-await.md @@ -22,8 +22,6 @@ async fn sleep(seconds: f64, result: Option) -> Option { } ``` -*Python awaitables instantiated with this method can only be awaited in *asyncio* context. Other Python async runtime may be supported in the future.* - ## `Send + 'static` constraint Resulting future of an `async fn` decorated by `#[pyfunction]` must be `Send + 'static` to be embedded in a Python object. @@ -85,6 +83,13 @@ async fn cancellable(#[pyo3(cancel_handle)] mut cancel: CancelHandle) { } ``` +## *asyncio* vs. *anyio* + +By default, Python awaitables instantiated with `async fn` can only be awaited in *asyncio* context. + +PyO3 can also target [*anyio*](https://github.com/agronholm/anyio) with the dedicated `anyio` Cargo feature. With it enabled, `async fn` become awaitable both in *asyncio* or [*trio*](https://github.com/python-trio/trio) context. +However, it requires to have the [*sniffio*](https://github.com/python-trio/sniffio) (or *anyio*) library installed. + ## The `Coroutine` type To make a Rust future awaitable in Python, PyO3 defines a [`Coroutine`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.Coroutine.html) type, which implements the Python [coroutine protocol](https://docs.python.org/3/library/collections.abc.html#collections.abc.Coroutine). diff --git a/guide/src/building_and_distribution.md b/guide/src/building_and_distribution.md index 8cb675f4303..7c6cf30cce9 100644 --- a/guide/src/building_and_distribution.md +++ b/guide/src/building_and_distribution.md @@ -62,6 +62,7 @@ There are many ways to go about this: it is possible to use `cargo` to build the PyO3 has some Cargo features to configure projects for building Python extension modules: - The `extension-module` feature, which must be enabled when building Python extension modules. - The `abi3` feature and its version-specific `abi3-pyXY` companions, which are used to opt-in to the limited Python API in order to support multiple Python versions in a single wheel. +- The `anyio` feature, making PyO3 coroutines target [*anyio*](https://github.com/agronholm/anyio) instead of *asyncio*; either [*sniffio*](https://github.com/python-trio/sniffio) or *anyio* should be added as dependency of the Python extension. This section describes each of these packaging tools before describing how to build manually without them. It then proceeds with an explanation of the `extension-module` feature. Finally, there is a section describing PyO3's `abi3` features. diff --git a/newsfragments/3612.added.md b/newsfragments/3612.added.md new file mode 100644 index 00000000000..4f5f2f24014 --- /dev/null +++ b/newsfragments/3612.added.md @@ -0,0 +1 @@ +Support anyio with a Cargo feature \ No newline at end of file diff --git a/pytests/Cargo.toml b/pytests/Cargo.toml index 255094a6c40..d5865573dcd 100644 --- a/pytests/Cargo.toml +++ b/pytests/Cargo.toml @@ -7,7 +7,8 @@ edition = "2021" publish = false [dependencies] -pyo3 = { path = "../", features = ["extension-module"] } +futures = "0.3.29" +pyo3 = { path = "../", features = ["extension-module", "anyio"] } [build-dependencies] pyo3-build-config = { path = "../pyo3-build-config" } diff --git a/pytests/pyproject.toml b/pytests/pyproject.toml index fb1ac3dbdff..d4340b2d311 100644 --- a/pytests/pyproject.toml +++ b/pytests/pyproject.toml @@ -20,6 +20,7 @@ classifiers = [ [project.optional-dependencies] dev = [ + "anyio[trio]>=4.0", "hypothesis>=3.55", "pytest-asyncio>=0.21", "pytest-benchmark>=3.4", diff --git a/pytests/src/anyio.rs b/pytests/src/anyio.rs new file mode 100644 index 00000000000..8fbe6cf685e --- /dev/null +++ b/pytests/src/anyio.rs @@ -0,0 +1,34 @@ +use std::{task::Poll, thread, time::Duration}; + +use futures::{channel::oneshot, future::poll_fn}; +use pyo3::prelude::*; + +#[pyfunction] +async fn sleep(seconds: f64, result: Option) -> Option { + if seconds <= 0.0 { + let mut ready = false; + poll_fn(|cx| { + if ready { + return Poll::Ready(()); + } + ready = true; + cx.waker().wake_by_ref(); + Poll::Pending + }) + .await; + } else { + let (tx, rx) = oneshot::channel(); + thread::spawn(move || { + thread::sleep(Duration::from_secs_f64(seconds)); + tx.send(()).unwrap(); + }); + rx.await.unwrap(); + } + result +} + +#[pymodule] +pub fn anyio(_py: Python<'_>, m: &PyModule) -> PyResult<()> { + m.add_function(wrap_pyfunction!(sleep, m)?)?; + Ok(()) +} diff --git a/pytests/src/lib.rs b/pytests/src/lib.rs index dbcd3ca4113..c7bd00eefe1 100644 --- a/pytests/src/lib.rs +++ b/pytests/src/lib.rs @@ -2,6 +2,7 @@ use pyo3::prelude::*; use pyo3::types::PyDict; use pyo3::wrap_pymodule; +pub mod anyio; pub mod awaitable; pub mod buf_and_str; pub mod comparisons; @@ -18,6 +19,7 @@ pub mod subclassing; #[pymodule] fn pyo3_pytests(py: Python<'_>, m: &PyModule) -> PyResult<()> { + m.add_wrapped(wrap_pymodule!(anyio::anyio))?; m.add_wrapped(wrap_pymodule!(awaitable::awaitable))?; #[cfg(not(Py_LIMITED_API))] m.add_wrapped(wrap_pymodule!(buf_and_str::buf_and_str))?; @@ -39,6 +41,7 @@ fn pyo3_pytests(py: Python<'_>, m: &PyModule) -> PyResult<()> { let sys = PyModule::import(py, "sys")?; let sys_modules: &PyDict = sys.getattr("modules")?.downcast()?; + sys_modules.set_item("pyo3_pytests.anyio", m.getattr("anyio")?)?; sys_modules.set_item("pyo3_pytests.awaitable", m.getattr("awaitable")?)?; sys_modules.set_item("pyo3_pytests.buf_and_str", m.getattr("buf_and_str")?)?; sys_modules.set_item("pyo3_pytests.comparisons", m.getattr("comparisons")?)?; diff --git a/pytests/tests/test_anyio.py b/pytests/tests/test_anyio.py new file mode 100644 index 00000000000..c48435bd2fc --- /dev/null +++ b/pytests/tests/test_anyio.py @@ -0,0 +1,14 @@ +import asyncio + +from pyo3_pytests.anyio import sleep +import trio + + +def test_asyncio(): + assert asyncio.run(sleep(0)) is None + assert asyncio.run(sleep(0.1, 42)) == 42 + + +def test_trio(): + assert trio.run(sleep, 0) is None + assert trio.run(sleep, 0.1, 42) == 42 diff --git a/src/coroutine.rs b/src/coroutine.rs index f840828b4d4..494b4c27dd7 100644 --- a/src/coroutine.rs +++ b/src/coroutine.rs @@ -19,8 +19,12 @@ use crate::{ IntoPy, Py, PyErr, PyObject, PyResult, Python, }; +#[cfg(feature = "anyio")] +mod anyio; mod asyncio; pub(crate) mod cancel; +#[cfg(feature = "anyio")] +mod trio; pub(crate) mod waker; pub use cancel::CancelHandle; diff --git a/src/coroutine/anyio.rs b/src/coroutine/anyio.rs new file mode 100644 index 00000000000..198265b560f --- /dev/null +++ b/src/coroutine/anyio.rs @@ -0,0 +1,69 @@ +//! Coroutine implementation using sniffio to select the appropriate implementation, +//! compatible with anyio. +use crate::{ + coroutine::{asyncio::AsyncioWaker, trio::TrioWaker}, + exceptions::PyRuntimeError, + sync::GILOnceCell, + PyAny, PyErr, PyObject, PyResult, Python, +}; + +fn current_async_library(py: Python<'_>) -> PyResult<&PyAny> { + static CURRENT_ASYNC_LIBRARY: GILOnceCell = GILOnceCell::new(); + let import = || -> PyResult<_> { + let module = py.import("sniffio")?; + Ok(module.getattr("current_async_library")?.into()) + }; + CURRENT_ASYNC_LIBRARY + .get_or_try_init(py, import)? + .as_ref(py) + .call0() +} + +fn unsupported(runtime: &str) -> PyErr { + PyRuntimeError::new_err(format!("unsupported runtime {rt}", rt = runtime)) +} + +/// Sniffio/anyio-compatible coroutine waker. +/// +/// Polling a Rust future calls `sniffio.current_async_library` to select the appropriate +/// implementation, either asyncio or trio. +pub(super) enum AnyioWaker { + /// [`AsyncioWaker`] + Asyncio(AsyncioWaker), + /// [`TrioWaker`] + Trio(TrioWaker), +} + +impl AnyioWaker { + pub(super) fn new(py: Python<'_>) -> PyResult { + let sniffed = current_async_library(py)?; + match sniffed.extract()? { + "asyncio" => Ok(Self::Asyncio(AsyncioWaker::new(py)?)), + "trio" => Ok(Self::Trio(TrioWaker::new(py)?)), + rt => Err(unsupported(rt)), + } + } + + pub(super) fn yield_(&self, py: Python<'_>) -> PyResult { + match self { + AnyioWaker::Asyncio(w) => w.yield_(py), + AnyioWaker::Trio(w) => w.yield_(py), + } + } + + pub(super) fn yield_waken(py: Python<'_>) -> PyResult { + let sniffed = current_async_library(py)?; + match sniffed.extract()? { + "asyncio" => AsyncioWaker::yield_waken(py), + "trio" => TrioWaker::yield_waken(py), + rt => Err(unsupported(rt)), + } + } + + pub(super) fn wake(&self, py: Python<'_>) -> PyResult<()> { + match self { + AnyioWaker::Asyncio(w) => w.wake(py), + AnyioWaker::Trio(w) => w.wake(py), + } + } +} diff --git a/src/coroutine/cancel.rs b/src/coroutine/cancel.rs index 7828986c11e..8ccb2e88faa 100644 --- a/src/coroutine/cancel.rs +++ b/src/coroutine/cancel.rs @@ -1,9 +1,13 @@ -use crate::{PyAny, PyObject}; +use std::{ + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll, Waker}, +}; + use parking_lot::Mutex; -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; -use std::task::{Context, Poll, Waker}; + +use crate::{PyAny, PyObject}; #[derive(Debug, Default)] struct Inner { diff --git a/src/coroutine/trio.rs b/src/coroutine/trio.rs new file mode 100644 index 00000000000..4292cbc7143 --- /dev/null +++ b/src/coroutine/trio.rs @@ -0,0 +1,88 @@ +//! Coroutine implementation compatible with trio. +use pyo3_macros::pyfunction; + +use crate::{ + intern, + sync::GILOnceCell, + types::{PyCFunction, PyIterator}, + wrap_pyfunction, Py, PyAny, PyObject, PyResult, Python, +}; + +struct Trio { + cancel_shielded_checkpoint: PyObject, + current_task: PyObject, + current_trio_token: PyObject, + reschedule: PyObject, + succeeded: PyObject, + wait_task_rescheduled: PyObject, +} +impl Trio { + fn get(py: Python<'_>) -> PyResult<&Self> { + static TRIO: GILOnceCell = GILOnceCell::new(); + TRIO.get_or_try_init(py, || { + let module = py.import("trio.lowlevel")?; + Ok(Self { + cancel_shielded_checkpoint: module.getattr("cancel_shielded_checkpoint")?.into(), + current_task: module.getattr("current_task")?.into(), + current_trio_token: module.getattr("current_trio_token")?.into(), + reschedule: module.getattr("reschedule")?.into(), + succeeded: module.getattr("Abort")?.getattr("SUCCEEDED")?.into(), + wait_task_rescheduled: module.getattr("wait_task_rescheduled")?.into(), + }) + }) + } +} + +fn yield_from(coro_func: &PyAny) -> PyResult { + PyIterator::from_object(coro_func.call_method0("__await__")?)? + .next() + .expect("cancel_shielded_checkpoint didn't yield") + .map(Into::into) +} + +/// Asyncio-compatible coroutine waker. +/// +/// Polling a Rust future yields `trio.lowlevel.wait_task_rescheduled()`, while `Waker::wake` +/// reschedule the current task. +pub(super) struct TrioWaker { + task: PyObject, + token: PyObject, +} + +impl TrioWaker { + pub(super) fn new(py: Python<'_>) -> PyResult { + let trio = Trio::get(py)?; + let task = trio.current_task.call0(py)?; + let token = trio.current_trio_token.call0(py)?; + Ok(Self { task, token }) + } + + pub(super) fn yield_(&self, py: Python<'_>) -> PyResult { + static ABORT_FUNC: GILOnceCell> = GILOnceCell::new(); + let abort_func = + ABORT_FUNC.get_or_try_init(py, || wrap_pyfunction!(abort_func, py).map(Into::into))?; + let wait_task_rescheduled = Trio::get(py)? + .wait_task_rescheduled + .call1(py, (abort_func,))?; + yield_from(wait_task_rescheduled.as_ref(py)) + } + + pub(super) fn yield_waken(py: Python<'_>) -> PyResult { + let checkpoint = Trio::get(py)?.cancel_shielded_checkpoint.call0(py)?; + yield_from(checkpoint.as_ref(py)) + } + + pub(super) fn wake(&self, py: Python<'_>) -> PyResult<()> { + self.token.call_method1( + py, + intern!(py, "run_sync_soon"), + (&Trio::get(py)?.reschedule, &self.task), + )?; + Ok(()) + } +} + +#[pyfunction(crate = "crate")] +fn abort_func(py: Python<'_>, _arg: PyObject) -> PyResult { + Ok(Trio::get(py)?.succeeded.clone()) +} diff --git a/src/coroutine/waker.rs b/src/coroutine/waker.rs index 64f63e756b1..66180434ec5 100644 --- a/src/coroutine/waker.rs +++ b/src/coroutine/waker.rs @@ -5,10 +5,18 @@ use std::{ }; use crate::{ - coroutine::asyncio::AsyncioWaker, exceptions::PyStopIteration, intern, pyclass::IterNextOutput, - sync::GILOnceCell, types::PyFuture, Py, PyNativeType, PyObject, PyResult, Python, + exceptions::PyStopIteration, intern, pyclass::IterNextOutput, sync::GILOnceCell, + types::PyFuture, Py, PyNativeType, PyObject, PyResult, Python, }; +cfg_if::cfg_if! { + if #[cfg(feature = "anyio")] { + type WakerImpl = crate::coroutine::anyio::AnyioWaker; + } else { + type WakerImpl = crate::coroutine::asyncio::AsyncioWaker; + } +} + const MIXED_AWAITABLE_AND_FUTURE_ERROR: &str = "Python awaitable mixed with Rust future"; pub(crate) enum FutureOrPoll { @@ -21,7 +29,7 @@ thread_local! { } enum State { - Pending(AsyncioWaker), + Pending(WakerImpl), Waken, Delegated(PyObject), } @@ -49,10 +57,10 @@ impl CoroutineWaker { } pub(super) fn yield_(&self, py: Python<'_>) -> PyResult { - let init = || PyResult::Ok(State::Pending(AsyncioWaker::new(py)?)); + let init = || PyResult::Ok(State::Pending(WakerImpl::new(py)?)); let state = self.state.get_or_try_init(py, init)?; match state { - State::Waken => AsyncioWaker::yield_waken(py), + State::Waken => WakerImpl::yield_waken(py), State::Delegated(obj) => Ok(obj.clone_ref(py)), State::Pending(waker) => waker.yield_(py), }