diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index e3838b7ad2..97a8c8ff74 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -83,7 +83,7 @@ mod join_all; pub use self::join_all::{join_all, JoinAll}; mod select; -pub use self::select::{select, Select}; +pub use self::select::{select, select_biased, Select}; #[cfg(feature = "alloc")] mod select_all; @@ -99,12 +99,12 @@ mod try_join_all; pub use self::try_join_all::{try_join_all, TryJoinAll}; mod try_select; -pub use self::try_select::{try_select, TrySelect}; +pub use self::try_select::{try_select, try_select_biased, TrySelect}; #[cfg(feature = "alloc")] mod select_ok; #[cfg(feature = "alloc")] -pub use self::select_ok::{select_ok, SelectOk}; +pub use self::select_ok::{select_ok, select_ok_biased, SelectOk}; mod either; pub use self::either::Either; diff --git a/futures-util/src/future/select.rs b/futures-util/src/future/select.rs index 6bb9b80fc1..f9b3e0e3ad 100644 --- a/futures-util/src/future/select.rs +++ b/futures-util/src/future/select.rs @@ -9,6 +9,7 @@ use futures_core::task::{Context, Poll}; #[derive(Debug)] pub struct Select { inner: Option<(A, B)>, + biased: bool, } impl Unpin for Select {} @@ -23,7 +24,8 @@ impl Unpin for Select {} /// wrapped version of them. /// /// If both futures are ready when this is polled, the winner will be pseudo-randomly -/// selected. +/// selected, unless the std feature is not enabled. If std is enabled, the first +/// argument will always win. /// /// Also note that if both this and the second future have the same /// output type you can use the `Either::factor_first` method to @@ -91,6 +93,88 @@ where { assert_future::, _>(Select { inner: Some((future1, future2)), + biased: false, + }) +} + +/// Waits for either one of two differently-typed futures to complete, giving preferential treatment to the first one. +/// +/// This function will return a new future which awaits for either one of both +/// futures to complete. The returned future will finish with both the value +/// resolved and a future representing the completion of the other work. +/// +/// Note that this function consumes the receiving futures and returns a +/// wrapped version of them. +/// +/// If both futures are ready when this is polled, the winner will always be the first argument. +/// +/// Also note that if both this and the second future have the same +/// output type you can use the `Either::factor_first` method to +/// conveniently extract out the value at the end. +/// +/// # Examples +/// +/// A simple example +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::{ +/// pin_mut, +/// future::Either, +/// future::self, +/// }; +/// +/// // These two futures have different types even though their outputs have the same type. +/// let future1 = async { +/// future::pending::<()>().await; // will never finish +/// 1 +/// }; +/// let future2 = async { +/// future::ready(2).await +/// }; +/// +/// // 'select_biased' requires Future + Unpin bounds +/// pin_mut!(future1); +/// pin_mut!(future2); +/// +/// let value = match future::select_biased(future1, future2).await { +/// Either::Left((value1, _)) => value1, // `value1` is resolved from `future1` +/// // `_` represents `future2` +/// Either::Right((value2, _)) => value2, // `value2` is resolved from `future2` +/// // `_` represents `future1` +/// }; +/// +/// assert!(value == 2); +/// # }); +/// ``` +/// +/// A more complex example +/// +/// ``` +/// use futures::future::{self, Either, Future, FutureExt}; +/// +/// // A poor-man's join implemented on top of select +/// +/// fn join(a: A, b: B) -> impl Future +/// where A: Future + Unpin, +/// B: Future + Unpin, +/// { +/// future::select_biased(a, b).then(|either| { +/// match either { +/// Either::Left((x, b)) => b.map(move |y| (x, y)).left_future(), +/// Either::Right((y, a)) => a.map(move |x| (x, y)).right_future(), +/// } +/// }) +/// } +/// ``` +pub fn select_biased(future1: A, future2: B) -> Select +where + A: Future + Unpin, + B: Future + Unpin, +{ + assert_future::, _>(Select { + inner: Some((future1, future2)), + biased: true, }) } @@ -111,6 +195,7 @@ where Some(value) => value, } } + let biased = self.biased; let (a, b) = self.inner.as_mut().expect("cannot poll Select twice"); @@ -123,7 +208,7 @@ where } #[cfg(feature = "std")] - if crate::gen_index(2) == 0 { + if biased || crate::gen_index(2) == 0 { poll_wrap!(a, unwrap_option(self.inner.take()).1, Either::Left); poll_wrap!(b, unwrap_option(self.inner.take()).0, Either::Right); } else { diff --git a/futures-util/src/future/select_ok.rs b/futures-util/src/future/select_ok.rs index 3d6fa67cc1..3f349423f2 100644 --- a/futures-util/src/future/select_ok.rs +++ b/futures-util/src/future/select_ok.rs @@ -12,6 +12,7 @@ use futures_core::task::{Context, Poll}; #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct SelectOk { inner: Vec, + biased: bool, } impl Unpin for SelectOk {} @@ -35,7 +36,7 @@ impl Unpin for SelectOk {} /// Some futures that would have been polled and had errors get dropped, may now instead /// remain in the collection without being polled. /// -/// If you were relying on this biased behavior, consider switching to the [`select_biased!`](crate::select_biased) macro. +/// If you were relying on this biased behavior, consider switching to the [`select_ok_biased`] function. /// /// # Panics /// @@ -45,7 +46,36 @@ where I: IntoIterator, I::Item: TryFuture + Unpin, { - let ret = SelectOk { inner: iter.into_iter().collect() }; + let ret = SelectOk { inner: iter.into_iter().collect(), biased: false }; + assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty"); + assert_future::< + Result<(::Ok, Vec), ::Error>, + _, + >(ret) +} + +/// Creates a new future which will select the first successful future over a list of futures. +/// +/// The returned future will wait for any future within `iter` to be ready and Ok. Unlike +/// `select_all`, this will only return the first successful completion, or the last +/// failure. This is useful in contexts where any success is desired and failures +/// are ignored, unless all the futures fail. +/// +/// This function is only available when the `std` or `alloc` feature of this +/// library is activated, and it is activated by default. +/// +/// If multiple futures are ready at the same time this function is biased towards +/// entries that are earlier in the list. +/// +/// # Panics +/// +/// This function will panic if the iterator specified contains no items. +pub fn select_ok_biased(iter: I) -> SelectOk +where + I: IntoIterator, + I::Item: TryFuture + Unpin, +{ + let ret = SelectOk { inner: iter.into_iter().collect(), biased: true }; assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty"); assert_future::< Result<(::Ok, Vec), ::Error>, @@ -57,10 +87,12 @@ impl Future for SelectOk { type Output = Result<(Fut::Ok, Vec), Fut::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let Self { inner } = &mut *self; + let Self { inner, biased } = &mut *self; #[cfg(feature = "std")] { - crate::shuffle(inner); + if !*biased { + crate::shuffle(inner); + } } // loop until we've either exhausted all errors, a success was hit, or nothing is ready loop { diff --git a/futures-util/src/future/try_select.rs b/futures-util/src/future/try_select.rs index a24525bbdc..40185965e3 100644 --- a/futures-util/src/future/try_select.rs +++ b/futures-util/src/future/try_select.rs @@ -8,6 +8,7 @@ use futures_core::task::{Context, Poll}; #[derive(Debug)] pub struct TrySelect { inner: Option<(A, B)>, + biased: bool, } impl Unpin for TrySelect {} @@ -25,7 +26,8 @@ type EitherErr = Either<(::Error, B), (::E /// wrapped version of them. /// /// If both futures are ready when this is polled, the winner will be pseudo-randomly -/// selected. +/// selected, unless the `std` feature is disabled. If the std feature is disabled, +/// the first argument will always win. /// /// Also note that if both this and the second future have the same /// success/error type you can use the `Either::factor_first` method to @@ -60,6 +62,55 @@ where { super::assert_future::, EitherErr>, _>(TrySelect { inner: Some((future1, future2)), + biased: false, + }) +} + +/// Waits for either one of two differently-typed futures to complete, giving preferential treatment to the first one. +/// +/// This function will return a new future which awaits for either one of both +/// futures to complete. The returned future will finish with both the value +/// resolved and a future representing the completion of the other work. +/// +/// Note that this function consumes the receiving futures and returns a +/// wrapped version of them. +/// +/// If both futures are ready when this is polled, the winner will always be the first one. +/// +/// Also note that if both this and the second future have the same +/// success/error type you can use the `Either::factor_first` method to +/// conveniently extract out the value at the end. +/// +/// # Examples +/// +/// ``` +/// use futures::future::{self, Either, Future, FutureExt, TryFuture, TryFutureExt}; +/// +/// // A poor-man's try_join implemented on top of select +/// +/// fn try_join(a: A, b: B) -> impl TryFuture +/// where A: TryFuture + Unpin + 'static, +/// B: TryFuture + Unpin + 'static, +/// E: 'static, +/// { +/// future::try_select(a, b).then(|res| -> Box> + Unpin> { +/// match res { +/// Ok(Either::Left((x, b))) => Box::new(b.map_ok(move |y| (x, y))), +/// Ok(Either::Right((y, a))) => Box::new(a.map_ok(move |x| (x, y))), +/// Err(Either::Left((e, _))) => Box::new(future::err(e)), +/// Err(Either::Right((e, _))) => Box::new(future::err(e)), +/// } +/// }) +/// } +/// ``` +pub fn try_select_biased(future1: A, future2: B) -> TrySelect +where + A: TryFuture + Unpin, + B: TryFuture + Unpin, +{ + super::assert_future::, EitherErr>, _>(TrySelect { + inner: Some((future1, future2)), + biased: true, }) } @@ -91,7 +142,7 @@ where #[cfg(feature = "std")] { - if crate::gen_index(2) == 0 { + if self.biased || crate::gen_index(2) == 0 { poll_wrap!(a, b, Either::Left, Either::Right) } else { poll_wrap!(b, a, Either::Right, Either::Left) diff --git a/futures/tests/future_select.rs b/futures/tests/future_select.rs index da30a49271..5e2dffa94d 100644 --- a/futures/tests/future_select.rs +++ b/futures/tests/future_select.rs @@ -1,6 +1,6 @@ use std::future::ready; -use futures::future::select; +use futures::future::{select, select_biased}; use futures_executor::block_on; #[test] @@ -14,3 +14,13 @@ fn is_fair() { assert_eq!(results.iter().filter(|i| **i == 0).take(THRESHOLD).count(), THRESHOLD); assert_eq!(results.iter().filter(|i| **i == 1).take(THRESHOLD).count(), THRESHOLD) } + +#[test] +fn is_biased() { + let mut results = Vec::with_capacity(100); + for _ in 0..100 { + let (i, _) = block_on(select_biased(ready(0), ready(1))).factor_first(); + results.push(i); + } + assert!(results.iter().all(|i| *i == 0)); +} diff --git a/futures/tests/future_select_ok.rs b/futures/tests/future_select_ok.rs index a031f05fc5..fa4c48720b 100644 --- a/futures/tests/future_select_ok.rs +++ b/futures/tests/future_select_ok.rs @@ -2,7 +2,7 @@ use std::fmt::Debug; use std::time::Duration; use futures::executor::block_on; -use futures::future::{err, ok, select_ok, Future}; +use futures::future::{err, ok, select_ok, select_ok_biased, Future}; use futures_channel::oneshot; use std::thread; @@ -65,3 +65,15 @@ fn is_fair() { assert_eq!(results.iter().filter(|i| **i == 3).take(THRESHOLD).count(), THRESHOLD); assert_eq!(results.iter().filter(|i| **i == 4).take(THRESHOLD).count(), THRESHOLD); } + +#[test] +fn is_biased() { + let mut results = Vec::with_capacity(100); + for _ in 0..100 { + let v = vec![err(1), err(2), ok(3), ok(4)]; + + let (i, _v) = block_on(select_ok_biased(v)).ok().unwrap(); + results.push(i); + } + assert!(results.iter().all(|i| *i == 3)); +} diff --git a/futures/tests/future_try_select.rs b/futures/tests/future_try_select.rs index 382466fc7c..40dcd6da7f 100644 --- a/futures/tests/future_try_select.rs +++ b/futures/tests/future_try_select.rs @@ -1,4 +1,4 @@ -use futures::future::{ok, try_select}; +use futures::future::{ok, try_select, try_select_biased}; use futures_executor::block_on; #[test] @@ -12,3 +12,14 @@ fn is_fair() { assert_eq!(results.iter().filter(|i| **i == 0).take(THRESHOLD).count(), THRESHOLD); assert_eq!(results.iter().filter(|i| **i == 1).take(THRESHOLD).count(), THRESHOLD) } + +#[test] +fn is_biased() { + let mut results = Vec::with_capacity(100); + for _ in 0..100 { + let (i, _) = + block_on(try_select_biased(ok::<_, ()>(0), ok::<_, ()>(1))).unwrap().factor_first(); + results.push(i); + } + assert!(results.iter().all(|i| *i == 0)); +}