diff --git a/src/future.rs b/src/future.rs deleted file mode 100644 index bff479e..0000000 --- a/src/future.rs +++ /dev/null @@ -1,121 +0,0 @@ -use std::cell::UnsafeCell; -use std::sync::Arc; - -use {Group, GroupGuard, Queue}; - -struct FutureCell(UnsafeCell>); - -impl FutureCell { - fn new() -> FutureCell { - FutureCell(UnsafeCell::new(None)) - } - - unsafe fn take(&self) -> Option { - (*self.0.get()).take() - } - - unsafe fn set(&self, value: T) { - *self.0.get() = Some(value); - } -} - -// This is a lie, but we'll ensure that & methods are never called concurrently -unsafe impl Sync for FutureCell { } - -struct Promise { - result: Arc>, - _guard: GroupGuard, -} - -impl Promise { - pub fn fulfill(self, value: T) { - unsafe { - // Since the group is entered, we're guaranteed that no one - // is trying to get the value so it's safe to set - self.result.set(value); - } - } -} - -pub struct Future { - value: Arc>, - group: Group, -} - -impl Future { - pub fn new(queue: &Queue, work: F) -> Future - where F: 'static + Send + FnOnce() -> T { - let (promise, future) = future(); - queue.async(move || { - promise.fulfill(work()); - }); - future - } - - pub fn wait(self) -> T { - self.group.wait(); - let value = unsafe { - // Since the group is empty, we're guaranteed that the value has - // finished being set - self.value.take() - }; - value.expect("Promise was not fulfilled") - } - - fn notify(self, queue: &Queue, work: F) - where F: 'static + Send + FnOnce(T) { - let Future { value, group } = self; - group.notify(queue, move || { - // Since the original group is being notified, the group is empty - // and we're guaranteed that the value has finished being set - let value = unsafe { value.take() }; - if let Some(input) = value { - work(input); - } - }); - } - - pub fn map(self, queue: &Queue, work: F) -> Future - where F: 'static + Send + FnOnce(T) -> U, U: 'static + Send { - let (promise, future) = future(); - self.notify(queue, move |input| { - promise.fulfill(work(input)); - }); - future - } -} - -fn future() -> (Promise, Future) { - let future = Future { - value: Arc::new(FutureCell::new()), - group: Group::create(), - }; - - let promise = Promise { - result: future.value.clone(), - _guard: future.group.enter(), - }; - - (promise, future) -} - -#[cfg(test)] -mod tests { - use {Queue, QueueAttribute}; - use super::*; - - #[test] - fn test_wait() { - let q = Queue::create("", QueueAttribute::Concurrent); - let future = Future::new(&q, || "Hello, world!".to_string()); - assert!(future.wait() == "Hello, world!"); - } - - #[test] - fn test_map() { - let q = Queue::create("", QueueAttribute::Concurrent); - let future = Future::new(&q, || "Hello, world!".to_string()); - let future = future.map(&q, |s| s.len()); - assert!(future.wait() == "Hello, world!".len()); - } -} diff --git a/src/lib.rs b/src/lib.rs index 6dc75cb..0007278 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,13 +53,8 @@ use libc::{c_long, c_void, size_t}; use ffi::*; -pub use future::Future; -pub use proxy::ProxyQueue; - /// Raw foreign function interface for libdispatch. pub mod ffi; -mod future; -mod proxy; /// The type of a dispatch queue. pub enum QueueAttribute { diff --git a/src/proxy.rs b/src/proxy.rs deleted file mode 100644 index 3199a39..0000000 --- a/src/proxy.rs +++ /dev/null @@ -1,80 +0,0 @@ -use std::cell::UnsafeCell; -use std::sync::Arc; - -use {Queue, QueueAttribute}; - -struct UnsafeSyncCell(UnsafeCell); - -impl UnsafeSyncCell { - fn new(value: T) -> UnsafeSyncCell { - UnsafeSyncCell(UnsafeCell::new(value)) - } - - unsafe fn get(&self) -> &T { - &*self.0.get() - } - - unsafe fn get_mut(&self) -> &mut T { - &mut *self.0.get() - } -} - -unsafe impl Sync for UnsafeSyncCell { } - -#[derive(Clone)] -pub struct ProxyQueue { - queue: Queue, - value: Arc>, -} - -impl ProxyQueue { - pub fn new(value: T) -> ProxyQueue { - ProxyQueue { - queue: Queue::create("", QueueAttribute::Concurrent), - value: Arc::new(UnsafeSyncCell::new(value)), - } - } - - pub fn exec_mut(&self, work: F) - where F: 'static + Send + FnOnce(&mut T) { - let value = self.value.clone(); - self.queue.barrier_async(move || unsafe { - // Safe to get mut since no other blocks will be executing - work(value.get_mut()); - }) - } -} - -impl ProxyQueue { - pub fn exec(&self, work: F) - where F: 'static + Send + FnOnce(&T) { - let value = self.value.clone(); - self.queue.async(move || unsafe { - // Safe to get since our value is Sync - work(value.get()); - }) - } -} - -#[cfg(test)] -mod tests { - use std::sync::mpsc::channel; - use super::*; - - #[test] - fn test_proxy() { - let proxy = ProxyQueue::new("Hello, world!".to_string()); - - proxy.exec_mut(|s| { - s.push_str(" Bye.") - }); - - let (send, recv) = channel(); - proxy.exec(move |s| { - send.send(s.clone()).unwrap(); - }); - - let result = recv.recv().unwrap(); - assert!(result == "Hello, world! Bye."); - } -}