Skip to content

Commit

Permalink
Another attempt at abstracting Instant::now (#381)
Browse files Browse the repository at this point in the history
Currently, the timer uses a `Now` trait to abstract the source of time.
This allows time to be mocked out. However, the current implementation
has a number of limitations as represented by #288 and #296.

The main issues are that `Now` requires `&mut self` which prevents a
value from being easily used in a concurrent environment. Also, when
wanting to write code that is abstract over the source of time, generics
get out of hand.

This patch provides an alternate solution. A new type, `Clock` is
provided which defaults to `Instant::now` as the source of time, but
allows configuring the actual source using a new iteration of the `Now`
trait. This time, `Now` is `Send + Sync + 'static`. Internally, `Clock`
stores the now value in an `Arc<Now>` value, which introduces dynamism
and allows `Clock` values to be cloned and be `Sync`.

Also, the current clock can be set for the current execution context
using the `with_default` pattern.

Because using the `Instant::now` will be the most common case by far, it
is special cased in order to avoid the need to allocate an `Arc` and use
dynamic dispatch.
  • Loading branch information
carllerche authored Jun 6, 2018
1 parent 9013ed9 commit db620b4
Show file tree
Hide file tree
Showing 15 changed files with 468 additions and 53 deletions.
15 changes: 15 additions & 0 deletions src/clock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
//! A configurable source of time.
//!
//! This module provides the [`now`][n] function, which returns an `Instant`
//! representing "now". The source of time used by this function is configurable
//! (via the [`tokio-timer`] crate) and allows mocking out the source of time in
//! tests or performing caching operations to reduce the number of syscalls.
//!
//! Note that, because the source of time is configurable, it is possible to
//! observe non-monotonic behavior when calling [`now`] from different
//! executors.
//!
//! [n]: fn.now.html
//! [`tokio-timer`]: https://docs.rs/tokio-timer/0.2/tokio_timer/clock/index.html
pub use tokio_timer::clock::now;
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ extern crate tokio_udp;
#[cfg(feature = "unstable-futures")]
extern crate futures2;

pub mod clock;
pub mod executor;
pub mod fs;
pub mod net;
Expand Down
29 changes: 24 additions & 5 deletions src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::io;
use tokio_reactor;
use tokio_threadpool::Builder as ThreadPoolBuilder;
use tokio_threadpool::park::DefaultPark;
use tokio_timer::clock::{self, Clock};
use tokio_timer::timer::{self, Timer};

/// Builds Tokio Runtime with custom configuration values.
Expand Down Expand Up @@ -48,6 +49,9 @@ use tokio_timer::timer::{self, Timer};
pub struct Builder {
/// Thread pool specific builder
threadpool_builder: ThreadPoolBuilder,

/// The clock to use
clock: Clock,
}

impl Builder {
Expand All @@ -59,7 +63,16 @@ impl Builder {
let mut threadpool_builder = ThreadPoolBuilder::new();
threadpool_builder.name_prefix("tokio-runtime-worker-");

Builder { threadpool_builder }
Builder {
threadpool_builder,
clock: Clock::new(),
}
}

/// Set the `Clock` instance that will be used by the runtime.
pub fn clock(&mut self, clock: Clock) -> &mut Self {
self.clock = clock;
self
}

/// Set builder to set up the thread pool instance.
Expand Down Expand Up @@ -87,6 +100,10 @@ impl Builder {
use std::collections::HashMap;
use std::sync::{Arc, Mutex};

// Get a handle to the clock for the runtime.
let clock1 = self.clock.clone();
let clock2 = clock1.clone();

let timers = Arc::new(Mutex::new(HashMap::<_, timer::Handle>::new()));
let t1 = timers.clone();

Expand All @@ -103,14 +120,16 @@ impl Builder {
.clone();

tokio_reactor::with_default(&reactor_handle, enter, |enter| {
timer::with_default(&timer_handle, enter, |_| {
w.run();
});
clock::with_default(&clock1, enter, |enter| {
timer::with_default(&timer_handle, enter, |_| {
w.run();
});
})
});
})
.custom_park(move |worker_id| {
// Create a new timer
let timer = Timer::new(DefaultPark::new());
let timer = Timer::new_with_now(DefaultPark::new(), clock2.clone());

timers.lock().unwrap()
.insert(worker_id.clone(), timer.handle());
Expand Down
88 changes: 88 additions & 0 deletions src/runtime/current_thread/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use executor::current_thread::CurrentThread;
use runtime::current_thread::Runtime;

use tokio_reactor::Reactor;
use tokio_timer::clock::Clock;
use tokio_timer::timer::Timer;

use std::io;

/// Builds a Single-threaded runtime with custom configuration values.
///
/// Methods can be chained in order to set the configuration values. The
/// Runtime is constructed by calling [`build`].
///
/// New instances of `Builder` are obtained via [`Builder::new`].
///
/// See function level documentation for details on the various configuration
/// settings.
///
/// [`build`]: #method.build
/// [`Builder::new`]: #method.new
///
/// # Examples
///
/// ```
/// extern crate tokio;
/// extern crate tokio_timer;
///
/// use tokio::runtime::current_thread::Builder;
/// use tokio_timer::clock::Clock;
///
/// # pub fn main() {
/// // build Runtime
/// let runtime = Builder::new()
/// .clock(Clock::new())
/// .build();
/// // ... call runtime.run(...)
/// # let _ = runtime;
/// # }
/// ```
#[derive(Debug)]
pub struct Builder {
/// The clock to use
clock: Clock,
}

impl Builder {
/// Returns a new runtime builder initialized with default configuration
/// values.
///
/// Configuration methods can be chained on the return value.
pub fn new() -> Builder {
Builder {
clock: Clock::new(),
}
}

/// Set the `Clock` instance that will be used by the runtime.
pub fn clock(&mut self, clock: Clock) -> &mut Self {
self.clock = clock;
self
}

/// Create the configured `Runtime`.
pub fn build(&mut self) -> io::Result<Runtime> {
// We need a reactor to receive events about IO objects from kernel
let reactor = Reactor::new()?;
let reactor_handle = reactor.handle();

// Place a timer wheel on top of the reactor. If there are no timeouts to fire, it'll let the
// reactor pick up some new external events.
let timer = Timer::new_with_now(reactor, self.clock.clone());
let timer_handle = timer.handle();

// And now put a single-threaded executor on top of the timer. When there are no futures ready
// to do something, it'll let the timer or the reactor to generate some new stimuli for the
// futures to continue in their life.
let executor = CurrentThread::new_with_park(timer);

let runtime = Runtime::new2(
reactor_handle,
timer_handle,
self.clock.clone(),
executor);

Ok(runtime)
}
}
2 changes: 2 additions & 0 deletions src/runtime/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
//! [concurrent-rt]: ../struct.Runtime.html
//! [chan]: https://docs.rs/futures/0.1/futures/sync/mpsc/fn.channel.html
mod builder;
mod runtime;

pub use self::builder::Builder;
pub use self::runtime::{Runtime, Handle};
64 changes: 37 additions & 27 deletions src/runtime/current_thread/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use executor::current_thread::{self, CurrentThread};
use executor::current_thread::Handle as ExecutorHandle;
use runtime::current_thread::Builder;

use tokio_reactor::{self, Reactor};
use tokio_timer::clock::{self, Clock};
use tokio_timer::timer::{self, Timer};
use tokio_executor;

Expand All @@ -19,6 +21,7 @@ use std::io;
pub struct Runtime {
reactor_handle: tokio_reactor::Handle,
timer_handle: timer::Handle,
clock: Clock,
executor: CurrentThread<Timer<Reactor>>,
}

Expand Down Expand Up @@ -48,22 +51,21 @@ pub struct RunError {
impl Runtime {
/// Returns a new runtime initialized with default configuration values.
pub fn new() -> io::Result<Runtime> {
// We need a reactor to receive events about IO objects from kernel
let reactor = Reactor::new()?;
let reactor_handle = reactor.handle();

// Place a timer wheel on top of the reactor. If there are no timeouts to fire, it'll let the
// reactor pick up some new external events.
let timer = Timer::new(reactor);
let timer_handle = timer.handle();

// And now put a single-threaded executor on top of the timer. When there are no futures ready
// to do something, it'll let the timer or the reactor to generate some new stimuli for the
// futures to continue in their life.
let executor = CurrentThread::new_with_park(timer);

let runtime = Runtime { reactor_handle, timer_handle, executor };
Ok(runtime)
Builder::new().build()
}

pub(super) fn new2(
reactor_handle: tokio_reactor::Handle,
timer_handle: timer::Handle,
clock: Clock,
executor: CurrentThread<Timer<Reactor>>) -> Runtime
{
Runtime {
reactor_handle,
timer_handle,
clock,
executor,
}
}

/// Get a new handle to spawn futures on the single-threaded Tokio runtime
Expand Down Expand Up @@ -150,24 +152,32 @@ impl Runtime {
fn enter<F, R>(&mut self, f: F) -> R
where F: FnOnce(&mut current_thread::Entered<Timer<Reactor>>) -> R
{
let Runtime { ref reactor_handle, ref timer_handle, ref mut executor, .. } = *self;
let Runtime {
ref reactor_handle,
ref timer_handle,
ref clock,
ref mut executor,
..
} = *self;

// Binds an executor to this thread
let mut enter = tokio_executor::enter().expect("Multiple executors at once");

// This will set the default handle and timer to use inside the closure
// and run the future.
tokio_reactor::with_default(&reactor_handle, &mut enter, |enter| {
timer::with_default(&timer_handle, enter, |enter| {
// The TaskExecutor is a fake executor that looks into the
// current single-threaded executor when used. This is a trick,
// because we need two mutable references to the executor (one
// to run the provided future, another to install as the default
// one). We use the fake one here as the default one.
let mut default_executor = current_thread::TaskExecutor::current();
tokio_executor::with_default(&mut default_executor, enter, |enter| {
let mut executor = executor.enter(enter);
f(&mut executor)
clock::with_default(clock, enter, |enter| {
timer::with_default(&timer_handle, enter, |enter| {
// The TaskExecutor is a fake executor that looks into the
// current single-threaded executor when used. This is a trick,
// because we need two mutable references to the executor (one
// to run the provided future, another to install as the default
// one). We use the fake one here as the default one.
let mut default_executor = current_thread::TaskExecutor::current();
tokio_executor::with_default(&mut default_executor, enter, |enter| {
let mut executor = executor.enter(enter);
f(&mut executor)
})
})
})
})
Expand Down
69 changes: 69 additions & 0 deletions tests/clock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
extern crate futures;
extern crate tokio;
extern crate tokio_timer;
extern crate env_logger;

use tokio::prelude::*;
use tokio::runtime::{self, current_thread};
use tokio::timer::*;
use tokio_timer::clock::Clock;

use std::sync::mpsc;
use std::time::{Duration, Instant};

struct MockNow(Instant);

impl tokio_timer::clock::Now for MockNow {
fn now(&self) -> Instant {
self.0
}
}

#[test]
fn clock_and_timer_concurrent() {
let _ = env_logger::init();

let when = Instant::now() + Duration::from_millis(5_000);
let clock = Clock::new_with_now(MockNow(when));

let mut rt = runtime::Builder::new()
.clock(clock)
.build()
.unwrap();

let (tx, rx) = mpsc::channel();

rt.spawn({
Delay::new(when)
.map_err(|e| panic!("unexpected error; err={:?}", e))
.and_then(move |_| {
assert!(Instant::now() < when);
tx.send(()).unwrap();
Ok(())
})
});

rx.recv().unwrap();
}

#[test]
fn clock_and_timer_single_threaded() {
let _ = env_logger::init();

let when = Instant::now() + Duration::from_millis(5_000);
let clock = Clock::new_with_now(MockNow(when));

let mut rt = current_thread::Builder::new()
.clock(clock)
.build()
.unwrap();

rt.block_on({
Delay::new(when)
.map_err(|e| panic!("unexpected error; err={:?}", e))
.and_then(move |_| {
assert!(Instant::now() < when);
Ok(())
})
}).unwrap();
}
Loading

0 comments on commit db620b4

Please sign in to comment.