-
Notifications
You must be signed in to change notification settings - Fork 341
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
executor independent spawning #912
Comments
It would actually be rather trivial, the only thing required would be to convince use core::future::Future;
use core::pin::Pin;
use parking_lot::{const_rwlock, RwLock};
type Spawner = fn(Pin<Box<dyn Future<Output = ()> + Send>>);
static SPAWNER: RwLock<Option<Spawner>> = const_rwlock(None);
pub fn register_spawner(spawner: Spawner) {
let mut lock = SPAWNER.write();
if lock.is_some() {
panic!("spawner already registered");
}
*lock = Some(spawner);
}
pub fn spawn<F>(future: F)
where
F: Future<Output = ()> + Send + 'static,
{
SPAWNER.read().expect("no spawner registered")(Box::pin(future));
}
#[cfg(test)]
mod tests {
use super::*;
#[async_std::test]
async fn test_async_std() {
fn async_std_spawn(future: Pin<Box<dyn Future<Output = ()> + Send>>) {
async_std::task::spawn(future);
}
register_spawner(async_std_spawn);
spawn(async {
println!("spawned on async-std");
});
}
#[tokio::test]
async fn test_tokio() {
fn tokio_spawn(future: Pin<Box<dyn Future<Output = ()> + Send>>) {
tokio::spawn(future);
}
register_spawner(tokio_spawn);
spawn(async {
println!("spawned on tokio");
});
}
} |
Here are the corresponding PR's |
fn async_std_spawn(future: Pin<Box<dyn Future<Output = ()> + Send>>) {
async_std::task::spawn(future);
} fn tokio_spawn(future: Pin<Box<dyn Future<Output = ()> + Send>>) {
tokio::spawn(future);
} Note that both are not enough. Both |
Also, this seems to only works on global runtime. (and probably only if it is one) |
I'm not sure this is a problem, the idea is that for tests you depend on an executor and use that. But I see that it needs to be failiable in that case like
This likely involves boxing the joinhandle. I guess it's not something you want to do by default? I can look into this this evening or tomorrow. |
This proposal is meant to address an important problem: crate authors need a way to spawn without having to take on Tokio and async-std as a dependency that may clash with their users' preferences. The way people are currently forced to choose or compromise introduces both technical friction and social uncertainty, and slows the adoption of Rust async. But this specific approach is not ideal. People sometimes use both Tokio and async-std in the same application (this is why If the two executors are so similar that a dynamically dispatched, non-deterministic In the long run, I think |
Actually I think there is precedent. Wouldn't it be similar to the allocator api? |
So based on the feedback received so far I made some adaptions
I'd like to note that it is not intended to change the public api of
I need some more help understanding in what cases registering more than one executor is a requirement. The suggested api |
@dvc94ch I think you're doing some interesting research here. The sequence I hope spawn integration in the stdlib will take is:
However the current signature of pub fn spawn<F>(future: F) where
F: Future<Output = ()> + Send + 'static; For integration in pub fn spawn<F, T>(future: F) -> JoinHandle<T> where
F: Future<Output = T> + Send + 'static,
T: Send + 'static, With an option to later add hooks The source of my experiment is here: yoshuawuyts/global-executor-prototype. I would love for someone to be able to finish this up as I think this is a required step in making async Rust more accessible to people. But I'm not sure how feasible the approach in the repo is, or how feasible it'll be to implement in userland. |
|
Reopening. This requires the user to add |
I think you realized it in your follow-up comment, but unfortunately the approach of I'm not sure to which degree this is possible -- but that seems like something we should explore further. |
Well, it will require some boxing. /// Executor agnostic task spawning
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
use once_cell::sync::OnceCell;
use futures::channel::oneshot;
use std::sync::{Arc, Mutex};
pub type BoxedFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
/// Trait abstracting over an executor.
pub trait Executor: Send + Sync {
/// Blocks until the future has finished.
fn block_on(&self, future: BoxedFuture);
/// Spawns an asynchronous task using the underlying executor.
fn spawn(&self, future: BoxedFuture) -> BoxedFuture;
/// Runs the provided closure on a thread, which can execute blocking tasks asynchronously.
fn spawn_blocking(&self, task: Box<dyn FnOnce() + Send>) -> BoxedFuture;
/// Spawns a future that doesn't implement [Send].
///
/// The spawned future will be executed on the same thread that called `spawn_local`.
///
/// [Send]: https://doc.rust-lang.org/std/marker/trait.Send.html
fn spawn_local(&self, future: Pin<Box<dyn Future<Output = ()> + 'static>>) -> BoxedFuture;
}
static EXECUTOR: OnceCell<Box<dyn Executor>> = OnceCell::new();
/// Error returned by `try_register_executor` indicating that an executor was registered.
#[derive(Debug)]
pub struct ExecutorRegistered;
impl core::fmt::Display for ExecutorRegistered {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "async_spawner: executor already registered")
}
}
impl std::error::Error for ExecutorRegistered {}
pub fn try_register_executor(executor: Box<dyn Executor>) -> Result<(), ExecutorRegistered> {
EXECUTOR.set(executor).map_err(|_| ExecutorRegistered)
}
pub fn register_executor(executor: Box<dyn Executor>) {
try_register_executor(executor).unwrap();
}
pub fn executor() -> &'static Box<dyn Executor> {
EXECUTOR.get().expect("async_spawner: no executor registered")
}
/// Blocks until the future has finished.
pub fn block_on<F, T>(future: F) -> T
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let lock = Arc::new(Mutex::new(None));
let lock2 = lock.clone();
executor().block_on(Box::pin(async move {
let res = future.await;
let mut lock = lock2.lock().unwrap();
*lock = Some(res);
}));
let mut res = lock.lock().unwrap();
res.take().unwrap()
}
pub struct JoinHandle<T> {
handle: BoxedFuture,
rx: oneshot::Receiver<T>,
}
impl<T> Future for JoinHandle<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if let Poll::Ready(()) = Pin::new(&mut self.handle).poll(cx) {
if let Poll::Ready(Ok(res)) = Pin::new(&mut self.rx).poll(cx) {
Poll::Ready(res)
} else {
panic!("task paniced");
}
} else {
Poll::Pending
}
}
}
/// Spawns an asynchronous task using the underlying executor.
pub fn spawn<F, T>(future: F) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let (tx, rx) = oneshot::channel();
let handle = executor().spawn(Box::pin(async move {
let res = future.await;
tx.send(res).ok();
}));
JoinHandle { handle, rx }
}
/// Runs the provided closure on a thread, which can execute blocking tasks asynchronously.
pub fn spawn_blocking<F, T>(task: F) -> JoinHandle<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let (tx, rx) = oneshot::channel();
let handle = executor().spawn_blocking(Box::new(move || {
let res = task();
tx.send(res).ok();
}));
JoinHandle { handle, rx }
}
/// Spawns a future that doesn't implement [Send].
///
/// The spawned future will be executed on the same thread that called `spawn_local`.
///
/// [Send]: https://doc.rust-lang.org/std/marker/trait.Send.html
pub fn spawn_local<F, T>(future: F) -> JoinHandle<T>
where
F: Future<Output = T> + 'static,
T: Send + 'static,
{
let (tx, rx) = oneshot::channel();
let handle = executor().spawn_local(Box::pin(async move {
let res = future.await;
tx.send(res).ok();
}));
JoinHandle { handle, rx }
} |
Released as async-spawner 2.0.0 with improvements to docs and tests, @yoshuawuyts does this meet the bar for inclusion? If yes I can update the async-attributes PR. https://docs.rs/async-spawner/2.0.0/async_spawner/ |
Having the It's possible that executors could avoid this by taking advantage of the known size of a Another way around this would be to make the client do all the allocation, with the executor providing a |
Actually it would require at least three since we're also boxing the
So what would the api for that look like? I'm not very familiar with the internals of executors. |
I am not sure, honestly. It just seemed to me that having a fixed size for the futures was a 'handhold' on the problem that could be useful. I think the only thing I have to contribute is that we need to be conscious of how changes to the interface affect the allocation count. |
I think it's feasible to reduce the two allocations (oneshot and joinhandle) to one, I'll try that tomorrow. However I expect your suggestion will require exposing some executor internals. It would be great if we could reduce it to two allocations. |
@jimblandy using a bump allocator was a good solution, never used one before. Managed to get all except /// Trait abstracting over an executor.
pub trait Executor: Send + Sync {
/// Blocks until the future has finished.
fn block_on(&self, future: Pin<&mut (dyn Future<Output = ()> + Send + 'static)>);
/// Spawns an asynchronous task using the underlying executor.
fn spawn(&self, bump: &Bump, future: BumpFuture) -> BumpFuture;
/// Runs the provided closure on a thread, which can execute blocking tasks asynchronously.
fn spawn_blocking(&self, bump: &Bump, task: Box<dyn FnOnce() + Send>) -> BumpFuture;
/// Spawns a future that doesn't implement [Send].
///
/// The spawned future will be executed on the same thread that called `spawn_local`.
///
/// [Send]: https://doc.rust-lang.org/std/marker/trait.Send.html
fn spawn_local(
&self,
bump: &Bump,
future: Pin<BumpBox<'static, dyn Future<Output = ()>>>,
) -> BumpFuture;
}
/// Executor agnostic join handle.
pub struct JoinHandle<T: 'static> {
handle: BumpFuture,
res: BumpBox<'static, Mutex<Option<T>>>,
#[allow(unused)]
bump: Bump,
}
impl<T> Future for JoinHandle<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
if let Poll::Ready(()) = Pin::new(&mut self.handle).poll(cx) {
let mut res = self.res.lock().unwrap();
Poll::Ready(res.take().unwrap())
} else {
Poll::Pending
}
}
}
/// Spawns an asynchronous task using the underlying executor.
pub fn spawn<F, T>(future: F) -> JoinHandle<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let bump = Bump::new();
let bump_ref = unsafe { &*(&bump as *const _) };
let res = BumpBox::new_in(Mutex::new(None), bump_ref);
let res_ref: &'static Mutex<Option<T>> = unsafe { &*(&*res as *const _) };
let future = BumpBox::new_in(
async move {
let res = future.await;
let mut cell = res_ref.lock().unwrap();
*cell = Some(res);
},
&bump,
);
let handle = executor().spawn(&bump, coerce_bump_box_pin!(future));
JoinHandle { bump, handle, res }
} The one thing missing would be to figure out the size before hand and use |
Ups, that will only work if the join handle doesn't get dropped. |
Yeah, bump allocators are only simple because they impose restrictions on the order in which things can be freed. I don't think we know anything about the order in which these tasks will exit, or when the join handles will be dropped. I think you want an arena allocator instead. The Is there a way to avoid baking the choice of allocator into the (I'm sure I'm coming across as negative, but that's not my intent—I'd like to see this succeed. There are just a lot of design constraints to take into account.) |
I'm completely fine with four allocations, this is really exploding the scope. It was intended to be a simple solution. If you're planning on spawning hundreds of tasks per second, you should consider a different one. |
Actually it's impossible to do it in one allocation. It requires at least two. Even with the bump allocation scheme mentioned earlier, it's impossible to determine the size before hand as when allocating something like: struct TaskAlloc {
layout: Layout,
ptr: *const AtomicUsize,
task: *const (),
output: *const (),
join_handle: *const (),
} task needs to be the equivalent of The only way to do it so that it's equally efficient would be for |
It looks like originally the
Context
was intended to have an executor attached to it so that you can spawn tasks in an executor independent way. Withasync-io
we no longer need to rely on a specific runtime and wrapping the std types, which is a hughe gain for writing async code that works everywhere. However it is very useful sometimes to be able to spawn a background task. There doesn't seem to be a way to do this without either passing around a "spawner" or deciding on an executor. Is there likely going to be astd::task::spawn
that executors can hook in to?The text was updated successfully, but these errors were encountered: