Skip to content

Commit 1363a00

Browse files
committed
support non-async {stream,future}.cancel-{read,write}
During my earlier stream API refactoring, I had forgotten to support or test synchronous cancellation; this commit does both. In the process, I realized the future API ought to be updated to support blocking cancellation just like the stream API, so I made that change as well. This also adds `{Source,Destination}::reborrow` functions, allowing instances of those types to be reborrowed, such that they may be passed as parameters but also used again. Signed-off-by: Joel Dice <[email protected]>
1 parent 192f2fc commit 1363a00

File tree

10 files changed

+1380
-558
lines changed

10 files changed

+1380
-558
lines changed

crates/misc/component-async-tests/src/util.rs

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::{
99
use wasmtime::{
1010
StoreContextMut,
1111
component::{
12-
Accessor, Destination, FutureConsumer, FutureProducer, Lift, Lower, Source, StreamConsumer,
12+
Destination, FutureConsumer, FutureProducer, Lift, Lower, Source, StreamConsumer,
1313
StreamProducer, StreamResult,
1414
},
1515
};
@@ -139,24 +139,41 @@ impl<T> OneshotProducer<T> {
139139
impl<D, T: Send + 'static> FutureProducer<D> for OneshotProducer<T> {
140140
type Item = T;
141141

142-
async fn produce(self, _: &Accessor<D>) -> Result<T> {
143-
Ok(self.0.await?)
142+
fn poll_produce(
143+
self: Pin<&mut Self>,
144+
cx: &mut Context<'_>,
145+
_: StoreContextMut<D>,
146+
finish: bool,
147+
) -> Poll<Result<Option<T>>> {
148+
match Pin::new(&mut self.get_mut().0).poll(cx) {
149+
Poll::Pending if finish => Poll::Ready(Ok(None)),
150+
Poll::Pending => Poll::Pending,
151+
Poll::Ready(result) => Poll::Ready(Ok(Some(result?))),
152+
}
144153
}
145154
}
146155

147-
pub struct OneshotConsumer<T>(oneshot::Sender<T>);
156+
pub struct OneshotConsumer<T>(Option<oneshot::Sender<T>>);
148157

149158
impl<T> OneshotConsumer<T> {
150159
pub fn new(tx: oneshot::Sender<T>) -> Self {
151-
Self(tx)
160+
Self(Some(tx))
152161
}
153162
}
154163

155-
impl<D, T: Send + 'static> FutureConsumer<D> for OneshotConsumer<T> {
164+
impl<D, T: Lift + Send + 'static> FutureConsumer<D> for OneshotConsumer<T> {
156165
type Item = T;
157166

158-
async fn consume(self, _: &Accessor<D>, value: T) -> Result<()> {
159-
_ = self.0.send(value);
160-
Ok(())
167+
fn poll_consume(
168+
self: Pin<&mut Self>,
169+
_: &mut Context<'_>,
170+
store: StoreContextMut<D>,
171+
mut source: Source<'_, T>,
172+
_: bool,
173+
) -> Poll<Result<()>> {
174+
let value = &mut None;
175+
source.read(store, value)?;
176+
_ = self.get_mut().0.take().unwrap().send(value.take().unwrap());
177+
Poll::Ready(Ok(()))
161178
}
162179
}

0 commit comments

Comments
 (0)