From 0d41ba7a08e3e2224d1972b78f12e9f510bea433 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Wed, 6 Jun 2018 02:56:15 +0300 Subject: [PATCH] Implement a Send Handle for the single-threaded Runtime (#340) Implement a Send'able Handle for the single-threaded `Runtime` and `CurrentThread` executor to spawn new tasks from other threads. --- src/executor/current_thread/mod.rs | 80 ++++++++++++++++++++++++++- src/runtime/current_thread/mod.rs | 22 +++----- src/runtime/current_thread/runtime.rs | 28 +++++++++- tests/current_thread.rs | 50 +++++++++++++++++ 4 files changed, 163 insertions(+), 17 deletions(-) diff --git a/src/executor/current_thread/mod.rs b/src/executor/current_thread/mod.rs index 81c3b8a86d4..4b25d7eb3bd 100644 --- a/src/executor/current_thread/mod.rs +++ b/src/executor/current_thread/mod.rs @@ -118,6 +118,7 @@ use std::cell::Cell; use std::marker::PhantomData; use std::rc::Rc; use std::time::{Duration, Instant}; +use std::sync::mpsc; #[cfg(feature = "unstable-futures")] use futures2; @@ -132,6 +133,12 @@ pub struct CurrentThread { /// Thread park handle park: P, + + /// Handle for spawning new futures from other threads + spawn_handle: Handle, + + /// Receiver for futures spawned from other threads + spawn_receiver: mpsc::Receiver + Send + 'static>>, } /// Executes futures on the current thread. @@ -304,10 +311,17 @@ impl CurrentThread

{ pub fn new_with_park(park: P) -> Self { let unpark = park.unpark(); + let (spawn_sender, spawn_receiver) = mpsc::channel(); + + let scheduler = Scheduler::new(unpark); + let notify = scheduler.notify(); + CurrentThread { - scheduler: Scheduler::new(unpark), + scheduler: scheduler, num_futures: 0, park, + spawn_handle: Handle { sender: spawn_sender, notify: notify }, + spawn_receiver: spawn_receiver, } } @@ -399,6 +413,14 @@ impl CurrentThread

{ num_futures: &mut self.num_futures, } } + + /// Get a new handle to spawn futures on the executor + /// + /// Different to the executor itself, the handle can be sent to different + /// threads and can be used to spawn futures on the executor. + pub fn handle(&self) -> Handle { + self.spawn_handle.clone() + } } impl tokio_executor::Executor for CurrentThread { @@ -569,9 +591,26 @@ impl<'a, P: Park> Entered<'a, P> { /// Returns `true` if any futures were processed fn tick(&mut self) -> bool { - self.executor.scheduler.tick( + // Spawn any futures that were spawned from other threads by manually + // looping over the receiver stream + + // FIXME: Slightly ugly but needed to make the borrow checker happy + let (mut borrow, spawn_receiver) = ( + Borrow { + scheduler: &mut self.executor.scheduler, + num_futures: &mut self.executor.num_futures, + }, + &mut self.executor.spawn_receiver, + ); + + while let Ok(future) = spawn_receiver.try_recv() { + borrow.spawn_local(future); + } + + // After any pending futures were scheduled, do the actual tick + borrow.scheduler.tick( &mut *self.enter, - &mut self.executor.num_futures) + borrow.num_futures) } } @@ -584,6 +623,41 @@ impl<'a, P: Park> fmt::Debug for Entered<'a, P> { } } +// ===== impl Handle ===== + +/// Handle to spawn a future on the corresponding `CurrentThread` instance +#[derive(Clone)] +pub struct Handle { + sender: mpsc::Sender + Send + 'static>>, + notify: executor::NotifyHandle, +} + +// Manual implementation because the Sender does not implement Debug +impl fmt::Debug for Handle { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("Handle") + .finish() + } +} + +impl Handle { + /// Spawn a future onto the `CurrentThread` instance corresponding to this handle + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the `CurrentThread` + /// instance of the `Handle` does not exist anymore. + pub fn spawn(&self, future: F) -> Result<(), SpawnError> + where F: Future + Send + 'static { + self.sender.send(Box::new(future)) + .expect("CurrentThread does not exist anymore"); + // use 0 for the id, CurrentThread does not make use of it + self.notify.notify(0); + + Ok(()) + } +} + // ===== impl TaskExecutor ===== #[deprecated(since = "0.1.2", note = "use TaskExecutor::current instead")] diff --git a/src/runtime/current_thread/mod.rs b/src/runtime/current_thread/mod.rs index 191533eeeeb..90a90672c9a 100644 --- a/src/runtime/current_thread/mod.rs +++ b/src/runtime/current_thread/mod.rs @@ -17,11 +17,9 @@ //! //! # Spawning from other threads //! -//! By default, [`current_thread::Runtime`][rt] does not provide a way to spawn -//! tasks from other threads. However, this can be accomplished by using a -//! [`mpsc::channel`][chan]. To do so, create a channel to send the task, then -//! spawn a task on [`current_thread::Runtime`][rt] that consumes the channel -//! messages and spawns new tasks for them. +//! While [`current_thread::Runtime`][rt] does not implement `Send` and cannot +//! safely be moved to other threads, it provides a `Handle` that can be sent +//! to other threads and allows to spawn new tasks from there. //! //! For example: //! @@ -30,17 +28,15 @@ //! # extern crate futures; //! use tokio::runtime::current_thread::Runtime; //! use tokio::prelude::*; -//! use futures::sync::mpsc; +//! use std::thread; //! //! # fn main() { //! let mut runtime = Runtime::new().unwrap(); -//! let (tx, rx) = mpsc::channel(128); -//! # tx.send(future::ok(())); +//! let handle = runtime.handle(); //! -//! runtime.spawn(rx.for_each(|task| { -//! tokio::spawn(task); -//! Ok(()) -//! }).map_err(|e| panic!("channel error"))); +//! thread::spawn(move || { +//! handle.spawn(future::ok(())); +//! }).join().unwrap(); //! //! # /* //! runtime.run().unwrap(); @@ -69,4 +65,4 @@ mod runtime; -pub use self::runtime::Runtime; +pub use self::runtime::{Runtime, Handle}; diff --git a/src/runtime/current_thread/runtime.rs b/src/runtime/current_thread/runtime.rs index 0d0ff210645..3e31f10067c 100644 --- a/src/runtime/current_thread/runtime.rs +++ b/src/runtime/current_thread/runtime.rs @@ -1,4 +1,5 @@ use executor::current_thread::{self, CurrentThread}; +use executor::current_thread::Handle as ExecutorHandle; use tokio_reactor::{self, Reactor}; use tokio_timer::timer::{self, Timer}; @@ -21,6 +22,23 @@ pub struct Runtime { executor: CurrentThread>, } +/// Handle to spawn a future on the corresponding `CurrentThread` runtime instance +#[derive(Debug, Clone)] +pub struct Handle(ExecutorHandle); + +impl Handle { + /// Spawn a future onto the `CurrentThread` runtime instance corresponding to this handle + /// + /// # Panics + /// + /// This function panics if the spawn fails. Failure occurs if the `CurrentThread` + /// instance of the `Handle` does not exist anymore. + pub fn spawn(&self, future: F) -> Result<(), tokio_executor::SpawnError> + where F: Future + Send + 'static { + self.0.spawn(future) + } +} + /// Error returned by the `run` function. #[derive(Debug)] pub struct RunError { @@ -48,6 +66,14 @@ impl Runtime { Ok(runtime) } + /// Get a new handle to spawn futures on the single-threaded Tokio runtime + /// + /// Different to the runtime itself, the handle can be sent to different + /// threads. + pub fn handle(&self) -> Handle { + Handle(self.executor.handle().clone()) + } + /// Spawn a future onto the single-threaded Tokio runtime. /// /// See [module level][mod] documentation for more details. @@ -124,7 +150,7 @@ impl Runtime { fn enter(&mut self, f: F) -> R where F: FnOnce(&mut current_thread::Entered>) -> R { - let Runtime { ref reactor_handle, ref timer_handle, ref mut executor } = *self; + let Runtime { ref reactor_handle, ref timer_handle, ref mut executor, .. } = *self; // Binds an executor to this thread let mut enter = tokio_executor::enter().expect("Multiple executors at once"); diff --git a/tests/current_thread.rs b/tests/current_thread.rs index 1a811cc37fe..79e6785e938 100644 --- a/tests/current_thread.rs +++ b/tests/current_thread.rs @@ -567,6 +567,56 @@ fn turn_fair() { assert!(!res.has_polled()); } +#[test] +fn spawn_from_other_thread() { + let mut current_thread = CurrentThread::new(); + + let handle = current_thread.handle(); + let (sender, receiver) = oneshot::channel::<()>(); + + thread::spawn(move || { + handle.spawn(lazy(move || { + sender.send(()).unwrap(); + Ok(()) + })).unwrap(); + }); + + let _ = current_thread.block_on(receiver).unwrap(); +} + +#[test] +fn spawn_from_other_thread_unpark() { + use std::sync::mpsc::channel as mpsc_channel; + + let mut current_thread = CurrentThread::new(); + + let handle = current_thread.handle(); + let (sender_1, receiver_1) = oneshot::channel::<()>(); + let (sender_2, receiver_2) = mpsc_channel::<()>(); + + thread::spawn(move || { + let _ = receiver_2.recv().unwrap(); + + handle.spawn(lazy(move || { + sender_1.send(()).unwrap(); + Ok(()) + })).unwrap(); + }); + + // Ensure that unparking the executor works correctly. It will first + // check if there are new futures (there are none), then execute the + // lazy future below which will cause the future to be spawned from + // the other thread. Then the executor will park but should be woken + // up because *now* we have a new future to schedule + let _ = current_thread.block_on( + lazy(move || { + sender_2.send(()).unwrap(); + Ok(()) + }) + .and_then(|_| receiver_1) + ).unwrap(); +} + fn ok() -> future::FutureResult<(), ()> { future::ok(()) }