Skip to content

Commit

Permalink
First try at fixing anp#215... 1 test fails
Browse files Browse the repository at this point in the history
  • Loading branch information
zetanumbers committed Nov 27, 2020
1 parent 74fb3e7 commit 521141b
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 8 deletions.
103 changes: 95 additions & 8 deletions src/runtime/runloop.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
mod chained_waker;
use chained_waker::*;

use super::{Revision, Runtime};
use futures::{
stream::{Stream, StreamExt},
task::LocalSpawn,
};
use std::{
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::{Context as FutContext, Poll, Waker},
};

Expand All @@ -16,16 +23,19 @@ use std::{
pub struct RunLoop<Root> {
inner: Runtime,
root: Root,
changed: Arc<AtomicBool>,
}

impl super::Runtime {
/// Returns this runtime bound with a specific root function it will run in
/// a loop.
pub fn looped<Root, Out>(self, root: Root) -> RunLoop<Root>
where
Root: FnMut() -> Out,
Root: FnMut() -> Out + Unpin,
{
RunLoop { inner: self, root }
let mut out = RunLoop { inner: self, root, changed: Arc::new(AtomicBool::new(true)) };
out.set_state_change_waker(out.inner.wk.clone());
out
}
}

Expand All @@ -35,7 +45,12 @@ where
{
/// Creates a new `Runtime` attached to the provided root function.
pub fn new(root: Root) -> RunLoop<Root> {
RunLoop { root, inner: Runtime::new() }
Runtime::new().looped(root)
}

/// Indicates change occured since last [`RunLoop::run_once()`] call
pub fn has_changed(&self) -> bool {
self.changed.load(Ordering::Relaxed)
}

/// Returns the runtime's current Revision.
Expand All @@ -46,7 +61,9 @@ where
/// Sets the [`std::task::Waker`] which will be called when state variables
/// change.
pub fn set_state_change_waker(&mut self, wk: Waker) {
self.inner.set_state_change_waker(wk);
let changed = self.changed.clone();
let waker = ChainedWaker::new(move || changed.store(true, Ordering::Relaxed), wk);
self.inner.set_state_change_waker(futures::task::waker(waker));
}

/// Sets the executor that will be used to spawn normal priority tasks.
Expand All @@ -57,7 +74,9 @@ where
/// Run the root function once within this runtime's context, returning the
/// result.
pub fn run_once(&mut self) -> Out {
self.inner.run_once(&mut self.root)
let out = self.inner.run_once(&mut self.root);
self.changed.store(false, Ordering::Relaxed);
out
}

/// Poll this runtime without exiting. Discards any value returned from the
Expand Down Expand Up @@ -85,8 +104,76 @@ where
/// `poll_next`, always returning `Poll::Ready(Some(...))`.
fn poll_next(self: Pin<&mut Self>, cx: &mut FutContext<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
this.inner.set_state_change_waker(cx.waker().clone());
let out = this.run_once();
Poll::Ready(Some((this.inner.revision, out)))
this.set_state_change_waker(cx.waker().clone());
if this.has_changed() {
let out = this.run_once();
Poll::Ready(Some((this.inner.revision, out)))
} else {
Poll::Pending
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn pending_without_change() {
use futures::{stream::StreamExt, task::LocalSpawnExt};
let mut rl = RunLoop::new(|| ());
rl.run_once();
let mut pool = futures::executor::LocalPool::new();
let spawner = pool.spawner();
spawner
.spawn_local(async move {
rl.next().await.unwrap();
unreachable!()
})
.unwrap();
assert!(!pool.try_run_one());
}

#[test]
fn has_changed() {
let mut rl = RunLoop::new(|| crate::state(|| 0).1);
assert!(rl.has_changed());
let key = rl.run_once();
assert!(!rl.has_changed());
key.set(0);
assert!(!rl.has_changed());
key.set(1);
assert!(rl.has_changed());
rl.run_once();
assert!(!rl.has_changed());
}

#[test]
fn run_loop_in_pool() {
use futures::task::LocalSpawnExt;
use std::cell::RefCell;

let mut pool = futures::executor::LocalPool::new();
let spawner = pool.spawner();

let rl = Arc::new(RefCell::new(RunLoop::new(|| crate::state(|| 0).1)));
let key = rl.borrow_mut().run_once();
spawner
.spawn_local({
let rl = rl.clone();
async move {
rl.borrow_mut().next().await;
}
})
.unwrap();

assert!(!rl.borrow().has_changed());
pool.run_until_stalled();

key.set(1);
assert!(rl.borrow().has_changed());

pool.run_until_stalled();
assert!(!rl.borrow().has_changed());
}
}
32 changes: 32 additions & 0 deletions src/runtime/runloop/chained_waker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use super::Waker;
use std::sync::Arc;

pub use futures::task::ArcWake;

pub struct ChainedWaker<F>
where
F: Fn() + Send + Sync,
{
callback: F,
wrapped: Waker,
}

impl<F> ChainedWaker<F>
where
F: Fn() + Send + Sync,
{
pub fn new(callback: F, wrapped: Waker) -> Arc<Self> {
Arc::new(Self { callback, wrapped })
}
}

impl<F> ArcWake for ChainedWaker<F>
where
F: Fn() + Send + Sync,
{
fn wake_by_ref(arc_self: &Arc<Self>) {
let this = arc_self.as_ref();
(this.callback)();
this.wrapped.wake_by_ref();
}
}

0 comments on commit 521141b

Please sign in to comment.