diff --git a/futures-util/src/future/either.rs b/futures-util/src/future/either.rs index 24fbbe79d8..84814de1ca 100644 --- a/futures-util/src/future/either.rs +++ b/futures-util/src/future/either.rs @@ -7,7 +7,7 @@ use futures_sink::Sink; /// Combines two different futures, streams, or sinks having the same associated types into a single /// type. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum Either { /// First branch of the type Left(A), @@ -280,10 +280,7 @@ mod if_std { A: AsyncBufRead, B: AsyncBufRead, { - fn poll_fill_buf( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { unsafe { match self.get_unchecked_mut() { Either::Left(x) => Pin::new_unchecked(x).poll_fill_buf(cx), diff --git a/futures-util/src/future/first.rs b/futures-util/src/future/first.rs new file mode 100644 index 0000000000..9fa14e3008 --- /dev/null +++ b/futures-util/src/future/first.rs @@ -0,0 +1,83 @@ +use crate::future::Either; +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use pin_utils::unsafe_pinned; + +/// Future for the [`first()`] function. +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug, Clone, Default)] +pub struct First { + future1: F1, + future2: F2, +} + +impl Unpin for First {} + +impl First { + unsafe_pinned!(future1: F1); + unsafe_pinned!(future2: F2); +} + +impl Future for First { + type Output = Either; + + #[inline] + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.as_mut().future1().poll(cx) { + Poll::Ready(out) => Poll::Ready(Either::Left(out)), + Poll::Pending => match self.future2().poll(cx) { + Poll::Ready(out) => Poll::Ready(Either::Right(out)), + Poll::Pending => Poll::Pending, + }, + } + } +} + +// We don't provide FusedFuture, because the overhead of implementing it ( +// which requires a separate bool or Option field) is precisely the same as +// calling .fuse() + +/// Waits for either one of two differently-typed futures to complete. +/// +/// This function will return a new future which awaits for either one of both +/// futures to complete. The returned future will finish with the value of +/// whichever future finishes first. +/// +/// The future will discard the future that didn't complete; see `select` for +/// a future that will instead return the incomplete future. +/// +/// Note that this function consumes the receiving futures and returns a +/// wrapped version of them. +/// +/// Also note that if both this and the second future have the same +/// output type you can use the `Either::into_immer` method to +/// conveniently extract out the value at the end. +pub fn first(future1: F1, future2: F2) -> First { + First { future1, future2 } +} + +#[test] +fn test_first() { + use crate::future::{pending, ready, FutureExt}; + use crate::task::noop_waker_ref; + + let mut context = Context::from_waker(noop_waker_ref()); + + assert_eq!( + first(ready(10), ready(20)).poll_unpin(&mut context), + Poll::Ready(Either::Left(10)) + ); + assert_eq!( + first(ready(10), pending::<()>()).poll_unpin(&mut context), + Poll::Ready(Either::Left(10)) + ); + assert_eq!( + first(pending::<()>(), ready(20)).poll_unpin(&mut context), + Poll::Ready(Either::Right(20)) + ); + assert_eq!( + first(pending::<()>(), pending::<()>()).poll_unpin(&mut context), + Poll::Pending + ); +} diff --git a/futures-util/src/future/first_all.rs b/futures-util/src/future/first_all.rs new file mode 100644 index 0000000000..c30c62dcf9 --- /dev/null +++ b/futures-util/src/future/first_all.rs @@ -0,0 +1,111 @@ +use alloc::vec::Vec; +use core::iter::FromIterator; +use core::pin::Pin; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; + +/// Future for the [`first_all()`] function. +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug, Clone)] +pub struct FirstAll { + // Critical safety invariant: after FirstAll is created, this vector can + // never be reallocated, in order to ensure that Pin is upheld. + futures: Vec, +} + +// Safety: once created, the contents of the vector don't change, and they'll +// remain in place permanently. +impl Unpin for FirstAll {} + +impl Future for FirstAll { + type Output = F::Output; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + match this.futures.iter_mut().find_map(move |fut| { + // Safety: we promise that the future is never moved out of the vec, + // and that the vec never reallocates once FirstAll has been created + // (specifically after the first poll) + let pinned = unsafe { Pin::new_unchecked(fut) }; + match pinned.poll(cx) { + Poll::Ready(out) => Some(out), + Poll::Pending => None, + } + }) { + Some(out) => Poll::Ready(out), + None => Poll::Pending, + } + } +} + +// We don't provide FusedFuture, because the overhead of implementing it ( +// which requires clearing the vector after Ready is returned) is precisely +// the same as using .fuse() + +impl FromIterator for FirstAll { + fn from_iter>(iter: T) -> Self { + first_all(iter) + } +} + +/// Creates a new future which will return the result of the first completed +/// future out of a list. +/// +/// The returned future will wait for any future within `futures` to be ready. +/// Upon completion the item resolved will be returned. +/// +/// The remaining futures will be discarded when the returned future is +/// dropped; see `select_all` for a version that returns the incomplete +/// futures if you need to poll over them further. +/// +/// This function is only available when the `std` or `alloc` feature of this +/// library is activated, and it is activated by default. +/// +/// # Panics +/// +/// This function will panic if the iterator specified contains no items. +pub fn first_all(futures: I) -> FirstAll +where + I: IntoIterator, + I::Item: Future, +{ + let futures = Vec::from_iter(futures); + assert!(!futures.is_empty(), "Need at least 1 future for first_any"); + FirstAll { futures } +} + +#[test] +fn test_first_all() { + use crate::future::FutureExt; + use crate::task::noop_waker_ref; + use futures_channel::oneshot::channel; + + let mut futures = vec![]; + let mut senders = vec![]; + + for _ in 0..10 { + let (send, recv) = channel(); + futures.push(recv); + senders.push(send); + } + + let (send, recv) = channel(); + futures.push(recv); + + for _ in 0..10 { + let (send, recv) = channel(); + futures.push(recv); + senders.push(send); + } + + let mut fut = first_all(futures); + let mut context = Context::from_waker(noop_waker_ref()); + + let poll = fut.poll_unpin(&mut context); + assert_eq!(poll, Poll::Pending); + + send.send(10).unwrap(); + let poll = fut.poll_unpin(&mut context); + assert_eq!(poll, Poll::Ready(Ok(10))); +} diff --git a/futures-util/src/future/first_ok.rs b/futures-util/src/future/first_ok.rs new file mode 100644 index 0000000000..3be259f1f8 --- /dev/null +++ b/futures-util/src/future/first_ok.rs @@ -0,0 +1,220 @@ +use crate::future::{Fuse, FutureExt}; +use alloc::vec::Vec; +use core::iter::FromIterator; +use core::pin::Pin; +use futures_core::future::{FusedFuture, Future, TryFuture}; +use futures_core::task::{Context, Poll}; + +/// Future for the [`first_ok()`] function. +#[must_use = "futures do nothing unless you `.await` or poll them"] +#[derive(Debug, Clone)] +pub struct FirstOk { + // Critical safety invariant: after FirstAll is created, this vector can + // never be reallocated, nor can its contents be moved, in order to ensure + // that Pin is upheld. + futures: Vec, +} + +// Safety: once created, the contents of the vector don't change, and they'll +// remain in place permanently. +impl Unpin for FirstOk {} + +impl Future for FirstOk +where + F: Future> + FusedFuture, +{ + type Output = Result; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + /// Helper enum to track our state as we poll each future + enum State { + /// Haven't seen any errors + NoErrors, + + /// The last error we've seen + SeenError(E), + + /// At least 1 future is still pending; there's no need to + /// track errors + SeenPending, + } + + use State::*; + + impl State { + fn apply_error(&mut self, err: E) { + match self { + SeenError(..) | NoErrors => *self = SeenError(err), + SeenPending => {} + } + } + + fn apply_pending(&mut self) { + *self = SeenPending; + } + } + + let mut state = State::NoErrors; + + for fut in self.get_mut().futures.iter_mut() { + if !fut.is_terminated() { + // Safety: we promise that the future is never moved out of the vec, + // and that the vec never reallocates once FirstOk has been created + // (specifically after the first poll) + let pinned = unsafe { Pin::new_unchecked(fut) }; + match pinned.try_poll(cx) { + Poll::Ready(Ok(out)) => return Poll::Ready(Ok(out)), + Poll::Ready(Err(err)) => state.apply_error(err), + Poll::Pending => state.apply_pending(), + } + } + } + + match state { + SeenError(err) => Poll::Ready(Err(err)), + SeenPending | NoErrors => Poll::Pending, + } + } +} + +// We don't provide FusedFuture, because the overhead of implementing it ( +// which requires clearing the vector after Ready is returned) is precisely +// the same as using .fuse() + +impl FromIterator for FirstOk +where + F: FusedFuture + Future>, +{ + fn from_iter>(iter: I) -> Self { + first_ok_fused(iter) + } +} + +/// Creates a new future which will return the result of the first successful +/// future in a list of futures. +/// +/// The returned future will wait for any future within `iter` to be ready +/// and `Ok`. Unlike `first_all`, this will only return the first successful +/// completion, or the last error if none complete with `Ok`. This is useful +/// in contexts where any success is desired and failures are ignored, unless +/// all the futures fail. +/// +/// `first_ok_fused` requires [`FusedFuture`], in order to track which futures have +/// completed with errors and which are still pending. Many futures already +/// implement this trait. Use [`first_ok`] if you have futures which do not +/// implement [`FusedFuture`]. +/// +/// Any futures in the list that have already terminated will be ignored. +/// +/// This function is only available when the `std` or `alloc` feature of this +/// library is activated, and it is activated by default. +/// +/// # Panics +/// +/// This function will panic if the iterator specified contains no unterminated +/// items. +pub fn first_ok_fused(futures: I) -> FirstOk +where + I: IntoIterator, + I::Item: FusedFuture + Future>, +{ + let futures = Vec::from_iter(futures); + + assert!( + !futures.is_empty(), + "Need at least 1 non-terminated future for first_ok" + ); + + FirstOk { futures } +} + +/// Creates a new future which will return the result of the first successful +/// future in a list of futures. +/// +/// The returned future will wait for any future within `iter` to be ready +/// and `Ok`. Unlike `first_all`, this will only return the first successful +/// completion, or the last error if none complete with `Ok`. This is useful +/// in contexts where any success is desired and failures are ignored, unless +/// all the futures fail. +/// +/// If your future implements [`FusedFuture`], prefer [`first_ok_fused`], +/// which will have less overhead. +/// +/// This function is only available when the `std` or `alloc` feature of this +/// library is activated, and it is activated by default. +/// +/// # Panics +/// +/// This function will panic if the iterator specified contains no unterminated +/// items. +pub fn first_ok(futures: I) -> FirstOk> +where + I: IntoIterator, + I::Item: Future>, +{ + first_ok_fused(futures.into_iter().map(|fut| fut.fuse())) +} + +#[test] +fn test_first_ok_ok() { + use crate::task::noop_waker_ref; + use futures_channel::oneshot::channel; + + let mut futures = vec![]; + let mut senders = vec![]; + + for _ in 0..10 { + let (send, recv) = channel(); + futures.push(recv); + senders.push(send); + } + + let (send, recv) = channel(); + futures.push(recv); + + for _ in 0..10 { + let (send, recv) = channel(); + futures.push(recv); + senders.push(send); + } + + let mut fut = first_ok(futures); + let mut context = Context::from_waker(noop_waker_ref()); + + let poll = fut.poll_unpin(&mut context); + assert_eq!(poll, Poll::Pending); + + send.send(10).unwrap(); + let poll = fut.poll_unpin(&mut context); + assert_eq!(poll, Poll::Ready(Ok(10))); +} + +#[test] +fn test_first_ok_err() { + use crate::task::noop_waker_ref; + use futures_channel::oneshot::{channel, Canceled}; + + let mut futures = vec![]; + let mut senders = vec![]; + + for _ in 0..10 { + let (send, recv) = channel::(); + futures.push(recv); + senders.push(send); + } + + let mut fut = first_ok(futures); + let mut context = Context::from_waker(noop_waker_ref()); + + // Dropping a sender causes an error in the receiver. + for sender in senders.into_iter() { + let poll = fut.poll_unpin(&mut context); + assert_eq!(poll, Poll::Pending); + + drop(sender); + } + + let poll = fut.poll_unpin(&mut context); + assert_eq!(poll, Poll::Ready(Err(Canceled))); +} diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index 3f4bb01436..e34133c995 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -90,6 +90,19 @@ mod select_ok; #[cfg(feature = "alloc")] pub use self::select_ok::{select_ok, SelectOk}; +mod first; +pub use self::first::{first, First}; + +#[cfg(feature = "alloc")] +mod first_all; +#[cfg(feature = "alloc")] +pub use first_all::{first_all, FirstAll}; + +#[cfg(feature = "alloc")] +mod first_ok; +#[cfg(feature = "alloc")] +pub use first_ok::{first_ok, first_ok_fused, FirstOk}; + mod either; pub use self::either::Either;