Skip to content

Commit 7a17c5e

Browse files
committed
feat: expose coroutine constructor
1 parent 3feb111 commit 7a17c5e

File tree

8 files changed

+189
-121
lines changed

8 files changed

+189
-121
lines changed

guide/src/async-await.md

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
`#[pyfunction]` and `#[pymethods]` attributes also support `async fn`.
66

77
```rust
8-
# #![allow(dead_code)]
8+
# # ![allow(dead_code)]
99
# #[cfg(feature = "experimental-async")] {
1010
use std::{thread, time::Duration};
1111
use futures::channel::oneshot;
@@ -26,21 +26,33 @@ async fn sleep(seconds: f64, result: Option<PyObject>) -> Option<PyObject> {
2626

2727
## `Send + 'static` constraint
2828

29-
Resulting future of an `async fn` decorated by `#[pyfunction]` must be `Send + 'static` to be embedded in a Python object.
29+
Resulting future of an `async fn` decorated by `#[pyfunction]` must be `Send + 'static` to be embedded in a Python
30+
object.
3031

31-
As a consequence, `async fn` parameters and return types must also be `Send + 'static`, so it is not possible to have a signature like `async fn does_not_compile<'py>(arg: Bound<'py, PyAny>) -> Bound<'py, PyAny>`.
32+
As a consequence, `async fn` parameters and return types must also be `Send + 'static`, so it is not possible to have a
33+
signature like `async fn does_not_compile<'py>(arg: Bound<'py, PyAny>) -> Bound<'py, PyAny>`.
3234

33-
However, there is an exception for method receivers, so async methods can accept `&self`/`&mut self`. Note that this means that the class instance is borrowed for as long as the returned future is not completed, even across yield points and while waiting for I/O operations to complete. Hence, other methods cannot obtain exclusive borrows while the future is still being polled. This is the same as how async methods in Rust generally work but it is more problematic for Rust code interfacing with Python code due to pervasive shared mutability. This strongly suggests to prefer shared borrows `&self` over exclusive ones `&mut self` to avoid racy borrow check failures at runtime.
35+
However, there is an exception for method receivers, so async methods can accept `&self`/`&mut self`. Note that this
36+
means that the class instance is borrowed for as long as the returned future is not completed, even across yield points
37+
and while waiting for I/O operations to complete. Hence, other methods cannot obtain exclusive borrows while the future
38+
is still being polled. This is the same as how async methods in Rust generally work but it is more problematic for Rust
39+
code interfacing with Python code due to pervasive shared mutability. This strongly suggests to prefer shared
40+
borrows `&self` over exclusive ones `&mut self` to avoid racy borrow check failures at runtime.
3441

3542
## Implicit GIL holding
3643

37-
Even if it is not possible to pass a `py: Python<'py>` parameter to `async fn`, the GIL is still held during the execution of the future – it's also the case for regular `fn` without `Python<'py>`/`Bound<'py, PyAny>` parameter, yet the GIL is held.
44+
Even if it is not possible to pass a `py: Python<'py>` parameter to `async fn`, the GIL is still held during the
45+
execution of the future – it's also the case for regular `fn` without `Python<'py>`/`Bound<'py, PyAny>` parameter, yet
46+
the GIL is held.
3847

39-
It is still possible to get a `Python` marker using [`Python::with_gil`]({{#PYO3_DOCS_URL}}/pyo3/marker/struct.Python.html#method.with_gil); because `with_gil` is reentrant and optimized, the cost will be negligible.
48+
It is still possible to get a `Python` marker
49+
using [`Python::with_gil`]({{#PYO3_DOCS_URL}}/pyo3/marker/struct.Python.html#method.with_gil); because `with_gil` is
50+
reentrant and optimized, the cost will be negligible.
4051

4152
## Release the GIL across `.await`
4253

43-
There is currently no simple way to release the GIL when awaiting a future, *but solutions are currently in development*.
54+
There is currently no simple way to release the GIL when awaiting a future, *but solutions are currently in
55+
development*.
4456

4557
Here is the advised workaround for now:
4658

@@ -72,10 +84,12 @@ where
7284

7385
## Cancellation
7486

75-
Cancellation on the Python side can be caught using [`CancelHandle`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.CancelHandle.html) type, by annotating a function parameter with `#[pyo3(cancel_handle)]`.
87+
Cancellation on the Python side can be caught
88+
using [`CancelHandle`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.CancelHandle.html) type, by annotating a function
89+
parameter with `#[pyo3(cancel_handle)]`.
7690

7791
```rust
78-
# #![allow(dead_code)]
92+
# # ![allow(dead_code)]
7993
# #[cfg(feature = "experimental-async")] {
8094
use futures::FutureExt;
8195
use pyo3::prelude::*;
@@ -93,15 +107,42 @@ async fn cancellable(#[pyo3(cancel_handle)] mut cancel: CancelHandle) {
93107

94108
## *asyncio* vs. *anyio*
95109

96-
By default, Python awaitables instantiated with `async fn` can only be awaited in *asyncio* context.
110+
By default, Python awaitables instantiated with `async fn` can only be awaited in *asyncio* context.
97111

98-
PyO3 can also target [*anyio*](https://github.com/agronholm/anyio) with the dedicated `anyio` Cargo feature. With it enabled, `async fn` become awaitable both in *asyncio* or [*trio*](https://github.com/python-trio/trio) context.
112+
PyO3 can also target [*anyio*](https://github.com/agronholm/anyio) with the dedicated `anyio` Cargo feature. With it
113+
enabled, `async fn` become awaitable both in *asyncio* or [*trio*](https://github.com/python-trio/trio) context.
99114
However, it requires to have the [*sniffio*](https://github.com/python-trio/sniffio) (or *anyio*) library installed.
100115

101116
## The `Coroutine` type
102117

103-
To make a Rust future awaitable in Python, PyO3 defines a [`Coroutine`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.Coroutine.html) type, which implements the Python [coroutine protocol](https://docs.python.org/3/library/collections.abc.html#collections.abc.Coroutine).
118+
To make a Rust future awaitable in Python, PyO3 defines
119+
a [`Coroutine`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.Coroutine.html) type, which implements the
120+
Python [coroutine protocol](https://docs.python.org/3/library/collections.abc.html#collections.abc.Coroutine).
104121

105-
Each `coroutine.send` call is translated to a `Future::poll` call. If a [`CancelHandle`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.CancelHandle.html) parameter is declared, the exception passed to `coroutine.throw` call is stored in it and can be retrieved with [`CancelHandle::cancelled`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.CancelHandle.html#method.cancelled); otherwise, it cancels the Rust future, and the exception is reraised;
122+
Each `coroutine.send` call is translated to a `Future::poll` call. If
123+
a [`CancelHandle`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.CancelHandle.html) parameter is declared, the exception
124+
passed to `coroutine.throw` call is stored in it and can be retrieved
125+
with [`CancelHandle::cancelled`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.CancelHandle.html#method.cancelled);
126+
otherwise, it cancels the Rust future, and the exception is reraised;
106127

107-
*The type does not yet have a public constructor until the design is finalized.*
128+
Coroutine can also be instantiated directly
129+
130+
```rust
131+
# # ![allow(dead_code)]
132+
use pyo3::prelude::*;
133+
use pyo3::coroutine::{CancelHandle, Coroutine};
134+
135+
#[pyfunction]
136+
fn new_coroutine(py: Python<'_>) -> Coroutine {
137+
let mut cancel = CancelHandle::new();
138+
let throw_callback = cancel.throw_callback();
139+
let future = async move {
140+
cancel.cancelled().await;
141+
PyResult::Ok(())
142+
};
143+
Coroutine::new("my_coro", future)
144+
.with_qualname_prefix("MyClass")
145+
.with_throw_callback(throw_callback)
146+
.with_allow_threads(true)
147+
}
148+
```

newsfragments/3613.added.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Expose `Coroutine` constructor

pyo3-macros-backend/src/method.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -679,13 +679,13 @@ impl<'a> FnSpec<'a> {
679679
};
680680
let mut call = quote! {{
681681
let future = #future;
682-
#pyo3_path::impl_::coroutine::new_coroutine(
683-
#pyo3_path::intern!(py, stringify!(#python_name)),
684-
#qualname_prefix,
685-
#throw_callback,
686-
#allow_threads,
682+
#pyo3_path::coroutine::Coroutine::new(
683+
stringify!(#python_name),
687684
async move { #pyo3_path::impl_::wrap::OkWrap::wrap(future.await) },
688685
)
686+
.with_qualname_prefix(#qualname_prefix)
687+
.with_throw_callback(#throw_callback)
688+
.with_allow_threads(#allow_threads)
689689
}};
690690
if cancel_handle.is_some() {
691691
call = quote! {{

src/coroutine.rs

Lines changed: 62 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! Python coroutine implementation, used notably when wrapping `async fn`
22
//! with `#[pyfunction]`/`#[pymethods]`.
3+
use std::borrow::Cow;
34
use std::{
45
future::Future,
56
panic,
@@ -12,11 +13,11 @@ use pyo3_macros::{pyclass, pymethods};
1213

1314
use crate::{
1415
coroutine::waker::CoroutineWaker,
15-
exceptions::{PyAttributeError, PyGeneratorExit, PyRuntimeError, PyStopIteration},
16+
exceptions::{PyGeneratorExit, PyRuntimeError, PyStopIteration},
1617
marker::Ungil,
1718
panic::PanicException,
18-
types::{string::PyStringMethods, PyString},
19-
IntoPy, Py, PyErr, PyObject, PyResult, Python,
19+
types::PyString,
20+
Bound, IntoPy, Py, PyErr, PyObject, PyResult, Python,
2021
};
2122

2223
#[cfg(feature = "anyio")]
@@ -40,48 +41,44 @@ pub(crate) enum CoroOp {
4041
}
4142

4243
trait CoroutineFuture: Send {
43-
fn poll(self: Pin<&mut Self>, py: Python<'_>, waker: &Waker) -> Poll<PyResult<PyObject>>;
44+
fn poll(
45+
self: Pin<&mut Self>,
46+
py: Python<'_>,
47+
waker: &Waker,
48+
allow_threads: bool,
49+
) -> Poll<PyResult<PyObject>>;
4450
}
4551

4652
impl<F, T, E> CoroutineFuture for F
47-
where
48-
F: Future<Output = Result<T, E>> + Send,
49-
T: IntoPy<PyObject> + Send,
50-
E: Into<PyErr> + Send,
51-
{
52-
fn poll(self: Pin<&mut Self>, py: Python<'_>, waker: &Waker) -> Poll<PyResult<PyObject>> {
53-
self.poll(&mut Context::from_waker(waker))
54-
.map_ok(|obj| obj.into_py(py))
55-
.map_err(Into::into)
56-
}
57-
}
58-
59-
struct AllowThreads<F> {
60-
future: F,
61-
}
62-
63-
impl<F, T, E> CoroutineFuture for AllowThreads<F>
6453
where
6554
F: Future<Output = Result<T, E>> + Send + Ungil,
6655
T: IntoPy<PyObject> + Send + Ungil,
6756
E: Into<PyErr> + Send + Ungil,
6857
{
69-
fn poll(self: Pin<&mut Self>, py: Python<'_>, waker: &Waker) -> Poll<PyResult<PyObject>> {
70-
// SAFETY: future field is pinned when self is
71-
let future = unsafe { self.map_unchecked_mut(|a| &mut a.future) };
72-
py.allow_threads(|| future.poll(&mut Context::from_waker(waker)))
73-
.map_ok(|obj| obj.into_py(py))
74-
.map_err(Into::into)
58+
fn poll(
59+
self: Pin<&mut Self>,
60+
py: Python<'_>,
61+
waker: &Waker,
62+
allow_threads: bool,
63+
) -> Poll<PyResult<PyObject>> {
64+
if allow_threads {
65+
py.allow_threads(|| self.poll(&mut Context::from_waker(waker)))
66+
} else {
67+
self.poll(&mut Context::from_waker(waker))
68+
}
69+
.map_ok(|obj| obj.into_py(py))
70+
.map_err(Into::into)
7571
}
7672
}
7773

7874
/// Python coroutine wrapping a [`Future`].
7975
#[pyclass(crate = "crate")]
8076
pub struct Coroutine {
81-
name: Option<Py<PyString>>,
77+
future: Option<Pin<Box<dyn CoroutineFuture>>>,
78+
name: Cow<'static, str>,
8279
qualname_prefix: Option<&'static str>,
8380
throw_callback: Option<ThrowCallback>,
84-
future: Option<Pin<Box<dyn CoroutineFuture>>>,
81+
allow_threads: bool,
8582
waker: Option<Arc<CoroutineWaker>>,
8683
}
8784

@@ -91,32 +88,44 @@ impl Coroutine {
9188
/// Coroutine `send` polls the wrapped future, ignoring the value passed
9289
/// (should always be `None` anyway).
9390
///
94-
/// `Coroutine `throw` drop the wrapped future and reraise the exception passed
95-
pub(crate) fn new<F, T, E>(
96-
name: Option<Py<PyString>>,
97-
qualname_prefix: Option<&'static str>,
98-
throw_callback: Option<ThrowCallback>,
99-
allow_threads: bool,
100-
future: F,
101-
) -> Self
91+
/// `Coroutine `throw` drop the wrapped future and reraise the exception passed.
92+
pub fn new<F, T, E>(name: impl Into<Cow<'static, str>>, future: F) -> Self
10293
where
10394
F: Future<Output = Result<T, E>> + Send + Ungil + 'static,
10495
T: IntoPy<PyObject> + Send + Ungil,
10596
E: Into<PyErr> + Send + Ungil,
10697
{
10798
Self {
108-
name,
109-
qualname_prefix,
110-
throw_callback,
111-
future: Some(if allow_threads {
112-
Box::pin(AllowThreads { future })
113-
} else {
114-
Box::pin(future)
115-
}),
99+
future: Some(Box::pin(future)),
100+
name: name.into(),
101+
qualname_prefix: None,
102+
throw_callback: None,
103+
allow_threads: false,
116104
waker: None,
117105
}
118106
}
119107

108+
/// Set a prefix for `__qualname__`, which will be joined with a "."
109+
pub fn with_qualname_prefix(mut self, prefix: impl Into<Option<&'static str>>) -> Self {
110+
self.qualname_prefix = prefix.into();
111+
self
112+
}
113+
114+
/// Register a callback for coroutine `throw` method.
115+
///
116+
/// The exception passed to `throw` is then redirected to this callback, notifying the
117+
/// associated [`CancelHandle`], without being reraised.
118+
pub fn with_throw_callback(mut self, callback: impl Into<Option<ThrowCallback>>) -> Self {
119+
self.throw_callback = callback.into();
120+
self
121+
}
122+
123+
/// Release the GIL while polling the future wrapped.
124+
pub fn with_allow_threads(mut self, allow_threads: bool) -> Self {
125+
self.allow_threads = allow_threads;
126+
self
127+
}
128+
120129
fn poll_inner(&mut self, py: Python<'_>, mut op: CoroOp) -> PyResult<PyObject> {
121130
// raise if the coroutine has already been run to completion
122131
let future_rs = match self.future {
@@ -147,7 +156,7 @@ impl Coroutine {
147156
// poll the future and forward its results if ready; otherwise, yield from waker
148157
// polling is UnwindSafe because the future is dropped in case of panic
149158
let waker = Waker::from(self.waker.clone().unwrap());
150-
let poll = || future_rs.as_mut().poll(py, &waker);
159+
let poll = || future_rs.as_mut().poll(py, &waker, self.allow_threads);
151160
match panic::catch_unwind(panic::AssertUnwindSafe(poll)) {
152161
Err(err) => Err(PanicException::from_panic_payload(err)),
153162
Ok(Poll::Ready(res)) => Err(PyStopIteration::new_err(res?)),
@@ -172,22 +181,16 @@ impl Coroutine {
172181
#[pymethods(crate = "crate")]
173182
impl Coroutine {
174183
#[getter]
175-
fn __name__(&self, py: Python<'_>) -> PyResult<Py<PyString>> {
176-
match &self.name {
177-
Some(name) => Ok(name.clone_ref(py)),
178-
None => Err(PyAttributeError::new_err("__name__")),
179-
}
184+
fn __name__<'py>(&self, py: Python<'py>) -> Bound<'py, PyString> {
185+
PyString::new_bound(py, &self.name)
180186
}
181187

182188
#[getter]
183-
fn __qualname__(&self, py: Python<'_>) -> PyResult<Py<PyString>> {
184-
match (&self.name, &self.qualname_prefix) {
185-
(Some(name), Some(prefix)) => Ok(format!("{}.{}", prefix, name.bind(py).to_cow()?)
186-
.as_str()
187-
.into_py(py)),
188-
(Some(name), None) => Ok(name.clone_ref(py)),
189-
(None, _) => Err(PyAttributeError::new_err("__qualname__")),
190-
}
189+
fn __qualname__<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyString>> {
190+
Ok(match &self.qualname_prefix {
191+
Some(prefix) => PyString::new_bound(py, &format!("{}.{}", prefix, self.name)),
192+
None => self.__name__(py),
193+
})
191194
}
192195

193196
fn send(&mut self, py: Python<'_>, value: PyObject) -> PyResult<PyObject> {

src/coroutine/cancel.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,13 @@ impl CancelHandle {
6161
Cancelled(self).await
6262
}
6363

64-
#[doc(hidden)]
64+
/// Instantiate a [`ThrowCallback`] associated to this cancel handle.
6565
pub fn throw_callback(&self) -> ThrowCallback {
6666
ThrowCallback(self.0.clone())
6767
}
6868
}
6969

70-
#[doc(hidden)]
70+
/// Callback for coroutine `throw` method, notifying the associated [`CancelHandle`]
7171
pub struct ThrowCallback(Arc<Mutex<Inner>>);
7272

7373
impl ThrowCallback {

src/impl_/coroutine.rs

Lines changed: 3 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,10 @@
1-
use std::{
2-
future::Future,
3-
ops::{Deref, DerefMut},
4-
};
1+
use std::ops::{Deref, DerefMut};
52

63
use crate::{
7-
coroutine::{Coroutine, ThrowCallback},
8-
instance::Bound,
9-
marker::Ungil,
10-
pycell::impl_::PyClassBorrowChecker,
11-
pyclass::boolean_struct::False,
12-
types::{PyAnyMethods, PyString},
13-
IntoPy, Py, PyAny, PyClass, PyErr, PyObject, PyResult, Python,
4+
instance::Bound, pycell::impl_::PyClassBorrowChecker, pyclass::boolean_struct::False,
5+
types::PyAnyMethods, Py, PyAny, PyClass, PyResult, Python,
146
};
157

16-
pub fn new_coroutine<F, T, E>(
17-
name: &Bound<'_, PyString>,
18-
qualname_prefix: Option<&'static str>,
19-
throw_callback: Option<ThrowCallback>,
20-
allow_threads: bool,
21-
future: F,
22-
) -> Coroutine
23-
where
24-
F: Future<Output = Result<T, E>> + Send + Ungil + 'static,
25-
T: IntoPy<PyObject> + Send + Ungil,
26-
E: Into<PyErr> + Send + Ungil,
27-
{
28-
Coroutine::new(
29-
Some(name.clone().into()),
30-
qualname_prefix,
31-
throw_callback,
32-
allow_threads,
33-
future,
34-
)
35-
}
36-
378
fn get_ptr<T: PyClass>(obj: &Py<T>) -> *mut T {
389
obj.get_class_object().get_ptr()
3910
}

0 commit comments

Comments
 (0)