Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion spellcheck.dic
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
311
312
&
+
<
Expand Down Expand Up @@ -32,6 +32,7 @@
8MB
ABI
accessors
adaptively
adaptor
adaptors
Adaptors
Expand Down
2 changes: 2 additions & 0 deletions tokio/src/runtime/blocking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
mod pool;
pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner};

mod sharded_queue;

cfg_fs! {
pub(crate) use pool::spawn_mandatory_blocking;
}
Expand Down
229 changes: 112 additions & 117 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
//! Thread pool for blocking operations

use crate::loom::sync::{Arc, Condvar, Mutex};
use crate::loom::sync::{Arc, Mutex};
use crate::loom::thread;
use crate::runtime::blocking::schedule::BlockingSchedule;
use crate::runtime::blocking::sharded_queue::{ShardedQueue, WaitResult};
use crate::runtime::blocking::{shutdown, BlockingTask};
use crate::runtime::builder::ThreadNameFn;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle, BOX_FUTURE_THRESHOLD};
use crate::util::metric_atomics::MetricAtomicUsize;
use crate::util::trace::{blocking_task, SpawnMeta};

use std::collections::{HashMap, VecDeque};
use std::collections::HashMap;
use std::fmt;
use std::io;
use std::sync::atomic::Ordering;
Expand Down Expand Up @@ -74,11 +75,11 @@ impl SpawnerMetrics {
}

struct Inner {
/// State shared between worker threads.
shared: Mutex<Shared>,
/// Sharded queue for task distribution.
queue: ShardedQueue,

/// Pool threads wait on this.
condvar: Condvar,
/// State shared between worker threads (thread management only).
shared: Mutex<Shared>,

/// Spawned threads use this name.
thread_name: ThreadNameFn,
Expand All @@ -103,8 +104,6 @@ struct Inner {
}

struct Shared {
queue: VecDeque<Task>,
num_notify: u32,
shutdown: bool,
shutdown_tx: Option<shutdown::Sender>,
/// Prior to shutdown, we clean up `JoinHandles` by having each timed-out
Expand Down Expand Up @@ -214,16 +213,14 @@ impl BlockingPool {
BlockingPool {
spawner: Spawner {
inner: Arc::new(Inner {
queue: ShardedQueue::new(),
shared: Mutex::new(Shared {
queue: VecDeque::new(),
num_notify: 0,
shutdown: false,
shutdown_tx: Some(shutdown_tx),
last_exiting_thread: None,
worker_threads: HashMap::new(),
worker_thread_index: 0,
}),
condvar: Condvar::new(),
thread_name: builder.thread_name.clone(),
stack_size: builder.thread_stack_size,
after_start: builder.after_start.clone(),
Expand Down Expand Up @@ -253,7 +250,7 @@ impl BlockingPool {

shared.shutdown = true;
shared.shutdown_tx = None;
self.spawner.inner.condvar.notify_all();
self.spawner.inner.queue.shutdown();

let last_exited_thread = std::mem::take(&mut shared.last_exiting_thread);
let workers = std::mem::take(&mut shared.worker_threads);
Expand Down Expand Up @@ -391,9 +388,8 @@ impl Spawner {
}

fn spawn_task(&self, task: Task, rt: &Handle) -> Result<(), SpawnError> {
let mut shared = self.inner.shared.lock();

if shared.shutdown {
// Check shutdown without holding the lock
if self.inner.queue.is_shutdown() {
// Shutdown the task: it's fine to shutdown this task (even if
// mandatory) because it was scheduled after the shutdown of the
// runtime began.
Expand All @@ -403,52 +399,61 @@ impl Spawner {
return Err(SpawnError::ShuttingDown);
}

shared.queue.push_back(task);
// Get thread count for adaptive sharding
let num_threads = self.inner.metrics.num_threads();

// Push to the sharded queue - uses adaptive shard count based on thread count
self.inner.queue.push(task, num_threads);
self.inner.metrics.inc_queue_depth();

// Check if we need to spawn a new thread or notify an idle one
if self.inner.metrics.num_idle_threads() == 0 {
// No threads are able to process the task.

if self.inner.metrics.num_threads() == self.inner.thread_cap {
// At max number of threads
} else {
assert!(shared.shutdown_tx.is_some());
let shutdown_tx = shared.shutdown_tx.clone();

if let Some(shutdown_tx) = shutdown_tx {
let id = shared.worker_thread_index;
// No idle threads - might need to spawn one
if num_threads < self.inner.thread_cap {
// Try to spawn a new thread
let mut shared = self.inner.shared.lock();

// Double-check conditions after acquiring the lock
if shared.shutdown {
// Queue was shutdown while we were acquiring the lock
// The task is already in the queue, so it will be drained during shutdown
return Ok(());
}

match self.spawn_thread(shutdown_tx, rt, id) {
Ok(handle) => {
self.inner.metrics.inc_num_threads();
shared.worker_thread_index += 1;
shared.worker_threads.insert(id, handle);
}
Err(ref e)
if is_temporary_os_thread_error(e)
&& self.inner.metrics.num_threads() > 0 =>
{
// OS temporarily failed to spawn a new thread.
// The task will be picked up eventually by a currently
// busy thread.
}
Err(e) => {
// The OS refused to spawn the thread and there is no thread
// to pick up the task that has just been pushed to the queue.
return Err(SpawnError::NoThreads(e));
// Re-check thread count (another thread might have spawned one)
if self.inner.metrics.num_threads() < self.inner.thread_cap {
if let Some(shutdown_tx) = shared.shutdown_tx.clone() {
let id = shared.worker_thread_index;

match self.spawn_thread(shutdown_tx, rt, id) {
Ok(handle) => {
self.inner.metrics.inc_num_threads();
shared.worker_thread_index += 1;
shared.worker_threads.insert(id, handle);
}
Err(ref e)
if is_temporary_os_thread_error(e)
&& self.inner.metrics.num_threads() > 0 =>
{
// OS temporarily failed to spawn a new thread.
// The task will be picked up eventually by a currently
// busy thread.
}
Err(e) => {
// The OS refused to spawn the thread and there is no thread
// to pick up the task that has just been pushed to the queue.
return Err(SpawnError::NoThreads(e));
}
}
}
}
} else {
// At max threads, notify anyway in case threads are waiting
self.inner.queue.notify_one();
}
} else {
// Notify an idle worker thread. The notification counter
// is used to count the needed amount of notifications
// exactly. Thread libraries may generate spurious
// wakeups, this counter is used to keep us in a
// consistent state.
self.inner.metrics.dec_num_idle_threads();
shared.num_notify += 1;
self.inner.condvar.notify_one();
// There are idle threads waiting, notify one
self.inner.queue.notify_one();
}

Ok(())
Expand Down Expand Up @@ -505,90 +510,80 @@ impl Inner {
f();
}

let mut shared = self.shared.lock();
// Use worker_thread_id as the preferred shard
let preferred_shard = worker_thread_id;
let mut join_on_thread = None;

'main: loop {
// BUSY
while let Some(task) = shared.queue.pop_front() {
// BUSY: Process tasks from the queue
while let Some(task) = self.queue.pop(preferred_shard) {
self.metrics.dec_queue_depth();
drop(shared);
task.run();
}

shared = self.shared.lock();
// Check for shutdown before going idle
if self.queue.is_shutdown() {
break;
}

// IDLE
// IDLE: Wait for new tasks
self.metrics.inc_num_idle_threads();

while !shared.shutdown {
let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();

shared = lock_result.0;
let timeout_result = lock_result.1;

if shared.num_notify != 0 {
// We have received a legitimate wakeup,
// acknowledge it by decrementing the counter
// and transition to the BUSY state.
shared.num_notify -= 1;
break;
}

// Even if the condvar "timed out", if the pool is entering the
// shutdown phase, we want to perform the cleanup logic.
if !shared.shutdown && timeout_result.timed_out() {
// We'll join the prior timed-out thread's JoinHandle after dropping the lock.
// This isn't done when shutting down, because the thread calling shutdown will
// handle joining everything.
let my_handle = shared.worker_threads.remove(&worker_thread_id);
join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle);
loop {
match self.queue.wait_for_task(preferred_shard, self.keep_alive) {
WaitResult::Task(task) => {
// Got a task, process it
self.metrics.dec_num_idle_threads();
self.metrics.dec_queue_depth();
task.run();
// Go back to busy loop
break;
}
WaitResult::Shutdown => {
// Shutdown initiated
self.metrics.dec_num_idle_threads();
break 'main;
}
WaitResult::Timeout => {
// Timed out, exit this thread
self.metrics.dec_num_idle_threads();

// Clean up thread handle
let mut shared = self.shared.lock();
if !shared.shutdown {
let my_handle = shared.worker_threads.remove(&worker_thread_id);
join_on_thread =
std::mem::replace(&mut shared.last_exiting_thread, my_handle);
}

break 'main;
// Exit the main loop and terminate the thread
break 'main;
}
WaitResult::Spurious => {
// Spurious wakeup, check for tasks and continue waiting
if let Some(task) = self.queue.pop(preferred_shard) {
self.metrics.dec_num_idle_threads();
self.metrics.dec_queue_depth();
task.run();
break;
}
// Continue waiting
}
}

// Spurious wakeup detected, go back to sleep.
}
}

if shared.shutdown {
// Drain the queue
while let Some(task) = shared.queue.pop_front() {
self.metrics.dec_queue_depth();
drop(shared);

task.shutdown_or_run_if_mandatory();

shared = self.shared.lock();
}

// Work was produced, and we "took" it (by decrementing num_notify).
// This means that num_idle was decremented once for our wakeup.
// But, since we are exiting, we need to "undo" that, as we'll stay idle.
self.metrics.inc_num_idle_threads();
// NOTE: Technically we should also do num_notify++ and notify again,
// but since we're shutting down anyway, that won't be necessary.
break;
}
// Drain remaining tasks if shutting down
if self.queue.is_shutdown() {
self.queue.drain(|task| {
self.metrics.dec_queue_depth();
task.shutdown_or_run_if_mandatory();
});
}

// Thread exit
self.metrics.dec_num_threads();

// num_idle should now be tracked exactly, panic
// with a descriptive message if it is not the
// case.
let prev_idle = self.metrics.dec_num_idle_threads();
assert!(
prev_idle >= self.metrics.num_idle_threads(),
"num_idle_threads underflowed on thread exit"
);

if shared.shutdown && self.metrics.num_threads() == 0 {
self.condvar.notify_one();
}

drop(shared);

if let Some(f) = &self.before_stop {
f();
}
Expand Down
Loading