diff --git a/spellcheck.dic b/spellcheck.dic index e377506bac6..525e2a93b88 100644 --- a/spellcheck.dic +++ b/spellcheck.dic @@ -1,4 +1,4 @@ -311 +312 & + < @@ -32,6 +32,7 @@ 8MB ABI accessors +adaptively adaptor adaptors Adaptors diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index c42924be77d..0eaf33fc394 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -6,6 +6,8 @@ mod pool; pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner}; +mod sharded_queue; + cfg_fs! { pub(crate) use pool::spawn_mandatory_blocking; } diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 04edcac1d19..1cfb3a68b6a 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -1,8 +1,9 @@ //! Thread pool for blocking operations -use crate::loom::sync::{Arc, Condvar, Mutex}; +use crate::loom::sync::{Arc, Mutex}; use crate::loom::thread; use crate::runtime::blocking::schedule::BlockingSchedule; +use crate::runtime::blocking::sharded_queue::{ShardedQueue, WaitResult}; use crate::runtime::blocking::{shutdown, BlockingTask}; use crate::runtime::builder::ThreadNameFn; use crate::runtime::task::{self, JoinHandle}; @@ -10,7 +11,7 @@ use crate::runtime::{Builder, Callback, Handle, BOX_FUTURE_THRESHOLD}; use crate::util::metric_atomics::MetricAtomicUsize; use crate::util::trace::{blocking_task, SpawnMeta}; -use std::collections::{HashMap, VecDeque}; +use std::collections::HashMap; use std::fmt; use std::io; use std::sync::atomic::Ordering; @@ -74,11 +75,11 @@ impl SpawnerMetrics { } struct Inner { - /// State shared between worker threads. - shared: Mutex, + /// Sharded queue for task distribution. + queue: ShardedQueue, - /// Pool threads wait on this. - condvar: Condvar, + /// State shared between worker threads (thread management only). + shared: Mutex, /// Spawned threads use this name. thread_name: ThreadNameFn, @@ -103,8 +104,6 @@ struct Inner { } struct Shared { - queue: VecDeque, - num_notify: u32, shutdown: bool, shutdown_tx: Option, /// Prior to shutdown, we clean up `JoinHandles` by having each timed-out @@ -214,16 +213,14 @@ impl BlockingPool { BlockingPool { spawner: Spawner { inner: Arc::new(Inner { + queue: ShardedQueue::new(), shared: Mutex::new(Shared { - queue: VecDeque::new(), - num_notify: 0, shutdown: false, shutdown_tx: Some(shutdown_tx), last_exiting_thread: None, worker_threads: HashMap::new(), worker_thread_index: 0, }), - condvar: Condvar::new(), thread_name: builder.thread_name.clone(), stack_size: builder.thread_stack_size, after_start: builder.after_start.clone(), @@ -253,7 +250,7 @@ impl BlockingPool { shared.shutdown = true; shared.shutdown_tx = None; - self.spawner.inner.condvar.notify_all(); + self.spawner.inner.queue.shutdown(); let last_exited_thread = std::mem::take(&mut shared.last_exiting_thread); let workers = std::mem::take(&mut shared.worker_threads); @@ -391,9 +388,8 @@ impl Spawner { } fn spawn_task(&self, task: Task, rt: &Handle) -> Result<(), SpawnError> { - let mut shared = self.inner.shared.lock(); - - if shared.shutdown { + // Check shutdown without holding the lock + if self.inner.queue.is_shutdown() { // Shutdown the task: it's fine to shutdown this task (even if // mandatory) because it was scheduled after the shutdown of the // runtime began. @@ -403,52 +399,61 @@ impl Spawner { return Err(SpawnError::ShuttingDown); } - shared.queue.push_back(task); + // Get thread count for adaptive sharding + let num_threads = self.inner.metrics.num_threads(); + + // Push to the sharded queue - uses adaptive shard count based on thread count + self.inner.queue.push(task, num_threads); self.inner.metrics.inc_queue_depth(); + // Check if we need to spawn a new thread or notify an idle one if self.inner.metrics.num_idle_threads() == 0 { - // No threads are able to process the task. - - if self.inner.metrics.num_threads() == self.inner.thread_cap { - // At max number of threads - } else { - assert!(shared.shutdown_tx.is_some()); - let shutdown_tx = shared.shutdown_tx.clone(); - - if let Some(shutdown_tx) = shutdown_tx { - let id = shared.worker_thread_index; + // No idle threads - might need to spawn one + if num_threads < self.inner.thread_cap { + // Try to spawn a new thread + let mut shared = self.inner.shared.lock(); + + // Double-check conditions after acquiring the lock + if shared.shutdown { + // Queue was shutdown while we were acquiring the lock + // The task is already in the queue, so it will be drained during shutdown + return Ok(()); + } - match self.spawn_thread(shutdown_tx, rt, id) { - Ok(handle) => { - self.inner.metrics.inc_num_threads(); - shared.worker_thread_index += 1; - shared.worker_threads.insert(id, handle); - } - Err(ref e) - if is_temporary_os_thread_error(e) - && self.inner.metrics.num_threads() > 0 => - { - // OS temporarily failed to spawn a new thread. - // The task will be picked up eventually by a currently - // busy thread. - } - Err(e) => { - // The OS refused to spawn the thread and there is no thread - // to pick up the task that has just been pushed to the queue. - return Err(SpawnError::NoThreads(e)); + // Re-check thread count (another thread might have spawned one) + if self.inner.metrics.num_threads() < self.inner.thread_cap { + if let Some(shutdown_tx) = shared.shutdown_tx.clone() { + let id = shared.worker_thread_index; + + match self.spawn_thread(shutdown_tx, rt, id) { + Ok(handle) => { + self.inner.metrics.inc_num_threads(); + shared.worker_thread_index += 1; + shared.worker_threads.insert(id, handle); + } + Err(ref e) + if is_temporary_os_thread_error(e) + && self.inner.metrics.num_threads() > 0 => + { + // OS temporarily failed to spawn a new thread. + // The task will be picked up eventually by a currently + // busy thread. + } + Err(e) => { + // The OS refused to spawn the thread and there is no thread + // to pick up the task that has just been pushed to the queue. + return Err(SpawnError::NoThreads(e)); + } } } } + } else { + // At max threads, notify anyway in case threads are waiting + self.inner.queue.notify_one(); } } else { - // Notify an idle worker thread. The notification counter - // is used to count the needed amount of notifications - // exactly. Thread libraries may generate spurious - // wakeups, this counter is used to keep us in a - // consistent state. - self.inner.metrics.dec_num_idle_threads(); - shared.num_notify += 1; - self.inner.condvar.notify_one(); + // There are idle threads waiting, notify one + self.inner.queue.notify_one(); } Ok(()) @@ -505,90 +510,80 @@ impl Inner { f(); } - let mut shared = self.shared.lock(); + // Use worker_thread_id as the preferred shard + let preferred_shard = worker_thread_id; let mut join_on_thread = None; 'main: loop { - // BUSY - while let Some(task) = shared.queue.pop_front() { + // BUSY: Process tasks from the queue + while let Some(task) = self.queue.pop(preferred_shard) { self.metrics.dec_queue_depth(); - drop(shared); task.run(); + } - shared = self.shared.lock(); + // Check for shutdown before going idle + if self.queue.is_shutdown() { + break; } - // IDLE + // IDLE: Wait for new tasks self.metrics.inc_num_idle_threads(); - while !shared.shutdown { - let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap(); - - shared = lock_result.0; - let timeout_result = lock_result.1; - - if shared.num_notify != 0 { - // We have received a legitimate wakeup, - // acknowledge it by decrementing the counter - // and transition to the BUSY state. - shared.num_notify -= 1; - break; - } - - // Even if the condvar "timed out", if the pool is entering the - // shutdown phase, we want to perform the cleanup logic. - if !shared.shutdown && timeout_result.timed_out() { - // We'll join the prior timed-out thread's JoinHandle after dropping the lock. - // This isn't done when shutting down, because the thread calling shutdown will - // handle joining everything. - let my_handle = shared.worker_threads.remove(&worker_thread_id); - join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle); + loop { + match self.queue.wait_for_task(preferred_shard, self.keep_alive) { + WaitResult::Task(task) => { + // Got a task, process it + self.metrics.dec_num_idle_threads(); + self.metrics.dec_queue_depth(); + task.run(); + // Go back to busy loop + break; + } + WaitResult::Shutdown => { + // Shutdown initiated + self.metrics.dec_num_idle_threads(); + break 'main; + } + WaitResult::Timeout => { + // Timed out, exit this thread + self.metrics.dec_num_idle_threads(); + + // Clean up thread handle + let mut shared = self.shared.lock(); + if !shared.shutdown { + let my_handle = shared.worker_threads.remove(&worker_thread_id); + join_on_thread = + std::mem::replace(&mut shared.last_exiting_thread, my_handle); + } - break 'main; + // Exit the main loop and terminate the thread + break 'main; + } + WaitResult::Spurious => { + // Spurious wakeup, check for tasks and continue waiting + if let Some(task) = self.queue.pop(preferred_shard) { + self.metrics.dec_num_idle_threads(); + self.metrics.dec_queue_depth(); + task.run(); + break; + } + // Continue waiting + } } - - // Spurious wakeup detected, go back to sleep. } + } - if shared.shutdown { - // Drain the queue - while let Some(task) = shared.queue.pop_front() { - self.metrics.dec_queue_depth(); - drop(shared); - - task.shutdown_or_run_if_mandatory(); - - shared = self.shared.lock(); - } - - // Work was produced, and we "took" it (by decrementing num_notify). - // This means that num_idle was decremented once for our wakeup. - // But, since we are exiting, we need to "undo" that, as we'll stay idle. - self.metrics.inc_num_idle_threads(); - // NOTE: Technically we should also do num_notify++ and notify again, - // but since we're shutting down anyway, that won't be necessary. - break; - } + // Drain remaining tasks if shutting down + if self.queue.is_shutdown() { + self.queue.drain(|task| { + self.metrics.dec_queue_depth(); + task.shutdown_or_run_if_mandatory(); + }); } // Thread exit self.metrics.dec_num_threads(); - // num_idle should now be tracked exactly, panic - // with a descriptive message if it is not the - // case. - let prev_idle = self.metrics.dec_num_idle_threads(); - assert!( - prev_idle >= self.metrics.num_idle_threads(), - "num_idle_threads underflowed on thread exit" - ); - - if shared.shutdown && self.metrics.num_threads() == 0 { - self.condvar.notify_one(); - } - - drop(shared); - if let Some(f) = &self.before_stop { f(); } diff --git a/tokio/src/runtime/blocking/sharded_queue.rs b/tokio/src/runtime/blocking/sharded_queue.rs new file mode 100644 index 00000000000..83647cddc6f --- /dev/null +++ b/tokio/src/runtime/blocking/sharded_queue.rs @@ -0,0 +1,227 @@ +//! A sharded concurrent queue for the blocking pool. +//! +//! This implementation distributes tasks across multiple shards to reduce +//! lock contention when many threads are spawning blocking tasks concurrently. +//! The push operations use per-shard locking, while notifications use a global +//! condvar for simplicity. +//! +//! The queue adapts to the current concurrency level by using fewer shards +//! when there are few threads, which improves cache locality and reduces +//! lock contention on the active shards. + +use crate::loom::sync::{Condvar, Mutex}; + +use std::collections::VecDeque; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; +use std::sync::atomic::{AtomicBool, AtomicUsize}; +use std::time::Duration; + +use super::pool::Task; + +/// Number of shards. Must be a power of 2. +const NUM_SHARDS: usize = 16; + +/// A single shard containing a queue protected by its own mutex. +struct Shard { + /// The task queue for this shard. + queue: Mutex>, +} + +impl Shard { + fn new() -> Self { + Shard { + queue: Mutex::new(VecDeque::new()), + } + } + + /// Push a task to this shard's queue. + fn push(&self, task: Task) { + let mut queue = self.queue.lock(); + queue.push_back(task); + } + + /// Try to pop a task from this shard's queue. + fn pop(&self) -> Option { + let mut queue = self.queue.lock(); + queue.pop_front() + } +} + +/// A sharded queue that distributes tasks across multiple shards. +pub(super) struct ShardedQueue { + /// The shards - each with its own mutex-protected queue. + shards: [Shard; NUM_SHARDS], + /// Atomic counter for round-robin task distribution. + push_index: AtomicUsize, + /// Tracks the highest shard index that has ever been pushed to. + /// This allows `pop()` to skip checking shards that have never had tasks, + /// which is important for maintaining low overhead at low concurrency. + /// Only increases, never decreases (even when shards become empty). + max_shard_pushed: AtomicUsize, + /// Global shutdown flag. + shutdown: AtomicBool, + /// Global condition variable for worker notifications. + /// We use a single condvar to avoid the complexity of per-shard waiting. + condvar: Condvar, + /// Mutex to pair with the condvar. Only held during wait, not during push/pop. + condvar_mutex: Mutex<()>, +} + +/// Calculate the effective number of shards to use based on thread count. +/// Uses fewer shards at low concurrency for better cache locality. +#[inline] +fn effective_shards(num_threads: usize) -> usize { + match num_threads { + 0..=2 => 2, + 3..=4 => 4, + 5..=8 => 8, + _ => NUM_SHARDS, + } +} + +impl ShardedQueue { + pub(super) fn new() -> Self { + ShardedQueue { + shards: std::array::from_fn(|_| Shard::new()), + push_index: AtomicUsize::new(0), + max_shard_pushed: AtomicUsize::new(0), + shutdown: AtomicBool::new(false), + condvar: Condvar::new(), + condvar_mutex: Mutex::new(()), + } + } + + /// Push a task to the queue. + /// + /// `num_threads` is a hint about the current thread count, used to + /// adaptively choose how many shards to distribute across. + pub(super) fn push(&self, task: Task, num_threads: usize) { + let num_shards = effective_shards(num_threads); + let mask = num_shards - 1; + let index = self.push_index.fetch_add(1, Relaxed) & mask; + + // Update max_shard_pushed BEFORE pushing the task. + // AcqRel ensures the subsequent push cannot be reordered before this. + self.max_shard_pushed.fetch_max(index, AcqRel); + + self.shards[index].push(task); + } + + /// Notify one waiting worker that a task is available. + pub(super) fn notify_one(&self) { + self.condvar.notify_one(); + } + + /// Notify all waiting workers (used during shutdown). + pub(super) fn notify_all(&self) { + self.condvar.notify_all(); + } + + /// Try to pop a task, checking the preferred shard first, then others. + /// + /// Only checks shards up to `max_shard_pushed` since tasks can only exist + /// in shards that have been pushed to. + pub(super) fn pop(&self, preferred_shard: usize) -> Option { + // Only check shards that have ever had tasks pushed to them + let max_shard = self.max_shard_pushed.load(Acquire); + let num_shards_to_check = max_shard + 1; + + // Check shards starting from preferred, wrapping within active range + let start = preferred_shard % num_shards_to_check; + for i in 0..num_shards_to_check { + let index = (start + i) % num_shards_to_check; + if let Some(task) = self.shards[index].pop() { + return Some(task); + } + } + + None + } + + /// Drain all tasks from the queue, calling the provided closure on each. + pub(super) fn drain(&self, mut f: F) + where + F: FnMut(Task), + { + for shard in &self.shards { + loop { + let mut queue = shard.queue.lock(); + if let Some(task) = queue.pop_front() { + drop(queue); + f(task); + } else { + break; + } + } + } + } + + /// Set the shutdown flag and wake all workers. + pub(super) fn shutdown(&self) { + self.shutdown.store(true, Release); + self.notify_all(); + } + + /// Check if shutdown has been initiated. + pub(super) fn is_shutdown(&self) -> bool { + self.shutdown.load(Acquire) + } + + /// Wait for a task with timeout. + pub(super) fn wait_for_task(&self, preferred_shard: usize, timeout: Duration) -> WaitResult { + if self.is_shutdown() { + return WaitResult::Shutdown; + } + + // Try to pop without waiting first + if let Some(task) = self.pop(preferred_shard) { + return WaitResult::Task(task); + } + + // Acquire the condvar mutex before waiting + let guard = self.condvar_mutex.lock(); + + // Double-check shutdown and tasks after acquiring lock, as state may + // have changed while we were waiting for the lock + if self.is_shutdown() { + return WaitResult::Shutdown; + } + if let Some(task) = self.pop(preferred_shard) { + return WaitResult::Task(task); + } + + // Wait for notification or timeout + let result = self.condvar.wait_timeout(guard, timeout).unwrap(); + let timed_out = result.1.timed_out(); + + // Drop the lock before doing further work + drop(result.0); + + if self.is_shutdown() { + return WaitResult::Shutdown; + } + + // Try to get a task + if let Some(task) = self.pop(preferred_shard) { + return WaitResult::Task(task); + } + + if timed_out { + WaitResult::Timeout + } else { + WaitResult::Spurious + } + } +} + +/// Result of waiting for a task. +pub(super) enum WaitResult { + /// A task was found. + Task(Task), + /// The wait timed out. + Timeout, + /// Shutdown was initiated. + Shutdown, + /// Spurious wakeup, no task found. + Spurious, +}