Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a Send'able Handle for the single-threaded Runtime to spawn… #340

Merged
merged 5 commits into from
Jun 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 77 additions & 3 deletions src/executor/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ use std::cell::Cell;
use std::marker::PhantomData;
use std::rc::Rc;
use std::time::{Duration, Instant};
use std::sync::mpsc;

#[cfg(feature = "unstable-futures")]
use futures2;
Expand All @@ -132,6 +133,12 @@ pub struct CurrentThread<P: Park = ParkThread> {

/// Thread park handle
park: P,

/// Handle for spawning new futures from other threads
spawn_handle: Handle,

/// Receiver for futures spawned from other threads
spawn_receiver: mpsc::Receiver<Box<Future<Item = (), Error = ()> + Send + 'static>>,
}

/// Executes futures on the current thread.
Expand Down Expand Up @@ -304,10 +311,17 @@ impl<P: Park> CurrentThread<P> {
pub fn new_with_park(park: P) -> Self {
let unpark = park.unpark();

let (spawn_sender, spawn_receiver) = mpsc::channel();

let scheduler = Scheduler::new(unpark);
let notify = scheduler.notify();

CurrentThread {
scheduler: Scheduler::new(unpark),
scheduler: scheduler,
num_futures: 0,
park,
spawn_handle: Handle { sender: spawn_sender, notify: notify },
spawn_receiver: spawn_receiver,
}
}

Expand Down Expand Up @@ -399,6 +413,14 @@ impl<P: Park> CurrentThread<P> {
num_futures: &mut self.num_futures,
}
}

/// Get a new handle to spawn futures on the executor
///
/// Different to the executor itself, the handle can be sent to different
/// threads and can be used to spawn futures on the executor.
pub fn handle(&self) -> Handle {
self.spawn_handle.clone()
}
}

impl tokio_executor::Executor for CurrentThread {
Expand Down Expand Up @@ -569,9 +591,26 @@ impl<'a, P: Park> Entered<'a, P> {

/// Returns `true` if any futures were processed
fn tick(&mut self) -> bool {
self.executor.scheduler.tick(
// Spawn any futures that were spawned from other threads by manually
// looping over the receiver stream

// FIXME: Slightly ugly but needed to make the borrow checker happy
let (mut borrow, spawn_receiver) = (
Borrow {
scheduler: &mut self.executor.scheduler,
num_futures: &mut self.executor.num_futures,
},
&mut self.executor.spawn_receiver,
);

while let Ok(future) = spawn_receiver.try_recv() {
borrow.spawn_local(future);
}

// After any pending futures were scheduled, do the actual tick
borrow.scheduler.tick(
&mut *self.enter,
&mut self.executor.num_futures)
borrow.num_futures)
}
}

Expand All @@ -584,6 +623,41 @@ impl<'a, P: Park> fmt::Debug for Entered<'a, P> {
}
}

// ===== impl Handle =====

/// Handle to spawn a future on the corresponding `CurrentThread` instance
#[derive(Clone)]
pub struct Handle {
sender: mpsc::Sender<Box<Future<Item = (), Error = ()> + Send + 'static>>,
notify: executor::NotifyHandle,
}

// Manual implementation because the Sender does not implement Debug
impl fmt::Debug for Handle {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Handle")
.finish()
}
}

impl Handle {
/// Spawn a future onto the `CurrentThread` instance corresponding to this handle
///
/// # Panics
///
/// This function panics if the spawn fails. Failure occurs if the `CurrentThread`
/// instance of the `Handle` does not exist anymore.
pub fn spawn<F>(&self, future: F) -> Result<(), SpawnError>
where F: Future<Item = (), Error = ()> + Send + 'static {
self.sender.send(Box::new(future))
.expect("CurrentThread does not exist anymore");
// use 0 for the id, CurrentThread does not make use of it
self.notify.notify(0);

Ok(())
}
}

// ===== impl TaskExecutor =====

#[deprecated(since = "0.1.2", note = "use TaskExecutor::current instead")]
Expand Down
22 changes: 9 additions & 13 deletions src/runtime/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@
//!
//! # Spawning from other threads
//!
//! By default, [`current_thread::Runtime`][rt] does not provide a way to spawn
//! tasks from other threads. However, this can be accomplished by using a
//! [`mpsc::channel`][chan]. To do so, create a channel to send the task, then
//! spawn a task on [`current_thread::Runtime`][rt] that consumes the channel
//! messages and spawns new tasks for them.
//! While [`current_thread::Runtime`][rt] does not implement `Send` and cannot
//! safely be moved to other threads, it provides a `Handle` that can be sent
//! to other threads and allows to spawn new tasks from there.
//!
//! For example:
//!
Expand All @@ -30,17 +28,15 @@
//! # extern crate futures;
//! use tokio::runtime::current_thread::Runtime;
//! use tokio::prelude::*;
//! use futures::sync::mpsc;
//! use std::thread;
//!
//! # fn main() {
//! let mut runtime = Runtime::new().unwrap();
//! let (tx, rx) = mpsc::channel(128);
//! # tx.send(future::ok(()));
//! let handle = runtime.handle();
//!
//! runtime.spawn(rx.for_each(|task| {
//! tokio::spawn(task);
//! Ok(())
//! }).map_err(|e| panic!("channel error")));
//! thread::spawn(move || {
//! handle.spawn(future::ok(()));
//! }).join().unwrap();
//!
//! # /*
//! runtime.run().unwrap();
Expand Down Expand Up @@ -69,4 +65,4 @@

mod runtime;

pub use self::runtime::Runtime;
pub use self::runtime::{Runtime, Handle};
28 changes: 27 additions & 1 deletion src/runtime/current_thread/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use executor::current_thread::{self, CurrentThread};
use executor::current_thread::Handle as ExecutorHandle;

use tokio_reactor::{self, Reactor};
use tokio_timer::timer::{self, Timer};
Expand All @@ -21,6 +22,23 @@ pub struct Runtime {
executor: CurrentThread<Timer<Reactor>>,
}

/// Handle to spawn a future on the corresponding `CurrentThread` runtime instance
#[derive(Debug, Clone)]
pub struct Handle(ExecutorHandle);

impl Handle {
/// Spawn a future onto the `CurrentThread` runtime instance corresponding to this handle
///
/// # Panics
///
/// This function panics if the spawn fails. Failure occurs if the `CurrentThread`
/// instance of the `Handle` does not exist anymore.
pub fn spawn<F>(&self, future: F) -> Result<(), tokio_executor::SpawnError>
where F: Future<Item = (), Error = ()> + Send + 'static {
self.0.spawn(future)
}
}

/// Error returned by the `run` function.
#[derive(Debug)]
pub struct RunError {
Expand Down Expand Up @@ -48,6 +66,14 @@ impl Runtime {
Ok(runtime)
}

/// Get a new handle to spawn futures on the single-threaded Tokio runtime
///
/// Different to the runtime itself, the handle can be sent to different
/// threads.
pub fn handle(&self) -> Handle {
Handle(self.executor.handle().clone())
}

/// Spawn a future onto the single-threaded Tokio runtime.
///
/// See [module level][mod] documentation for more details.
Expand Down Expand Up @@ -124,7 +150,7 @@ impl Runtime {
fn enter<F, R>(&mut self, f: F) -> R
where F: FnOnce(&mut current_thread::Entered<Timer<Reactor>>) -> R
{
let Runtime { ref reactor_handle, ref timer_handle, ref mut executor } = *self;
let Runtime { ref reactor_handle, ref timer_handle, ref mut executor, .. } = *self;

// Binds an executor to this thread
let mut enter = tokio_executor::enter().expect("Multiple executors at once");
Expand Down
50 changes: 50 additions & 0 deletions tests/current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,56 @@ fn turn_fair() {
assert!(!res.has_polled());
}

#[test]
fn spawn_from_other_thread() {
let mut current_thread = CurrentThread::new();

let handle = current_thread.handle();
let (sender, receiver) = oneshot::channel::<()>();

thread::spawn(move || {
handle.spawn(lazy(move || {
sender.send(()).unwrap();
Ok(())
})).unwrap();
});

let _ = current_thread.block_on(receiver).unwrap();
}

#[test]
fn spawn_from_other_thread_unpark() {
use std::sync::mpsc::channel as mpsc_channel;

let mut current_thread = CurrentThread::new();

let handle = current_thread.handle();
let (sender_1, receiver_1) = oneshot::channel::<()>();
let (sender_2, receiver_2) = mpsc_channel::<()>();

thread::spawn(move || {
let _ = receiver_2.recv().unwrap();

handle.spawn(lazy(move || {
sender_1.send(()).unwrap();
Ok(())
})).unwrap();
});

// Ensure that unparking the executor works correctly. It will first
// check if there are new futures (there are none), then execute the
// lazy future below which will cause the future to be spawned from
// the other thread. Then the executor will park but should be woken
// up because *now* we have a new future to schedule
let _ = current_thread.block_on(
lazy(move || {
sender_2.send(()).unwrap();
Ok(())
})
.and_then(|_| receiver_1)
).unwrap();
}

fn ok() -> future::FutureResult<(), ()> {
future::ok(())
}