Skip to content

Commit ee66f6c

Browse files
authored
Add support for user-supplied executors (#3091)
1 parent eba08ad commit ee66f6c

File tree

3 files changed

+131
-57
lines changed

3 files changed

+131
-57
lines changed

any_spawner/src/lib.rs

Lines changed: 38 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@
3232
use std::{future::Future, pin::Pin, sync::OnceLock};
3333
use thiserror::Error;
3434

35-
pub(crate) type PinnedFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
36-
pub(crate) type PinnedLocalFuture<T> = Pin<Box<dyn Future<Output = T>>>;
35+
/// A future that has been pinned.
36+
pub type PinnedFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
37+
/// A future that has been pinned.
38+
pub type PinnedLocalFuture<T> = Pin<Box<dyn Future<Output = T>>>;
3739

3840
static SPAWN: OnceLock<fn(PinnedFuture<()>)> = OnceLock::new();
3941
static SPAWN_LOCAL: OnceLock<fn(PinnedLocalFuture<()>)> = OnceLock::new();
@@ -284,63 +286,42 @@ impl Executor {
284286
.map_err(|_| ExecutorError::AlreadySet)?;
285287
Ok(())
286288
}
287-
}
288289

289-
#[cfg(test)]
290-
mod tests {
291-
#[cfg(feature = "futures-executor")]
292-
#[test]
293-
fn can_spawn_local_future() {
294-
use crate::Executor;
295-
use std::rc::Rc;
296-
_ = Executor::init_futures_executor();
297-
let rc = Rc::new(());
298-
Executor::spawn_local(async {
299-
_ = rc;
300-
});
301-
Executor::spawn(async {});
302-
}
290+
/// Globally sets a custom executor as the executor used to spawn tasks.
291+
///
292+
/// Returns `Err(_)` if an executor has already been set.
293+
pub fn init_custom_executor(
294+
custom_executor: impl CustomExecutor + 'static,
295+
) -> Result<(), ExecutorError> {
296+
static EXECUTOR: OnceLock<Box<dyn CustomExecutor>> = OnceLock::new();
297+
EXECUTOR
298+
.set(Box::new(custom_executor))
299+
.map_err(|_| ExecutorError::AlreadySet)?;
303300

304-
#[cfg(feature = "futures-executor")]
305-
#[test]
306-
fn can_make_threaded_progress() {
307-
use crate::Executor;
308-
use std::sync::{atomic::AtomicUsize, Arc};
309-
_ = Executor::init_futures_executor();
310-
let counter = Arc::new(AtomicUsize::new(0));
311-
Executor::spawn({
312-
let counter = Arc::clone(&counter);
313-
async move {
314-
assert_eq!(
315-
counter.fetch_add(1, std::sync::atomic::Ordering::AcqRel),
316-
0
317-
);
318-
}
319-
});
320-
futures::executor::block_on(Executor::tick());
321-
assert_eq!(counter.load(std::sync::atomic::Ordering::Acquire), 1);
301+
SPAWN
302+
.set(|fut| {
303+
EXECUTOR.get().unwrap().spawn(fut);
304+
})
305+
.map_err(|_| ExecutorError::AlreadySet)?;
306+
SPAWN_LOCAL
307+
.set(|fut| EXECUTOR.get().unwrap().spawn_local(fut))
308+
.map_err(|_| ExecutorError::AlreadySet)?;
309+
POLL_LOCAL
310+
.set(|| EXECUTOR.get().unwrap().poll_local())
311+
.map_err(|_| ExecutorError::AlreadySet)?;
312+
Ok(())
322313
}
314+
}
323315

324-
#[cfg(feature = "futures-executor")]
325-
#[test]
326-
fn can_make_local_progress() {
327-
use crate::Executor;
328-
use std::sync::{atomic::AtomicUsize, Arc};
329-
_ = Executor::init_futures_executor();
330-
let counter = Arc::new(AtomicUsize::new(0));
331-
Executor::spawn_local({
332-
let counter = Arc::clone(&counter);
333-
async move {
334-
assert_eq!(
335-
counter.fetch_add(1, std::sync::atomic::Ordering::AcqRel),
336-
0
337-
);
338-
Executor::spawn_local(async {
339-
// Should not crash
340-
});
341-
}
342-
});
343-
Executor::poll_local();
344-
assert_eq!(counter.load(std::sync::atomic::Ordering::Acquire), 1);
345-
}
316+
/// A trait for custom executors.
317+
/// Custom executors can be used to integrate with any executor that supports spawning futures.
318+
///
319+
/// All methods can be called recursively.
320+
pub trait CustomExecutor: Send + Sync {
321+
/// Spawns a future, usually on a thread pool.
322+
fn spawn(&self, fut: PinnedFuture<()>);
323+
/// Spawns a local future. May require calling `poll_local` to make progress.
324+
fn spawn_local(&self, fut: PinnedLocalFuture<()>);
325+
/// Polls the executor, if it supports polling.
326+
fn poll_local(&self);
346327
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#[cfg(feature = "futures-executor")]
2+
use any_spawner::{CustomExecutor, Executor, PinnedFuture, PinnedLocalFuture};
3+
#[cfg(feature = "futures-executor")]
4+
#[test]
5+
fn can_create_custom_executor() {
6+
use futures::{
7+
executor::{LocalPool, LocalSpawner},
8+
task::LocalSpawnExt,
9+
};
10+
use std::{
11+
cell::RefCell,
12+
sync::{
13+
atomic::{AtomicUsize, Ordering},
14+
Arc,
15+
},
16+
};
17+
18+
thread_local! {
19+
static LOCAL_POOL: RefCell<LocalPool> = RefCell::new(LocalPool::new());
20+
static SPAWNER: LocalSpawner = LOCAL_POOL.with(|pool| pool.borrow().spawner());
21+
}
22+
23+
struct CustomFutureExecutor;
24+
impl CustomExecutor for CustomFutureExecutor {
25+
fn spawn(&self, _fut: PinnedFuture<()>) {
26+
panic!("not supported in this test");
27+
}
28+
29+
fn spawn_local(&self, fut: PinnedLocalFuture<()>) {
30+
SPAWNER.with(|spawner| {
31+
spawner.spawn_local(fut).expect("failed to spawn future");
32+
});
33+
}
34+
35+
fn poll_local(&self) {
36+
LOCAL_POOL.with(|pool| {
37+
if let Ok(mut pool) = pool.try_borrow_mut() {
38+
pool.run_until_stalled();
39+
}
40+
// If we couldn't borrow_mut, we're in a nested call to poll, so we don't need to do anything.
41+
});
42+
}
43+
}
44+
45+
Executor::init_custom_executor(CustomFutureExecutor)
46+
.expect("couldn't set executor");
47+
48+
let counter = Arc::new(AtomicUsize::new(0));
49+
let counter_clone = Arc::clone(&counter);
50+
Executor::spawn_local(async move {
51+
counter_clone.store(1, Ordering::Release);
52+
});
53+
Executor::poll_local();
54+
assert_eq!(counter.load(Ordering::Acquire), 1);
55+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#[cfg(feature = "futures-executor")]
2+
use any_spawner::Executor;
3+
// All tests in this file use the same executor.
4+
5+
#[cfg(feature = "futures-executor")]
6+
#[test]
7+
fn can_spawn_local_future() {
8+
use std::rc::Rc;
9+
10+
let _ = Executor::init_futures_executor();
11+
let rc = Rc::new(());
12+
Executor::spawn_local(async {
13+
_ = rc;
14+
});
15+
Executor::spawn(async {});
16+
}
17+
18+
#[cfg(feature = "futures-executor")]
19+
#[test]
20+
fn can_make_local_progress() {
21+
use std::sync::{
22+
atomic::{AtomicUsize, Ordering},
23+
Arc,
24+
};
25+
let _ = Executor::init_futures_executor();
26+
let counter = Arc::new(AtomicUsize::new(0));
27+
Executor::spawn_local({
28+
let counter = Arc::clone(&counter);
29+
async move {
30+
assert_eq!(counter.fetch_add(1, Ordering::AcqRel), 0);
31+
Executor::spawn_local(async {
32+
// Should not crash
33+
});
34+
}
35+
});
36+
Executor::poll_local();
37+
assert_eq!(counter.load(Ordering::Acquire), 1);
38+
}

0 commit comments

Comments
 (0)