diff --git a/futures-util/src/abortable.rs b/futures-util/src/abortable.rs index 9dbcfc2b5..f978eeb23 100644 --- a/futures-util/src/abortable.rs +++ b/futures-util/src/abortable.rs @@ -1,6 +1,8 @@ use crate::task::AtomicWaker; use alloc::sync::Arc; +use core::borrow::BorrowMut; use core::fmt; +use core::marker::PhantomData; use core::pin::Pin; use core::sync::atomic::{AtomicBool, Ordering}; use futures_core::future::Future; @@ -12,14 +14,15 @@ pin_project! { /// A future/stream which can be remotely short-circuited using an `AbortHandle`. #[derive(Debug, Clone)] #[must_use = "futures/streams do nothing unless you poll them"] - pub struct Abortable { + pub struct Abortable { #[pin] task: T, inner: Arc, + phantom: PhantomData, } } -impl Abortable { +impl Abortable { /// Creates a new `Abortable` future/stream using an existing `AbortRegistration`. /// `AbortRegistration`s can be acquired through `AbortHandle::new`. /// @@ -55,8 +58,11 @@ impl Abortable { /// assert_eq!(stream.next().await, None); /// # }); /// ``` - pub fn new(task: T, reg: AbortRegistration) -> Self { - Self { task, inner: reg.inner } + pub fn new(task: T, mut reg: R) -> Self + where + R: BorrowMut, + { + Self { task, inner: reg.borrow_mut().inner.clone(), phantom: PhantomData } } /// Checks whether the task has been aborted. Note that all this @@ -129,7 +135,7 @@ impl fmt::Display for Aborted { #[cfg(feature = "std")] impl std::error::Error for Aborted {} -impl Abortable { +impl Abortable { fn try_poll( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -160,7 +166,7 @@ impl Abortable { } } -impl Future for Abortable +impl Future for Abortable where Fut: Future, { @@ -171,7 +177,7 @@ where } } -impl Stream for Abortable +impl Stream for Abortable where St: Stream, {