From dded2a912f64fb19625314881d4a034dd6b6f847 Mon Sep 17 00:00:00 2001 From: David Brown Date: Tue, 26 Nov 2024 14:42:11 -0700 Subject: [PATCH 01/14] zephyr: sync: channel: Wrap channel implementation with "flavor" Taken from crossbeam channel, instead of directly storing the queue into the Receiver and Sender, instead wrap this in a enum, which will allow us to have different kinds of implementations. This is done via enum rather than traits as it would otherwise have to be dynamic, and a small dynamic decision is generally faster than the vtable that would result from using `dyn`. Signed-off-by: David Brown --- zephyr/src/sync/channel.rs | 124 +++++++++++++++++++++++++------------ 1 file changed, 86 insertions(+), 38 deletions(-) diff --git a/zephyr/src/sync/channel.rs b/zephyr/src/sync/channel.rs index c541b75d..dc13f3fd 100644 --- a/zephyr/src/sync/channel.rs +++ b/zephyr/src/sync/channel.rs @@ -36,12 +36,16 @@ mod counter; pub fn unbounded_from(queue: Queue) -> (Sender, Receiver) { let (s, r) = counter::new(queue); let s = Sender { - queue: s, - _phantom: PhantomData, + flavor: SenderFlavor::Unbounded { + queue: s, + _phantom: PhantomData, + } }; let r = Receiver { - queue: r, - _phantom: PhantomData, + flavor: ReceiverFlavor::Unbounded { + queue: r, + _phantom: PhantomData, + } }; (s, r) } @@ -81,8 +85,7 @@ impl Message { /// The sending side of a channel. pub struct Sender { - queue: counter::Sender, - _phantom: PhantomData, + flavor: SenderFlavor, } unsafe impl Send for Sender {} @@ -92,10 +95,14 @@ impl Sender { /// Sends a message over the given channel. This will perform an alloc of the message, which /// will have an accompanied free on the recipient side. pub fn send(&self, msg: T) -> Result<(), SendError> { - let msg = Box::new(Message::new(msg)); - let msg = Box::into_raw(msg); - unsafe { - self.queue.send(msg as *mut c_void); + match &self.flavor { + SenderFlavor::Unbounded { queue, .. } => { + let msg = Box::new(Message::new(msg)); + let msg = Box::into_raw(msg); + unsafe { + queue.send(msg as *mut c_void); + } + } } Ok(()) } @@ -103,34 +110,52 @@ impl Sender { impl Drop for Sender { fn drop(&mut self) { - unsafe { - self.queue.release(|_| { - crate::printkln!("Release"); - true - }) + match &self.flavor { + SenderFlavor::Unbounded { queue, .. } => { + unsafe { + queue.release(|_| { + crate::printkln!("Release"); + true + }) + } + } } } } impl Clone for Sender { fn clone(&self) -> Self { - Sender { - queue: self.queue.acquire(), - _phantom: PhantomData, - } + let flavor = match &self.flavor { + SenderFlavor::Unbounded { queue, .. } => { + SenderFlavor::Unbounded { + queue: queue.acquire(), + _phantom: PhantomData, + } + } + }; + + Sender { flavor } + } +} + +/// The "flavor" of a sender. This maps to the type of channel. +enum SenderFlavor { + /// An unbounded queue. Messages are allocated with Box, and sent directly. + Unbounded { + queue: counter::Sender, + _phantom: PhantomData, } } impl fmt::Debug for Sender { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Sender {:?}", *self.queue) + write!(f, "Sender") } } /// The receiving side of a channel. pub struct Receiver { - queue: counter::Receiver, - _phantom: PhantomData, + flavor: ReceiverFlavor, } unsafe impl Send for Receiver {} @@ -144,38 +169,61 @@ impl Receiver { /// operation can proceed. If the channel is empty and becomes disconnected, this call will /// wake up and return an error. pub fn recv(&self) -> Result { - let msg = unsafe { - self.queue.recv() - }; - let msg = msg as *mut Message; - let msg = unsafe { Box::from_raw(msg) }; - Ok(msg.data) + match &self.flavor { + ReceiverFlavor::Unbounded { queue, .. } => { + let msg = unsafe { + queue.recv() + }; + let msg = msg as *mut Message; + let msg = unsafe { Box::from_raw(msg) }; + Ok(msg.data) + } + } } } impl Drop for Receiver { fn drop(&mut self) { - unsafe { - self.queue.release(|_| { - crate::printkln!("Release"); - true - }) + match &self.flavor { + ReceiverFlavor::Unbounded { queue, .. } => { + unsafe { + queue.release(|_| { + crate::printkln!("Release"); + true + }) + } + } } } } impl Clone for Receiver { fn clone(&self) -> Self { - Receiver { - queue: self.queue.acquire(), - _phantom: PhantomData, - } + let flavor = match &self.flavor { + ReceiverFlavor::Unbounded { queue, .. } => { + ReceiverFlavor::Unbounded { + queue: queue.acquire(), + _phantom: PhantomData, + } + } + }; + + Receiver { flavor } } } impl fmt::Debug for Receiver { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Sender {:?}", *self.queue) + write!(f, "Sender") + } +} + +/// The "flavor" of a receiver. This maps to the type of the channel. +enum ReceiverFlavor { + /// An unbounded queue. Messages were allocated with Box, and will be freed upon receipt. + Unbounded { + queue: counter::Receiver, + _phantom: PhantomData, } } From a9b013006570d32b13f66423f6f9b53f18656124 Mon Sep 17 00:00:00 2001 From: David Brown Date: Tue, 26 Nov 2024 15:49:54 -0700 Subject: [PATCH 02/14] zephyr: sync: channel: Implement bounded channels In addition to unbounded channels, also implement bounded channels. These use pre-allocated message buffers, where send as well as recv can block. They are implemented by using two `z_fifo`s to hold the items, treating one as a free list. Signed-off-by: David Brown --- zephyr/src/sync/channel.rs | 107 ++++++++++++++++++++++++++++++++++++- 1 file changed, 106 insertions(+), 1 deletion(-) diff --git a/zephyr/src/sync/channel.rs b/zephyr/src/sync/channel.rs index dc13f3fd..a34e0e58 100644 --- a/zephyr/src/sync/channel.rs +++ b/zephyr/src/sync/channel.rs @@ -15,9 +15,11 @@ extern crate alloc; use alloc::boxed::Box; +use core::cell::UnsafeCell; use core::ffi::c_void; use core::fmt; use core::marker::PhantomData; +use core::mem::MaybeUninit; use crate::sys::queue::Queue; @@ -62,6 +64,29 @@ pub fn unbounded() -> (Sender, Receiver) { unbounded_from(Queue::new().unwrap()) } +/// Create a multi-producer multi-consumer channel with bounded capacity. +/// +/// The messages are allocated at channel creation time. If there are no messages at `send` time, +/// send will block (possibly waiting for a timeout). +/// +/// At this time, Zephyr does not support crossbeam's 0 capacity queues, which are also called +/// a rendezvous, where both threads wait until in the same region. `bounded` will panic if called +/// with a capacity of zero. +pub fn bounded(cap: usize) -> (Sender, Receiver) { + if cap == 0 { + panic!("Zero capacity queues no supported on Zephyr"); + } + + let (s, r) = counter::new(Bounded::new(cap)); + let s = Sender { + flavor: SenderFlavor::Bounded(s), + }; + let r = Receiver { + flavor: ReceiverFlavor::Bounded(r), + }; + (s, r) +} + /// The underlying type for Messages through Zephyr's [`Queue`]. /// /// This wrapper is used internally to wrap user messages through the queue. It is not useful in @@ -103,6 +128,15 @@ impl Sender { queue.send(msg as *mut c_void); } } + SenderFlavor::Bounded(chan) => { + // Retrieve a message buffer from the free list. + let buf = unsafe { chan.free.recv() }; + let buf = buf as *mut Message; + unsafe { + buf.write(Message::new(msg)); + chan.chan.send(buf as *mut c_void); + } + } } Ok(()) } @@ -119,6 +153,13 @@ impl Drop for Sender { }) } } + SenderFlavor::Bounded(chan) => { + unsafe { + chan.release(|_| { + panic!("Bounded queues cannot be dropped"); + }) + } + } } } } @@ -132,6 +173,9 @@ impl Clone for Sender { _phantom: PhantomData, } } + SenderFlavor::Bounded(chan) => { + SenderFlavor::Bounded(chan.acquire()) + } }; Sender { flavor } @@ -144,7 +188,8 @@ enum SenderFlavor { Unbounded { queue: counter::Sender, _phantom: PhantomData, - } + }, + Bounded(counter::Sender>), } impl fmt::Debug for Sender { @@ -178,6 +223,17 @@ impl Receiver { let msg = unsafe { Box::from_raw(msg) }; Ok(msg.data) } + ReceiverFlavor::Bounded(chan) => { + let rawbuf = unsafe { + chan.chan.recv() + }; + let buf = rawbuf as *mut Message; + let msg: Message = unsafe { buf.read() }; + unsafe { + chan.free.send(buf as *mut c_void); + } + Ok(msg.data) + } } } } @@ -193,6 +249,13 @@ impl Drop for Receiver { }) } } + ReceiverFlavor::Bounded(chan) => { + unsafe { + chan.release(|_| { + panic!("Bounded channels cannot be dropped"); + }) + } + } } } } @@ -206,6 +269,9 @@ impl Clone for Receiver { _phantom: PhantomData, } } + ReceiverFlavor::Bounded(chan) => { + ReceiverFlavor::Bounded(chan.acquire()) + } }; Receiver { flavor } @@ -224,6 +290,45 @@ enum ReceiverFlavor { Unbounded { queue: counter::Receiver, _phantom: PhantomData, + }, + Bounded(counter::Receiver>), +} + +type Slot = UnsafeCell>>; + +/// Bounded channel implementation. +struct Bounded { + /// The messages themselves. This Box owns the allocation of the messages, although it is + /// unsafe to drop this with any messages stored in either of the Zephyr queues. + /// + /// The UnsafeCell is needed to indicate that this data is handled outside of what Rust is aware + /// of. MaybeUninit allows us to create this without allocation. + _slots: Box<[Slot]>, + /// The free queue, holds messages that aren't be used. + free: Queue, + /// The channel queue. These are messages that have been sent and are waiting to be received. + chan: Queue, +} + +impl Bounded { + fn new(cap: usize) -> Self { + let slots: Box<[Slot]> = (0..cap) + .map(|_| { + UnsafeCell::new(MaybeUninit::uninit()) + }) + .collect(); + + let free = Queue::new().unwrap(); + let chan = Queue::new().unwrap(); + + // Add each of the boxes to the free list. + for chan in &slots { + unsafe { + free.send(chan.get() as *mut c_void); + } + } + + Bounded { _slots: slots, free, chan } } } From 5bf1ac6c5d0163c211d42eb01f6818e4c24446d1 Mon Sep 17 00:00:00 2001 From: David Brown Date: Tue, 26 Nov 2024 16:38:39 -0700 Subject: [PATCH 03/14] samples: philosophers: Test bounded channels as well Bounded channels only allocate on creation, and are able to block upon send until there is a message slot available. Signed-off-by: David Brown --- samples/philosophers/Kconfig | 9 +++++++++ samples/philosophers/sample.yaml | 9 ++++++++- samples/philosophers/src/channel.rs | 20 ++++++++++++++++---- 3 files changed, 33 insertions(+), 5 deletions(-) diff --git a/samples/philosophers/Kconfig b/samples/philosophers/Kconfig index 3b0ca539..3248a849 100644 --- a/samples/philosophers/Kconfig +++ b/samples/philosophers/Kconfig @@ -40,3 +40,12 @@ choice channels to synchronize. endchoice + +if SYNC_CHANNEL + config USE_BOUNDED_CHANNELS + bool "Should channel sync use dounded channels?" + default y + help + If set, the channel-based communication will use bounded channels with bounds calculated + to not ever block. +endif diff --git a/samples/philosophers/sample.yaml b/samples/philosophers/sample.yaml index d8aafe82..8af73988 100644 --- a/samples/philosophers/sample.yaml +++ b/samples/philosophers/sample.yaml @@ -32,8 +32,15 @@ tests: min_ram: 32 extra_configs: - CONFIG_SYNC_CONDVAR=y - sample.rust.philosopher.channel: + sample.rust.philosopher.channel_bounded: tags: introduction min_ram: 32 extra_configs: - CONFIG_SYNC_CHANNEL=y + - CONFIG_USE_BOUNDED_CHANNELS=y + sample.rust.philosopher.channel_unbounded: + tags: introduction + min_ram: 32 + extra_configs: + - CONFIG_SYNC_CHANNEL=y + - CONFIG_USE_BOUNDED_CHANNELS=n diff --git a/samples/philosophers/src/channel.rs b/samples/philosophers/src/channel.rs index 399bb2b3..3aef607a 100644 --- a/samples/philosophers/src/channel.rs +++ b/samples/philosophers/src/channel.rs @@ -112,10 +112,22 @@ impl ChannelSync { /// Generate a syncer out of a ChannelSync. #[allow(dead_code)] pub fn get_channel_syncer() -> Vec> { - let (cq_send, cq_recv) = channel::unbounded(); - let reply_queues = [(); NUM_PHIL].each_ref().map(|()| { - channel::unbounded() - }); + let (cq_send, cq_recv); + let reply_queues; + + if cfg!(CONFIG_USE_BOUNDED_CHANNELS) { + // Use only one message, so that send will block, to ensure that works. + (cq_send, cq_recv) = channel::bounded(1); + reply_queues = [(); NUM_PHIL].each_ref().map(|()| { + channel::bounded(1) + }); + } else { + (cq_send, cq_recv) = channel::unbounded(); + reply_queues = [(); NUM_PHIL].each_ref().map(|()| { + channel::unbounded() + }); + } + let syncer = reply_queues.into_iter().map(|rqueue| { let item = Box::new(ChannelSync::new(cq_send.clone(), rqueue)) as Box; From d5ff559f61f0a963a2d55aba273c9bc84d51ea56 Mon Sep 17 00:00:00 2001 From: David Brown Date: Wed, 27 Nov 2024 11:43:44 -0700 Subject: [PATCH 04/14] zephyr: sys: queue: Add timeout to queue recv Add an argument to the `recv` method on Queue to allow a timeout to be specified. This will allow channels to have timeout variants available. Signed-off-by: David Brown --- zephyr/src/sync/channel.rs | 7 ++++--- zephyr/src/sys/queue.rs | 12 +++++++++--- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/zephyr/src/sync/channel.rs b/zephyr/src/sync/channel.rs index a34e0e58..e2924b0c 100644 --- a/zephyr/src/sync/channel.rs +++ b/zephyr/src/sync/channel.rs @@ -22,6 +22,7 @@ use core::marker::PhantomData; use core::mem::MaybeUninit; use crate::sys::queue::Queue; +use crate::time::Forever; mod counter; @@ -130,7 +131,7 @@ impl Sender { } SenderFlavor::Bounded(chan) => { // Retrieve a message buffer from the free list. - let buf = unsafe { chan.free.recv() }; + let buf = unsafe { chan.free.recv(Forever) }; let buf = buf as *mut Message; unsafe { buf.write(Message::new(msg)); @@ -217,7 +218,7 @@ impl Receiver { match &self.flavor { ReceiverFlavor::Unbounded { queue, .. } => { let msg = unsafe { - queue.recv() + queue.recv(Forever) }; let msg = msg as *mut Message; let msg = unsafe { Box::from_raw(msg) }; @@ -225,7 +226,7 @@ impl Receiver { } ReceiverFlavor::Bounded(chan) => { let rawbuf = unsafe { - chan.chan.recv() + chan.chan.recv(Forever) }; let buf = rawbuf as *mut Message; let msg: Message = unsafe { buf.read() }; diff --git a/zephyr/src/sys/queue.rs b/zephyr/src/sys/queue.rs index 713b8f8e..a2c69752 100644 --- a/zephyr/src/sys/queue.rs +++ b/zephyr/src/sys/queue.rs @@ -18,8 +18,8 @@ use zephyr_sys::{ #[cfg(CONFIG_RUST_ALLOC)] use crate::error::Result; -use crate::sys::K_FOREVER; use crate::object::{Fixed, StaticKernelObject, Wrapped}; +use crate::time::Timeout; /// A wrapper around a Zephyr `k_queue` object. pub struct Queue { @@ -62,8 +62,14 @@ impl Queue { /// Get an element from a queue. /// /// This routine removes the first data item from the [`Queue`]. - pub unsafe fn recv(&self) -> *mut c_void { - k_queue_get(self.item.get(), K_FOREVER) + /// The timeout value can be [`Forever`] to block until there is a message, [`NoWait`] to check + /// and immediately return if there is no message, or a [`Duration`] to indicate a specific + /// timeout. + pub unsafe fn recv(&self, timeout: T) -> *mut c_void + where T: Into + { + let timeout: Timeout = timeout.into(); + k_queue_get(self.item.get(), timeout.0) } } From 73fb5156d9538d309179f696696bb33bcec567ba Mon Sep 17 00:00:00 2001 From: David Brown Date: Wed, 27 Nov 2024 11:57:07 -0700 Subject: [PATCH 05/14] zephyr: sync: channel: Add send timeout and try variants Add implementations for `try_send` and `send_timeout` to channels. Currently, these all use the same error code, as there is only one kind of failure possible with Zephyr's channels (and zephyr treats "try" as just a zero timeout). Signed-off-by: David Brown --- zephyr/src/sync/channel.rs | 36 +++++++++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/zephyr/src/sync/channel.rs b/zephyr/src/sync/channel.rs index e2924b0c..38a9ba0f 100644 --- a/zephyr/src/sync/channel.rs +++ b/zephyr/src/sync/channel.rs @@ -22,7 +22,7 @@ use core::marker::PhantomData; use core::mem::MaybeUninit; use crate::sys::queue::Queue; -use crate::time::Forever; +use crate::time::{Forever, NoWait, Timeout}; mod counter; @@ -118,9 +118,15 @@ unsafe impl Send for Sender {} unsafe impl Sync for Sender {} impl Sender { - /// Sends a message over the given channel. This will perform an alloc of the message, which - /// will have an accompanied free on the recipient side. - pub fn send(&self, msg: T) -> Result<(), SendError> { + /// Waits for a message to be sent into the channel, but only for a limited time. + /// + /// This call will block until the send operation can proceed or the operation times out. + /// + /// For unbounded channels, this will perform an allocation (and always send immediately). For + /// bounded channels, no allocation will be performed. + pub fn send_timeout(&self, msg: T, timeout: D) -> Result<(), SendError> + where D: Into, + { match &self.flavor { SenderFlavor::Unbounded { queue, .. } => { let msg = Box::new(Message::new(msg)); @@ -131,7 +137,10 @@ impl Sender { } SenderFlavor::Bounded(chan) => { // Retrieve a message buffer from the free list. - let buf = unsafe { chan.free.recv(Forever) }; + let buf = unsafe { chan.free.recv(timeout) }; + if buf.is_null() { + return Err(SendError(msg)); + } let buf = buf as *mut Message; unsafe { buf.write(Message::new(msg)); @@ -141,6 +150,23 @@ impl Sender { } Ok(()) } + + /// Sends a message over the given channel. Waiting if necessary. + /// + /// For unbounded channels, this will allocate space for a message, and immediately send it. + /// For bounded channels, this will block until a message slot is available, and then send the + /// message. + pub fn send(&self, msg: T) -> Result<(), SendError> { + self.send_timeout(msg, Forever) + } + + /// Attempts to send a message into the channel without blocking. + /// + /// This message will either send a message into the channel immediately or return an error if + /// the channel is full. The returned error contains the original message. + pub fn try_send(&self, msg: T) -> Result<(), SendError> { + self.send_timeout(msg, NoWait) + } } impl Drop for Sender { From 35bcf0d6e10b570ccbee2b63fb567beeb43cab7e Mon Sep 17 00:00:00 2001 From: David Brown Date: Wed, 27 Nov 2024 12:04:34 -0700 Subject: [PATCH 06/14] zephyr: sync: channel: Add try and timeout variants of `recv` Add `try_recv` and `recv_timeout` variants to the channel Receiver. Notably, this allows the special case of a `try_recv` on a bounded channel being safe to call from IRQ context. Notably, unbounded channels are _never_ safe to use from IRQ context. Signed-off-by: David Brown --- zephyr/src/sync/channel.rs | 41 ++++++++++++++++++++++++++++++++------ 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/zephyr/src/sync/channel.rs b/zephyr/src/sync/channel.rs index 38a9ba0f..0071eb30 100644 --- a/zephyr/src/sync/channel.rs +++ b/zephyr/src/sync/channel.rs @@ -234,17 +234,22 @@ unsafe impl Send for Receiver {} unsafe impl Sync for Receiver {} impl Receiver { - /// Blocks the current thread until a message is received or the channel is empty and - /// disconnected. + /// Waits for a message to be received from the channel, but only for a limited time. /// /// If the channel is empty and not disconnected, this call will block until the receive - /// operation can proceed. If the channel is empty and becomes disconnected, this call will + /// operation can proceed or the operation times out. /// wake up and return an error. - pub fn recv(&self) -> Result { + pub fn recv_timeout(&self, timeout: D) -> Result + where D: Into, + { match &self.flavor { ReceiverFlavor::Unbounded { queue, .. } => { let msg = unsafe { - queue.recv(Forever) + let msg = queue.recv(timeout); + if msg.is_null() { + return Err(RecvError); + } + msg }; let msg = msg as *mut Message; let msg = unsafe { Box::from_raw(msg) }; @@ -252,7 +257,11 @@ impl Receiver { } ReceiverFlavor::Bounded(chan) => { let rawbuf = unsafe { - chan.chan.recv(Forever) + let buf = chan.chan.recv(timeout); + if buf.is_null() { + return Err(RecvError); + } + buf }; let buf = rawbuf as *mut Message; let msg: Message = unsafe { buf.read() }; @@ -263,6 +272,26 @@ impl Receiver { } } } + + /// Blocks the current thread until a message is received or the channel is empty and + /// disconnected. + /// + /// If the channel is empty and not disconnected, this call will block until the receive + /// operation can proceed. + pub fn recv(&self) -> Result { + self.recv_timeout(Forever) + } + + /// Attempts to receive a message from the channel without blocking. + /// + /// This method will either receive a message from the channel immediately, or return an error + /// if the channel is empty. + /// + /// This method is safe to use from IRQ context, if and only if the channel was created as a + /// bounded channel. + pub fn try_recv(&self) -> Result { + self.recv_timeout(NoWait) + } } impl Drop for Receiver { From bdea9783358cbf6a47c44d7f5442f5db50da3d73 Mon Sep 17 00:00:00 2001 From: David Brown Date: Wed, 27 Nov 2024 12:09:10 -0700 Subject: [PATCH 07/14] zephyr: sync: channel: Add IRQ safety comment Add a bit of commentary as to the limited situations where it safe to use channels from IRQ context. Signed-off-by: David Brown --- zephyr/src/sync/channel.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/zephyr/src/sync/channel.rs b/zephyr/src/sync/channel.rs index 0071eb30..0042e3f9 100644 --- a/zephyr/src/sync/channel.rs +++ b/zephyr/src/sync/channel.rs @@ -10,6 +10,18 @@ //! In other words, `zephyr::sys::Queue` is a Rust friendly implementation of `k_queue` in Zephyr. //! This module provides `Sender` and `Receiver`, which can be cloned and behave as if they had an //! internal `Arc` inside them, but without the overhead of an actual Arc. +//! +//! ## IRQ safety +//! +//! These channels are usable from IRQ context on Zephyr in very limited situations. Notably, all +//! of the following must be true: +//! - The channel has been created with `bounded()`, which pre-allocates all of the messages. +//! - If the type `T` has a Drop implementation, this implementation can be called from IRQ context. +//! - Only `try_send` or `try_recv` are used on the channel. +//! +//! The requirement for Drop is only strictly true if the IRQ handler calls `try_recv` and drops +//! received message. If the message is *always* sent over another channel or otherwise not +//! dropped, it *might* be safe to use these messages. extern crate alloc; From 1bea58e761287577f2e03158f8b0d0ca23d6afe9 Mon Sep 17 00:00:00 2001 From: David Brown Date: Fri, 29 Nov 2024 12:31:58 -0700 Subject: [PATCH 08/14] samples: philosophers: typo fix Fix a typo in the description of the Kconfig setting. Signed-off-by: David Brown --- samples/philosophers/Kconfig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/philosophers/Kconfig b/samples/philosophers/Kconfig index 3248a849..2f2ab346 100644 --- a/samples/philosophers/Kconfig +++ b/samples/philosophers/Kconfig @@ -43,7 +43,7 @@ endchoice if SYNC_CHANNEL config USE_BOUNDED_CHANNELS - bool "Should channel sync use dounded channels?" + bool "Should channel sync use bounded channels?" default y help If set, the channel-based communication will use bounded channels with bounds calculated From e5c269d544e0e07483d803a0c14e1c0235bd80aa Mon Sep 17 00:00:00 2001 From: David Brown Date: Fri, 29 Nov 2024 12:32:34 -0700 Subject: [PATCH 09/14] zephyr: sync: channel: Panic on drop Change behavior to panic if all handles are dropped. Add a comment describing this behavior, and the reason behind it. It could also be made to leak, but still should be documented. Signed-off-by: David Brown --- zephyr/src/sync/channel.rs | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/zephyr/src/sync/channel.rs b/zephyr/src/sync/channel.rs index 0042e3f9..4b22b676 100644 --- a/zephyr/src/sync/channel.rs +++ b/zephyr/src/sync/channel.rs @@ -22,6 +22,21 @@ //! The requirement for Drop is only strictly true if the IRQ handler calls `try_recv` and drops //! received message. If the message is *always* sent over another channel or otherwise not //! dropped, it *might* be safe to use these messages. +//! +//! ## Dropping of Sender/Receiver +//! +//! Crossbeam channels support detecting when all senders or all receivers have been dropped on a +//! channel, which will cause the handles on the other end to error, including waking up current +//! threads waiting on those channels. +//! +//! At this time, this isn't implementable in Zephyr, as there is no API to wake up all threads +//! blocked on a given `k_queue`. As such, this scenario is not supported. What actually happens +//! is that when all senders or receivers on a channel are dropped, operations on the other end of +//! the channel may just block (or queue forever with unbounded queues). If all handles (both +//! sender and receiver) are dropped, the last drop will cause a panic. It maybe be better to just +//! leak the entire channel, as any data associated with the channels would be leaked at this point, +//! including the underlying Zephyr `k_queue`. Until APIs are added to Zephyr to allow the channel +//! information to be safely freed, these can't actually be freed. extern crate alloc; @@ -187,15 +202,14 @@ impl Drop for Sender { SenderFlavor::Unbounded { queue, .. } => { unsafe { queue.release(|_| { - crate::printkln!("Release"); - true + panic!("Unbounded queues cannot currently be dropped"); }) } } SenderFlavor::Bounded(chan) => { unsafe { chan.release(|_| { - panic!("Bounded queues cannot be dropped"); + panic!("Bounded queues cannot currently be dropped"); }) } } From bcd74812959a77e42d9b5609b4e62877ba4d4af2 Mon Sep 17 00:00:00 2001 From: David Brown Date: Fri, 29 Nov 2024 12:33:27 -0700 Subject: [PATCH 10/14] zephyr: sync: channel: Rename loop variable Rename this loop variable to better match what each element is, and avoid a conflict with a variable in an outer scope. Signed-off-by: David Brown --- zephyr/src/sync/channel.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zephyr/src/sync/channel.rs b/zephyr/src/sync/channel.rs index 4b22b676..fd647420 100644 --- a/zephyr/src/sync/channel.rs +++ b/zephyr/src/sync/channel.rs @@ -404,9 +404,9 @@ impl Bounded { let chan = Queue::new().unwrap(); // Add each of the boxes to the free list. - for chan in &slots { + for slot in &slots { unsafe { - free.send(chan.get() as *mut c_void); + free.send(slot.get() as *mut c_void); } } From 67381563b092ad6c0081d7177c59a4ac76ccc48e Mon Sep 17 00:00:00 2001 From: David Brown Date: Fri, 29 Nov 2024 13:00:41 -0700 Subject: [PATCH 11/14] zephyr: sync: channel: Add safety comments Add comments describe the safey of unsafe blocks. Signed-off-by: David Brown --- zephyr/src/sync/channel.rs | 42 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/zephyr/src/sync/channel.rs b/zephyr/src/sync/channel.rs index fd647420..e60db8a0 100644 --- a/zephyr/src/sync/channel.rs +++ b/zephyr/src/sync/channel.rs @@ -141,6 +141,8 @@ pub struct Sender { flavor: SenderFlavor, } +// SAFETY: We implement Send and Sync for the Sender itself, as long as the underlying data can be +// sent. The underlying zephyr primitives used for the channel provide the Sync safety. unsafe impl Send for Sender {} unsafe impl Sync for Sender {} @@ -158,12 +160,18 @@ impl Sender { SenderFlavor::Unbounded { queue, .. } => { let msg = Box::new(Message::new(msg)); let msg = Box::into_raw(msg); + // SAFETY: Zephyr requires, for as long as the message remains in the queue, that + // the first `usize` of the message be available for its use, and that the message + // not be moved. The `into_raw` of the box consumes the box, so this is entirely a + // raw pointer with no references from the Rust code. The item is not used until it + // has been removed from the queue. unsafe { queue.send(msg as *mut c_void); } } SenderFlavor::Bounded(chan) => { // Retrieve a message buffer from the free list. + // SAFETY: Please see the safety discussion on `Bounded` on what makes this safe. let buf = unsafe { chan.free.recv(timeout) }; if buf.is_null() { return Err(SendError(msg)); @@ -200,6 +208,8 @@ impl Drop for Sender { fn drop(&mut self) { match &self.flavor { SenderFlavor::Unbounded { queue, .. } => { + // SAFETY: It is not possible to free from Zephyr queues. This means drop has to + // either leak or panic. We will panic for now. unsafe { queue.release(|_| { panic!("Unbounded queues cannot currently be dropped"); @@ -207,6 +217,8 @@ impl Drop for Sender { } } SenderFlavor::Bounded(chan) => { + // SAFETY: It is not possible to free from Zephyr queues. This means drop has to + // either leak or panic. We will panic for now. unsafe { chan.release(|_| { panic!("Bounded queues cannot currently be dropped"); @@ -256,6 +268,8 @@ pub struct Receiver { flavor: ReceiverFlavor, } +// SAFETY: We implement Send and Sync for the Receiver itself, as long as the underlying data can be +// sent. The underlying zephyr primitives used for the channel provide the Sync safety. unsafe impl Send for Receiver {} unsafe impl Sync for Receiver {} @@ -270,6 +284,7 @@ impl Receiver { { match &self.flavor { ReceiverFlavor::Unbounded { queue, .. } => { + // SAFETY: Messages were sent by converting a Box through `into_raw()`. let msg = unsafe { let msg = queue.recv(timeout); if msg.is_null() { @@ -278,10 +293,15 @@ impl Receiver { msg }; let msg = msg as *mut Message; + // SAFETY: After receiving the message from the queue's `recv` method, Zephyr will + // no longer use the `usize` at the beginning, and it is safe for us to convert the + // message back into a box, copy the field out of it, an allow the Box itself to be + // freed. let msg = unsafe { Box::from_raw(msg) }; Ok(msg.data) } ReceiverFlavor::Bounded(chan) => { + // SAFETY: Please see the safety discussion on Bounded. let rawbuf = unsafe { let buf = chan.chan.recv(timeout); if buf.is_null() { @@ -324,14 +344,17 @@ impl Drop for Receiver { fn drop(&mut self) { match &self.flavor { ReceiverFlavor::Unbounded { queue, .. } => { + // SAFETY: As the Zephyr channel cannot be freed we must either leak or panic. + // Chose panic for now. unsafe { queue.release(|_| { - crate::printkln!("Release"); - true + panic!("Unnbounded channel cannot be dropped"); }) } } ReceiverFlavor::Bounded(chan) => { + // SAFETY: As the Zephyr channel cannot be freed we must either leak or panic. + // Chose panic for now. unsafe { chan.release(|_| { panic!("Bounded channels cannot be dropped"); @@ -378,6 +401,20 @@ enum ReceiverFlavor { type Slot = UnsafeCell>>; +// SAFETY: A Bounded channel contains an array of messages that are allocated together in a Box. +// This Box is held for an eventual future implementation that is able to free the messages, once +// they have all been taken from Zephyr's knowledge. For now, they could also be leaked. +// +// There are two `Queue`s used here: `free` is the free list of messages that are not being sent, +// and `chan` for messages that have been sent but not received. Initially, all slots are placed on +// the `free` queue. At any time, outside of the calls in this module, each slot must live inside +// of one of the two queues. This means that the messages cannot be moved or accessed, except +// inside of the individual send/receive operations. Zephyr makes use of the initial `usize` field +// at the beginning of each Slot. +// +// We use MaybeUninit for the messages to avoid needing to initialize the messages. The individual +// messages are accessed through pointers when they are retrieved from the Zephyr `Queue`, so these +// values are never marked as initialized. /// Bounded channel implementation. struct Bounded { /// The messages themselves. This Box owns the allocation of the messages, although it is @@ -405,6 +442,7 @@ impl Bounded { // Add each of the boxes to the free list. for slot in &slots { + // SAFETY: See safety discussion on `Bounded`. unsafe { free.send(slot.get() as *mut c_void); } From 6b38734f8071f402d80522e74712dd409bffe211 Mon Sep 17 00:00:00 2001 From: David Brown Date: Fri, 29 Nov 2024 13:08:13 -0700 Subject: [PATCH 12/14] zephyr: sync: channel: Pin the bounded message slots Although this is a private field and never accessed within this code, explicitly mark it as `Pin` to make it clear that it is important that the data never be moved. Signed-off-by: David Brown --- zephyr/src/sync/channel.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/zephyr/src/sync/channel.rs b/zephyr/src/sync/channel.rs index e60db8a0..503db963 100644 --- a/zephyr/src/sync/channel.rs +++ b/zephyr/src/sync/channel.rs @@ -47,6 +47,7 @@ use core::ffi::c_void; use core::fmt; use core::marker::PhantomData; use core::mem::MaybeUninit; +use core::pin::Pin; use crate::sys::queue::Queue; use crate::time::{Forever, NoWait, Timeout}; @@ -404,6 +405,8 @@ type Slot = UnsafeCell>>; // SAFETY: A Bounded channel contains an array of messages that are allocated together in a Box. // This Box is held for an eventual future implementation that is able to free the messages, once // they have all been taken from Zephyr's knowledge. For now, they could also be leaked. +// It is a `Pin>` because it is important that the data never be moved, as we maintain +// pointers to the items in Zephyr queues. // // There are two `Queue`s used here: `free` is the free list of messages that are not being sent, // and `chan` for messages that have been sent but not received. Initially, all slots are placed on @@ -422,7 +425,7 @@ struct Bounded { /// /// The UnsafeCell is needed to indicate that this data is handled outside of what Rust is aware /// of. MaybeUninit allows us to create this without allocation. - _slots: Box<[Slot]>, + _slots: Pin]>>, /// The free queue, holds messages that aren't be used. free: Queue, /// The channel queue. These are messages that have been sent and are waiting to be received. @@ -436,12 +439,13 @@ impl Bounded { UnsafeCell::new(MaybeUninit::uninit()) }) .collect(); + let slots = Box::into_pin(slots); let free = Queue::new().unwrap(); let chan = Queue::new().unwrap(); // Add each of the boxes to the free list. - for slot in &slots { + for slot in slots.as_ref().iter() { // SAFETY: See safety discussion on `Bounded`. unsafe { free.send(slot.get() as *mut c_void); From 635c6ff080330f7245fc669e696ed6c6186f392d Mon Sep 17 00:00:00 2001 From: David Brown Date: Fri, 29 Nov 2024 14:04:57 -0700 Subject: [PATCH 13/14] CI: Fix twister invocation The `west twister` command seems to be broken right now. Fix this by directly invoking the twister script better. Signed-off-by: David Brown --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index f6a7fece..3f12beb7 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -77,6 +77,6 @@ jobs: lscpu df -h - west twister -M all -T samples -T tests -v --inline-logs --integration -j 4 \ + ../zephyr/scripts/twister -M all -T samples -T tests -v --inline-logs --integration -j 4 \ --timeout-multiplier 2 \ $(cat etc/platforms.txt) From 1b3198a9692437fc516d6e86826313c1a1ed3b75 Mon Sep 17 00:00:00 2001 From: David Brown Date: Thu, 5 Dec 2024 12:22:43 -0700 Subject: [PATCH 14/14] zephyr: sync: channel: Fix typo in panic message Fix so that it has valid grammar. Signed-off-by: David Brown --- zephyr/src/sync/channel.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zephyr/src/sync/channel.rs b/zephyr/src/sync/channel.rs index 503db963..7dba4bb5 100644 --- a/zephyr/src/sync/channel.rs +++ b/zephyr/src/sync/channel.rs @@ -103,7 +103,7 @@ pub fn unbounded() -> (Sender, Receiver) { /// with a capacity of zero. pub fn bounded(cap: usize) -> (Sender, Receiver) { if cap == 0 { - panic!("Zero capacity queues no supported on Zephyr"); + panic!("Zero capacity queues are not supported on Zephyr"); } let (s, r) = counter::new(Bounded::new(cap));