diff --git a/src/sync/chase_lev.rs b/src/sync/chase_lev.rs index 68113a4e4..b2e7d34a5 100644 --- a/src/sync/chase_lev.rs +++ b/src/sync/chase_lev.rs @@ -102,6 +102,19 @@ pub enum Steal { Data(T), } +/// When stealing half of the data, this is an enumeration of the possible +/// outcomes. +#[derive(PartialEq, Eq, Debug)] +pub enum StealHalf { + /// The deque was empty at the time of stealing. + Empty, + /// The stealer lost the race for stealing data, and a retry may return more + /// data. + Abort, + /// The stealer has successfully stolen some data. + Data(Vec), +} + // An internal buffer used by the chase-lev deque. This structure is actually // implemented as a circular buffer, and is used as the intermediate storage of // the data in the deque. @@ -137,6 +150,12 @@ impl Stealer { pub fn steal(&self) -> Steal { self.deque.steal() } + + /// Steals half of the work off the end of the queue (opposite of the + /// worker's end) + pub fn steal_half(&self) -> StealHalf { + self.deque.steal_half() + } } impl Clone for Stealer { @@ -254,6 +273,40 @@ impl Deque { } } + fn steal_half(&self) -> StealHalf { + let guard = epoch::pin(); + + let t = self.top.load(Acquire); + fence(SeqCst); + let b = self.bottom.load(Acquire); + + let size = b - t; + if size <= 0 { + return StealHalf::Empty; + } + + let half = (size + 1) / 2; + + unsafe { + let a = self.array.load(Acquire, &guard).unwrap(); + + let mut data = Vec::with_capacity(half as usize); + for i in t..t + half { + data.push(a.get(i)); + } + + if self.top.compare_and_swap(t, t + half, SeqCst) == t { + StealHalf::Data(data) + } else { + while data.len() > 0 { + mem::forget(data.pop()); + } + StealHalf::Abort + } + } + + } + // potentially shrink the array. This can be called only from the worker. unsafe fn maybe_shrink(&self, b: isize, t: isize, guard: &epoch::Guard) { let a = self.array.load(SeqCst, guard).unwrap();