Skip to content

Commit

Permalink
Merge pull request #1198 from Nemo157/new-nightly
Browse files Browse the repository at this point in the history
Update for new nightly
  • Loading branch information
MajorBreakfast authored Aug 14, 2018
2 parents dc159c9 + 8124b82 commit c02ec75
Show file tree
Hide file tree
Showing 54 changed files with 259 additions and 280 deletions.
4 changes: 2 additions & 2 deletions futures-channel/benches/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ fn notify_noop() -> LocalWaker {

fn noop_cx(f: impl FnOnce(&mut task::Context)) {
let pool = LocalPool::new();
let mut exec = pool.executor();
let mut spawn = pool.spawner();
let waker = notify_noop();
let cx = &mut task::Context::new(&waker, &mut exec);
let cx = &mut task::Context::new(&waker, &mut spawn);
f(cx)
}

Expand Down
2 changes: 1 addition & 1 deletion futures-core/src/task.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Task notification.
pub use core::task::{
Context, Poll, Executor,
Context, Poll, Spawn,
Waker, LocalWaker, UnsafeWake,
SpawnErrorKind, SpawnObjError, SpawnLocalObjError,
};
Expand Down
4 changes: 2 additions & 2 deletions futures-executor/benches/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ fn task_init(b: &mut Bencher) {
};

let pool = LocalPool::new();
let mut exec = pool.executor();
let mut spawn = pool.spawner();
let waker = notify_noop();
let mut cx = task::Context::new(&waker, &mut exec);
let mut cx = task::Context::new(&waker, &mut spawn);

b.iter(|| {
fut.num = 0;
Expand Down
2 changes: 1 addition & 1 deletion futures-executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ macro_rules! if_std {

if_std! {
mod local_pool;
pub use crate::local_pool::{block_on, block_on_stream, BlockingStream, LocalPool, LocalExecutor};
pub use crate::local_pool::{block_on, block_on_stream, BlockingStream, LocalPool, LocalSpawn};

mod unpark_mutex;
mod thread_pool;
Expand Down
64 changes: 32 additions & 32 deletions futures-executor/src/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use futures_core::future::{Future, FutureObj, LocalFutureObj};
use futures_core::stream::{Stream};
use futures_core::task::{
self, Context, Poll, LocalWaker, Wake,
Executor, SpawnObjError, SpawnLocalObjError, SpawnErrorKind
Spawn, SpawnObjError, SpawnLocalObjError, SpawnErrorKind
};
use futures_util::stream::FuturesUnordered;
use futures_util::stream::StreamExt;
Expand All @@ -24,20 +24,20 @@ use std::thread::{self, Thread};
/// little work in between I/O actions.
///
/// To get a handle to the pool that implements
/// [`Executor`](futures_core::task::Executor), use the
/// [`executor()`](LocalPool::executor) method. Because the executor is
/// [`Spawn`](futures_core::task::Spawn), use the
/// [`spawner()`](LocalPool::spawner) method. Because the executor is
/// single-threaded, it supports a special form of task spawning for non-`Send`
/// futures, via [`spawn_local_obj`](LocalExecutor::spawn_local_obj).
/// futures, via [`spawn_local_obj`](LocalSpawn::spawn_local_obj).
#[derive(Debug)]
pub struct LocalPool {
pool: FuturesUnordered<LocalFutureObj<'static, ()>>,
incoming: Rc<Incoming>,
}

/// A handle to a [`LocalPool`](LocalPool) that implements
/// [`Executor`](futures_core::task::Executor).
/// [`Spawn`](futures_core::task::Spawn).
#[derive(Clone, Debug)]
pub struct LocalExecutor {
pub struct LocalSpawn {
incoming: Weak<Incoming>,
}

Expand All @@ -59,7 +59,7 @@ impl Wake for ThreadNotify {
}
}

// Set up and run a basic single-threaded executor loop, invocing `f` on each
// Set up and run a basic single-threaded spawner loop, invoking `f` on each
// turn.
fn run_executor<T, F: FnMut(&LocalWaker) -> Poll<T>>(mut f: F) -> T {
let _enter = enter()
Expand Down Expand Up @@ -87,71 +87,71 @@ impl LocalPool {
}
}

/// Get a clonable handle to the pool as an executor.
pub fn executor(&self) -> LocalExecutor {
LocalExecutor {
/// Get a clonable handle to the pool as a [`Spawn`].
pub fn spawner(&self) -> LocalSpawn {
LocalSpawn {
incoming: Rc::downgrade(&self.incoming)
}
}

/// Run all tasks in the pool to completion.
///
/// The given executor, `exec`, is used as the default executor for any
/// The given spawner, `spawn`, is used as the default spawner for any
/// *newly*-spawned tasks. You can route these additional tasks back into
/// the `LocalPool` by using its executor handle:
/// the `LocalPool` by using its spawner handle:
///
/// ```
/// use futures::executor::LocalPool;
///
/// let mut pool = LocalPool::new();
/// let mut exec = pool.executor();
/// let mut spawn = pool.spawner();
///
/// // ... spawn some initial tasks using `exec.spawn()` or `exec.spawn_local()`
/// // ... spawn some initial tasks using `spawn.spawn()` or `spawn.spawn_local()`
///
/// // run *all* tasks in the pool to completion, including any newly-spawned ones.
/// pool.run(&mut exec);
/// pool.run(&mut spawn);
/// ```
///
/// The function will block the calling thread until *all* tasks in the pool
/// are complete, including any spawned while running existing tasks.
pub fn run<Exec>(&mut self, exec: &mut Exec) where Exec: Executor + Sized {
run_executor(|local_waker| self.poll_pool(local_waker, exec))
pub fn run<Sp>(&mut self, spawn: &mut Sp) where Sp: Spawn + Sized {
run_executor(|local_waker| self.poll_pool(local_waker, spawn))
}

/// Runs all the tasks in the pool until the given future completes.
///
/// The given executor, `exec`, is used as the default executor for any
/// The given spawner, `spawn`, is used as the default spawner for any
/// *newly*-spawned tasks. You can route these additional tasks back into
/// the `LocalPool` by using its executor handle:
/// the `LocalPool` by using its spawner handle:
///
/// ```
/// #![feature(pin, arbitrary_self_types, futures_api)]
/// use futures::executor::LocalPool;
/// use futures::future::ready;
///
/// let mut pool = LocalPool::new();
/// let mut exec = pool.executor();
/// let mut spawn = pool.spawner();
/// # let my_app = ready(());
///
/// // run tasks in the pool until `my_app` completes, by default spawning
/// // further tasks back onto the pool
/// pool.run_until(my_app, &mut exec);
/// pool.run_until(my_app, &mut spawn);
/// ```
///
/// The function will block the calling thread *only* until the future `f`
/// completes; there may still be incomplete tasks in the pool, which will
/// be inert after the call completes, but can continue with further use of
/// `run` or `run_until`. While the function is running, however, all tasks
/// in the pool will try to make progress.
pub fn run_until<F, Exec>(&mut self, future: F, exec: &mut Exec)
pub fn run_until<F, Sp>(&mut self, future: F, spawn: &mut Sp)
-> F::Output
where F: Future, Exec: Executor + Sized
where F: Future, Sp: Spawn + Sized
{
pin_mut!(future);

run_executor(|local_waker| {
{
let mut main_cx = Context::new(local_waker, exec);
let mut main_cx = Context::new(local_waker, spawn);

// if our main task is done, so are we
let result = future.reborrow().poll(&mut main_cx);
Expand All @@ -160,19 +160,19 @@ impl LocalPool {
}
}

self.poll_pool(local_waker, exec);
self.poll_pool(local_waker, spawn);
Poll::Pending
})
}

// Make maximal progress on the entire pool of spawned task, returning `Ready`
// if the pool is empty and `Pending` if no further progress can be made.
fn poll_pool<Exec>(&mut self, local_waker: &LocalWaker, exec: &mut Exec)
fn poll_pool<Sp>(&mut self, local_waker: &LocalWaker, spawn: &mut Sp)
-> Poll<()>
where Exec: Executor + Sized
where Sp: Spawn + Sized
{
// state for the FuturesUnordered, which will never be used
let mut pool_cx = Context::new(local_waker, exec);
let mut pool_cx = Context::new(local_waker, spawn);

loop {
// empty the incoming queue of newly-spawned tasks
Expand Down Expand Up @@ -215,7 +215,7 @@ lazy_static! {
/// Run a future to completion on the current thread.
///
/// This function will block the caller until the given future has completed.
/// The default executor for the future is a global `ThreadPool`.
/// The default spawner for the future is a global `ThreadPool`.
///
/// Use a [`LocalPool`](LocalPool) if you need finer-grained control over
/// spawned tasks.
Expand All @@ -228,7 +228,7 @@ pub fn block_on<F: Future>(f: F) -> F::Output {
///
/// When `next` is called on the resulting `BlockingStream`, the caller
/// will be blocked until the next element of the `Stream` becomes available.
/// The default executor for the future is a global `ThreadPool`.
/// The default spawner for the future is a global `ThreadPool`.
pub fn block_on_stream<S: Stream + Unpin>(stream: S) -> BlockingStream<S> {
BlockingStream { stream }
}
Expand Down Expand Up @@ -264,7 +264,7 @@ impl<S: Stream + Unpin> Iterator for BlockingStream<S> {
}
}

impl Executor for LocalExecutor {
impl Spawn for LocalSpawn {
fn spawn_obj(
&mut self,
future: FutureObj<'static, ()>,
Expand All @@ -286,7 +286,7 @@ impl Executor for LocalExecutor {
}
}

impl LocalExecutor {
impl LocalSpawn {
/// Spawn a non-`Send` future onto the associated [`LocalPool`](LocalPool).
pub fn spawn_local_obj(
&mut self,
Expand Down
8 changes: 4 additions & 4 deletions futures-executor/src/thread_pool.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::enter;
use crate::unpark_mutex::UnparkMutex;
use futures_core::future::{Future, FutureObj};
use futures_core::task::{self, Poll, Wake, Executor, SpawnObjError};
use futures_core::task::{self, Poll, Wake, Spawn, SpawnObjError};
use futures_util::future::FutureExt;
use futures_util::task::local_waker_ref_from_nonlocal;
use num_cpus;
Expand Down Expand Up @@ -85,12 +85,12 @@ impl ThreadPool {
ThreadPoolBuilder::new()
}

/// Runs the given future with this thread pool as the default executor for
/// Runs the given future with this thread pool as the default spawner for
/// spawning tasks.
///
/// **This function will block the calling thread** until the given future
/// is complete. While executing that future, any tasks spawned onto the
/// default executor will be routed to this thread pool.
/// default spawner will be routed to this thread pool.
///
/// Note that the function will return when the provided future completes,
/// even if some of the tasks it spawned are still running.
Expand All @@ -99,7 +99,7 @@ impl ThreadPool {
}
}

impl Executor for ThreadPool {
impl Spawn for ThreadPool {
fn spawn_obj(
&mut self,
future: FutureObj<'static, ()>,
Expand Down
Loading

0 comments on commit c02ec75

Please sign in to comment.