Skip to content

Commit 6d0e01b

Browse files
committed
rt: Allow concurrent block_on's with basic_scheduler
This allows us to concurrently call `Runtime::block_on` with the basic_scheduler and allowing other threads to steal the dedicated parker.
1 parent 8270774 commit 6d0e01b

File tree

6 files changed

+185
-57
lines changed

6 files changed

+185
-57
lines changed

tokio/src/park/thread.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ pub(crate) struct UnparkThread {
2121
#[derive(Debug)]
2222
struct Inner {
2323
state: AtomicUsize,
24-
mutex: Mutex<()>,
25-
condvar: Condvar,
24+
mutex: Arc<Mutex<()>>,
25+
condvar: Arc<Condvar>,
2626
}
2727

2828
const EMPTY: usize = 0;
@@ -37,11 +37,15 @@ thread_local! {
3737

3838
impl ParkThread {
3939
pub(crate) fn new() -> Self {
40+
ParkThread::with_condvar(Arc::new(Condvar::new()), Arc::new(Mutex::new(())))
41+
}
42+
43+
pub(crate) fn with_condvar(condvar: Arc<Condvar>, mutex: Arc<Mutex<()>>) -> Self {
4044
Self {
4145
inner: Arc::new(Inner {
4246
state: AtomicUsize::new(EMPTY),
43-
mutex: Mutex::new(()),
44-
condvar: Condvar::new(),
47+
mutex,
48+
condvar,
4549
}),
4650
}
4751
}

tokio/src/runtime/basic_scheduler.rs

Lines changed: 135 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,35 @@
1-
use crate::park::{Park, Unpark};
1+
use crate::loom;
2+
use crate::loom::sync::{Condvar, Mutex};
3+
use crate::park::{Park, ParkThread, Unpark};
24
use crate::runtime;
35
use crate::runtime::task::{self, JoinHandle, Schedule, Task};
46
use crate::util::linked_list::LinkedList;
5-
use crate::util::{waker_ref, Wake};
7+
use crate::util::{waker_ref, Wake, WakerRef};
68

79
use std::cell::RefCell;
810
use std::collections::VecDeque;
911
use std::fmt;
1012
use std::future::Future;
11-
use std::sync::{Arc, Mutex};
13+
use std::sync::Arc;
1214
use std::task::Poll::Ready;
1315
use std::time::Duration;
1416

1517
/// Executes tasks on the current thread
16-
pub(crate) struct BasicScheduler<P>
17-
where
18-
P: Park,
19-
{
18+
pub(crate) struct BasicScheduler<P: Park> {
19+
/// Inner with the dedicated parker P
20+
inner: Mutex<Option<Inner<P>>>,
21+
22+
/// Sync items used to notify other threads that called
23+
/// block_on concurrently that the dedicated driver is available
24+
/// to steal
25+
condvar: loom::sync::Arc<Condvar>,
26+
mutex: loom::sync::Arc<Mutex<()>>,
27+
28+
/// Sendable task spawner
29+
spawner: Spawner,
30+
}
31+
32+
struct Inner<P: Park> {
2033
/// Scheduler run queue
2134
///
2235
/// When the scheduler is executed, the queue is removed from `self` and
@@ -59,7 +72,7 @@ struct Shared {
5972
unpark: Box<dyn Unpark>,
6073
}
6174

62-
/// Thread-local context
75+
/// Thread-local context.
6376
struct Context {
6477
/// Shared scheduler state
6578
shared: Arc<Shared>,
@@ -68,16 +81,16 @@ struct Context {
6881
tasks: RefCell<Tasks>,
6982
}
7083

71-
/// Initial queue capacity
84+
/// Initial queue capacity.
7285
const INITIAL_CAPACITY: usize = 64;
7386

7487
/// Max number of tasks to poll per tick.
7588
const MAX_TASKS_PER_TICK: usize = 61;
7689

77-
/// How often ot check the remote queue first
90+
/// How often to check the remote queue first.
7891
const REMOTE_FIRST_INTERVAL: u8 = 31;
7992

80-
// Tracks the current BasicScheduler
93+
// Tracks the current BasicScheduler.
8194
scoped_thread_local!(static CURRENT: Context);
8295

8396
impl<P> BasicScheduler<P>
@@ -87,19 +100,28 @@ where
87100
pub(crate) fn new(park: P) -> BasicScheduler<P> {
88101
let unpark = Box::new(park.unpark());
89102

90-
BasicScheduler {
103+
let spawner = Spawner {
104+
shared: Arc::new(Shared {
105+
queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
106+
unpark: unpark as Box<dyn Unpark>,
107+
}),
108+
};
109+
110+
let inner = Mutex::new(Some(Inner {
91111
tasks: Some(Tasks {
92112
owned: LinkedList::new(),
93113
queue: VecDeque::with_capacity(INITIAL_CAPACITY),
94114
}),
95-
spawner: Spawner {
96-
shared: Arc::new(Shared {
97-
queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
98-
unpark: unpark as Box<dyn Unpark>,
99-
}),
100-
},
115+
spawner: spawner.clone(),
101116
tick: 0,
102117
park,
118+
}));
119+
120+
BasicScheduler {
121+
inner,
122+
spawner,
123+
condvar: loom::sync::Arc::new(Condvar::new()),
124+
mutex: loom::sync::Arc::new(Mutex::new(())),
103125
}
104126
}
105127

@@ -108,7 +130,6 @@ where
108130
}
109131

110132
/// Spawns a future onto the thread pool
111-
#[allow(dead_code)]
112133
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
113134
where
114135
F: Future + Send + 'static,
@@ -117,13 +138,55 @@ where
117138
self.spawner.spawn(future)
118139
}
119140

120-
pub(crate) fn block_on<F>(&mut self, future: F) -> F::Output
121-
where
122-
F: Future,
123-
{
141+
pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output {
142+
// If we can steal the dedicated parker than lets block_on that
143+
// otherwise, lets block_on and attempt to steal it back if we can.
144+
if let Some(mut inner) = self.take_inner() {
145+
inner.block_on(future)
146+
} else {
147+
// TODO: should this be false or true? In the origina block_on for
148+
// basic_scheduler we have false?
149+
let enter = crate::runtime::enter(false);
150+
151+
let mut park = ParkThread::with_condvar(self.condvar.clone(), self.mutex.clone());
152+
let waker = park.unpark().into_waker();
153+
let mut cx = std::task::Context::from_waker(&waker);
154+
155+
pin!(future);
156+
157+
loop {
158+
if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
159+
return v;
160+
}
161+
162+
if let Some(mut inner) = self.take_inner() {
163+
// We will enter again on in the inner implementation below
164+
drop(enter);
165+
return inner.block_on(future);
166+
}
167+
168+
park.park().expect("failed to park");
169+
}
170+
}
171+
}
172+
173+
fn take_inner(&self) -> Option<InnerGuard<'_, P>> {
174+
let mut lock = self.inner.lock().unwrap();
175+
let inner = lock.take()?;
176+
177+
Some(InnerGuard {
178+
inner: Some(inner),
179+
scheduler: &self,
180+
})
181+
}
182+
}
183+
184+
impl<P: Park> Inner<P> {
185+
/// Block on the future provided and drive the runtime's driver.
186+
fn block_on<F: Future>(&mut self, future: F) -> F::Output {
124187
enter(self, |scheduler, context| {
125188
let _enter = runtime::enter(false);
126-
let waker = waker_ref(&scheduler.spawner.shared);
189+
let waker = scheduler.spawner.waker_ref();
127190
let mut cx = std::task::Context::from_waker(&waker);
128191

129192
pin!(future);
@@ -178,16 +241,16 @@ where
178241

179242
/// Enter the scheduler context. This sets the queue and other necessary
180243
/// scheduler state in the thread-local
181-
fn enter<F, R, P>(scheduler: &mut BasicScheduler<P>, f: F) -> R
244+
fn enter<F, R, P>(scheduler: &mut Inner<P>, f: F) -> R
182245
where
183-
F: FnOnce(&mut BasicScheduler<P>, &Context) -> R,
246+
F: FnOnce(&mut Inner<P>, &Context) -> R,
184247
P: Park,
185248
{
186249
// Ensures the run queue is placed back in the `BasicScheduler` instance
187250
// once `block_on` returns.`
188251
struct Guard<'a, P: Park> {
189252
context: Option<Context>,
190-
scheduler: &'a mut BasicScheduler<P>,
253+
scheduler: &'a mut Inner<P>,
191254
}
192255

193256
impl<P: Park> Drop for Guard<'_, P> {
@@ -214,12 +277,15 @@ where
214277
CURRENT.set(context, || f(scheduler, context))
215278
}
216279

217-
impl<P> Drop for BasicScheduler<P>
218-
where
219-
P: Park,
220-
{
280+
impl<P: Park> Drop for BasicScheduler<P> {
221281
fn drop(&mut self) {
222-
enter(self, |scheduler, context| {
282+
let mut inner = {
283+
let mut lock = self.inner.lock().expect("BasicScheduler Inner lock");
284+
lock.take()
285+
.expect("Oh no! We never placed the Inner state back!")
286+
};
287+
288+
enter(&mut inner, |scheduler, context| {
223289
// Loop required here to ensure borrow is dropped between iterations
224290
#[allow(clippy::while_let_loop)]
225291
loop {
@@ -269,6 +335,10 @@ impl Spawner {
269335
fn pop(&self) -> Option<task::Notified<Arc<Shared>>> {
270336
self.shared.queue.lock().unwrap().pop_front()
271337
}
338+
339+
fn waker_ref(&self) -> WakerRef<'_> {
340+
waker_ref(&self.shared)
341+
}
272342
}
273343

274344
impl fmt::Debug for Spawner {
@@ -325,3 +395,36 @@ impl Wake for Shared {
325395
arc_self.unpark.unpark();
326396
}
327397
}
398+
399+
// ===== InnerGuard =====
400+
401+
/// Used to ensure we always place the Inner value
402+
/// back into its slot in `BasicScheduler` even if the
403+
/// future panics.
404+
struct InnerGuard<'a, P: Park> {
405+
inner: Option<Inner<P>>,
406+
scheduler: &'a BasicScheduler<P>,
407+
}
408+
409+
impl<P: Park> InnerGuard<'_, P> {
410+
fn block_on<F: Future>(&mut self, future: F) -> F::Output {
411+
// The only time inner gets set to `None` is if we have dropped
412+
// already so this unwrap is safe.
413+
self.inner.as_mut().unwrap().block_on(future)
414+
}
415+
}
416+
417+
impl<P: Park> Drop for InnerGuard<'_, P> {
418+
fn drop(&mut self) {
419+
if let Some(inner) = self.inner.take() {
420+
let mut lock = self.scheduler.inner.lock().unwrap();
421+
lock.replace(inner);
422+
423+
// Wake up possible other threads
424+
// notifying them that they might need
425+
// to steal the driver.
426+
drop(self.scheduler.mutex.lock().unwrap());
427+
self.scheduler.condvar.notify_one();
428+
}
429+
}
430+
}

tokio/src/runtime/builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ cfg_rt_core! {
463463
let blocking_spawner = blocking_pool.spawner().clone();
464464

465465
Ok(Runtime {
466-
kind: Kind::Basic(Mutex::new(Some(scheduler))),
466+
kind: Kind::Basic(scheduler),
467467
handle: Handle {
468468
spawner,
469469
io_handle,

tokio/src/runtime/mod.rs

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ enum Kind {
293293

294294
/// Execute all tasks on the current-thread.
295295
#[cfg(feature = "rt-core")]
296-
Basic(Mutex<Option<BasicScheduler<time::Driver>>>),
296+
Basic(BasicScheduler<time::Driver>),
297297

298298
/// Execute tasks across multiple threads.
299299
#[cfg(feature = "rt-threaded")]
@@ -398,7 +398,7 @@ impl Runtime {
398398
Kind::Shell(_) => panic!("task execution disabled"),
399399
#[cfg(feature = "rt-threaded")]
400400
Kind::ThreadPool(exec) => exec.spawn(future),
401-
Kind::Basic(_exec) => self.handle.spawner.spawn(future),
401+
Kind::Basic(exec) => exec.spawn(future),
402402
}
403403
}
404404

@@ -458,24 +458,7 @@ impl Runtime {
458458
}
459459
}
460460
#[cfg(feature = "rt-core")]
461-
Kind::Basic(exec) => {
462-
// TODO(lucio): clean this up and move this impl into
463-
// `basic_scheduler.rs`, this is hacky and bad but will work for
464-
// now.
465-
let exec_temp = {
466-
let mut lock = exec.lock().unwrap();
467-
lock.take()
468-
};
469-
470-
if let Some(mut exec_temp) = exec_temp {
471-
let res = exec_temp.block_on(future);
472-
exec.lock().unwrap().replace(exec_temp);
473-
res
474-
} else {
475-
let mut enter = crate::runtime::enter(true);
476-
enter.block_on(future).unwrap()
477-
}
478-
}
461+
Kind::Basic(exec) => exec.block_on(future),
479462
#[cfg(feature = "rt-threaded")]
480463
Kind::ThreadPool(exec) => exec.block_on(future),
481464
})

tokio/src/util/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ mod rand;
1212
mod wake;
1313
pub(crate) use wake::{waker_ref, Wake};
1414

15+
cfg_rt_core! {
16+
pub(crate) use wake::WakerRef;
17+
}
18+
1519
cfg_rt_threaded! {
1620
pub(crate) use rand::FastRand;
1721

tokio/tests/rt_common.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,40 @@ rt_test! {
574574
});
575575
}
576576

577+
#[test]
578+
fn always_active_parker() {
579+
// This test it to show that we will always have
580+
// an active parker even if we call block_on concurrently
581+
582+
for _ in 0..100 {
583+
let rt = rt();
584+
let rt2 = rt.clone();
585+
586+
let (tx1, rx1) = oneshot::channel();
587+
let (tx2, rx2) = oneshot::channel();
588+
589+
let jh1 = thread::spawn(move || {
590+
rt.block_on(async move {
591+
rx2.await.unwrap();
592+
time::delay_for(Duration::from_millis(5)).await;
593+
tx1.send(()).unwrap();
594+
});
595+
});
596+
597+
let jh2 = thread::spawn(move || {
598+
rt2.block_on(async move {
599+
tx2.send(()).unwrap();
600+
time::delay_for(Duration::from_millis(5)).await;
601+
rx1.await.unwrap();
602+
time::delay_for(Duration::from_millis(5)).await;
603+
});
604+
});
605+
606+
jh1.join().unwrap();
607+
jh2.join().unwrap();
608+
}
609+
}
610+
577611
#[test]
578612
// IOCP requires setting the "max thread" concurrency value. The sane,
579613
// default, is to set this to the number of cores. Threads that poll I/O

0 commit comments

Comments
 (0)