From 521141b49daa51fb93b9c7bbb9cfb1e00327d766 Mon Sep 17 00:00:00 2001 From: zetanumbers Date: Fri, 27 Nov 2020 23:44:50 +0300 Subject: [PATCH] First try at fixing #215... 1 test fails --- src/runtime/runloop.rs | 103 ++++++++++++++++++++++++--- src/runtime/runloop/chained_waker.rs | 32 +++++++++ 2 files changed, 127 insertions(+), 8 deletions(-) create mode 100644 src/runtime/runloop/chained_waker.rs diff --git a/src/runtime/runloop.rs b/src/runtime/runloop.rs index b8e648d2d..56bcaa816 100644 --- a/src/runtime/runloop.rs +++ b/src/runtime/runloop.rs @@ -1,3 +1,6 @@ +mod chained_waker; +use chained_waker::*; + use super::{Revision, Runtime}; use futures::{ stream::{Stream, StreamExt}, @@ -5,6 +8,10 @@ use futures::{ }; use std::{ pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, task::{Context as FutContext, Poll, Waker}, }; @@ -16,6 +23,7 @@ use std::{ pub struct RunLoop { inner: Runtime, root: Root, + changed: Arc, } impl super::Runtime { @@ -23,9 +31,11 @@ impl super::Runtime { /// a loop. pub fn looped(self, root: Root) -> RunLoop where - Root: FnMut() -> Out, + Root: FnMut() -> Out + Unpin, { - RunLoop { inner: self, root } + let mut out = RunLoop { inner: self, root, changed: Arc::new(AtomicBool::new(true)) }; + out.set_state_change_waker(out.inner.wk.clone()); + out } } @@ -35,7 +45,12 @@ where { /// Creates a new `Runtime` attached to the provided root function. pub fn new(root: Root) -> RunLoop { - RunLoop { root, inner: Runtime::new() } + Runtime::new().looped(root) + } + + /// Indicates change occured since last [`RunLoop::run_once()`] call + pub fn has_changed(&self) -> bool { + self.changed.load(Ordering::Relaxed) } /// Returns the runtime's current Revision. @@ -46,7 +61,9 @@ where /// Sets the [`std::task::Waker`] which will be called when state variables /// change. pub fn set_state_change_waker(&mut self, wk: Waker) { - self.inner.set_state_change_waker(wk); + let changed = self.changed.clone(); + let waker = ChainedWaker::new(move || changed.store(true, Ordering::Relaxed), wk); + self.inner.set_state_change_waker(futures::task::waker(waker)); } /// Sets the executor that will be used to spawn normal priority tasks. @@ -57,7 +74,9 @@ where /// Run the root function once within this runtime's context, returning the /// result. pub fn run_once(&mut self) -> Out { - self.inner.run_once(&mut self.root) + let out = self.inner.run_once(&mut self.root); + self.changed.store(false, Ordering::Relaxed); + out } /// Poll this runtime without exiting. Discards any value returned from the @@ -85,8 +104,76 @@ where /// `poll_next`, always returning `Poll::Ready(Some(...))`. fn poll_next(self: Pin<&mut Self>, cx: &mut FutContext<'_>) -> Poll> { let this = self.get_mut(); - this.inner.set_state_change_waker(cx.waker().clone()); - let out = this.run_once(); - Poll::Ready(Some((this.inner.revision, out))) + this.set_state_change_waker(cx.waker().clone()); + if this.has_changed() { + let out = this.run_once(); + Poll::Ready(Some((this.inner.revision, out))) + } else { + Poll::Pending + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn pending_without_change() { + use futures::{stream::StreamExt, task::LocalSpawnExt}; + let mut rl = RunLoop::new(|| ()); + rl.run_once(); + let mut pool = futures::executor::LocalPool::new(); + let spawner = pool.spawner(); + spawner + .spawn_local(async move { + rl.next().await.unwrap(); + unreachable!() + }) + .unwrap(); + assert!(!pool.try_run_one()); + } + + #[test] + fn has_changed() { + let mut rl = RunLoop::new(|| crate::state(|| 0).1); + assert!(rl.has_changed()); + let key = rl.run_once(); + assert!(!rl.has_changed()); + key.set(0); + assert!(!rl.has_changed()); + key.set(1); + assert!(rl.has_changed()); + rl.run_once(); + assert!(!rl.has_changed()); + } + + #[test] + fn run_loop_in_pool() { + use futures::task::LocalSpawnExt; + use std::cell::RefCell; + + let mut pool = futures::executor::LocalPool::new(); + let spawner = pool.spawner(); + + let rl = Arc::new(RefCell::new(RunLoop::new(|| crate::state(|| 0).1))); + let key = rl.borrow_mut().run_once(); + spawner + .spawn_local({ + let rl = rl.clone(); + async move { + rl.borrow_mut().next().await; + } + }) + .unwrap(); + + assert!(!rl.borrow().has_changed()); + pool.run_until_stalled(); + + key.set(1); + assert!(rl.borrow().has_changed()); + + pool.run_until_stalled(); + assert!(!rl.borrow().has_changed()); } } diff --git a/src/runtime/runloop/chained_waker.rs b/src/runtime/runloop/chained_waker.rs new file mode 100644 index 000000000..9f5f13a62 --- /dev/null +++ b/src/runtime/runloop/chained_waker.rs @@ -0,0 +1,32 @@ +use super::Waker; +use std::sync::Arc; + +pub use futures::task::ArcWake; + +pub struct ChainedWaker +where + F: Fn() + Send + Sync, +{ + callback: F, + wrapped: Waker, +} + +impl ChainedWaker +where + F: Fn() + Send + Sync, +{ + pub fn new(callback: F, wrapped: Waker) -> Arc { + Arc::new(Self { callback, wrapped }) + } +} + +impl ArcWake for ChainedWaker +where + F: Fn() + Send + Sync, +{ + fn wake_by_ref(arc_self: &Arc) { + let this = arc_self.as_ref(); + (this.callback)(); + this.wrapped.wake_by_ref(); + } +}