diff --git a/futures-executor/src/lib.rs b/futures-executor/src/lib.rs index a0dc49d7b..3acfe8549 100644 --- a/futures-executor/src/lib.rs +++ b/futures-executor/src/lib.rs @@ -53,7 +53,10 @@ extern crate std; #[cfg(feature = "std")] mod local_pool; #[cfg(feature = "std")] -pub use crate::local_pool::{block_on, block_on_stream, BlockingStream, LocalPool, LocalSpawner}; +pub use crate::local_pool::{ + block_on, block_on_stream, block_on_timeout, BlockingStream, LocalPool, LocalSpawner, + TimeoutError, +}; #[cfg(feature = "thread-pool")] #[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))] diff --git a/futures-executor/src/local_pool.rs b/futures-executor/src/local_pool.rs index 90c2a4152..95ff84a92 100644 --- a/futures-executor/src/local_pool.rs +++ b/futures-executor/src/local_pool.rs @@ -2,19 +2,19 @@ use crate::enter; use futures_core::future::Future; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; -use futures_task::{waker_ref, ArcWake}; -use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; +use futures_task::{waker_ref, ArcWake, FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; use futures_util::pin_mut; -use futures_util::stream::FuturesUnordered; -use futures_util::stream::StreamExt; +use futures_util::stream::{FuturesUnordered, StreamExt}; use std::cell::RefCell; +use std::error::Error; +use std::fmt; use std::ops::{Deref, DerefMut}; +use std::pin::Pin; use std::rc::{Rc, Weak}; -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, -}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use std::thread::{self, Thread}; +use std::time::{Duration, Instant}; use std::vec::Vec; /// A single-threaded task pool for polling futures to completion. @@ -75,9 +75,29 @@ impl ArcWake for ThreadNotify { } } +/// An error returned when a blocking execution times out. +#[non_exhaustive] +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub struct TimeoutError; + +impl fmt::Display for TimeoutError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "blocking execution has timed out") + } +} + +impl Error for TimeoutError {} + // Set up and run a basic single-threaded spawner loop, invoking `f` on each // turn. -fn run_executor) -> Poll>(mut f: F) -> T { +fn run_executor_impl< + T, + F: FnMut(&mut Context<'_>) -> Poll, + P: FnMut() -> Result<(), TimeoutError>, +>( + mut f: F, + mut park: P, +) -> Result { let _enter = enter().expect( "cannot execute `LocalPool` executor from within \ another executor", @@ -88,7 +108,7 @@ fn run_executor) -> Poll>(mut f: F) -> T { let mut cx = Context::from_waker(&waker); loop { if let Poll::Ready(t) = f(&mut cx) { - return t; + return Ok(t); } // Wait for a wakeup. @@ -96,12 +116,37 @@ fn run_executor) -> Poll>(mut f: F) -> T { // No wakeup occurred. It may occur now, right before parking, // but in that case the token made available by `unpark()` // is guaranteed to still be available and `park()` is a no-op. - thread::park(); + park()?; } } }) } +fn run_executor) -> Poll>(f: F) -> T { + let park = || { + thread::park(); + Ok(()) + }; + run_executor_impl(f, park).unwrap() +} + +fn run_executor_timeout) -> Poll>( + f: F, + mut timeout: Duration, +) -> Result { + let start = Instant::now(); + let park = || { + thread::park_timeout(timeout); + let elapsed = start.elapsed(); + if elapsed >= timeout { + return Err(TimeoutError); + } + timeout -= elapsed; + Ok(()) + }; + run_executor_impl(f, park) +} + /// Check for a wakeup, but don't consume it. fn woken() -> bool { CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::Acquire)) @@ -137,6 +182,32 @@ impl LocalPool { run_executor(|cx| self.poll_pool(cx)) } + /// Run all tasks in the pool to completion, or return an error if the execution + /// times out. + /// ``` + /// use std::time::Duration; + /// use futures::executor::LocalPool; + /// + /// let mut pool = LocalPool::new(); + /// + /// // ... spawn some initial tasks using `spawn.spawn()` or `spawn.spawn_local()` + /// + /// // run *all* tasks in the pool to completion, including any newly-spawned ones. + /// if let Err(err) = pool.run_timeout(Duration::from_secs(10)) { + /// println!("{}", err); + /// } + /// ``` + /// + /// The function will block the calling thread until *all* tasks in the pool + /// are complete, including any spawned while running existing tasks, or until + /// the given timeout is reached. + /// In this case, there may still be incomplete tasks in the pool, which will + /// be inert after the call completes, but can continue with further use of + /// one of the pool's run or poll methods. + pub fn run_timeout(&mut self, timeout: Duration) -> Result<(), TimeoutError> { + run_executor_timeout(|cx| self.poll_pool(cx), timeout) + } + /// Runs all the tasks in the pool until the given future completes. /// /// ``` @@ -156,19 +227,39 @@ impl LocalPool { /// however, all tasks in the pool will try to make progress. pub fn run_until(&mut self, future: F) -> F::Output { pin_mut!(future); + run_executor(|cx| self.poll_pool_until(cx, &mut future)) + } - run_executor(|cx| { - { - // if our main task is done, so are we - let result = future.as_mut().poll(cx); - if let Poll::Ready(output) = result { - return Poll::Ready(output); - } - } - - let _ = self.poll_pool(cx); - Poll::Pending - }) + /// Runs all the tasks in the pool until the given future completes, or return + /// an error if the execution times out. + /// + /// ``` + /// use std::time::Duration; + /// use futures::executor::LocalPool; + /// + /// let mut pool = LocalPool::new(); + /// # let my_app = async {}; + /// + /// // run tasks in the pool until `my_app` completes or times out + /// match pool.run_until_timeout(my_app, Duration::from_secs(10)) { + /// Ok(res) => { /*...*/ } + /// Err(err) => println!("{}", err) + /// } + /// ``` + /// + /// The function will block the calling thread *only* until the future `f` + /// completes, or until the given timeout is reached; there may still be + /// incomplete tasks in the pool, which will be inert after the call + /// completes, but can continue with further use of one of the pool's run + /// or poll methods. While the function is running, however, all tasks in + /// the pool will try to make progress. + pub fn run_until_timeout( + &mut self, + future: F, + timeout: Duration, + ) -> Result { + pin_mut!(future); + run_executor_timeout(|cx| self.poll_pool_until(cx, &mut future), timeout) } /// Runs all tasks and returns after completing one future or until no more progress @@ -291,6 +382,20 @@ impl LocalPool { } } + /// Try to poll the given future, and poll the pool in case it's pending. + fn poll_pool_until( + &mut self, + cx: &mut Context<'_>, + future: &mut Pin<&mut F>, + ) -> Poll { + // if our main task is done, so are we + if let Poll::Ready(output) = future.as_mut().poll(cx) { + return Poll::Ready(output); + } + let _ = self.poll_pool(cx); + Poll::Pending + } + /// Empty the incoming queue of newly-spawned tasks. fn drain_incoming(&mut self) { let mut incoming = self.incoming.borrow_mut(); @@ -316,6 +421,18 @@ pub fn block_on(f: F) -> F::Output { run_executor(|cx| f.as_mut().poll(cx)) } +/// Run a future to completion on the current thread, or return an error +/// if the execution times out. +/// +/// This function will block the caller until the given future has completed, +/// or until the given timeout is reached. +/// +/// Use a [`LocalPool`] if you need finer-grained control over spawned tasks. +pub fn block_on_timeout(f: F, timeout: Duration) -> Result { + pin_mut!(f); + run_executor_timeout(|cx| f.as_mut().poll(cx), timeout) +} + /// Turn a stream into a blocking iterator. /// /// When `next` is called on the resulting `BlockingStream`, the caller diff --git a/futures-executor/tests/local_pool.rs b/futures-executor/tests/local_pool.rs index 131f37934..4973805e3 100644 --- a/futures-executor/tests/local_pool.rs +++ b/futures-executor/tests/local_pool.rs @@ -64,6 +64,19 @@ fn run_until_executes_spawned() { pool.run_until(rx).unwrap(); } +#[test] +fn run_until_timeout() { + let mut pool = LocalPool::new(); + assert!(pool.run_until_timeout(pending(), Duration::from_millis(1)).is_err()) +} + +#[test] +fn run_timeout() { + let mut pool = LocalPool::new(); + pool.spawner().spawn_local(pending()).unwrap(); + assert!(pool.run_timeout(Duration::from_millis(1)).is_err()) +} + #[test] fn run_returns_if_empty() { let mut pool = LocalPool::new(); diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 839a0a1f0..a0aaa5dcc 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -189,8 +189,8 @@ pub mod executor { //! [`spawn_local_obj`]: https://docs.rs/futures/0.3/futures/task/trait.LocalSpawn.html#tymethod.spawn_local_obj pub use futures_executor::{ - block_on, block_on_stream, enter, BlockingStream, Enter, EnterError, LocalPool, - LocalSpawner, + block_on, block_on_stream, block_on_timeout, enter, BlockingStream, Enter, EnterError, + LocalPool, LocalSpawner, TimeoutError, }; #[cfg(feature = "thread-pool")]