|
| 1 | +//! Efficient read-write locking without `pthread_rwlock_t`. |
| 2 | +//! |
| 3 | +//! The readers-writer lock provided by the `pthread` library has a number of |
| 4 | +//! problems which make it a suboptimal choice for `std`: |
| 5 | +//! |
| 6 | +//! * It is non-movable, so it needs to be allocated (lazily, to make the |
| 7 | +//! constructor `const`). |
| 8 | +//! * `pthread` is an external library, meaning the fast path of acquiring an |
| 9 | +//! uncontended lock cannot be inlined. |
| 10 | +//! * Some platforms (at least glibc before version 2.25) have buggy implementations |
| 11 | +//! that can easily lead to undefined behaviour in safe Rust code when not properly |
| 12 | +//! guarded against. |
| 13 | +//! * On some platforms (e.g. macOS), the lock is very slow. |
| 14 | +//! |
| 15 | +//! Therefore, we implement our own `RwLock`! Naively, one might reach for a |
| 16 | +//! spinlock, but those [can be quite problematic] when the lock is contended. |
| 17 | +//! Instead, this readers-writer lock copies its implementation strategy from |
| 18 | +//! the Windows [SRWLOCK] and the [usync] library. Spinning is still used for the |
| 19 | +//! fast path, but it is bounded: after spinning fails, threads will locklessly |
| 20 | +//! add an information structure containing a [`Thread`] handle into a queue of |
| 21 | +//! waiters associated with the lock. The lock owner, upon releasing the lock, |
| 22 | +//! will scan through the queue and wake up threads as appropriate, which will |
| 23 | +//! then again try to acquire the lock. The resulting [`RwLock`] is: |
| 24 | +//! |
| 25 | +//! * adaptive, since it spins before doing any heavywheight parking operations |
| 26 | +//! * allocation-free, modulo the per-thread [`Thread`] handle, which is |
| 27 | +//! allocated regardless when using threads created by `std` |
| 28 | +//! * writer-preferring, even if some readers may still slip through |
| 29 | +//! * unfair, which reduces context-switching and thus drastically improves |
| 30 | +//! performance |
| 31 | +//! |
| 32 | +//! and also quite fast in most cases. |
| 33 | +//! |
| 34 | +//! [can be quite problematic]: https://matklad.github.io/2020/01/02/spinlocks-considered-harmful.html |
| 35 | +//! [SRWLOCK]: https://learn.microsoft.com/en-us/windows/win32/sync/slim-reader-writer--srw--locks |
| 36 | +//! [usync]: https://crates.io/crates/usync |
| 37 | +//! |
| 38 | +//! # Implementation |
| 39 | +//! |
| 40 | +//! ## State |
| 41 | +//! |
| 42 | +//! A single [`AtomicPtr`] is used as state variable. The lowest three bits are used |
| 43 | +//! to indicate the meaning of the remaining bits: |
| 44 | +//! |
| 45 | +//! | [`LOCKED`] | [`QUEUED`] | [`QUEUE_LOCKED`] | Remaining | | |
| 46 | +//! |:-----------|:-----------|:-----------------|:-------------|:----------------------------------------------------------------------------------------------------------------------------| |
| 47 | +//! | 0 | 0 | 0 | 0 | The lock is unlocked, no threads are waiting | |
| 48 | +//! | 1 | 0 | 0 | 0 | The lock is write-locked, no threads waiting | |
| 49 | +//! | 1 | 0 | 0 | n > 0 | The lock is read-locked with n readers | |
| 50 | +//! | 0 | 1 | * | `*mut Node` | The lock is unlocked, but some threads are waiting. Only writers may lock the lock | |
| 51 | +//! | 1 | 1 | * | `*mut Node` | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count | |
| 52 | +//! |
| 53 | +//! ## Waiter queue |
| 54 | +//! |
| 55 | +//! When threads are waiting on the lock (`QUEUE` is set), the lock state |
| 56 | +//! points to a queue of waiters, which is implemented as a linked list of |
| 57 | +//! nodes stored on the stack to avoid memory allocation. To enable lockless |
| 58 | +//! enqueuing of new nodes to the queue, the linked list is single-linked upon |
| 59 | +//! creation. Since when the lock is read-locked, the lock count is stored in |
| 60 | +//! the last link of the queue, threads have to traverse the queue to find the |
| 61 | +//! last element upon releasing the lock. To avoid having to traverse the whole |
| 62 | +//! list again and again, a pointer to the found tail is cached in the (current) |
| 63 | +//! first element of the queue. |
| 64 | +//! |
| 65 | +//! Also, while the lock is unfair for performance reasons, it is still best to |
| 66 | +//! wake the tail node first, which requires backlinks to previous nodes to be |
| 67 | +//! created. This is done at the same time as finding the tail, and thus a set |
| 68 | +//! tail field indicates the remaining portion of the queue is initialized. |
| 69 | +//! |
| 70 | +//! TLDR: Here's a diagram of what the queue looks like: |
| 71 | +//! |
| 72 | +//! ```text |
| 73 | +//! state |
| 74 | +//! │ |
| 75 | +//! ▼ |
| 76 | +//! ╭───────╮ next ╭───────╮ next ╭───────╮ next ╭───────╮ |
| 77 | +//! │ ├─────►│ ├─────►│ ├─────►│ count │ |
| 78 | +//! │ │ │ │ │ │ │ │ |
| 79 | +//! │ │ │ │◄─────┤ │◄─────┤ │ |
| 80 | +//! ╰───────╯ ╰───────╯ prev ╰───────╯ prev ╰───────╯ |
| 81 | +//! │ ▲ |
| 82 | +//! └───────────────────────────┘ |
| 83 | +//! tail |
| 84 | +//! ``` |
| 85 | +//! |
| 86 | +//! Invariants: |
| 87 | +//! 1. At least one node must contain a non-null, current `tail` field. |
| 88 | +//! 2. The first non-null `tail` field must be valid and current. |
| 89 | +//! 3. All nodes preceding this node must have a correct, non-null `next` field. |
| 90 | +//! 4. All nodes following this node must have a correct, non-null `prev` field. |
| 91 | +//! |
| 92 | +//! Access to the queue is controlled by the `QUEUE_LOCKED` bit, which threads |
| 93 | +//! try to set both after enqueuing themselves to eagerly add backlinks to the |
| 94 | +//! queue, which drastically improves performance, and after unlocking the lock |
| 95 | +//! to wake the next waiter(s). This is done atomically at the same time as the |
| 96 | +//! enqueuing/unlocking operation. The thread releasing the `QUEUE_LOCK` bit |
| 97 | +//! will check the state of the lock and wake up waiters as appropriate. This |
| 98 | +//! guarantees forward-progress even if the unlocking thread could not acquire |
| 99 | +//! the queue lock. |
| 100 | +//! |
| 101 | +//! ## Memory orderings |
| 102 | +//! |
| 103 | +//! To properly synchronize changes to the data protected by the lock, the lock |
| 104 | +//! is acquired and released with [`Acquire`] and [`Release`] ordering, respectively. |
| 105 | +//! To propagate the initialization of nodes, changes to the queue lock are also |
| 106 | +//! performed using these orderings. |
| 107 | +
|
| 108 | +#![forbid(unsafe_op_in_unsafe_fn)] |
| 109 | + |
| 110 | +use crate::cell::OnceCell; |
| 111 | +use crate::hint::spin_loop; |
| 112 | +use crate::mem; |
| 113 | +use crate::ptr::{self, invalid_mut, null_mut, NonNull}; |
| 114 | +use crate::sync::atomic::{ |
| 115 | + AtomicBool, AtomicPtr, |
| 116 | + Ordering::{AcqRel, Acquire, Relaxed, Release}, |
| 117 | +}; |
| 118 | +use crate::sys_common::thread_info; |
| 119 | +use crate::thread::Thread; |
| 120 | + |
| 121 | +// Locking uses exponential backoff. `SPIN_COUNT` indicates how many times the |
| 122 | +// locking operation will be retried. |
| 123 | +// `spin_loop` will be called `2.pow(SPIN_COUNT) - 1` times. |
| 124 | +const SPIN_COUNT: usize = 7; |
| 125 | + |
| 126 | +type State = *mut (); |
| 127 | +type AtomicState = AtomicPtr<()>; |
| 128 | + |
| 129 | +const UNLOCKED: State = invalid_mut(0); |
| 130 | +const LOCKED: usize = 1; |
| 131 | +const QUEUED: usize = 2; |
| 132 | +const QUEUE_LOCKED: usize = 4; |
| 133 | +const SINGLE: usize = 8; |
| 134 | +const MASK: usize = !(QUEUE_LOCKED | QUEUED | LOCKED); |
| 135 | + |
| 136 | +/// Marks the state as write-locked, if possible. |
| 137 | +#[inline] |
| 138 | +fn write_lock(state: State) -> Option<State> { |
| 139 | + let state = state.wrapping_byte_add(LOCKED); |
| 140 | + if state.addr() & LOCKED == LOCKED { Some(state) } else { None } |
| 141 | +} |
| 142 | + |
| 143 | +/// Marks the state as read-locked, if possible. |
| 144 | +#[inline] |
| 145 | +fn read_lock(state: State) -> Option<State> { |
| 146 | + if state.addr() & QUEUED == 0 && state.addr() != LOCKED { |
| 147 | + Some(invalid_mut(state.addr().checked_add(SINGLE)? | LOCKED)) |
| 148 | + } else { |
| 149 | + None |
| 150 | + } |
| 151 | +} |
| 152 | + |
| 153 | +/// Masks the state, assuming it points to a queue node. |
| 154 | +/// |
| 155 | +/// # Safety |
| 156 | +/// The state must contain a valid pointer to a queue node. |
| 157 | +#[inline] |
| 158 | +unsafe fn to_node(state: State) -> NonNull<Node> { |
| 159 | + unsafe { NonNull::new_unchecked(state.mask(MASK)).cast() } |
| 160 | +} |
| 161 | + |
| 162 | +/// An atomic node pointer with relaxed operations. |
| 163 | +struct AtomicLink(AtomicPtr<Node>); |
| 164 | + |
| 165 | +impl AtomicLink { |
| 166 | + fn new(v: Option<NonNull<Node>>) -> AtomicLink { |
| 167 | + AtomicLink(AtomicPtr::new(v.map_or(null_mut(), NonNull::as_ptr))) |
| 168 | + } |
| 169 | + |
| 170 | + fn get(&self) -> Option<NonNull<Node>> { |
| 171 | + NonNull::new(self.0.load(Relaxed)) |
| 172 | + } |
| 173 | + |
| 174 | + fn set(&self, v: Option<NonNull<Node>>) { |
| 175 | + self.0.store(v.map_or(null_mut(), NonNull::as_ptr), Relaxed); |
| 176 | + } |
| 177 | +} |
| 178 | + |
| 179 | +#[repr(align(8))] |
| 180 | +struct Node { |
| 181 | + next: AtomicLink, |
| 182 | + prev: AtomicLink, |
| 183 | + tail: AtomicLink, |
| 184 | + write: bool, |
| 185 | + thread: OnceCell<Thread>, |
| 186 | + completed: AtomicBool, |
| 187 | +} |
| 188 | + |
| 189 | +impl Node { |
| 190 | + /// Create a new queue node. |
| 191 | + fn new(write: bool) -> Node { |
| 192 | + Node { |
| 193 | + next: AtomicLink::new(None), |
| 194 | + prev: AtomicLink::new(None), |
| 195 | + tail: AtomicLink::new(None), |
| 196 | + write, |
| 197 | + thread: OnceCell::new(), |
| 198 | + completed: AtomicBool::new(false), |
| 199 | + } |
| 200 | + } |
| 201 | + |
| 202 | + /// Prepare this node for waiting. |
| 203 | + fn prepare(&mut self) { |
| 204 | + // Fall back to creating an unnamed `Thread` handle to allow locking in |
| 205 | + // TLS destructors. |
| 206 | + self.thread |
| 207 | + .get_or_init(|| thread_info::current_thread().unwrap_or_else(|| Thread::new(None))); |
| 208 | + self.completed = AtomicBool::new(false); |
| 209 | + } |
| 210 | + |
| 211 | + /// Wait until this node is marked as completed. |
| 212 | + /// |
| 213 | + /// # Safety |
| 214 | + /// May only be called from the thread that created the node. |
| 215 | + unsafe fn wait(&self) { |
| 216 | + while !self.completed.load(Acquire) { |
| 217 | + unsafe { |
| 218 | + self.thread.get().unwrap().park(); |
| 219 | + } |
| 220 | + } |
| 221 | + } |
| 222 | + |
| 223 | + /// Atomically mark this node as completed. The node may not outlive this call. |
| 224 | + unsafe fn complete(this: NonNull<Node>) { |
| 225 | + // Since the node may be destroyed immediately after the completed flag |
| 226 | + // is set, clone the thread handle before that. |
| 227 | + let thread = unsafe { this.as_ref().thread.get().unwrap().clone() }; |
| 228 | + unsafe { |
| 229 | + this.as_ref().completed.store(true, Release); |
| 230 | + } |
| 231 | + thread.unpark(); |
| 232 | + } |
| 233 | +} |
| 234 | + |
| 235 | +struct PanicGuard; |
| 236 | + |
| 237 | +impl Drop for PanicGuard { |
| 238 | + fn drop(&mut self) { |
| 239 | + rtabort!("tried to drop node in intrusive list."); |
| 240 | + } |
| 241 | +} |
| 242 | + |
| 243 | +/// Add backlinks to the queue, returning the tail. |
| 244 | +/// |
| 245 | +/// May be called from multiple threads at the same time, while the queue is not |
| 246 | +/// modified (this happens when unlocking multiple readers). |
| 247 | +/// |
| 248 | +/// # Safety |
| 249 | +/// * `head` must point to a node in a valid queue. |
| 250 | +/// * `head` must be or be in front of the head of the queue at the time of the |
| 251 | +/// last removal. |
| 252 | +/// * The part of the queue starting with `head` must not be modified during this |
| 253 | +/// call. |
| 254 | +unsafe fn add_backlinks_and_find_tail(head: NonNull<Node>) -> NonNull<Node> { |
| 255 | + let mut current = head; |
| 256 | + let tail = loop { |
| 257 | + let c = unsafe { current.as_ref() }; |
| 258 | + match c.tail.get() { |
| 259 | + Some(tail) => break tail, |
| 260 | + // SAFETY: |
| 261 | + // All `next` fields before the first node with a `set` tail are |
| 262 | + // non-null and valid (invariant 3). |
| 263 | + None => unsafe { |
| 264 | + let next = c.next.get().unwrap_unchecked(); |
| 265 | + next.as_ref().prev.set(Some(current)); |
| 266 | + current = next; |
| 267 | + }, |
| 268 | + } |
| 269 | + }; |
| 270 | + |
| 271 | + unsafe { |
| 272 | + head.as_ref().tail.set(Some(tail)); |
| 273 | + tail |
| 274 | + } |
| 275 | +} |
| 276 | + |
| 277 | +pub struct RwLock { |
| 278 | + state: AtomicState, |
| 279 | +} |
| 280 | + |
| 281 | +impl RwLock { |
| 282 | + #[inline] |
| 283 | + pub const fn new() -> RwLock { |
| 284 | + RwLock { state: AtomicPtr::new(UNLOCKED) } |
| 285 | + } |
| 286 | + |
| 287 | + #[inline] |
| 288 | + pub fn try_read(&self) -> bool { |
| 289 | + self.state.fetch_update(Acquire, Relaxed, read_lock).is_ok() |
| 290 | + } |
| 291 | + |
| 292 | + #[inline] |
| 293 | + pub fn read(&self) { |
| 294 | + if !self.try_read() { |
| 295 | + self.lock_contended(false) |
| 296 | + } |
| 297 | + } |
| 298 | + |
| 299 | + #[inline] |
| 300 | + pub fn try_write(&self) -> bool { |
| 301 | + // Atomically set the `LOCKED` bit. This is lowered to a single atomic |
| 302 | + // instruction on most modern processors (e.g. "lock bts" on x86 and |
| 303 | + // "ldseta" on modern AArch64), and therefore is more efficient than |
| 304 | + // `fetch_update(lock(true))`, which can spuriously fail if a new node |
| 305 | + // is appended to the queue. |
| 306 | + self.state.fetch_or(LOCKED, Acquire).addr() & LOCKED == 0 |
| 307 | + } |
| 308 | + |
| 309 | + #[inline] |
| 310 | + pub fn write(&self) { |
| 311 | + if !self.try_write() { |
| 312 | + self.lock_contended(true) |
| 313 | + } |
| 314 | + } |
| 315 | + |
| 316 | + #[cold] |
| 317 | + fn lock_contended(&self, write: bool) { |
| 318 | + let update = if write { write_lock } else { read_lock }; |
| 319 | + let mut node = Node::new(write); |
| 320 | + let mut state = self.state.load(Relaxed); |
| 321 | + let mut count = 0; |
| 322 | + loop { |
| 323 | + if let Some(next) = update(state) { |
| 324 | + // The lock is available, try locking it. |
| 325 | + match self.state.compare_exchange_weak(state, next, Acquire, Relaxed) { |
| 326 | + Ok(_) => return, |
| 327 | + Err(new) => state = new, |
| 328 | + } |
| 329 | + } else if state.addr() & QUEUED == 0 && count < SPIN_COUNT { |
| 330 | + // If the lock is not available and no threads are queued, spin |
| 331 | + // for a while, using exponential backoff to decrease cache |
| 332 | + // contention. |
| 333 | + for _ in 0..(1 << count) { |
| 334 | + spin_loop(); |
| 335 | + } |
| 336 | + state = self.state.load(Relaxed); |
| 337 | + count += 1; |
| 338 | + } else { |
| 339 | + // Fall back to parking. First, prepare the node. |
| 340 | + node.prepare(); |
| 341 | + |
| 342 | + // If there are threads queued, set the `next` field to a |
| 343 | + // pointer to the next node in the queue. Otherwise set it to |
| 344 | + // the lock count if the state is read-locked or to zero if it |
| 345 | + // is write-locked. |
| 346 | + node.next.0 = AtomicPtr::new(state.mask(MASK).cast()); |
| 347 | + node.prev = AtomicLink::new(None); |
| 348 | + let mut next = ptr::from_ref(&node) |
| 349 | + .map_addr(|addr| addr | QUEUED | (state.addr() & LOCKED)) |
| 350 | + as State; |
| 351 | + |
| 352 | + if state.addr() & QUEUED == 0 { |
| 353 | + // If this is the first node in the queue, set the tail field to |
| 354 | + // the node itself to ensure there is a current `tail` field in |
| 355 | + // the queue (invariants 1 and 2). This needs to use `set` to |
| 356 | + // avoid invalidating the new pointer. |
| 357 | + node.tail.set(Some(NonNull::from(&node))); |
| 358 | + } else { |
| 359 | + // Otherwise, the tail of the queue is not known. |
| 360 | + node.tail.set(None); |
| 361 | + // Try locking the queue to eagerly add backlinks. |
| 362 | + next = next.map_addr(|addr| addr | QUEUE_LOCKED); |
| 363 | + } |
| 364 | + |
| 365 | + // Register the node, using release ordering to propagate our |
| 366 | + // changes to the waking thread. |
| 367 | + if let Err(new) = self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) { |
| 368 | + // The state has changed, just try again. |
| 369 | + state = new; |
| 370 | + continue; |
| 371 | + } |
| 372 | + |
| 373 | + // The node is registered, so the structure must not be |
| 374 | + // mutably accessed or destroyed while other threads may |
| 375 | + // be accessing it. Guard against unwinds using a panic |
| 376 | + // guard that aborts when dropped. |
| 377 | + let guard = PanicGuard; |
| 378 | + |
| 379 | + // If the current thread locked the queue, unlock it again, |
| 380 | + // linking it in the process. |
| 381 | + if state.addr() & (QUEUE_LOCKED | QUEUED) == QUEUED { |
| 382 | + unsafe { |
| 383 | + self.unlock_queue(next); |
| 384 | + } |
| 385 | + } |
| 386 | + |
| 387 | + // Wait until the node is removed from the queue. |
| 388 | + // SAFETY: the node was created by the current thread. |
| 389 | + unsafe { |
| 390 | + node.wait(); |
| 391 | + } |
| 392 | + |
| 393 | + // The node was removed from the queue, disarm the guard. |
| 394 | + mem::forget(guard); |
| 395 | + |
| 396 | + // Reload the state and try again. |
| 397 | + state = self.state.load(Relaxed); |
| 398 | + count = 0; |
| 399 | + } |
| 400 | + } |
| 401 | + } |
| 402 | + |
| 403 | + #[inline] |
| 404 | + pub unsafe fn read_unlock(&self) { |
| 405 | + match self.state.fetch_update(Release, Acquire, |state| { |
| 406 | + if state.addr() & QUEUED == 0 { |
| 407 | + let count = state.addr() - (SINGLE | LOCKED); |
| 408 | + Some(if count > 0 { invalid_mut(count | LOCKED) } else { UNLOCKED }) |
| 409 | + } else { |
| 410 | + None |
| 411 | + } |
| 412 | + }) { |
| 413 | + Ok(_) => {} |
| 414 | + // There are waiters queued and the lock count was moved to the |
| 415 | + // tail of the queue. |
| 416 | + Err(state) => unsafe { self.read_unlock_contended(state) }, |
| 417 | + } |
| 418 | + } |
| 419 | + |
| 420 | + #[cold] |
| 421 | + unsafe fn read_unlock_contended(&self, state: State) { |
| 422 | + // The state was observed with acquire ordering above, so the current |
| 423 | + // thread will observe all node initializations. |
| 424 | + |
| 425 | + // SAFETY: |
| 426 | + // Because new read-locks cannot be acquired while threads are queued, |
| 427 | + // all queue-lock owners will observe the set `LOCKED` bit. Because they |
| 428 | + // do not modify the queue while there is a lock owner, the queue will |
| 429 | + // not be removed from here. |
| 430 | + let tail = unsafe { add_backlinks_and_find_tail(to_node(state)).as_ref() }; |
| 431 | + // The lock count is stored in the `next` field of `tail`. |
| 432 | + // Decrement it, making sure to observe all changes made to the queue |
| 433 | + // by the other lock owners by using acquire-release ordering. |
| 434 | + let was_last = tail.next.0.fetch_byte_sub(SINGLE, AcqRel).addr() - SINGLE == 0; |
| 435 | + if was_last { |
| 436 | + // SAFETY: |
| 437 | + // Other threads cannot read-lock while threads are queued. Also, |
| 438 | + // the `LOCKED` bit is still set, so there are no writers. Therefore, |
| 439 | + // the current thread exclusively owns the lock. |
| 440 | + unsafe { self.unlock_contended(state) } |
| 441 | + } |
| 442 | + } |
| 443 | + |
| 444 | + #[inline] |
| 445 | + pub unsafe fn write_unlock(&self) { |
| 446 | + if let Err(state) = |
| 447 | + self.state.compare_exchange(invalid_mut(LOCKED), UNLOCKED, Release, Relaxed) |
| 448 | + { |
| 449 | + // SAFETY: |
| 450 | + // Since other threads cannot acquire the lock, the state can only |
| 451 | + // have changed because there are threads queued on the lock. |
| 452 | + unsafe { self.unlock_contended(state) } |
| 453 | + } |
| 454 | + } |
| 455 | + |
| 456 | + /// # Safety |
| 457 | + /// * The lock must be exclusively owned by this thread. |
| 458 | + /// * There must be threads queued on the lock. |
| 459 | + #[cold] |
| 460 | + unsafe fn unlock_contended(&self, mut state: State) { |
| 461 | + loop { |
| 462 | + // Atomically release the lock and try to acquire the queue lock. |
| 463 | + let next = state.map_addr(|a| (a & !LOCKED) | QUEUE_LOCKED); |
| 464 | + match self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) { |
| 465 | + // The queue lock was acquired. Release it, waking up the next |
| 466 | + // waiter in the process. |
| 467 | + Ok(_) if state.addr() & QUEUE_LOCKED == 0 => unsafe { |
| 468 | + return self.unlock_queue(next); |
| 469 | + }, |
| 470 | + // Another thread already holds the queue lock, leave waking up |
| 471 | + // waiters to it. |
| 472 | + Ok(_) => return, |
| 473 | + Err(new) => state = new, |
| 474 | + } |
| 475 | + } |
| 476 | + } |
| 477 | + |
| 478 | + /// Unlocks the queue. If the lock is unlocked, wakes up the next eligible |
| 479 | + /// thread(s). |
| 480 | + /// |
| 481 | + /// # Safety |
| 482 | + /// The queue lock must be held by the current thread. |
| 483 | + unsafe fn unlock_queue(&self, mut state: State) { |
| 484 | + debug_assert_eq!(state.addr() & (QUEUED | QUEUE_LOCKED), QUEUED | QUEUE_LOCKED); |
| 485 | + |
| 486 | + loop { |
| 487 | + let tail = unsafe { add_backlinks_and_find_tail(to_node(state)) }; |
| 488 | + |
| 489 | + if state.addr() & LOCKED == LOCKED { |
| 490 | + // Another thread has locked the lock. Leave waking up waiters |
| 491 | + // to them by releasing the queue lock. |
| 492 | + match self.state.compare_exchange_weak( |
| 493 | + state, |
| 494 | + state.mask(!QUEUE_LOCKED), |
| 495 | + Release, |
| 496 | + Acquire, |
| 497 | + ) { |
| 498 | + Ok(_) => return, |
| 499 | + Err(new) => { |
| 500 | + state = new; |
| 501 | + continue; |
| 502 | + } |
| 503 | + } |
| 504 | + } |
| 505 | + |
| 506 | + let is_writer = unsafe { tail.as_ref().write }; |
| 507 | + if is_writer && let Some(prev) = unsafe { tail.as_ref().prev.get() } { |
| 508 | + // `tail` is a writer and there is a node before `tail`. |
| 509 | + // Split off `tail`. |
| 510 | + |
| 511 | + // There are no set `tail` links before the node pointed to by |
| 512 | + // `state`, so the first non-null tail field will be current |
| 513 | + // (invariant 2). Invariant 4 is fullfilled since `find_tail` |
| 514 | + // was called on this node, which ensures all backlinks are set. |
| 515 | + unsafe { |
| 516 | + to_node(state).as_ref().tail.set(Some(prev)); |
| 517 | + } |
| 518 | + |
| 519 | + // Release the queue lock. Doing this by subtraction is more |
| 520 | + // efficient on modern processors since it is a single instruction |
| 521 | + // instead of an update loop, which will fail if new threads are |
| 522 | + // added to the list. |
| 523 | + self.state.fetch_byte_sub(QUEUE_LOCKED, Release); |
| 524 | + |
| 525 | + // The tail was split off and the lock released. Mark the node as |
| 526 | + // completed. |
| 527 | + unsafe { |
| 528 | + return Node::complete(tail); |
| 529 | + } |
| 530 | + } else { |
| 531 | + // The next waiter is a reader or the queue only consists of one |
| 532 | + // waiter. Just wake all threads. |
| 533 | + |
| 534 | + // The lock cannot be locked (checked above), so mark it as |
| 535 | + // unlocked to reset the queue. |
| 536 | + if let Err(new) = |
| 537 | + self.state.compare_exchange_weak(state, UNLOCKED, Release, Acquire) |
| 538 | + { |
| 539 | + state = new; |
| 540 | + continue; |
| 541 | + } |
| 542 | + |
| 543 | + let mut current = tail; |
| 544 | + loop { |
| 545 | + let prev = unsafe { current.as_ref().prev.get() }; |
| 546 | + unsafe { |
| 547 | + Node::complete(current); |
| 548 | + } |
| 549 | + match prev { |
| 550 | + Some(prev) => current = prev, |
| 551 | + None => return, |
| 552 | + } |
| 553 | + } |
| 554 | + } |
| 555 | + } |
| 556 | + } |
| 557 | +} |
0 commit comments