Skip to content

Commit

Permalink
Cleaned up First* implementations:
Browse files Browse the repository at this point in the history
- Removed FusedIterator; .fuse() is equivalent.
- FirstOk is more explicit about its use of FusedIterator.
- This design will have to wait for rust-lang#2111 to be resolved; currently the
  contract of FusedFuture is not strict enough for our needs.
  • Loading branch information
Lucretiel committed Apr 2, 2020
1 parent d980367 commit 1753765
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 51 deletions.
15 changes: 7 additions & 8 deletions futures-util/src/future/first.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::future::Either;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use pin_utils::unsafe_pinned;

Expand All @@ -12,6 +12,8 @@ pub struct First<F1, F2> {
future2: F2,
}

impl<F1: Unpin, F2: Unpin> Unpin for First<F1, F2> {}

impl<F1, F2> First<F1, F2> {
unsafe_pinned!(future1: F1);
unsafe_pinned!(future2: F2);
Expand All @@ -32,12 +34,9 @@ impl<F1: Future, F2: Future> Future for First<F1, F2> {
}
}

impl<F1: FusedFuture, F2: FusedFuture> FusedFuture for First<F1, F2> {
#[inline]
fn is_terminated(&self) -> bool {
self.future1.is_terminated() || self.future2.is_terminated()
}
}
// We don't provide FusedFuture, because the overhead of implementing it (
// which requires a separate bool or Option field) is precisely the same as
// calling .fuse()

/// Waits for either one of two differently-typed futures to complete.
///
Expand All @@ -52,7 +51,7 @@ impl<F1: FusedFuture, F2: FusedFuture> FusedFuture for First<F1, F2> {
/// wrapped version of them.
///
/// Also note that if both this and the second future have the same
/// output type you can use the `Either::factor_first` method to
/// output type you can use the `Either::into_immer` method to
/// conveniently extract out the value at the end.
pub fn first<F1, F2>(future1: F1, future2: F2) -> First<F1, F2> {
First { future1, future2 }
Expand Down
21 changes: 5 additions & 16 deletions futures-util/src/future/first_all.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use core::iter::FromIterator;
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
use futures_core::future::Future;
use futures_core::task::{Context, Poll};

/// Future for the [`first_all()`] function.
Expand Down Expand Up @@ -32,26 +32,15 @@ impl<F: Future> Future for FirstAll<F> {
Poll::Pending => None,
}
}) {
Some(out) => {
// Safety: safe because vec clears in place
this.futures.clear();
Poll::Ready(out)
}
Some(out) => Poll::Ready(out),
None => Poll::Pending,
}
}
}

impl<F: FusedFuture> FusedFuture for FirstAll<F> {
#[inline]
fn is_terminated(&self) -> bool {
// Logic: it's possible for a future to independently become
// terminated, before it returns Ready, so we're not terminated unless
// *all* of our inner futures are terminated. When our own poll returns
// Ready, this vector is cleared, so the logic works correctly.
self.futures.iter().all(|fut| fut.is_terminated())
}
}
// We don't provide FusedFuture, because the overhead of implementing it (
// which requires clearing the vector after Ready is returned) is precisely
// the same as using .fuse()

impl<Fut: Future> FromIterator<Fut> for FirstAll<Fut> {
fn from_iter<T: IntoIterator<Item = Fut>>(iter: T) -> Self {
Expand Down
61 changes: 34 additions & 27 deletions futures-util/src/future/first_ok.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,10 @@ impl<F: FusedFuture + TryFuture> Future for FirstOk<F> {
#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Basic logic diagram:
// - If all existing futures are terminated, return Pending. This means
// someone polled after this future returned ready, or that this
// future will never return ready because a future spuriously
// terminated itself.
// - If a future returns Ok, clear the vector (this is safe because
// vec drops in place), then return that value. We clear the vector
// so that our FusedFuture impl, which checks `are all futures
// terminated`, works correctly.
// - If all existing futures are terminated, return Pending.
// - If a future returns Ok, return that value.
// - If all existing futures BECOME terminated while polling them, and
// an error was returned, return the final error; otherwise return
// pending.
// an error was returned, return the final error.

/// Helper enum to track our state as we poll each future
enum State<E> {
Expand Down Expand Up @@ -64,19 +57,15 @@ impl<F: FusedFuture + TryFuture> Future for FirstOk<F> {
}

let mut state = State::NoErrors;
let this = self.get_mut();
for fut in this.futures.iter_mut() {

for fut in self.get_mut().futures.iter_mut() {
if !fut.is_terminated() {
// Safety: we promise that the future is never moved out of the vec,
// and that the vec never reallocates once FirstOk has been created
// (specifically after the first poll)
let pinned = unsafe { Pin::new_unchecked(fut) };
match pinned.try_poll(cx) {
Poll::Ready(Ok(out)) => {
// Safety: safe because vec clears in place
this.futures.clear();
return Poll::Ready(Ok(out));
}
Poll::Ready(Ok(out)) => return Poll::Ready(Ok(out)),
Poll::Ready(Err(err)) => state.apply_error(err),
Poll::Pending => state.apply_pending(),
}
Expand All @@ -85,17 +74,21 @@ impl<F: FusedFuture + TryFuture> Future for FirstOk<F> {

match state {
SeenError(err) => Poll::Ready(Err(err)),
NoErrors | SeenPending => Poll::Pending,
SeenPending => Poll::Pending,
// This is unreachable unless every future in the vec returned
// is_terminated, which means that we must have returned Ready on
// a previous poll, or the vec is empty, which we disallow in the
// first_ok constructor, or that we were initialized with futures
// that have already returned Ready, which is possibly unsound
// (given !Unpin futures) but certainly breaks first_ok contract.
NoErrors => panic!("All futures in the FirstOk terminated without a result being found. Did you re-poll after Ready?"),
}
}
}

impl<F: FusedFuture + TryFuture> FusedFuture for FirstOk<F> {
#[inline]
fn is_terminated(&self) -> bool {
self.futures.iter().all(|fut| fut.is_terminated())
}
}
// We don't provide FusedFuture, because the overhead of implementing it (
// which requires clearing the vector after Ready is returned) is precisely
// the same as using .fuse()

impl<Fut: FusedFuture + TryFuture> FromIterator<Fut> for FirstOk<Fut> {
fn from_iter<T: IntoIterator<Item = Fut>>(iter: T) -> Self {
Expand All @@ -121,13 +114,27 @@ impl<Fut: FusedFuture + TryFuture> FromIterator<Fut> for FirstOk<Fut> {
///
/// # Panics
///
/// This function will panic if the iterator specified contains no items.
/// This function will panic if the iterator specified contains no items, or
/// if any of the futures have already been terminated.
pub fn first_ok<I>(futures: I) -> FirstOk<I::Item>
where
I: IntoIterator,
I::Item: FusedFuture + TryFuture,
{
let futures = Vec::from_iter(futures);
assert!(!futures.is_empty(), "Need at least 1 future for first_ok");
let futures: Vec<_> = futures
.into_iter()
.inspect(|fut| {
assert!(
!fut.is_terminated(),
"Can't call first_ok with a terminated future"
)
})
.collect();

assert!(
!futures.is_empty(),
"Need at least 1 non-terminated future for first_ok"
);

FirstOk { futures }
}

0 comments on commit 1753765

Please sign in to comment.