diff --git a/Cargo.toml b/Cargo.toml index a0dd67f..88a83db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,6 @@ exclude = ["assets/*"] [dependencies] seize = "0.4" -atomic-wait = "1" serde = { version = "1", optional = true } [dev-dependencies] diff --git a/src/raw/mod.rs b/src/raw/mod.rs index aac9e75..ae1471d 100644 --- a/src/raw/mod.rs +++ b/src/raw/mod.rs @@ -6,7 +6,7 @@ use std::borrow::Borrow; use std::hash::{BuildHasher, Hash}; use std::marker::PhantomData; use std::mem::{self, MaybeUninit}; -use std::sync::atomic::{fence, AtomicPtr, AtomicU32, AtomicUsize, Ordering}; +use std::sync::atomic::{fence, AtomicPtr, AtomicU8, AtomicUsize, Ordering}; use std::sync::Mutex; use std::{hint, panic, ptr}; @@ -50,7 +50,7 @@ pub struct State { // but not necessarily copied. pub claim: AtomicUsize, // The status of the resize. - pub status: AtomicU32, + pub status: AtomicU8, // A thread parker for blocking on copy operations. pub parker: Parker, // Entries whose retirement has been deferred by later tables. @@ -66,7 +66,7 @@ impl Default for State { allocating: Mutex::new(()), copied: AtomicUsize::new(0), claim: AtomicUsize::new(0), - status: AtomicU32::new(State::PENDING), + status: AtomicU8::new(State::PENDING), parker: Parker::default(), deferred: seize::Deferred::new(), collector: ptr::null(), @@ -76,11 +76,13 @@ impl Default for State { impl State { // A resize is in-progress. - pub const PENDING: u32 = 0; + pub const PENDING: u8 = 0; + // The resize has been aborted, continue to the next table. - pub const ABORTED: u32 = 1; + pub const ABORTED: u8 = 1; + // The resize was complete and the table was promoted. - pub const PROMOTED: u32 = 2; + pub const PROMOTED: u8 = 2; } // The result of an insert operation. @@ -1814,13 +1816,14 @@ where // This table doesn't have space for the next entry. // // Abort the current resize. - next.state().status.store(State::ABORTED, Ordering::Relaxed); + next.state().status.store(State::ABORTED, Ordering::SeqCst); // Allocate the next table. let allocated = self.as_ref(next).get_or_alloc_next(None); // Wake anyone waiting for us to finish. - atomic_wait::wake_all(&next.state().status); + let state = self.table.state(); + state.parker.unpark(&state.status); // Retry in a new table. next = allocated; @@ -1841,6 +1844,7 @@ where } } + let state = next.state(); // We copied all that we can, wait for the table to be promoted. for spun in 0.. { // Avoid spinning in tests, which can hide race conditions. @@ -1850,7 +1854,7 @@ where 7 }; - let status = next.state().status.load(Ordering::Relaxed); + let status = state.status.load(Ordering::SeqCst); // If this copy was aborted, we have to retry in the new table. if status == State::ABORTED { @@ -1859,7 +1863,6 @@ where // The copy has completed. if status == State::PROMOTED { - fence(Ordering::Acquire); return next; } @@ -1874,7 +1877,9 @@ where } // Park until the table is promoted. - atomic_wait::wait(&next.state().status, State::PENDING); + state + .parker + .park(&state.status, |status| status == State::PENDING); } } } @@ -1975,6 +1980,7 @@ where return next; } + let state = next.state(); for spun in 0.. { // Avoid spinning in tests, which can hide race conditions. const SPIN_WAIT: usize = if cfg!(any(test, debug_assertions)) { @@ -1984,7 +1990,7 @@ where }; // The copy has completed. - let status = next.state().status.load(Ordering::Acquire); + let status = state.status.load(Ordering::SeqCst); if status == State::PROMOTED { return next; } @@ -2000,7 +2006,9 @@ where } // Park until the table is promoted. - atomic_wait::wait(&next.state().status, State::PENDING); + state + .parker + .park(&state.status, |status| status == State::PENDING); } } } @@ -2135,11 +2143,13 @@ where // Returns `true` if the table was promoted. #[inline] fn try_promote(&self, next: Table, copied: usize, guard: &impl Guard) -> bool { + let state = next.state(); + // Update the copy count. let copied = if copied > 0 { - next.state().copied.fetch_add(copied, Ordering::AcqRel) + copied + state.copied.fetch_add(copied, Ordering::AcqRel) + copied } else { - next.state().copied.load(Ordering::Acquire) + state.copied.load(Ordering::Acquire) }; // If we copied all the entries in the table, we can try to promote. @@ -2164,9 +2174,7 @@ where .is_ok() { // Successfully promoted the table. - next.state() - .status - .store(State::PROMOTED, Ordering::Release); + state.status.store(State::PROMOTED, Ordering::SeqCst); unsafe { // Retire the old table. @@ -2182,7 +2190,7 @@ where } // Wake up any writers waiting for the resize to complete. - atomic_wait::wake_all(&next.state().status); + state.parker.unpark(&state.status); return true; } } diff --git a/src/raw/utils/parker.rs b/src/raw/utils/parker.rs index 8a9e623..b5db5f6 100644 --- a/src/raw/utils/parker.rs +++ b/src/raw/utils/parker.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::sync::atomic::{AtomicPtr, AtomicU64, Ordering}; +use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicU8, Ordering}; use std::sync::Mutex; use std::thread::{self, Thread}; @@ -25,7 +25,7 @@ struct State { impl Parker { // Block the current thread until the park condition is false. - pub fn park(&self, atomic: &AtomicPtr, should_park: impl Fn(*mut T) -> bool) { + pub fn park(&self, atomic: &impl Atomic, should_park: impl Fn(T) -> bool) { let key = atomic as *const _ as usize; loop { @@ -89,7 +89,7 @@ impl Parker { // Unpark all threads waiting on the given atomic. // // Note that any modifications must be `SeqCst` to be visible to unparked threads. - pub fn unpark(&self, atomic: &AtomicPtr) { + pub fn unpark(&self, atomic: &impl Atomic) { let key = atomic as *const _ as usize; // Fast-path, no one waiting to be unparked. @@ -113,3 +113,21 @@ impl Parker { } } } + +/// A generic atomic variable. +pub trait Atomic { + /// Load the value using the given ordering. + fn load(&self, ordering: Ordering) -> T; +} + +impl Atomic<*mut T> for AtomicPtr { + fn load(&self, ordering: Ordering) -> *mut T { + self.load(ordering) + } +} + +impl Atomic for AtomicU8 { + fn load(&self, ordering: Ordering) -> u8 { + self.load(ordering) + } +}