Skip to content

Commit

Permalink
Implement a Send Handle for the single-threaded Runtime (#340)
Browse files Browse the repository at this point in the history
Implement a Send'able Handle for the single-threaded `Runtime` and
`CurrentThread` executor to spawn new tasks from other threads.
  • Loading branch information
sdroege authored and carllerche committed Jun 5, 2018
1 parent c07a7b2 commit 0d41ba7
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 17 deletions.
80 changes: 77 additions & 3 deletions src/executor/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ use std::cell::Cell;
use std::marker::PhantomData;
use std::rc::Rc;
use std::time::{Duration, Instant};
use std::sync::mpsc;

#[cfg(feature = "unstable-futures")]
use futures2;
Expand All @@ -132,6 +133,12 @@ pub struct CurrentThread<P: Park = ParkThread> {

/// Thread park handle
park: P,

/// Handle for spawning new futures from other threads
spawn_handle: Handle,

/// Receiver for futures spawned from other threads
spawn_receiver: mpsc::Receiver<Box<Future<Item = (), Error = ()> + Send + 'static>>,
}

/// Executes futures on the current thread.
Expand Down Expand Up @@ -304,10 +311,17 @@ impl<P: Park> CurrentThread<P> {
pub fn new_with_park(park: P) -> Self {
let unpark = park.unpark();

let (spawn_sender, spawn_receiver) = mpsc::channel();

let scheduler = Scheduler::new(unpark);
let notify = scheduler.notify();

CurrentThread {
scheduler: Scheduler::new(unpark),
scheduler: scheduler,
num_futures: 0,
park,
spawn_handle: Handle { sender: spawn_sender, notify: notify },
spawn_receiver: spawn_receiver,
}
}

Expand Down Expand Up @@ -399,6 +413,14 @@ impl<P: Park> CurrentThread<P> {
num_futures: &mut self.num_futures,
}
}

/// Get a new handle to spawn futures on the executor
///
/// Different to the executor itself, the handle can be sent to different
/// threads and can be used to spawn futures on the executor.
pub fn handle(&self) -> Handle {
self.spawn_handle.clone()
}
}

impl tokio_executor::Executor for CurrentThread {
Expand Down Expand Up @@ -569,9 +591,26 @@ impl<'a, P: Park> Entered<'a, P> {

/// Returns `true` if any futures were processed
fn tick(&mut self) -> bool {
self.executor.scheduler.tick(
// Spawn any futures that were spawned from other threads by manually
// looping over the receiver stream

// FIXME: Slightly ugly but needed to make the borrow checker happy
let (mut borrow, spawn_receiver) = (
Borrow {
scheduler: &mut self.executor.scheduler,
num_futures: &mut self.executor.num_futures,
},
&mut self.executor.spawn_receiver,
);

while let Ok(future) = spawn_receiver.try_recv() {
borrow.spawn_local(future);
}

// After any pending futures were scheduled, do the actual tick
borrow.scheduler.tick(
&mut *self.enter,
&mut self.executor.num_futures)
borrow.num_futures)
}
}

Expand All @@ -584,6 +623,41 @@ impl<'a, P: Park> fmt::Debug for Entered<'a, P> {
}
}

// ===== impl Handle =====

/// Handle to spawn a future on the corresponding `CurrentThread` instance
#[derive(Clone)]
pub struct Handle {
sender: mpsc::Sender<Box<Future<Item = (), Error = ()> + Send + 'static>>,
notify: executor::NotifyHandle,
}

// Manual implementation because the Sender does not implement Debug
impl fmt::Debug for Handle {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Handle")
.finish()
}
}

impl Handle {
/// Spawn a future onto the `CurrentThread` instance corresponding to this handle
///
/// # Panics
///
/// This function panics if the spawn fails. Failure occurs if the `CurrentThread`
/// instance of the `Handle` does not exist anymore.
pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError>
where F: Future<Item = (), Error = ()> + Send + 'static {
self.sender.send(Box::new(future))
.expect("CurrentThread does not exist anymore");
// use 0 for the id, CurrentThread does not make use of it
self.notify.notify(0);

Ok(())
}
}

// ===== impl TaskExecutor =====

#[deprecated(since = "0.1.2", note = "use TaskExecutor::current instead")]
Expand Down
22 changes: 9 additions & 13 deletions src/runtime/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
//!
//! # Spawning from other threads
//!
//! By default, [`current_thread::Runtime`][rt] does not provide a way to spawn
//! tasks from other threads. However, this can be accomplished by using a
//! [`mpsc::channel`][chan]. To do so, create a channel to send the task, then
//! spawn a task on [`current_thread::Runtime`][rt] that consumes the channel
//! messages and spawns new tasks for them.
//! While [`current_thread::Runtime`][rt] does not implement `Send` and cannot
//! safely be moved to other threads, it provides a `Handle` that can be sent
//! to other threads and allows to spawn new tasks from there.
//!
//! For example:
//!
Expand All @@ -30,17 +28,15 @@
//! # extern crate futures;
//! use tokio::runtime::current_thread::Runtime;
//! use tokio::prelude::*;
//! use futures::sync::mpsc;
//! use std::thread;
//!
//! # fn main() {
//! let mut runtime = Runtime::new().unwrap();
//! let (tx, rx) = mpsc::channel(128);
//! # tx.send(future::ok(()));
//! let handle = runtime.handle();
//!
//! runtime.spawn(rx.for_each(|task| {
//! tokio::spawn(task);
//! Ok(())
//! }).map_err(|e| panic!("channel error")));
//! thread::spawn(move || {
//! handle.spawn(future::ok(()));
//! }).join().unwrap();
//!
//! # /*
//! runtime.run().unwrap();
Expand Down Expand Up @@ -69,4 +65,4 @@
mod runtime;

pub use self::runtime::Runtime;
pub use self::runtime::{Runtime, Handle};
28 changes: 27 additions & 1 deletion src/runtime/current_thread/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use executor::current_thread::{self, CurrentThread};
use executor::current_thread::Handle as ExecutorHandle;

use tokio_reactor::{self, Reactor};
use tokio_timer::timer::{self, Timer};
Expand All @@ -21,6 +22,23 @@ pub struct Runtime {
executor: CurrentThread<Timer<Reactor>>,
}

/// Handle to spawn a future on the corresponding `CurrentThread` runtime instance
#[derive(Debug, Clone)]
pub struct Handle(ExecutorHandle);

impl Handle {
/// Spawn a future onto the `CurrentThread` runtime instance corresponding to this handle
///
/// # Panics
///
/// This function panics if the spawn fails. Failure occurs if the `CurrentThread`
/// instance of the `Handle` does not exist anymore.
pub fn spawn<F>(&self, future: F) -> Result<(), tokio_executor::SpawnError>
where F: Future<Item = (), Error = ()> + Send + 'static {
self.0.spawn(future)
}
}

/// Error returned by the `run` function.
#[derive(Debug)]
pub struct RunError {
Expand Down Expand Up @@ -48,6 +66,14 @@ impl Runtime {
Ok(runtime)
}

/// Get a new handle to spawn futures on the single-threaded Tokio runtime
///
/// Different to the runtime itself, the handle can be sent to different
/// threads.
pub fn handle(&self) -> Handle {
Handle(self.executor.handle().clone())
}

/// Spawn a future onto the single-threaded Tokio runtime.
///
/// See [module level][mod] documentation for more details.
Expand Down Expand Up @@ -124,7 +150,7 @@ impl Runtime {
fn enter<F, R>(&mut self, f: F) -> R
where F: FnOnce(&mut current_thread::Entered<Timer<Reactor>>) -> R
{
let Runtime { ref reactor_handle, ref timer_handle, ref mut executor } = *self;
let Runtime { ref reactor_handle, ref timer_handle, ref mut executor, .. } = *self;

// Binds an executor to this thread
let mut enter = tokio_executor::enter().expect("Multiple executors at once");
Expand Down
50 changes: 50 additions & 0 deletions tests/current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,56 @@ fn turn_fair() {
assert!(!res.has_polled());
}

#[test]
fn spawn_from_other_thread() {
let mut current_thread = CurrentThread::new();

let handle = current_thread.handle();
let (sender, receiver) = oneshot::channel::<()>();

thread::spawn(move || {
handle.spawn(lazy(move || {
sender.send(()).unwrap();
Ok(())
})).unwrap();
});

let _ = current_thread.block_on(receiver).unwrap();
}

#[test]
fn spawn_from_other_thread_unpark() {
use std::sync::mpsc::channel as mpsc_channel;

let mut current_thread = CurrentThread::new();

let handle = current_thread.handle();
let (sender_1, receiver_1) = oneshot::channel::<()>();
let (sender_2, receiver_2) = mpsc_channel::<()>();

thread::spawn(move || {
let _ = receiver_2.recv().unwrap();

handle.spawn(lazy(move || {
sender_1.send(()).unwrap();
Ok(())
})).unwrap();
});

// Ensure that unparking the executor works correctly. It will first
// check if there are new futures (there are none), then execute the
// lazy future below which will cause the future to be spawned from
// the other thread. Then the executor will park but should be woken
// up because *now* we have a new future to schedule
let _ = current_thread.block_on(
lazy(move || {
sender_2.send(()).unwrap();
Ok(())
})
.and_then(|_| receiver_1)
).unwrap();
}

fn ok() -> future::FutureResult<(), ()> {
future::ok(())
}

0 comments on commit 0d41ba7

Please sign in to comment.