diff --git a/Cargo.lock b/Cargo.lock index 9c09285..23e3ba4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -269,6 +269,26 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "pin-project" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bf43b791c5b9e34c3d182969b4abb522f9343702850a2e57f460d00d09b4b3" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.14" @@ -368,6 +388,7 @@ name = "statecs" version = "0.3.2" dependencies = [ "futures", + "pin-project", "procs", "tokio", "tokio-test", diff --git a/Cargo.toml b/Cargo.toml index 28855b9..4667ce3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,8 +8,9 @@ edition = "2021" [dependencies] futures = "0.3.30" +pin-project = "1.1.5" procs = { path = "procs" } [dev-dependencies] tokio-test = "0.4.4" -tokio = { version = "1", features = ["full"] } \ No newline at end of file +tokio = { version = "1", features = ["full"] } diff --git a/src/lib.rs b/src/lib.rs index d6677a5..2551d39 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,7 +13,6 @@ pub mod prelude { pub use crate::take; pub use crate::tuple_proc::{IntoTupleProcessor, TupleProcessFn}; - pub use crate::{cascade, cascade_option}; } pub use prelude::*; diff --git a/src/tuple_proc.rs b/src/tuple_proc.rs index 95848ab..46c3325 100644 --- a/src/tuple_proc.rs +++ b/src/tuple_proc.rs @@ -1,14 +1,13 @@ use crate::apply_tuple; use crate::ComponentGet; use crate::TupleMerge; -use futures::future::FutureExt; use std::{future::Future, marker::PhantomData}; use procs::system_wrap; use crate::TupleExtend; -pub trait IntoTupleProcessor { +pub trait IntoTupleProcessor { type IType; type OType; type TupleProcessorType; @@ -24,92 +23,28 @@ macro_rules! cascade { #[macro_export] macro_rules! cascade_fn { - ($expr:expr) => { + (ref $expr:expr) => { |x| cascade!(x => $expr) }; -} - -#[macro_export] -macro_rules! cascade_option { - ($val:expr => $expr:expr) => { - $expr.into_tuple_processor().cascade_option($val) - }; -} - -#[macro_export] -macro_rules! cascade_option_fn { - ($expr:expr) => { - move |x| cascade_option!(x => $expr) - }; -} - -#[macro_export] -macro_rules! cascade_result { - ($val:expr => $expr:expr) => { - $expr.into_tuple_processor().cascade_result($val) - }; -} - -#[macro_export] -macro_rules! cascade_result_fn { - ($expr:expr) => { - move |x| cascade_result!(x => $expr) - }; -} - -#[macro_export] -macro_rules! cascade_async { - ($val:expr => $expr:expr) => { - $expr.into_tuple_processor().cascade_fut($val) - }; -} - -#[macro_export] -macro_rules! cascade_async_fn { ($expr:expr) => { - move |x| cascade_async!(x => $expr) + move |x| cascade!(x => $expr) }; } #[macro_export] -macro_rules! cascade_obj_async { +macro_rules! cascade_obj { (($obj:expr, $val:expr) => $expr:expr) => { - $expr.into_tuple_processor().cascade_obj_fut($obj, $val) - }; -} - -#[macro_export] -macro_rules! cascade_obj_async_fn { - ($expr:expr) => { - move |obj, x| cascade_obj_async!((obj, x) => $expr) + $expr.into_tuple_processor().cascade_obj($obj, $val) }; } #[macro_export] -macro_rules! cascade_option_async { - ($val:expr => $expr:expr) => { - $expr.into_tuple_processor().cascade_option_fut($val) +macro_rules! cascade_obj_fn { + (ref $expr:expr) => { + |obj, x| cascade_obj!((obj, x) => $expr) }; -} - -#[macro_export] -macro_rules! cascade_option_async_fn { ($expr:expr) => { - move |x| cascade_option_async!(x => $expr) - }; -} - -#[macro_export] -macro_rules! cascade_result_async { - ($val:expr => $expr:expr) => { - $expr.into_tuple_processor().cascade_result_fut($val) - }; -} - -#[macro_export] -macro_rules! cascade_result_async_fn { - ($expr:expr) => { - move |x| cascade_result_async!(x => $expr) + move |obj, x| cascade_obj!((obj, x) => $expr) }; } @@ -134,7 +69,7 @@ mod tests { } assert_eq!( Some((1u32, 0i8, 2i16)), - cascade_option!((0i8, 1i32, 2i16) => test_option) + cascade!((0i8, 1i32, 2i16) => test_option) ); // result fn test_result(a: i32, b: u32) -> Result<(i32,), u32> { @@ -146,11 +81,11 @@ mod tests { } assert_eq!( Err(666u32), - cascade_result!((666u32, 233i32, "string".to_string()) => test_result) + cascade!((666u32, 233i32, "string".to_string()) => test_result) ); assert_eq!( Ok((666i32, "string".to_string())), - cascade_result!((233u32, 666i32, "string".to_string()) => test_result) + cascade!((233u32, 666i32, "string".to_string()) => test_result) ); } #[test] @@ -159,7 +94,7 @@ mod tests { async fn test_cascade(a: i32) -> (i32,) { (a + 1,) } - assert_eq!((234i32,), cascade_async!((233i32,) => test_cascade).await); + assert_eq!((234i32,), cascade!((233i32,) => test_cascade).await); // option async fn test_option(a: i32) -> Option<(u32,)> { if a > 0 { @@ -170,7 +105,7 @@ mod tests { } assert_eq!( Some((1u32, 0i8, 2i16)), - cascade_option_async!((0i8, 1i32, 2i16) => test_option).await + cascade!((0i8, 1i32, 2i16) => test_option).await ); // result async fn test_result(a: i32, b: u32) -> Result<(i32,), u32> { @@ -182,21 +117,141 @@ mod tests { } assert_eq!( Err(666u32), - cascade_result_async!((666u32, 233i32, "string".to_string()) => test_result).await + cascade!((666u32, 233i32, "string".to_string()) => test_result).await ); assert_eq!( Ok((666i32, "string".to_string())), - cascade_result_async!((233u32, 666i32, "string".to_string()) => test_result).await + cascade!((233u32, 666i32, "string".to_string()) => test_result).await ); } } pub struct TupleProcessFn(F, PhantomData); +pub trait CascadeInto { + type Output + where + U: TupleMerge; + fn cascade_merge(self, u: U) -> Self::Output + where + U: TupleMerge; +} + +impl CascadeInto<0, 0> for T +where + T: TupleExtend, +{ + type Output = ::AfterMerge where U: TupleMerge; + fn cascade_merge(self, u: U) -> Self::Output + where + U: TupleMerge, + { + u.merge(self) + } +} + +impl CascadeInto<0, 1> for Option +where + T: TupleExtend, +{ + type Output = Option<::AfterMerge> where U: TupleMerge; + fn cascade_merge(self, u: U) -> Self::Output + where + U: TupleMerge, + { + self.map(|s| u.merge(s)) + } +} + +impl CascadeInto<0, 2> for Result +where + T: TupleExtend, +{ + type Output = Result<::AfterMerge, E> where U: TupleMerge; + fn cascade_merge(self, u: U) -> Self::Output + where + U: TupleMerge, + { + self.map(|s| u.merge(s)) + } +} +#[pin_project::pin_project] +pub struct CascadeFut { + #[pin] + fut: Fut, + u: Option, +} + +impl Future for CascadeFut +where + Fut: Future, + Fut::Output: TupleExtend, + U: TupleMerge, +{ + type Output = ::AfterMerge; + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let this = self.project(); + let r: Fut::Output = futures::ready!(this.fut.poll(cx)); + std::task::Poll::Ready(this.u.take().unwrap().merge(r)) + } +} +impl Future for CascadeFut +where + Fut: Future>, + Res: TupleExtend, + U: TupleMerge, +{ + type Output = Option<::AfterMerge>; + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let this = self.project(); + let r: Fut::Output = futures::ready!(this.fut.poll(cx)); + std::task::Poll::Ready(r.map(|r| this.u.take().unwrap().merge(r))) + } +} +impl Future for CascadeFut +where + Fut: Future>, + Res: TupleExtend, + U: TupleMerge, +{ + type Output = Result<::AfterMerge, E>; + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let this = self.project(); + let r: Fut::Output = futures::ready!(this.fut.poll(cx)); + std::task::Poll::Ready(r.map(|r| this.u.take().unwrap().merge(r))) + } +} + +impl CascadeInto<3, J> for F +where + F: Future, + T: CascadeInto<0, J>, +{ + type Output = CascadeFut where U: TupleMerge; + fn cascade_merge(self, u: U) -> Self::Output + where + U: TupleMerge, + { + CascadeFut { + fut: self, + u: Some(u), + } + } +} + macro_rules! impl_tuple_proc { (($($ids:ident),*$(,)?)) => { #[allow(non_snake_case)] - impl IntoTupleProcessor<($($ids,)*), OType> for F + impl IntoTupleProcessor<($($ids,)*), OType, OType> for F where F: FnOnce($($ids,)*) -> OType, { @@ -213,7 +268,6 @@ macro_rules! impl_tuple_proc { impl<_F, OType, $($ids,)*> TupleProcessFn<_F, (($($ids,)*), OType)> where _F: FnOnce($($ids,)*) -> OType, - OType: TupleExtend, { #[system_wrap( _E: ($($ids,)*) @@ -225,140 +279,36 @@ macro_rules! impl_tuple_proc { #[system_wrap( _E: ($($ids,)*) )] - pub fn cascade<_E>(self, e: _E) -> ::AfterMerge + pub fn cascade<_E, const I: usize, const J: usize>(self, e: _E) -> + >::Output where outof![_E]: TupleMerge, + OType: CascadeInto, { let (r, b) = _E!(e => |($($ids,)*)| self.0($($ids,)*) => NoMerge); - b.merge(()).merge(r) - } - } - #[allow(non_snake_case)] - impl TupleProcessFn)> - where - F: FnOnce($($ids,)*) -> Option, - { - #[system_wrap( - E: ($($ids,)*) - )] - pub fn cascade_option(self, e: E) -> Option<::AfterMerge> - where - outof![E]: TupleMerge, - OType: TupleExtend, - { - let (r, b) = E!(e => |($($ids,)*)| self.0($($ids,)*) => NoMerge); - r.map(|r| b.merge(()).merge(r)) - } - } - #[allow(non_snake_case)] - impl TupleProcessFn)> - where - F: FnOnce($($ids,)*) -> Result, - { - #[system_wrap( - E: ($($ids,)*) - )] - pub fn cascade_result( - self, - e: E, - ) -> Result<::AfterMerge, ErrType> - where - outof![E]: TupleMerge, - OType: TupleExtend, - { - let (r, b) = E!(e => |($($ids,)*)| self.0($($ids,)*) => NoMerge); - r.map(|r| b.merge(()).merge(r)) - } - } - #[allow(non_snake_case)] - impl TupleProcessFn - where - F: FnOnce($($ids,)*) -> Fut, - Fut: Future, - OType: TupleExtend, - { - #[system_wrap( - E: ($($ids,)*) - )] - pub async fn operate_fut(self, e: E) -> (OType, outof![E]) { - let (r, b) = E!(e => |($($ids,)*)| self.0($($ids,)*) => NoMerge); - (r.await, b.merge(())) - } - #[system_wrap( - E: ($($ids,)*) - )] - pub async fn cascade_fut(self, e: E) -> ::AfterMerge - where - outof![E]: TupleMerge, - { - let (r, b) = E!(e => |($($ids,)*)| self.0($($ids,)*) => NoMerge); - b.merge(()).merge(r.await) - } - } - #[allow(non_snake_case)] - impl TupleProcessFn - where - F: FnOnce($($ids,)*) -> Fut, - Fut: Future>, - { - #[system_wrap( - E: ($($ids,)*) - )] - pub fn cascade_option_fut( - self, - e: E, - ) -> impl Future::AfterMerge>> - where - outof![E]: TupleMerge, - OType: TupleExtend, - { - let (r, b) = E!(e => |($($ids,)*)| self.0($($ids,)*) => NoMerge); - // r.await.map(|r| b.merge(()).merge(r)) - r.map(|x| x.map(|r| b.merge(()).merge(r))) - } - } - #[allow(non_snake_case)] - impl TupleProcessFn - where - F: FnOnce($($ids,)*) -> Fut, - Fut: Future>, - { - #[system_wrap( - E: ($($ids,)*) - )] - pub async fn cascade_result_fut( - self, - e: E, - ) -> Result<::AfterMerge, ErrType> - where - outof![E]: TupleMerge, - OType: TupleExtend, - { - let (r, b) = E!(e => |($($ids,)*)| self.0($($ids,)*) => NoMerge); - r.await.map(|r| b.merge(()).merge(r)) + r.cascade_merge(b.merge(())) } } #[allow(non_snake_case)] - impl TupleProcessFn + impl TupleProcessFn where - F: FnOnce(Obj, ($($ids,)*)) -> Fut, - Fut: Future)>, + F: FnOnce(Obj, ($($ids,)*)) -> OType, { #[system_wrap( E: ($($ids,)*) )] - pub async fn cascade_obj_fut( + pub fn cascade_obj( self, obj: Obj, e: E, - ) -> (Obj, Option<::AfterMerge>) + ) + -> >::Output where outof![E]: TupleMerge, - OType: TupleExtend, + OType: CascadeInto { let (r, b) = E!(e => |($($ids,)*)| self.0(obj, ($($ids,)*)) => NoMerge); - let (obj, r) = r.await; - (obj, r.map(|r| b.merge(()).merge(r))) + r.cascade_merge(b.merge(())) } } };