-
Notifications
You must be signed in to change notification settings - Fork 1.5k
support non-async {stream,future}.cancel-{read,write}
#11625
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
1363a00
to
5b586a8
Compare
finish: bool, | ||
) -> Poll<Result<Option<T>>> { | ||
match Pin::new(&mut self.get_mut().0).poll(cx) { | ||
Poll::Pending if finish => Poll::Ready(Ok(None)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reminds me of something we may wish to talk more about later, but I keep being endlessly confused about the finish: bool
parameter here. I feel like the way this is handled practically everywhere is to do exactly what this is doing which is to just pretend it's done when finish
is flagged.
Initially reading this I thought "wait shouldn't this close the oneshot and then look at the result?" but that's not correct because the host could re-initiate a read later on. I don't know of a great answer to this, nor is this PR the place to necessarily work this all out, but I feel like we need to change something about this either API-wise or such.
let state = self.concurrent_state_mut(store); | ||
let caller = state.guest_task.unwrap(); | ||
let waitable = Waitable::Transmit(state.get_mut(transmit_id)?.read_handle); | ||
let old_set = waitable.common(state)?.set; | ||
let set = state.get_mut(caller)?.sync_call_set; | ||
waitable.join(state, Some(set))?; | ||
|
||
self.suspend(store, SuspendReason::Waiting { set, task: caller })?; | ||
|
||
let state = self.concurrent_state_mut(store); | ||
waitable.join(state, old_set)?; | ||
|
||
let event = waitable.take_event(state)?; | ||
if let Some( | ||
event @ (Event::StreamRead { code, .. } | Event::FutureRead { code, .. }), | ||
) = event | ||
{ | ||
waitable.on_delivery(self.id().get_mut(store), event); | ||
code | ||
} else { | ||
unreachable!() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like it's mostly a copy/paste of the above, and my suspicion is that it's likely pretty close to what's elsewhere too. Would it be possible to refactor this all to a common "block waiting for a result" function?
During my earlier stream API refactoring, I had forgotten to support or test synchronous cancellation; this commit does both. In the process, I realized the future API ought to be updated to support blocking cancellation just like the stream API, so I made that change as well. This also adds `{Source,Destination}::reborrow` functions, allowing instances of those types to be reborrowed, such that they may be passed as parameters but also used again. Note that I had to move some functions from `impl ConcurrentState` to `impl Instance` in order to access the store and suspend the current fiber when synchronously cancelling. Signed-off-by: Joel Dice <[email protected]>
5b586a8
to
ca53149
Compare
Signed-off-by: Joel Dice <[email protected]>
…ance#11625) * support non-async `{stream,future}.cancel-{read,write}` During my earlier stream API refactoring, I had forgotten to support or test synchronous cancellation; this commit does both. In the process, I realized the future API ought to be updated to support blocking cancellation just like the stream API, so I made that change as well. This also adds `{Source,Destination}::reborrow` functions, allowing instances of those types to be reborrowed, such that they may be passed as parameters but also used again. Note that I had to move some functions from `impl ConcurrentState` to `impl Instance` in order to access the store and suspend the current fiber when synchronously cancelling. Signed-off-by: Joel Dice <[email protected]> * reduce code duplication Signed-off-by: Joel Dice <[email protected]> --------- Signed-off-by: Joel Dice <[email protected]>
* support non-async `{stream,future}.cancel-{read,write}` (#11625) * support non-async `{stream,future}.cancel-{read,write}` During my earlier stream API refactoring, I had forgotten to support or test synchronous cancellation; this commit does both. In the process, I realized the future API ought to be updated to support blocking cancellation just like the stream API, so I made that change as well. This also adds `{Source,Destination}::reborrow` functions, allowing instances of those types to be reborrowed, such that they may be passed as parameters but also used again. Note that I had to move some functions from `impl ConcurrentState` to `impl Instance` in order to access the store and suspend the current fiber when synchronously cancelling. Signed-off-by: Joel Dice <[email protected]> * reduce code duplication Signed-off-by: Joel Dice <[email protected]> --------- Signed-off-by: Joel Dice <[email protected]> * support and test synchronous `{stream,future}.cancel-{read,write}` (#11645) * support and test synchronous `{stream,future}.cancel-{read,write}` Previously, we only supported async calls to those intrinsics; now we support blocking, synchronous calls as well. Signed-off-by: Joel Dice <[email protected]> * update future-read.wast test Signed-off-by: Joel Dice <[email protected]> --------- Signed-off-by: Joel Dice <[email protected]> * p3-http: finish `wasi:[email protected]` implementation (#11636) * refactor(p3-http): use trappable errors Signed-off-by: Roman Volosatovs <[email protected]> * feat(p3-http): implement `content-length` handling Signed-off-by: Roman Volosatovs <[email protected]> * refactor(p3-http): remove a few resource utilities Signed-off-by: Roman Volosatovs <[email protected]> * remove unused test import Signed-off-by: Roman Volosatovs <[email protected]> * fix(p3-http): close stream handles on drop Signed-off-by: Roman Volosatovs <[email protected]> * test(p3-http): stream responses back This is something we've been doing in wasip3, but I forgot to port this over Signed-off-by: Roman Volosatovs <[email protected]> * doc(p3-http): add missing docs, internalize more, simplify Signed-off-by: Roman Volosatovs <[email protected]> * refactor(p3-http): extract `Body::consume` Signed-off-by: Roman Volosatovs <[email protected]> * refactor(p3-http): clean-up `content-length` error reporting Signed-off-by: Roman Volosatovs <[email protected]> * refactor(p3-http): drop elided lifetime Signed-off-by: Roman Volosatovs <[email protected]> * fix(p3-http): avoid guest body deadlock hazard Signed-off-by: Roman Volosatovs <[email protected]> * refactor(p3-http): add more docs, clean-up Signed-off-by: Roman Volosatovs <[email protected]> * doc(p3-http): add more docs Signed-off-by: Roman Volosatovs <[email protected]> * fix(p3-http): rework result future handling Most importantly this avoids a race condition between `content-length` error observed by `GuestBody` and hyper I/O driver Signed-off-by: Roman Volosatovs <[email protected]> * add new imports after rebase Signed-off-by: Roman Volosatovs <[email protected]> * clean-up `poll_consume` Signed-off-by: Roman Volosatovs <[email protected]> * assert content-length `handle` results Signed-off-by: Roman Volosatovs <[email protected]> * relax `content_length` test `handle` assert Signed-off-by: Roman Volosatovs <[email protected]> --------- Signed-off-by: Roman Volosatovs <[email protected]> * p3-http: implementation follow-up (#11649) * p3: refactor future producers/consumers Signed-off-by: Roman Volosatovs <[email protected]> * p3-http: tie lifetime of the spawned task to the bodies Signed-off-by: Roman Volosatovs <[email protected]> * p3-http: improve docs Signed-off-by: Roman Volosatovs <[email protected]> --------- Signed-off-by: Roman Volosatovs <[email protected]> * Ignore a wasip3 http test temporarily (#11657) Filed #11656 to track the eventual resolution. * don't delete sync-lowered subtasks unless they've exited (#11655) Previously, we were unconditionally deleting the callee subtask once it returned a value to a sync-lowered call, but that's only appropriate if the subtask has exited. Otherwise, it needs to keep running and only be deleted once it actually exits. Thanks to Luke for the `sync-streams.wast` test that uncovered this, which I've copied from the `component-model` repo. This also makes a couple of debug logging tweaks that proved useful while investigating the above issue. Signed-off-by: Joel Dice <[email protected]> --------- Signed-off-by: Joel Dice <[email protected]> Signed-off-by: Roman Volosatovs <[email protected]> Co-authored-by: Joel Dice <[email protected]> Co-authored-by: Roman Volosatovs <[email protected]>
During my earlier stream API refactoring, I had forgotten to support or test synchronous cancellation; this commit does both. In the process, I realized the future API ought to be updated to support blocking cancellation just like the stream API, so I made that change as well.
This also adds
{Source,Destination}::reborrow
functions, allowing instances of those types to be reborrowed, such that they may be passed as parameters but also used again.Note that I had to move some functions from
impl ConcurrentState
toimpl Instance
in order to access the store and suspend the current fiber when synchronously cancelling.