Skip to content

Commit cf5e530

Browse files
committed
fallback to blocking in mpsc channels
1 parent 8af67ba commit cf5e530

File tree

4 files changed

+540
-139
lines changed

4 files changed

+540
-139
lines changed

library/std/src/sync/mpmc/array.rs

+127-35
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,18 @@ pub(crate) struct Channel<T> {
8686
receivers: SyncWaker,
8787
}
8888

89+
/// The state of the channel after calling `start_recv` or `start_send`.
90+
#[derive(PartialEq, Eq)]
91+
enum Status {
92+
/// The channel is ready to read or write to.
93+
Ready,
94+
/// There is currently a send or receive in progress holding up the queue.
95+
/// All operations must block to preserve linearizability.
96+
InProgress,
97+
/// The channel is empty.
98+
Empty,
99+
}
100+
89101
impl<T> Channel<T> {
90102
/// Creates a bounded channel of capacity `cap`.
91103
pub(crate) fn with_capacity(cap: usize) -> Self {
@@ -122,7 +134,7 @@ impl<T> Channel<T> {
122134
}
123135

124136
/// Attempts to reserve a slot for sending a message.
125-
fn start_send(&self, token: &mut Token) -> bool {
137+
fn start_send(&self, token: &mut Token) -> Status {
126138
let backoff = Backoff::new();
127139
let mut tail = self.tail.load(Ordering::Relaxed);
128140

@@ -131,7 +143,7 @@ impl<T> Channel<T> {
131143
if tail & self.mark_bit != 0 {
132144
token.array.slot = ptr::null();
133145
token.array.stamp = 0;
134-
return true;
146+
return Status::Ready;
135147
}
136148

137149
// Deconstruct the tail.
@@ -166,7 +178,7 @@ impl<T> Channel<T> {
166178
// Prepare the token for the follow-up call to `write`.
167179
token.array.slot = slot as *const Slot<T> as *const u8;
168180
token.array.stamp = tail + 1;
169-
return true;
181+
return Status::Ready;
170182
}
171183
Err(_) => {
172184
backoff.spin_light();
@@ -180,10 +192,16 @@ impl<T> Channel<T> {
180192
// If the head lags one lap behind the tail as well...
181193
if head.wrapping_add(self.one_lap) == tail {
182194
// ...then the channel is full.
183-
return false;
195+
return Status::Empty;
196+
}
197+
198+
// The head was advanced but the stamp hasn't been updated yet,
199+
// meaning a receive is in-progress. Spin for a bit waiting for
200+
// the receive to complete before falling back to blocking.
201+
if !backoff.try_spin_light() {
202+
return Status::InProgress;
184203
}
185204

186-
backoff.spin_light();
187205
tail = self.tail.load(Ordering::Relaxed);
188206
} else {
189207
// Snooze because we need to wait for the stamp to get updated.
@@ -200,10 +218,10 @@ impl<T> Channel<T> {
200218
return Err(msg);
201219
}
202220

203-
let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
221+
let slot: &Slot<T> = unsafe { &*token.array.slot.cast::<Slot<T>>() };
204222

205223
// Write the message into the slot and update the stamp.
206-
slot.msg.get().write(MaybeUninit::new(msg));
224+
unsafe { slot.msg.get().write(MaybeUninit::new(msg)) }
207225
slot.stamp.store(token.array.stamp, Ordering::Release);
208226

209227
// Wake a sleeping receiver.
@@ -212,7 +230,7 @@ impl<T> Channel<T> {
212230
}
213231

214232
/// Attempts to reserve a slot for receiving a message.
215-
fn start_recv(&self, token: &mut Token) -> bool {
233+
fn start_recv(&self, token: &mut Token) -> Status {
216234
let backoff = Backoff::new();
217235
let mut head = self.head.load(Ordering::Relaxed);
218236

@@ -249,7 +267,7 @@ impl<T> Channel<T> {
249267
// Prepare the token for the follow-up call to `read`.
250268
token.array.slot = slot as *const Slot<T> as *const u8;
251269
token.array.stamp = head.wrapping_add(self.one_lap);
252-
return true;
270+
return Status::Ready;
253271
}
254272
Err(_) => {
255273
backoff.spin_light();
@@ -267,14 +285,20 @@ impl<T> Channel<T> {
267285
// ...then receive an error.
268286
token.array.slot = ptr::null();
269287
token.array.stamp = 0;
270-
return true;
288+
return Status::Ready;
271289
} else {
272290
// Otherwise, the receive operation is not ready.
273-
return false;
291+
return Status::Empty;
274292
}
275293
}
276294

277-
backoff.spin_light();
295+
// The tail was advanced but the stamp hasn't been updated yet,
296+
// meaning a send is in-progress. Spin for a bit waiting for
297+
// the send to complete before falling back to blocking.
298+
if !backoff.try_spin_light() {
299+
return Status::InProgress;
300+
}
301+
278302
head = self.head.load(Ordering::Relaxed);
279303
} else {
280304
// Snooze because we need to wait for the stamp to get updated.
@@ -291,10 +315,10 @@ impl<T> Channel<T> {
291315
return Err(());
292316
}
293317

294-
let slot: &Slot<T> = &*(token.array.slot as *const Slot<T>);
318+
let slot: &Slot<T> = unsafe { &*token.array.slot.cast::<Slot<T>>() };
295319

296320
// Read the message from the slot and update the stamp.
297-
let msg = slot.msg.get().read().assume_init();
321+
let msg = unsafe { slot.msg.get().read().assume_init() };
298322
slot.stamp.store(token.array.stamp, Ordering::Release);
299323

300324
// Wake a sleeping sender.
@@ -304,11 +328,13 @@ impl<T> Channel<T> {
304328

305329
/// Attempts to send a message into the channel.
306330
pub(crate) fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
307-
let token = &mut Token::default();
308-
if self.start_send(token) {
309-
unsafe { self.write(token, msg).map_err(TrySendError::Disconnected) }
310-
} else {
311-
Err(TrySendError::Full(msg))
331+
match self.send_blocking(msg, None, false) {
332+
Ok(None) => Ok(()),
333+
Ok(Some(msg)) => Err(TrySendError::Full(msg)),
334+
Err(SendTimeoutError::Disconnected(msg)) => Err(TrySendError::Disconnected(msg)),
335+
Err(SendTimeoutError::Timeout(_)) => {
336+
unreachable!("called recv_blocking with deadline: None")
337+
}
312338
}
313339
}
314340

@@ -318,12 +344,43 @@ impl<T> Channel<T> {
318344
msg: T,
319345
deadline: Option<Instant>,
320346
) -> Result<(), SendTimeoutError<T>> {
347+
self.send_blocking(msg, deadline, true)
348+
.map(|value| assert!(value.is_none(), "called send_blocking with block: true"))
349+
}
350+
351+
/// Sends a message into the channel.
352+
///
353+
/// Blocks until a message is sent if `should_block` is `true`. Otherwise, returns `Ok(Some(msg))` if
354+
/// the channel is full.
355+
///
356+
/// Note this method may still block when `should_block` is `false` if the channel is in an inconsistent state.
357+
pub(crate) fn send_blocking(
358+
&self,
359+
msg: T,
360+
deadline: Option<Instant>,
361+
should_block: bool,
362+
) -> Result<Option<T>, SendTimeoutError<T>> {
321363
let token = &mut Token::default();
364+
let mut state = self.senders.start();
322365
loop {
323-
// Try sending a message.
324-
if self.start_send(token) {
325-
let res = unsafe { self.write(token, msg) };
326-
return res.map_err(SendTimeoutError::Disconnected);
366+
// Try sending a message several times.
367+
let backoff = Backoff::new();
368+
loop {
369+
match self.start_send(token) {
370+
Status::Ready => {
371+
let res = unsafe { self.write(token, msg) };
372+
return res.map(|_| None).map_err(SendTimeoutError::Disconnected);
373+
}
374+
// If the channel is full, return or block immediately.
375+
Status::Empty if !should_block => return Ok(Some(msg)),
376+
Status::Empty => break,
377+
// Otherwise spin for a bit before blocking.
378+
Status::InProgress => {}
379+
}
380+
381+
if !backoff.try_spin_light() {
382+
break;
383+
}
327384
}
328385

329386
if let Some(d) = deadline {
@@ -335,7 +392,7 @@ impl<T> Channel<T> {
335392
Context::with(|cx| {
336393
// Prepare for blocking until a receiver wakes us up.
337394
let oper = Operation::hook(token);
338-
self.senders.register(oper, cx);
395+
self.senders.register(oper, cx, &state);
339396

340397
// Has the channel become ready just now?
341398
if !self.is_full() || self.is_disconnected() {
@@ -353,28 +410,61 @@ impl<T> Channel<T> {
353410
Selected::Operation(_) => {}
354411
}
355412
});
413+
414+
state.unpark();
356415
}
357416
}
358417

359418
/// Attempts to receive a message without blocking.
360419
pub(crate) fn try_recv(&self) -> Result<T, TryRecvError> {
361-
let token = &mut Token::default();
362-
363-
if self.start_recv(token) {
364-
unsafe { self.read(token).map_err(|_| TryRecvError::Disconnected) }
365-
} else {
366-
Err(TryRecvError::Empty)
420+
match self.recv_blocking(None, false) {
421+
Ok(Some(value)) => Ok(value),
422+
Ok(None) => Err(TryRecvError::Empty),
423+
Err(RecvTimeoutError::Disconnected) => Err(TryRecvError::Disconnected),
424+
Err(RecvTimeoutError::Timeout) => {
425+
unreachable!("called recv_blocking with deadline: None")
426+
}
367427
}
368428
}
369429

370430
/// Receives a message from the channel.
371431
pub(crate) fn recv(&self, deadline: Option<Instant>) -> Result<T, RecvTimeoutError> {
432+
self.recv_blocking(deadline, true)
433+
.map(|value| value.expect("called recv_blocking with block: true"))
434+
}
435+
436+
/// Receives a message from the channel.
437+
///
438+
/// Blocks until a message is received if `should_block` is `true`. Otherwise, returns `Ok(None)` if
439+
/// the channel is full.
440+
///
441+
/// Note this may still block when `should_block` is `false` if the channel is in an inconsistent state.
442+
pub(crate) fn recv_blocking(
443+
&self,
444+
deadline: Option<Instant>,
445+
should_block: bool,
446+
) -> Result<Option<T>, RecvTimeoutError> {
372447
let token = &mut Token::default();
448+
let mut state = self.receivers.start();
373449
loop {
374-
// Try receiving a message.
375-
if self.start_recv(token) {
376-
let res = unsafe { self.read(token) };
377-
return res.map_err(|_| RecvTimeoutError::Disconnected);
450+
// Try receiving a message several times.
451+
let backoff = Backoff::new();
452+
loop {
453+
match self.start_recv(token) {
454+
Status::Ready => {
455+
let res = unsafe { self.read(token) };
456+
return res.map(Some).map_err(|_| RecvTimeoutError::Disconnected);
457+
}
458+
// If the channel is empty, return or block immediately.
459+
Status::Empty if !should_block => return Ok(None),
460+
Status::Empty => break,
461+
// Otherwise spin for a bit before blocking.
462+
Status::InProgress => {}
463+
}
464+
465+
if !backoff.try_spin_light() {
466+
break;
467+
}
378468
}
379469

380470
if let Some(d) = deadline {
@@ -386,7 +476,7 @@ impl<T> Channel<T> {
386476
Context::with(|cx| {
387477
// Prepare for blocking until a sender wakes us up.
388478
let oper = Operation::hook(token);
389-
self.receivers.register(oper, cx);
479+
self.receivers.register(oper, cx, &state);
390480

391481
// Has the channel become ready just now?
392482
if !self.is_empty() || self.is_disconnected() {
@@ -406,6 +496,8 @@ impl<T> Channel<T> {
406496
Selected::Operation(_) => {}
407497
}
408498
});
499+
500+
state.unpark();
409501
}
410502
}
411503

0 commit comments

Comments
 (0)