From 046d5c6aba35b4a42203e6b60b4808b456f5e2ad Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 3 Dec 2025 23:54:08 +0000 Subject: [PATCH 1/2] rt: improve spawn_blocking scalability with sharded queue The blocking pool's task queue was protected by a single mutex, causing severe contention when many threads spawn blocking tasks concurrently. This resulted in nearly linear degradation: 16 concurrent threads took ~18x longer than a single thread. Replace the single-mutex queue with a sharded queue that distributes tasks across 16 lock-protected shards. The implementation adapts to concurrency levels by using fewer shards when thread count is low, maintaining cache locality while avoiding contention at scale. Benchmark results (spawning 100 batches of 16 tasks per thread): | Concurrency | Before | After | Improvement | |-------------|----------|---------|-------------| | 1 thread | 13.3ms | 17.8ms | +34% | | 2 threads | 26.0ms | 20.1ms | -23% | | 4 threads | 45.4ms | 27.5ms | -39% | | 8 threads | 111.5ms | 20.3ms | -82% | | 16 threads | 247.8ms | 22.4ms | -91% | The slight overhead at 1 thread is due to the sharded infrastructure, but this is acceptable given the dramatic improvement at higher concurrency where the original design suffered from lock contention. --- spellcheck.dic | 3 +- tokio/src/runtime/blocking/mod.rs | 2 + tokio/src/runtime/blocking/pool.rs | 229 +++++++++--------- tokio/src/runtime/blocking/sharded_queue.rs | 242 ++++++++++++++++++++ 4 files changed, 358 insertions(+), 118 deletions(-) create mode 100644 tokio/src/runtime/blocking/sharded_queue.rs diff --git a/spellcheck.dic b/spellcheck.dic index e377506bac6..525e2a93b88 100644 --- a/spellcheck.dic +++ b/spellcheck.dic @@ -1,4 +1,4 @@ -311 +312 & + < @@ -32,6 +32,7 @@ 8MB ABI accessors +adaptively adaptor adaptors Adaptors diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index c42924be77d..0eaf33fc394 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -6,6 +6,8 @@ mod pool; pub(crate) use pool::{spawn_blocking, BlockingPool, Spawner}; +mod sharded_queue; + cfg_fs! { pub(crate) use pool::spawn_mandatory_blocking; } diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 04edcac1d19..1cfb3a68b6a 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -1,8 +1,9 @@ //! Thread pool for blocking operations -use crate::loom::sync::{Arc, Condvar, Mutex}; +use crate::loom::sync::{Arc, Mutex}; use crate::loom::thread; use crate::runtime::blocking::schedule::BlockingSchedule; +use crate::runtime::blocking::sharded_queue::{ShardedQueue, WaitResult}; use crate::runtime::blocking::{shutdown, BlockingTask}; use crate::runtime::builder::ThreadNameFn; use crate::runtime::task::{self, JoinHandle}; @@ -10,7 +11,7 @@ use crate::runtime::{Builder, Callback, Handle, BOX_FUTURE_THRESHOLD}; use crate::util::metric_atomics::MetricAtomicUsize; use crate::util::trace::{blocking_task, SpawnMeta}; -use std::collections::{HashMap, VecDeque}; +use std::collections::HashMap; use std::fmt; use std::io; use std::sync::atomic::Ordering; @@ -74,11 +75,11 @@ impl SpawnerMetrics { } struct Inner { - /// State shared between worker threads. - shared: Mutex, + /// Sharded queue for task distribution. + queue: ShardedQueue, - /// Pool threads wait on this. - condvar: Condvar, + /// State shared between worker threads (thread management only). + shared: Mutex, /// Spawned threads use this name. thread_name: ThreadNameFn, @@ -103,8 +104,6 @@ struct Inner { } struct Shared { - queue: VecDeque, - num_notify: u32, shutdown: bool, shutdown_tx: Option, /// Prior to shutdown, we clean up `JoinHandles` by having each timed-out @@ -214,16 +213,14 @@ impl BlockingPool { BlockingPool { spawner: Spawner { inner: Arc::new(Inner { + queue: ShardedQueue::new(), shared: Mutex::new(Shared { - queue: VecDeque::new(), - num_notify: 0, shutdown: false, shutdown_tx: Some(shutdown_tx), last_exiting_thread: None, worker_threads: HashMap::new(), worker_thread_index: 0, }), - condvar: Condvar::new(), thread_name: builder.thread_name.clone(), stack_size: builder.thread_stack_size, after_start: builder.after_start.clone(), @@ -253,7 +250,7 @@ impl BlockingPool { shared.shutdown = true; shared.shutdown_tx = None; - self.spawner.inner.condvar.notify_all(); + self.spawner.inner.queue.shutdown(); let last_exited_thread = std::mem::take(&mut shared.last_exiting_thread); let workers = std::mem::take(&mut shared.worker_threads); @@ -391,9 +388,8 @@ impl Spawner { } fn spawn_task(&self, task: Task, rt: &Handle) -> Result<(), SpawnError> { - let mut shared = self.inner.shared.lock(); - - if shared.shutdown { + // Check shutdown without holding the lock + if self.inner.queue.is_shutdown() { // Shutdown the task: it's fine to shutdown this task (even if // mandatory) because it was scheduled after the shutdown of the // runtime began. @@ -403,52 +399,61 @@ impl Spawner { return Err(SpawnError::ShuttingDown); } - shared.queue.push_back(task); + // Get thread count for adaptive sharding + let num_threads = self.inner.metrics.num_threads(); + + // Push to the sharded queue - uses adaptive shard count based on thread count + self.inner.queue.push(task, num_threads); self.inner.metrics.inc_queue_depth(); + // Check if we need to spawn a new thread or notify an idle one if self.inner.metrics.num_idle_threads() == 0 { - // No threads are able to process the task. - - if self.inner.metrics.num_threads() == self.inner.thread_cap { - // At max number of threads - } else { - assert!(shared.shutdown_tx.is_some()); - let shutdown_tx = shared.shutdown_tx.clone(); - - if let Some(shutdown_tx) = shutdown_tx { - let id = shared.worker_thread_index; + // No idle threads - might need to spawn one + if num_threads < self.inner.thread_cap { + // Try to spawn a new thread + let mut shared = self.inner.shared.lock(); + + // Double-check conditions after acquiring the lock + if shared.shutdown { + // Queue was shutdown while we were acquiring the lock + // The task is already in the queue, so it will be drained during shutdown + return Ok(()); + } - match self.spawn_thread(shutdown_tx, rt, id) { - Ok(handle) => { - self.inner.metrics.inc_num_threads(); - shared.worker_thread_index += 1; - shared.worker_threads.insert(id, handle); - } - Err(ref e) - if is_temporary_os_thread_error(e) - && self.inner.metrics.num_threads() > 0 => - { - // OS temporarily failed to spawn a new thread. - // The task will be picked up eventually by a currently - // busy thread. - } - Err(e) => { - // The OS refused to spawn the thread and there is no thread - // to pick up the task that has just been pushed to the queue. - return Err(SpawnError::NoThreads(e)); + // Re-check thread count (another thread might have spawned one) + if self.inner.metrics.num_threads() < self.inner.thread_cap { + if let Some(shutdown_tx) = shared.shutdown_tx.clone() { + let id = shared.worker_thread_index; + + match self.spawn_thread(shutdown_tx, rt, id) { + Ok(handle) => { + self.inner.metrics.inc_num_threads(); + shared.worker_thread_index += 1; + shared.worker_threads.insert(id, handle); + } + Err(ref e) + if is_temporary_os_thread_error(e) + && self.inner.metrics.num_threads() > 0 => + { + // OS temporarily failed to spawn a new thread. + // The task will be picked up eventually by a currently + // busy thread. + } + Err(e) => { + // The OS refused to spawn the thread and there is no thread + // to pick up the task that has just been pushed to the queue. + return Err(SpawnError::NoThreads(e)); + } } } } + } else { + // At max threads, notify anyway in case threads are waiting + self.inner.queue.notify_one(); } } else { - // Notify an idle worker thread. The notification counter - // is used to count the needed amount of notifications - // exactly. Thread libraries may generate spurious - // wakeups, this counter is used to keep us in a - // consistent state. - self.inner.metrics.dec_num_idle_threads(); - shared.num_notify += 1; - self.inner.condvar.notify_one(); + // There are idle threads waiting, notify one + self.inner.queue.notify_one(); } Ok(()) @@ -505,90 +510,80 @@ impl Inner { f(); } - let mut shared = self.shared.lock(); + // Use worker_thread_id as the preferred shard + let preferred_shard = worker_thread_id; let mut join_on_thread = None; 'main: loop { - // BUSY - while let Some(task) = shared.queue.pop_front() { + // BUSY: Process tasks from the queue + while let Some(task) = self.queue.pop(preferred_shard) { self.metrics.dec_queue_depth(); - drop(shared); task.run(); + } - shared = self.shared.lock(); + // Check for shutdown before going idle + if self.queue.is_shutdown() { + break; } - // IDLE + // IDLE: Wait for new tasks self.metrics.inc_num_idle_threads(); - while !shared.shutdown { - let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap(); - - shared = lock_result.0; - let timeout_result = lock_result.1; - - if shared.num_notify != 0 { - // We have received a legitimate wakeup, - // acknowledge it by decrementing the counter - // and transition to the BUSY state. - shared.num_notify -= 1; - break; - } - - // Even if the condvar "timed out", if the pool is entering the - // shutdown phase, we want to perform the cleanup logic. - if !shared.shutdown && timeout_result.timed_out() { - // We'll join the prior timed-out thread's JoinHandle after dropping the lock. - // This isn't done when shutting down, because the thread calling shutdown will - // handle joining everything. - let my_handle = shared.worker_threads.remove(&worker_thread_id); - join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle); + loop { + match self.queue.wait_for_task(preferred_shard, self.keep_alive) { + WaitResult::Task(task) => { + // Got a task, process it + self.metrics.dec_num_idle_threads(); + self.metrics.dec_queue_depth(); + task.run(); + // Go back to busy loop + break; + } + WaitResult::Shutdown => { + // Shutdown initiated + self.metrics.dec_num_idle_threads(); + break 'main; + } + WaitResult::Timeout => { + // Timed out, exit this thread + self.metrics.dec_num_idle_threads(); + + // Clean up thread handle + let mut shared = self.shared.lock(); + if !shared.shutdown { + let my_handle = shared.worker_threads.remove(&worker_thread_id); + join_on_thread = + std::mem::replace(&mut shared.last_exiting_thread, my_handle); + } - break 'main; + // Exit the main loop and terminate the thread + break 'main; + } + WaitResult::Spurious => { + // Spurious wakeup, check for tasks and continue waiting + if let Some(task) = self.queue.pop(preferred_shard) { + self.metrics.dec_num_idle_threads(); + self.metrics.dec_queue_depth(); + task.run(); + break; + } + // Continue waiting + } } - - // Spurious wakeup detected, go back to sleep. } + } - if shared.shutdown { - // Drain the queue - while let Some(task) = shared.queue.pop_front() { - self.metrics.dec_queue_depth(); - drop(shared); - - task.shutdown_or_run_if_mandatory(); - - shared = self.shared.lock(); - } - - // Work was produced, and we "took" it (by decrementing num_notify). - // This means that num_idle was decremented once for our wakeup. - // But, since we are exiting, we need to "undo" that, as we'll stay idle. - self.metrics.inc_num_idle_threads(); - // NOTE: Technically we should also do num_notify++ and notify again, - // but since we're shutting down anyway, that won't be necessary. - break; - } + // Drain remaining tasks if shutting down + if self.queue.is_shutdown() { + self.queue.drain(|task| { + self.metrics.dec_queue_depth(); + task.shutdown_or_run_if_mandatory(); + }); } // Thread exit self.metrics.dec_num_threads(); - // num_idle should now be tracked exactly, panic - // with a descriptive message if it is not the - // case. - let prev_idle = self.metrics.dec_num_idle_threads(); - assert!( - prev_idle >= self.metrics.num_idle_threads(), - "num_idle_threads underflowed on thread exit" - ); - - if shared.shutdown && self.metrics.num_threads() == 0 { - self.condvar.notify_one(); - } - - drop(shared); - if let Some(f) = &self.before_stop { f(); } diff --git a/tokio/src/runtime/blocking/sharded_queue.rs b/tokio/src/runtime/blocking/sharded_queue.rs new file mode 100644 index 00000000000..6211746e10c --- /dev/null +++ b/tokio/src/runtime/blocking/sharded_queue.rs @@ -0,0 +1,242 @@ +//! A sharded concurrent queue for the blocking pool. +//! +//! This implementation distributes tasks across multiple shards to reduce +//! lock contention when many threads are spawning blocking tasks concurrently. +//! The push operations use per-shard locking, while notifications use a global +//! condvar for simplicity. +//! +//! The queue adapts to the current concurrency level by using fewer shards +//! when there are few threads, which improves cache locality and reduces +//! lock contention on the active shards. + +use crate::loom::sync::{Condvar, Mutex}; + +use std::collections::VecDeque; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; +use std::sync::atomic::{AtomicBool, AtomicUsize}; +use std::time::Duration; + +use super::pool::Task; + +/// Number of shards. Must be a power of 2. +const NUM_SHARDS: usize = 16; + +/// A single shard containing a queue protected by its own mutex. +struct Shard { + /// The task queue for this shard. + queue: Mutex>, + /// Number of tasks in this shard's queue. Used for fast empty checks. + len: AtomicUsize, +} + +impl Shard { + fn new() -> Self { + Shard { + queue: Mutex::new(VecDeque::new()), + len: AtomicUsize::new(0), + } + } + + /// Push a task to this shard's queue. + fn push(&self, task: Task) { + let mut queue = self.queue.lock(); + queue.push_back(task); + // Store with Release to ensure the task is visible before len is updated + self.len.store(queue.len(), Release); + } + + /// Try to pop a task from this shard's queue. + fn pop(&self) -> Option { + // Fast path: check if empty without locking + if self.len.load(Acquire) == 0 { + return None; + } + + let mut queue = self.queue.lock(); + let task = queue.pop_front(); + if task.is_some() { + self.len.store(queue.len(), Release); + } + task + } +} + +/// A sharded queue that distributes tasks across multiple shards. +pub(super) struct ShardedQueue { + /// The shards - each with its own mutex-protected queue. + shards: [Shard; NUM_SHARDS], + /// Atomic counter for round-robin task distribution. + push_index: AtomicUsize, + /// Tracks the highest shard index that has ever been pushed to. + /// This allows `pop()` to skip checking shards that have never had tasks, + /// which is important for maintaining low overhead at low concurrency. + /// Only increases, never decreases (even when shards become empty). + max_shard_pushed: AtomicUsize, + /// Global shutdown flag. + shutdown: AtomicBool, + /// Global condition variable for worker notifications. + /// We use a single condvar to avoid the complexity of per-shard waiting. + condvar: Condvar, + /// Mutex to pair with the condvar. Only held during wait, not during push/pop. + condvar_mutex: Mutex<()>, +} + +/// Calculate the effective number of shards to use based on thread count. +/// Uses fewer shards at low concurrency for better cache locality. +#[inline] +fn effective_shards(num_threads: usize) -> usize { + match num_threads { + 0..=2 => 2, + 3..=4 => 4, + 5..=8 => 8, + _ => NUM_SHARDS, + } +} + +impl ShardedQueue { + pub(super) fn new() -> Self { + ShardedQueue { + shards: std::array::from_fn(|_| Shard::new()), + push_index: AtomicUsize::new(0), + max_shard_pushed: AtomicUsize::new(0), + shutdown: AtomicBool::new(false), + condvar: Condvar::new(), + condvar_mutex: Mutex::new(()), + } + } + + /// Push a task to the queue. + /// + /// `num_threads` is a hint about the current thread count, used to + /// adaptively choose how many shards to distribute across. + pub(super) fn push(&self, task: Task, num_threads: usize) { + let num_shards = effective_shards(num_threads); + let mask = num_shards - 1; + let index = self.push_index.fetch_add(1, Relaxed) & mask; + + // Update max_shard_pushed BEFORE pushing the task. + // AcqRel ensures the subsequent push cannot be reordered before this. + self.max_shard_pushed.fetch_max(index, AcqRel); + + self.shards[index].push(task); + } + + /// Notify one waiting worker that a task is available. + pub(super) fn notify_one(&self) { + self.condvar.notify_one(); + } + + /// Notify all waiting workers (used during shutdown). + pub(super) fn notify_all(&self) { + self.condvar.notify_all(); + } + + /// Try to pop a task, checking the preferred shard first, then others. + /// + /// Only checks shards up to `max_shard_pushed` since tasks can only exist + /// in shards that have been pushed to. + pub(super) fn pop(&self, preferred_shard: usize) -> Option { + // Only check shards that have ever had tasks pushed to them + let max_shard = self.max_shard_pushed.load(Acquire); + let num_shards_to_check = max_shard + 1; + + // Check shards starting from preferred, wrapping within active range + let start = preferred_shard % num_shards_to_check; + for i in 0..num_shards_to_check { + let index = (start + i) % num_shards_to_check; + if let Some(task) = self.shards[index].pop() { + return Some(task); + } + } + + None + } + + /// Drain all tasks from the queue, calling the provided closure on each. + pub(super) fn drain(&self, mut f: F) + where + F: FnMut(Task), + { + for shard in &self.shards { + loop { + let mut queue = shard.queue.lock(); + if let Some(task) = queue.pop_front() { + shard.len.store(queue.len(), Release); + drop(queue); + f(task); + } else { + break; + } + } + } + } + + /// Set the shutdown flag and wake all workers. + pub(super) fn shutdown(&self) { + self.shutdown.store(true, Release); + self.notify_all(); + } + + /// Check if shutdown has been initiated. + pub(super) fn is_shutdown(&self) -> bool { + self.shutdown.load(Acquire) + } + + /// Wait for a task with timeout. + pub(super) fn wait_for_task(&self, preferred_shard: usize, timeout: Duration) -> WaitResult { + if self.is_shutdown() { + return WaitResult::Shutdown; + } + + // Try to pop without waiting first + if let Some(task) = self.pop(preferred_shard) { + return WaitResult::Task(task); + } + + // Acquire the condvar mutex before waiting + let guard = self.condvar_mutex.lock(); + + // Double-check shutdown and tasks after acquiring lock, as state may + // have changed while we were waiting for the lock + if self.is_shutdown() { + return WaitResult::Shutdown; + } + if let Some(task) = self.pop(preferred_shard) { + return WaitResult::Task(task); + } + + // Wait for notification or timeout + let result = self.condvar.wait_timeout(guard, timeout).unwrap(); + let timed_out = result.1.timed_out(); + + // Drop the lock before doing further work + drop(result.0); + + if self.is_shutdown() { + return WaitResult::Shutdown; + } + + // Try to get a task + if let Some(task) = self.pop(preferred_shard) { + return WaitResult::Task(task); + } + + if timed_out { + WaitResult::Timeout + } else { + WaitResult::Spurious + } + } +} + +/// Result of waiting for a task. +pub(super) enum WaitResult { + /// A task was found. + Task(Task), + /// The wait timed out. + Timeout, + /// Shutdown was initiated. + Shutdown, + /// Spurious wakeup, no task found. + Spurious, +} From 4255d3d86bb4bfa64f35cc7e6761c23aec91f6ac Mon Sep 17 00:00:00 2001 From: Alex Gaynor Date: Tue, 9 Dec 2025 21:33:33 -0500 Subject: [PATCH 2/2] remove len from Shard, it was only used for a fast path in len --- tokio/src/runtime/blocking/sharded_queue.rs | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/tokio/src/runtime/blocking/sharded_queue.rs b/tokio/src/runtime/blocking/sharded_queue.rs index 6211746e10c..83647cddc6f 100644 --- a/tokio/src/runtime/blocking/sharded_queue.rs +++ b/tokio/src/runtime/blocking/sharded_queue.rs @@ -25,15 +25,12 @@ const NUM_SHARDS: usize = 16; struct Shard { /// The task queue for this shard. queue: Mutex>, - /// Number of tasks in this shard's queue. Used for fast empty checks. - len: AtomicUsize, } impl Shard { fn new() -> Self { Shard { queue: Mutex::new(VecDeque::new()), - len: AtomicUsize::new(0), } } @@ -41,23 +38,12 @@ impl Shard { fn push(&self, task: Task) { let mut queue = self.queue.lock(); queue.push_back(task); - // Store with Release to ensure the task is visible before len is updated - self.len.store(queue.len(), Release); } /// Try to pop a task from this shard's queue. fn pop(&self) -> Option { - // Fast path: check if empty without locking - if self.len.load(Acquire) == 0 { - return None; - } - let mut queue = self.queue.lock(); - let task = queue.pop_front(); - if task.is_some() { - self.len.store(queue.len(), Release); - } - task + queue.pop_front() } } @@ -161,7 +147,6 @@ impl ShardedQueue { loop { let mut queue = shard.queue.lock(); if let Some(task) = queue.pop_front() { - shard.len.store(queue.len(), Release); drop(queue); f(task); } else {