From 144b6acfc53d12e9dc5b0f95961fdc4b606ec7d7 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Mon, 31 Aug 2020 14:55:29 +0000 Subject: [PATCH 01/10] rt: Allow concurrent block_on's with basic_scheduler This allows us to concurrently call `Runtime::block_on` with the basic_scheduler and allowing other threads to steal the dedicated parker. --- tokio/src/park/thread.rs | 12 +- tokio/src/runtime/basic_scheduler.rs | 172 ++++++++++++++++++++++----- tokio/src/runtime/builder.rs | 2 +- tokio/src/runtime/mod.rs | 23 +--- tokio/src/util/mod.rs | 4 + tokio/tests/rt_common.rs | 35 ++++++ 6 files changed, 191 insertions(+), 57 deletions(-) diff --git a/tokio/src/park/thread.rs b/tokio/src/park/thread.rs index 9ed41310fa5..df40f9010a7 100644 --- a/tokio/src/park/thread.rs +++ b/tokio/src/park/thread.rs @@ -21,8 +21,8 @@ pub(crate) struct UnparkThread { #[derive(Debug)] struct Inner { state: AtomicUsize, - mutex: Mutex<()>, - condvar: Condvar, + mutex: Arc>, + condvar: Arc, } const EMPTY: usize = 0; @@ -37,11 +37,15 @@ thread_local! { impl ParkThread { pub(crate) fn new() -> Self { + ParkThread::with_condvar(Arc::new(Condvar::new()), Arc::new(Mutex::new(()))) + } + + pub(crate) fn with_condvar(condvar: Arc, mutex: Arc>) -> Self { Self { inner: Arc::new(Inner { state: AtomicUsize::new(EMPTY), - mutex: Mutex::new(()), - condvar: Condvar::new(), + mutex, + condvar, }), } } diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 48cff709d46..860ea4034e5 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -1,22 +1,35 @@ -use crate::park::{Park, Unpark}; +use crate::loom; +use crate::loom::sync::{Condvar, Mutex}; +use crate::park::{Park, ParkThread, Unpark}; use crate::runtime; use crate::runtime::task::{self, JoinHandle, Schedule, Task}; use crate::util::linked_list::LinkedList; -use crate::util::{waker_ref, Wake}; +use crate::util::{waker_ref, Wake, WakerRef}; use std::cell::RefCell; use std::collections::VecDeque; use std::fmt; use std::future::Future; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::task::Poll::Ready; use std::time::Duration; /// Executes tasks on the current thread -pub(crate) struct BasicScheduler

-where - P: Park, -{ +pub(crate) struct BasicScheduler { + /// Inner with the dedicated parker P + inner: Mutex>>, + + /// Sync items used to notify other threads that called + /// block_on concurrently that the dedicated driver is available + /// to steal + condvar: loom::sync::Arc, + mutex: loom::sync::Arc>, + + /// Sendable task spawner + spawner: Spawner, +} + +struct Inner { /// Scheduler run queue /// /// When the scheduler is executed, the queue is removed from `self` and @@ -59,7 +72,7 @@ struct Shared { unpark: Box, } -/// Thread-local context +/// Thread-local context. struct Context { /// Shared scheduler state shared: Arc, @@ -68,16 +81,16 @@ struct Context { tasks: RefCell, } -/// Initial queue capacity +/// Initial queue capacity. const INITIAL_CAPACITY: usize = 64; /// Max number of tasks to poll per tick. const MAX_TASKS_PER_TICK: usize = 61; -/// How often ot check the remote queue first +/// How often to check the remote queue first. const REMOTE_FIRST_INTERVAL: u8 = 31; -// Tracks the current BasicScheduler +// Tracks the current BasicScheduler. scoped_thread_local!(static CURRENT: Context); impl

BasicScheduler

@@ -87,19 +100,28 @@ where pub(crate) fn new(park: P) -> BasicScheduler

{ let unpark = Box::new(park.unpark()); - BasicScheduler { + let spawner = Spawner { + shared: Arc::new(Shared { + queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), + unpark: unpark as Box, + }), + }; + + let inner = Mutex::new(Some(Inner { tasks: Some(Tasks { owned: LinkedList::new(), queue: VecDeque::with_capacity(INITIAL_CAPACITY), }), - spawner: Spawner { - shared: Arc::new(Shared { - queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)), - unpark: unpark as Box, - }), - }, + spawner: spawner.clone(), tick: 0, park, + })); + + BasicScheduler { + inner, + spawner, + condvar: loom::sync::Arc::new(Condvar::new()), + mutex: loom::sync::Arc::new(Mutex::new(())), } } @@ -108,7 +130,6 @@ where } /// Spawns a future onto the thread pool - #[allow(dead_code)] pub(crate) fn spawn(&self, future: F) -> JoinHandle where F: Future + Send + 'static, @@ -117,13 +138,60 @@ where self.spawner.spawn(future) } - pub(crate) fn block_on(&mut self, future: F) -> F::Output - where - F: Future, - { + pub(crate) fn block_on(&self, future: F) -> F::Output { + // If we can steal the dedicated parker than lets block_on that + // otherwise, lets block_on and attempt to steal it back if we can. + if let Some(mut inner) = self.take_inner() { + inner.block_on(future) + } else { + // TODO: should this be false or true? In the origina block_on for + // basic_scheduler we have false? + let enter = crate::runtime::enter(false); + + let mut park = ParkThread::with_condvar(self.condvar.clone(), self.mutex.clone()); + let waker = park.unpark().into_waker(); + let mut cx = std::task::Context::from_waker(&waker); + + pin!(future); + + loop { + if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) { + return v; + } + + // Check if we can steal the driver + // TODO: Consider using an atomic load here intead of locking + // the mutex. + if let Some(mut inner) = self.take_inner() { + // We will enter again on in the inner implementation below + drop(enter); + return inner.block_on(future); + } + + // Park this thread waiting for some external wake up from + // a waker or a notification that we can steal the driver again. + park.park().expect("failed to park"); + } + } + } + + fn take_inner(&self) -> Option> { + let mut lock = self.inner.lock().unwrap(); + let inner = lock.take()?; + + Some(InnerGuard { + inner: Some(inner), + scheduler: &self, + }) + } +} + +impl Inner

{ + /// Block on the future provided and drive the runtime's driver. + fn block_on(&mut self, future: F) -> F::Output { enter(self, |scheduler, context| { let _enter = runtime::enter(false); - let waker = waker_ref(&scheduler.spawner.shared); + let waker = scheduler.spawner.waker_ref(); let mut cx = std::task::Context::from_waker(&waker); pin!(future); @@ -178,16 +246,16 @@ where /// Enter the scheduler context. This sets the queue and other necessary /// scheduler state in the thread-local -fn enter(scheduler: &mut BasicScheduler

, f: F) -> R +fn enter(scheduler: &mut Inner

, f: F) -> R where - F: FnOnce(&mut BasicScheduler

, &Context) -> R, + F: FnOnce(&mut Inner

, &Context) -> R, P: Park, { // Ensures the run queue is placed back in the `BasicScheduler` instance // once `block_on` returns.` struct Guard<'a, P: Park> { context: Option, - scheduler: &'a mut BasicScheduler

, + scheduler: &'a mut Inner

, } impl Drop for Guard<'_, P> { @@ -214,12 +282,15 @@ where CURRENT.set(context, || f(scheduler, context)) } -impl

Drop for BasicScheduler

-where - P: Park, -{ +impl Drop for BasicScheduler

{ fn drop(&mut self) { - enter(self, |scheduler, context| { + let mut inner = { + let mut lock = self.inner.lock().expect("BasicScheduler Inner lock"); + lock.take() + .expect("Oh no! We never placed the Inner state back!") + }; + + enter(&mut inner, |scheduler, context| { // Loop required here to ensure borrow is dropped between iterations #[allow(clippy::while_let_loop)] loop { @@ -269,6 +340,10 @@ impl Spawner { fn pop(&self) -> Option>> { self.shared.queue.lock().unwrap().pop_front() } + + fn waker_ref(&self) -> WakerRef<'_> { + waker_ref(&self.shared) + } } impl fmt::Debug for Spawner { @@ -325,3 +400,36 @@ impl Wake for Shared { arc_self.unpark.unpark(); } } + +// ===== InnerGuard ===== + +/// Used to ensure we always place the Inner value +/// back into its slot in `BasicScheduler` even if the +/// future panics. +struct InnerGuard<'a, P: Park> { + inner: Option>, + scheduler: &'a BasicScheduler

, +} + +impl InnerGuard<'_, P> { + fn block_on(&mut self, future: F) -> F::Output { + // The only time inner gets set to `None` is if we have dropped + // already so this unwrap is safe. + self.inner.as_mut().unwrap().block_on(future) + } +} + +impl Drop for InnerGuard<'_, P> { + fn drop(&mut self) { + if let Some(inner) = self.inner.take() { + let mut lock = self.scheduler.inner.lock().unwrap(); + lock.replace(inner); + + // Wake up possible other threads + // notifying them that they might need + // to steal the driver. + drop(self.scheduler.mutex.lock().unwrap()); + self.scheduler.condvar.notify_one(); + } + } +} diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index db01cf5871e..1920201d810 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -463,7 +463,7 @@ cfg_rt_core! { let blocking_spawner = blocking_pool.spawner().clone(); Ok(Runtime { - kind: Kind::Basic(Mutex::new(Some(scheduler))), + kind: Kind::Basic(scheduler), handle: Handle { spawner, io_handle, diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 9d26446bf7e..38015127eaa 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -293,7 +293,7 @@ enum Kind { /// Execute all tasks on the current-thread. #[cfg(feature = "rt-core")] - Basic(Mutex>>), + Basic(BasicScheduler), /// Execute tasks across multiple threads. #[cfg(feature = "rt-threaded")] @@ -398,7 +398,7 @@ impl Runtime { Kind::Shell(_) => panic!("task execution disabled"), #[cfg(feature = "rt-threaded")] Kind::ThreadPool(exec) => exec.spawn(future), - Kind::Basic(_exec) => self.handle.spawner.spawn(future), + Kind::Basic(exec) => exec.spawn(future), } } @@ -458,24 +458,7 @@ impl Runtime { } } #[cfg(feature = "rt-core")] - Kind::Basic(exec) => { - // TODO(lucio): clean this up and move this impl into - // `basic_scheduler.rs`, this is hacky and bad but will work for - // now. - let exec_temp = { - let mut lock = exec.lock().unwrap(); - lock.take() - }; - - if let Some(mut exec_temp) = exec_temp { - let res = exec_temp.block_on(future); - exec.lock().unwrap().replace(exec_temp); - res - } else { - let mut enter = crate::runtime::enter(true); - enter.block_on(future).unwrap() - } - } + Kind::Basic(exec) => exec.block_on(future), #[cfg(feature = "rt-threaded")] Kind::ThreadPool(exec) => exec.block_on(future), }) diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index c5439f4878b..278d6343784 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -12,6 +12,10 @@ mod rand; mod wake; pub(crate) use wake::{waker_ref, Wake}; +cfg_rt_core! { + pub(crate) use wake::WakerRef; +} + cfg_rt_threaded! { pub(crate) use rand::FastRand; diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 35e2ea81a02..e3c75cb99d5 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -574,6 +574,41 @@ rt_test! { }); } + #[test] + fn always_active_parker() { + // This test it to show that we will always have + // an active parker even if we call block_on concurrently + + // run multiple times to catch any odd concurrency bugs + for _ in 0..100 { + let rt = rt(); + let rt2 = rt.clone(); + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + let jh1 = thread::spawn(move || { + rt.block_on(async move { + rx2.await.unwrap(); + time::delay_for(Duration::from_millis(5)).await; + tx1.send(()).unwrap(); + }); + }); + + let jh2 = thread::spawn(move || { + rt2.block_on(async move { + tx2.send(()).unwrap(); + time::delay_for(Duration::from_millis(5)).await; + rx1.await.unwrap(); + time::delay_for(Duration::from_millis(5)).await; + }); + }); + + jh1.join().unwrap(); + jh2.join().unwrap(); + } + } + #[test] // IOCP requires setting the "max thread" concurrency value. The sane, // default, is to set this to the number of cores. Threads that poll I/O From 61133a353334a07d33f80a7c17c520d1dd1ba286 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Tue, 1 Sep 2020 14:05:48 +0000 Subject: [PATCH 02/10] Reduce iterations for test --- tokio/tests/rt_common.rs | 41 +++++++++++++++++++--------------------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index e3c75cb99d5..a8968be1b96 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -579,34 +579,31 @@ rt_test! { // This test it to show that we will always have // an active parker even if we call block_on concurrently - // run multiple times to catch any odd concurrency bugs - for _ in 0..100 { - let rt = rt(); - let rt2 = rt.clone(); - - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); + let rt = rt(); + let rt2 = rt.clone(); - let jh1 = thread::spawn(move || { - rt.block_on(async move { - rx2.await.unwrap(); - time::delay_for(Duration::from_millis(5)).await; - tx1.send(()).unwrap(); - }); - }); + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); - let jh2 = thread::spawn(move || { - rt2.block_on(async move { - tx2.send(()).unwrap(); - time::delay_for(Duration::from_millis(5)).await; - rx1.await.unwrap(); + let jh1 = thread::spawn(move || { + rt.block_on(async move { + rx2.await.unwrap(); time::delay_for(Duration::from_millis(5)).await; + tx1.send(()).unwrap(); }); + }); + + let jh2 = thread::spawn(move || { + rt2.block_on(async move { + tx2.send(()).unwrap(); + time::delay_for(Duration::from_millis(5)).await; + rx1.await.unwrap(); + time::delay_for(Duration::from_millis(5)).await; }); + }); - jh1.join().unwrap(); - jh2.join().unwrap(); - } + jh1.join().unwrap(); + jh2.join().unwrap(); } #[test] From cb9136838a724323aaf3751870ec73ebfe8abddf Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Tue, 8 Sep 2020 14:39:11 -0500 Subject: [PATCH 03/10] Refactor to use `VecDeque` --- tokio/src/park/thread.rs | 12 ++--- tokio/src/runtime/basic_scheduler.rs | 74 ++++++++++++++++------------ 2 files changed, 46 insertions(+), 40 deletions(-) diff --git a/tokio/src/park/thread.rs b/tokio/src/park/thread.rs index df40f9010a7..9ed41310fa5 100644 --- a/tokio/src/park/thread.rs +++ b/tokio/src/park/thread.rs @@ -21,8 +21,8 @@ pub(crate) struct UnparkThread { #[derive(Debug)] struct Inner { state: AtomicUsize, - mutex: Arc>, - condvar: Arc, + mutex: Mutex<()>, + condvar: Condvar, } const EMPTY: usize = 0; @@ -37,15 +37,11 @@ thread_local! { impl ParkThread { pub(crate) fn new() -> Self { - ParkThread::with_condvar(Arc::new(Condvar::new()), Arc::new(Mutex::new(()))) - } - - pub(crate) fn with_condvar(condvar: Arc, mutex: Arc>) -> Self { Self { inner: Arc::new(Inner { state: AtomicUsize::new(EMPTY), - mutex, - condvar, + mutex: Mutex::new(()), + condvar: Condvar::new(), }), } } diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index d1785a0cbfc..848a1ad07b5 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -1,5 +1,4 @@ -use crate::loom; -use crate::loom::sync::{Condvar, Mutex}; +use crate::loom::sync::Mutex; use crate::park::{Park, ParkThread, Unpark}; use crate::runtime; use crate::runtime::task::{self, JoinHandle, Schedule, Task}; @@ -11,7 +10,7 @@ use std::collections::VecDeque; use std::fmt; use std::future::Future; use std::sync::Arc; -use std::task::Poll::Ready; +use std::task::{Poll::Ready, Waker}; use std::time::Duration; /// Executes tasks on the current thread @@ -19,11 +18,8 @@ pub(crate) struct BasicScheduler { /// Inner with the dedicated parker P inner: Mutex>>, - /// Sync items used to notify other threads that called - /// block_on concurrently that the dedicated driver is available - /// to steal - condvar: loom::sync::Arc, - mutex: loom::sync::Arc>, + /// Queue of threads available to steal the dedicated parker P. + steal_queue: Mutex>, /// Sendable task spawner spawner: Spawner, @@ -120,8 +116,7 @@ where BasicScheduler { inner, spawner, - condvar: loom::sync::Arc::new(Condvar::new()), - mutex: loom::sync::Arc::new(Mutex::new(())), + steal_queue: Mutex::new(VecDeque::new()), } } @@ -139,16 +134,15 @@ where } pub(crate) fn block_on(&self, future: F) -> F::Output { - // If we can steal the dedicated parker than lets block_on that - // otherwise, lets block_on and attempt to steal it back if we can. + // If we can steal the dedicated parker then lets block_on that + // Otherwise, we'll block_on the future and attempt to steal the + // parker later, if we can. if let Some(mut inner) = self.take_inner() { inner.block_on(future) } else { - // TODO: should this be false or true? In the origina block_on for - // basic_scheduler we have false? let enter = crate::runtime::enter(false); - let mut park = ParkThread::with_condvar(self.condvar.clone(), self.mutex.clone()); + let mut park = ParkThread::new(); let waker = park.unpark().into_waker(); let mut cx = std::task::Context::from_waker(&waker); @@ -159,17 +153,23 @@ where return v; } - // Check if we can steal the driver + // Check if we can steal the dedicated parker P. + // // TODO: Consider using an atomic load here intead of locking // the mutex. if let Some(mut inner) = self.take_inner() { // We will enter again on in the inner implementation below drop(enter); return inner.block_on(future); + } else { + let waker = park.unpark().into_waker(); + let mut lock = self.steal_queue.lock().unwrap(); + lock.push_back(waker); } - // Park this thread waiting for some external wake up from - // a waker or a notification that we can steal the driver again. + // Park this thread, waiting for some external wakeup: either + // from the future we are currently polling or a wakeup from the + // block_on that contains the parker, notifying us to steal the parker. park.park().expect("failed to park"); } } @@ -284,11 +284,17 @@ where impl Drop for BasicScheduler

{ fn drop(&mut self) { - let mut inner = { - let mut lock = self.inner.lock().expect("BasicScheduler Inner lock"); - lock.take() - .expect("Oh no! We never placed the Inner state back!") - }; + // Avoid double panicking, since it makes debugging much harder. + if std::thread::panicking() { + return; + } + + let mut inner = self + .inner + .lock() + .unwrap() + .take() + .expect("Oh no! We never placed the Inner state back, this is a bug!"); enter(&mut inner, |scheduler, context| { // Loop required here to ensure borrow is dropped between iterations @@ -404,7 +410,7 @@ impl Wake for Shared { // ===== InnerGuard ===== /// Used to ensure we always place the Inner value -/// back into its slot in `BasicScheduler` even if the +/// back into its slot in `BasicScheduler`, even if the /// future panics. struct InnerGuard<'a, P: Park> { inner: Option>, @@ -421,15 +427,19 @@ impl InnerGuard<'_, P> { impl Drop for InnerGuard<'_, P> { fn drop(&mut self) { + // Avoid double panicking, since it makes debugging much harder. + if std::thread::panicking() { + return; + } + if let Some(inner) = self.inner.take() { - let mut lock = self.scheduler.inner.lock().unwrap(); - lock.replace(inner); - - // Wake up possible other threads - // notifying them that they might need - // to steal the driver. - drop(self.scheduler.mutex.lock().unwrap()); - self.scheduler.condvar.notify_one(); + self.scheduler.inner.lock().unwrap().replace(inner); + + // Wake up other possible threads that could steal + // the dedicated parker P. + if let Some(waker) = self.scheduler.steal_queue.lock().unwrap().pop_front() { + waker.wake(); + } } } } From 22dd7d5c7e0b61ea476d1a59b6baa474dd442a1d Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Tue, 8 Sep 2020 15:17:09 -0500 Subject: [PATCH 04/10] Avoid double panic in drops --- tokio/src/runtime/basic_scheduler.rs | 40 ++++++++++++++++------------ 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 848a1ad07b5..820a1876456 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -284,17 +284,18 @@ where impl Drop for BasicScheduler

{ fn drop(&mut self) { - // Avoid double panicking, since it makes debugging much harder. - if std::thread::panicking() { - return; - } - - let mut inner = self - .inner - .lock() - .unwrap() - .take() - .expect("Oh no! We never placed the Inner state back, this is a bug!"); + // Avoid a double panic if we are currently panicking and + // the lock may be poisoned. + let mut inner = if let Ok(lock) = &mut self.inner.lock() { + lock.take() + .expect("Oh no! We never placed the Inner state back, this is a bug!") + } else { + if std::thread::panicking() { + return; + } else { + panic!("Inner lock poisoned"); + } + }; enter(&mut inner, |scheduler, context| { // Loop required here to ensure borrow is dropped between iterations @@ -427,13 +428,18 @@ impl InnerGuard<'_, P> { impl Drop for InnerGuard<'_, P> { fn drop(&mut self) { - // Avoid double panicking, since it makes debugging much harder. - if std::thread::panicking() { - return; - } - if let Some(inner) = self.inner.take() { - self.scheduler.inner.lock().unwrap().replace(inner); + // Avoid a double panic if we are currently panicking and + // the lock may be poisoned. + if let Ok(lock) = &mut self.scheduler.inner.lock() { + lock.replace(inner); + } else { + if std::thread::panicking() { + return; + } else { + panic!("Inner lock poisoned"); + } + } // Wake up other possible threads that could steal // the dedicated parker P. From e7682bfd673b6c6b9b3f4731934c67a318348f6a Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Tue, 22 Sep 2020 19:35:49 +0000 Subject: [PATCH 05/10] Use `Notify` and clean up panicability of drops --- tokio/src/runtime/basic_scheduler.rs | 141 +++++++++++++++------------ tokio/src/sync/mod.rs | 7 ++ 2 files changed, 83 insertions(+), 65 deletions(-) diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 820a1876456..8cbd14363f0 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -1,31 +1,36 @@ use crate::loom::sync::Mutex; -use crate::park::{Park, ParkThread, Unpark}; +use crate::park::{CachedParkThread, Park, Unpark}; use crate::runtime; use crate::runtime::task::{self, JoinHandle, Schedule, Task}; +use crate::sync::Notify; use crate::util::linked_list::{Link, LinkedList}; use crate::util::{waker_ref, Wake, WakerRef}; -use std::cell::RefCell; use std::collections::VecDeque; use std::fmt; use std::future::Future; use std::sync::Arc; -use std::task::{Poll::Ready, Waker}; +use std::task::Poll::Ready; use std::time::Duration; +use std::{cell::RefCell, sync::PoisonError}; /// Executes tasks on the current thread pub(crate) struct BasicScheduler { - /// Inner with the dedicated parker P - inner: Mutex>>, - - /// Queue of threads available to steal the dedicated parker P. - steal_queue: Mutex>, + /// Inner state guarded by a mutex that is shared + /// between all `block_on` calls. + inner: Mutex>, /// Sendable task spawner spawner: Spawner, } struct Inner { + scheduler: Option>, + steal_queue: Arc, +} + +/// The inner scheduler that owns the task queue and the main parker P. +struct Scheduler { /// Scheduler run queue /// /// When the scheduler is executed, the queue is removed from `self` and @@ -89,10 +94,7 @@ const REMOTE_FIRST_INTERVAL: u8 = 31; // Tracks the current BasicScheduler. scoped_thread_local!(static CURRENT: Context); -impl

BasicScheduler

-where - P: Park, -{ +impl BasicScheduler

{ pub(crate) fn new(park: P) -> BasicScheduler

{ let unpark = Box::new(park.unpark()); @@ -103,7 +105,7 @@ where }), }; - let inner = Mutex::new(Some(Inner { + let scheduler = Some(Scheduler { tasks: Some(Tasks { owned: LinkedList::new(), queue: VecDeque::with_capacity(INITIAL_CAPACITY), @@ -111,13 +113,14 @@ where spawner: spawner.clone(), tick: 0, park, - })); + }); - BasicScheduler { - inner, - spawner, - steal_queue: Mutex::new(VecDeque::new()), - } + let inner = Mutex::new(Inner { + scheduler, + steal_queue: Arc::new(Notify::new()), + }); + + BasicScheduler { inner, spawner } } pub(crate) fn spawner(&self) -> &Spawner { @@ -142,29 +145,39 @@ where } else { let enter = crate::runtime::enter(false); - let mut park = ParkThread::new(); + let mut park = CachedParkThread::new(); let waker = park.unpark().into_waker(); let mut cx = std::task::Context::from_waker(&waker); pin!(future); + let notifier = { + let lock = self.inner.lock().unwrap(); + lock.steal_queue.clone() + }; + + let mut notified = Box::pin(notifier.notified()); + loop { - if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) { - return v; + if let Ready(_) = notified.as_mut().poll(&mut cx) { + // Check if we can steal the dedicated parker P. + // + // TODO: Consider using an atomic load here intead of locking + // the mutex. + if let Some(mut inner) = self.take_inner() { + // We will enter again on in the inner implementation below + drop(enter); + return inner.block_on(future); + } else { + // Since the notify future polled to ready, it will panic if polled again + // beyond ready. To avoid this, lets create a new future. This allocation is + // unfortunate but should be extremely rare. + notified = Box::pin(notifier.notified()); + } } - // Check if we can steal the dedicated parker P. - // - // TODO: Consider using an atomic load here intead of locking - // the mutex. - if let Some(mut inner) = self.take_inner() { - // We will enter again on in the inner implementation below - drop(enter); - return inner.block_on(future); - } else { - let waker = park.unpark().into_waker(); - let mut lock = self.steal_queue.lock().unwrap(); - lock.push_back(waker); + if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) { + return v; } // Park this thread, waiting for some external wakeup: either @@ -177,16 +190,16 @@ where fn take_inner(&self) -> Option> { let mut lock = self.inner.lock().unwrap(); - let inner = lock.take()?; + let inner = lock.scheduler.take()?; Some(InnerGuard { inner: Some(inner), - scheduler: &self, + basic_scheduler: &self, }) } } -impl Inner

{ +impl Scheduler

{ /// Block on the future provided and drive the runtime's driver. fn block_on(&mut self, future: F) -> F::Output { enter(self, |scheduler, context| { @@ -246,16 +259,16 @@ impl Inner

{ /// Enter the scheduler context. This sets the queue and other necessary /// scheduler state in the thread-local -fn enter(scheduler: &mut Inner

, f: F) -> R +fn enter(scheduler: &mut Scheduler

, f: F) -> R where - F: FnOnce(&mut Inner

, &Context) -> R, + F: FnOnce(&mut Scheduler

, &Context) -> R, P: Park, { // Ensures the run queue is placed back in the `BasicScheduler` instance // once `block_on` returns.` struct Guard<'a, P: Park> { context: Option, - scheduler: &'a mut Inner

, + scheduler: &'a mut Scheduler

, } impl Drop for Guard<'_, P> { @@ -286,14 +299,14 @@ impl Drop for BasicScheduler

{ fn drop(&mut self) { // Avoid a double panic if we are currently panicking and // the lock may be poisoned. - let mut inner = if let Ok(lock) = &mut self.inner.lock() { - lock.take() - .expect("Oh no! We never placed the Inner state back, this is a bug!") - } else { - if std::thread::panicking() { - return; - } else { - panic!("Inner lock poisoned"); + + let mut inner = { + let mut lock = self.inner.lock().unwrap_or_else(PoisonError::into_inner); + + match lock.scheduler.take() { + Some(inner) => inner, + None if std::thread::panicking() => return, + None => panic!("Oh no! We never placed the Inner state back, this is a bug!"), } }; @@ -414,8 +427,8 @@ impl Wake for Shared { /// back into its slot in `BasicScheduler`, even if the /// future panics. struct InnerGuard<'a, P: Park> { - inner: Option>, - scheduler: &'a BasicScheduler

, + inner: Option>, + basic_scheduler: &'a BasicScheduler

, } impl InnerGuard<'_, P> { @@ -428,24 +441,22 @@ impl InnerGuard<'_, P> { impl Drop for InnerGuard<'_, P> { fn drop(&mut self) { - if let Some(inner) = self.inner.take() { - // Avoid a double panic if we are currently panicking and - // the lock may be poisoned. - if let Ok(lock) = &mut self.scheduler.inner.lock() { - lock.replace(inner); - } else { - if std::thread::panicking() { - return; - } else { - panic!("Inner lock poisoned"); - } - } + if let Some(scheduler) = self.inner.take() { + // We can ignore the poison error here since we are + // just replacing the state. + let mut lock = self + .basic_scheduler + .inner + .lock() + .unwrap_or_else(PoisonError::into_inner); + + // Replace old scheduler back into the state to allow + // other threads to pick it up and drive it. + lock.scheduler.replace(scheduler); // Wake up other possible threads that could steal // the dedicated parker P. - if let Some(waker) = self.scheduler.steal_queue.lock().unwrap().pop_front() { - waker.wake(); - } + lock.steal_queue.notify_one() } } } diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index 5d66512d5ad..522987a26ce 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -458,6 +458,13 @@ cfg_sync! { pub mod watch; } +cfg_not_sync! { + cfg_rt_core! { + mod notify; + pub(crate) use notify::Notify; + } +} + cfg_not_sync! { cfg_atomic_waker_impl! { mod task; From 8d1edfa2df8f6f2b1467fb263c79932e5c126496 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Tue, 22 Sep 2020 20:48:06 +0000 Subject: [PATCH 06/10] Allow unreachable pub in non-sync settings for Notify --- tokio/src/runtime/basic_scheduler.rs | 8 ++++---- tokio/src/sync/notify.rs | 7 +++++++ 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 8cbd14363f0..6597fc99643 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -26,7 +26,7 @@ pub(crate) struct BasicScheduler { struct Inner { scheduler: Option>, - steal_queue: Arc, + notify: Arc, } /// The inner scheduler that owns the task queue and the main parker P. @@ -117,7 +117,7 @@ impl BasicScheduler

{ let inner = Mutex::new(Inner { scheduler, - steal_queue: Arc::new(Notify::new()), + notify: Arc::new(Notify::new()), }); BasicScheduler { inner, spawner } @@ -153,7 +153,7 @@ impl BasicScheduler

{ let notifier = { let lock = self.inner.lock().unwrap(); - lock.steal_queue.clone() + lock.notify.clone() }; let mut notified = Box::pin(notifier.notified()); @@ -456,7 +456,7 @@ impl Drop for InnerGuard<'_, P> { // Wake up other possible threads that could steal // the dedicated parker P. - lock.steal_queue.notify_one() + lock.notify.notify_one() } } } diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 4321c974608..02dcbe842b2 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -1,3 +1,10 @@ +// Allow `unreachable_pub` warnings when sync is not enabled +// due to the usage of `Notify` within the `rt-core` feature set. +// When this module is compiled with `sync` enabled we will warn on +// this lint. When `rt-core` is enabled we use `pub(crate)` which +// triggers this warning but it is safe to ignore in this case. +#![cfg_attr(not(feature = "sync"), allow(unreachable_pub))] + use crate::loom::sync::atomic::AtomicU8; use crate::loom::sync::Mutex; use crate::util::linked_list::{self, LinkedList}; From 0d758d9cd0531155103b4df2a392befc0600cd94 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Wed, 23 Sep 2020 14:15:29 +0000 Subject: [PATCH 07/10] Remove `Arc` around `Notify` --- tokio/src/runtime/basic_scheduler.rs | 72 +++++++++++++--------------- 1 file changed, 32 insertions(+), 40 deletions(-) diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 6597fc99643..df324bbabf0 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -18,19 +18,18 @@ use std::{cell::RefCell, sync::PoisonError}; pub(crate) struct BasicScheduler { /// Inner state guarded by a mutex that is shared /// between all `block_on` calls. - inner: Mutex>, + inner: Mutex>>, + + /// Notifier for waking up other threads to steal the + /// parker. + notify: Notify, /// Sendable task spawner spawner: Spawner, } -struct Inner { - scheduler: Option>, - notify: Arc, -} - /// The inner scheduler that owns the task queue and the main parker P. -struct Scheduler { +struct Inner { /// Scheduler run queue /// /// When the scheduler is executed, the queue is removed from `self` and @@ -105,7 +104,7 @@ impl BasicScheduler

{ }), }; - let scheduler = Some(Scheduler { + let inner = Mutex::new(Some(Inner { tasks: Some(Tasks { owned: LinkedList::new(), queue: VecDeque::with_capacity(INITIAL_CAPACITY), @@ -113,14 +112,13 @@ impl BasicScheduler

{ spawner: spawner.clone(), tick: 0, park, - }); + })); - let inner = Mutex::new(Inner { - scheduler, - notify: Arc::new(Notify::new()), - }); - - BasicScheduler { inner, spawner } + BasicScheduler { + inner, + notify: Notify::new(), + spawner, + } } pub(crate) fn spawner(&self) -> &Spawner { @@ -144,19 +142,13 @@ impl BasicScheduler

{ inner.block_on(future) } else { let enter = crate::runtime::enter(false); - let mut park = CachedParkThread::new(); let waker = park.unpark().into_waker(); let mut cx = std::task::Context::from_waker(&waker); pin!(future); - let notifier = { - let lock = self.inner.lock().unwrap(); - lock.notify.clone() - }; - - let mut notified = Box::pin(notifier.notified()); + let mut notified = Box::pin(self.notify.notified()); loop { if let Ready(_) = notified.as_mut().poll(&mut cx) { @@ -172,7 +164,7 @@ impl BasicScheduler

{ // Since the notify future polled to ready, it will panic if polled again // beyond ready. To avoid this, lets create a new future. This allocation is // unfortunate but should be extremely rare. - notified = Box::pin(notifier.notified()); + notified = Box::pin(self.notify.notified()); } } @@ -189,8 +181,7 @@ impl BasicScheduler

{ } fn take_inner(&self) -> Option> { - let mut lock = self.inner.lock().unwrap(); - let inner = lock.scheduler.take()?; + let inner = self.inner.lock().unwrap().take()?; Some(InnerGuard { inner: Some(inner), @@ -199,7 +190,7 @@ impl BasicScheduler

{ } } -impl Scheduler

{ +impl Inner

{ /// Block on the future provided and drive the runtime's driver. fn block_on(&mut self, future: F) -> F::Output { enter(self, |scheduler, context| { @@ -259,16 +250,16 @@ impl Scheduler

{ /// Enter the scheduler context. This sets the queue and other necessary /// scheduler state in the thread-local -fn enter(scheduler: &mut Scheduler

, f: F) -> R +fn enter(scheduler: &mut Inner

, f: F) -> R where - F: FnOnce(&mut Scheduler

, &Context) -> R, + F: FnOnce(&mut Inner

, &Context) -> R, P: Park, { // Ensures the run queue is placed back in the `BasicScheduler` instance // once `block_on` returns.` struct Guard<'a, P: Park> { context: Option, - scheduler: &'a mut Scheduler

, + scheduler: &'a mut Inner

, } impl Drop for Guard<'_, P> { @@ -300,14 +291,15 @@ impl Drop for BasicScheduler

{ // Avoid a double panic if we are currently panicking and // the lock may be poisoned. - let mut inner = { - let mut lock = self.inner.lock().unwrap_or_else(PoisonError::into_inner); - - match lock.scheduler.take() { - Some(inner) => inner, - None if std::thread::panicking() => return, - None => panic!("Oh no! We never placed the Inner state back, this is a bug!"), - } + let mut inner = match self + .inner + .lock() + .unwrap_or_else(PoisonError::into_inner) + .take() + { + Some(inner) => inner, + None if std::thread::panicking() => return, + None => panic!("Oh no! We never placed the Inner state back, this is a bug!"), }; enter(&mut inner, |scheduler, context| { @@ -427,7 +419,7 @@ impl Wake for Shared { /// back into its slot in `BasicScheduler`, even if the /// future panics. struct InnerGuard<'a, P: Park> { - inner: Option>, + inner: Option>, basic_scheduler: &'a BasicScheduler

, } @@ -452,11 +444,11 @@ impl Drop for InnerGuard<'_, P> { // Replace old scheduler back into the state to allow // other threads to pick it up and drive it. - lock.scheduler.replace(scheduler); + lock.replace(scheduler); // Wake up other possible threads that could steal // the dedicated parker P. - lock.notify.notify_one() + self.basic_scheduler.notify.notify_one() } } } From 93ffbad782741dc3c4cde758597de0d560b57ec9 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Wed, 23 Sep 2020 14:37:10 +0000 Subject: [PATCH 08/10] Add more allowed warnings --- tokio/src/sync/notify.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 5ae387209ff..d319e8aae8f 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -3,7 +3,7 @@ // When this module is compiled with `sync` enabled we will warn on // this lint. When `rt-core` is enabled we use `pub(crate)` which // triggers this warning but it is safe to ignore in this case. -#![cfg_attr(not(feature = "sync"), allow(unreachable_pub))] +#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))] use crate::loom::sync::atomic::AtomicU8; use crate::loom::sync::Mutex; From 6b6558d344f05992a3b0aacd64e9c804240d9df7 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Wed, 23 Sep 2020 17:51:34 +0000 Subject: [PATCH 09/10] Refactor block_on to use `Enter::block_on` --- tokio/src/runtime/basic_scheduler.rs | 69 ++++++++++++---------------- 1 file changed, 29 insertions(+), 40 deletions(-) diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index df324bbabf0..c6be7d08844 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -1,16 +1,16 @@ -use crate::loom::sync::Mutex; use crate::park::{CachedParkThread, Park, Unpark}; use crate::runtime; use crate::runtime::task::{self, JoinHandle, Schedule, Task}; use crate::sync::Notify; use crate::util::linked_list::{Link, LinkedList}; use crate::util::{waker_ref, Wake, WakerRef}; +use crate::{future::poll_fn, loom::sync::Mutex}; use std::collections::VecDeque; use std::fmt; use std::future::Future; use std::sync::Arc; -use std::task::Poll::Ready; +use std::task::Poll::{Pending, Ready}; use std::time::Duration; use std::{cell::RefCell, sync::PoisonError}; @@ -135,47 +135,36 @@ impl BasicScheduler

{ } pub(crate) fn block_on(&self, future: F) -> F::Output { - // If we can steal the dedicated parker then lets block_on that - // Otherwise, we'll block_on the future and attempt to steal the - // parker later, if we can. - if let Some(mut inner) = self.take_inner() { - inner.block_on(future) - } else { - let enter = crate::runtime::enter(false); - let mut park = CachedParkThread::new(); - let waker = park.unpark().into_waker(); - let mut cx = std::task::Context::from_waker(&waker); - - pin!(future); - - let mut notified = Box::pin(self.notify.notified()); + pin!(future); + + // Attempt to steal the dedicated parker and block_on the future if we can there, + // othwerwise, lets select on a notification that the parker is available + // or the future is complete. + loop { + if let Some(inner) = &mut self.take_inner() { + return inner.block_on(future); + } else { + let mut enter = crate::runtime::enter(false); + + let notified = self.notify.notified(); + pin!(notified); + + if let Some(out) = enter + .block_on(poll_fn(|cx| { + if notified.as_mut().poll(cx).is_ready() { + return Ready(None); + } - loop { - if let Ready(_) = notified.as_mut().poll(&mut cx) { - // Check if we can steal the dedicated parker P. - // - // TODO: Consider using an atomic load here intead of locking - // the mutex. - if let Some(mut inner) = self.take_inner() { - // We will enter again on in the inner implementation below - drop(enter); - return inner.block_on(future); - } else { - // Since the notify future polled to ready, it will panic if polled again - // beyond ready. To avoid this, lets create a new future. This allocation is - // unfortunate but should be extremely rare. - notified = Box::pin(self.notify.notified()); - } - } + if let Ready(out) = future.as_mut().poll(cx) { + return Ready(Some(out)); + } - if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) { - return v; + Pending + })) + .expect("Failed to `Enter::block_on`") + { + return out; } - - // Park this thread, waiting for some external wakeup: either - // from the future we are currently polling or a wakeup from the - // block_on that contains the parker, notifying us to steal the parker. - park.park().expect("failed to park"); } } } From 28c4c0855e4f0636dc29a4268261f5288e459341 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Wed, 23 Sep 2020 17:55:50 +0000 Subject: [PATCH 10/10] Fix imports --- tokio/src/runtime/basic_scheduler.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index c6be7d08844..0c0e95a6504 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -1,18 +1,18 @@ -use crate::park::{CachedParkThread, Park, Unpark}; -use crate::runtime; +use crate::future::poll_fn; +use crate::loom::sync::Mutex; +use crate::park::{Park, Unpark}; use crate::runtime::task::{self, JoinHandle, Schedule, Task}; use crate::sync::Notify; use crate::util::linked_list::{Link, LinkedList}; use crate::util::{waker_ref, Wake, WakerRef}; -use crate::{future::poll_fn, loom::sync::Mutex}; +use std::cell::RefCell; use std::collections::VecDeque; use std::fmt; use std::future::Future; -use std::sync::Arc; +use std::sync::{Arc, PoisonError}; use std::task::Poll::{Pending, Ready}; use std::time::Duration; -use std::{cell::RefCell, sync::PoisonError}; /// Executes tasks on the current thread pub(crate) struct BasicScheduler { @@ -183,7 +183,7 @@ impl Inner

{ /// Block on the future provided and drive the runtime's driver. fn block_on(&mut self, future: F) -> F::Output { enter(self, |scheduler, context| { - let _enter = runtime::enter(false); + let _enter = crate::runtime::enter(false); let waker = scheduler.spawner.waker_ref(); let mut cx = std::task::Context::from_waker(&waker);