diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index f6a7fece..3f12beb7 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -77,6 +77,6 @@ jobs: lscpu df -h - west twister -M all -T samples -T tests -v --inline-logs --integration -j 4 \ + ../zephyr/scripts/twister -M all -T samples -T tests -v --inline-logs --integration -j 4 \ --timeout-multiplier 2 \ $(cat etc/platforms.txt) diff --git a/samples/philosophers/Kconfig b/samples/philosophers/Kconfig index 3b0ca539..2f2ab346 100644 --- a/samples/philosophers/Kconfig +++ b/samples/philosophers/Kconfig @@ -40,3 +40,12 @@ choice channels to synchronize. endchoice + +if SYNC_CHANNEL + config USE_BOUNDED_CHANNELS + bool "Should channel sync use bounded channels?" + default y + help + If set, the channel-based communication will use bounded channels with bounds calculated + to not ever block. +endif diff --git a/samples/philosophers/sample.yaml b/samples/philosophers/sample.yaml index d8aafe82..8af73988 100644 --- a/samples/philosophers/sample.yaml +++ b/samples/philosophers/sample.yaml @@ -32,8 +32,15 @@ tests: min_ram: 32 extra_configs: - CONFIG_SYNC_CONDVAR=y - sample.rust.philosopher.channel: + sample.rust.philosopher.channel_bounded: tags: introduction min_ram: 32 extra_configs: - CONFIG_SYNC_CHANNEL=y + - CONFIG_USE_BOUNDED_CHANNELS=y + sample.rust.philosopher.channel_unbounded: + tags: introduction + min_ram: 32 + extra_configs: + - CONFIG_SYNC_CHANNEL=y + - CONFIG_USE_BOUNDED_CHANNELS=n diff --git a/samples/philosophers/src/channel.rs b/samples/philosophers/src/channel.rs index 399bb2b3..3aef607a 100644 --- a/samples/philosophers/src/channel.rs +++ b/samples/philosophers/src/channel.rs @@ -112,10 +112,22 @@ impl ChannelSync { /// Generate a syncer out of a ChannelSync. #[allow(dead_code)] pub fn get_channel_syncer() -> Vec> { - let (cq_send, cq_recv) = channel::unbounded(); - let reply_queues = [(); NUM_PHIL].each_ref().map(|()| { - channel::unbounded() - }); + let (cq_send, cq_recv); + let reply_queues; + + if cfg!(CONFIG_USE_BOUNDED_CHANNELS) { + // Use only one message, so that send will block, to ensure that works. + (cq_send, cq_recv) = channel::bounded(1); + reply_queues = [(); NUM_PHIL].each_ref().map(|()| { + channel::bounded(1) + }); + } else { + (cq_send, cq_recv) = channel::unbounded(); + reply_queues = [(); NUM_PHIL].each_ref().map(|()| { + channel::unbounded() + }); + } + let syncer = reply_queues.into_iter().map(|rqueue| { let item = Box::new(ChannelSync::new(cq_send.clone(), rqueue)) as Box; diff --git a/zephyr/src/sync/channel.rs b/zephyr/src/sync/channel.rs index c541b75d..7dba4bb5 100644 --- a/zephyr/src/sync/channel.rs +++ b/zephyr/src/sync/channel.rs @@ -10,16 +10,47 @@ //! In other words, `zephyr::sys::Queue` is a Rust friendly implementation of `k_queue` in Zephyr. //! This module provides `Sender` and `Receiver`, which can be cloned and behave as if they had an //! internal `Arc` inside them, but without the overhead of an actual Arc. +//! +//! ## IRQ safety +//! +//! These channels are usable from IRQ context on Zephyr in very limited situations. Notably, all +//! of the following must be true: +//! - The channel has been created with `bounded()`, which pre-allocates all of the messages. +//! - If the type `T` has a Drop implementation, this implementation can be called from IRQ context. +//! - Only `try_send` or `try_recv` are used on the channel. +//! +//! The requirement for Drop is only strictly true if the IRQ handler calls `try_recv` and drops +//! received message. If the message is *always* sent over another channel or otherwise not +//! dropped, it *might* be safe to use these messages. +//! +//! ## Dropping of Sender/Receiver +//! +//! Crossbeam channels support detecting when all senders or all receivers have been dropped on a +//! channel, which will cause the handles on the other end to error, including waking up current +//! threads waiting on those channels. +//! +//! At this time, this isn't implementable in Zephyr, as there is no API to wake up all threads +//! blocked on a given `k_queue`. As such, this scenario is not supported. What actually happens +//! is that when all senders or receivers on a channel are dropped, operations on the other end of +//! the channel may just block (or queue forever with unbounded queues). If all handles (both +//! sender and receiver) are dropped, the last drop will cause a panic. It maybe be better to just +//! leak the entire channel, as any data associated with the channels would be leaked at this point, +//! including the underlying Zephyr `k_queue`. Until APIs are added to Zephyr to allow the channel +//! information to be safely freed, these can't actually be freed. extern crate alloc; use alloc::boxed::Box; +use core::cell::UnsafeCell; use core::ffi::c_void; use core::fmt; use core::marker::PhantomData; +use core::mem::MaybeUninit; +use core::pin::Pin; use crate::sys::queue::Queue; +use crate::time::{Forever, NoWait, Timeout}; mod counter; @@ -36,12 +67,16 @@ mod counter; pub fn unbounded_from(queue: Queue) -> (Sender, Receiver) { let (s, r) = counter::new(queue); let s = Sender { - queue: s, - _phantom: PhantomData, + flavor: SenderFlavor::Unbounded { + queue: s, + _phantom: PhantomData, + } }; let r = Receiver { - queue: r, - _phantom: PhantomData, + flavor: ReceiverFlavor::Unbounded { + queue: r, + _phantom: PhantomData, + } }; (s, r) } @@ -58,6 +93,29 @@ pub fn unbounded() -> (Sender, Receiver) { unbounded_from(Queue::new().unwrap()) } +/// Create a multi-producer multi-consumer channel with bounded capacity. +/// +/// The messages are allocated at channel creation time. If there are no messages at `send` time, +/// send will block (possibly waiting for a timeout). +/// +/// At this time, Zephyr does not support crossbeam's 0 capacity queues, which are also called +/// a rendezvous, where both threads wait until in the same region. `bounded` will panic if called +/// with a capacity of zero. +pub fn bounded(cap: usize) -> (Sender, Receiver) { + if cap == 0 { + panic!("Zero capacity queues are not supported on Zephyr"); + } + + let (s, r) = counter::new(Bounded::new(cap)); + let s = Sender { + flavor: SenderFlavor::Bounded(s), + }; + let r = Receiver { + flavor: ReceiverFlavor::Bounded(r), + }; + (s, r) +} + /// The underlying type for Messages through Zephyr's [`Queue`]. /// /// This wrapper is used internally to wrap user messages through the queue. It is not useful in @@ -81,101 +139,320 @@ impl Message { /// The sending side of a channel. pub struct Sender { - queue: counter::Sender, - _phantom: PhantomData, + flavor: SenderFlavor, } +// SAFETY: We implement Send and Sync for the Sender itself, as long as the underlying data can be +// sent. The underlying zephyr primitives used for the channel provide the Sync safety. unsafe impl Send for Sender {} unsafe impl Sync for Sender {} impl Sender { - /// Sends a message over the given channel. This will perform an alloc of the message, which - /// will have an accompanied free on the recipient side. - pub fn send(&self, msg: T) -> Result<(), SendError> { - let msg = Box::new(Message::new(msg)); - let msg = Box::into_raw(msg); - unsafe { - self.queue.send(msg as *mut c_void); + /// Waits for a message to be sent into the channel, but only for a limited time. + /// + /// This call will block until the send operation can proceed or the operation times out. + /// + /// For unbounded channels, this will perform an allocation (and always send immediately). For + /// bounded channels, no allocation will be performed. + pub fn send_timeout(&self, msg: T, timeout: D) -> Result<(), SendError> + where D: Into, + { + match &self.flavor { + SenderFlavor::Unbounded { queue, .. } => { + let msg = Box::new(Message::new(msg)); + let msg = Box::into_raw(msg); + // SAFETY: Zephyr requires, for as long as the message remains in the queue, that + // the first `usize` of the message be available for its use, and that the message + // not be moved. The `into_raw` of the box consumes the box, so this is entirely a + // raw pointer with no references from the Rust code. The item is not used until it + // has been removed from the queue. + unsafe { + queue.send(msg as *mut c_void); + } + } + SenderFlavor::Bounded(chan) => { + // Retrieve a message buffer from the free list. + // SAFETY: Please see the safety discussion on `Bounded` on what makes this safe. + let buf = unsafe { chan.free.recv(timeout) }; + if buf.is_null() { + return Err(SendError(msg)); + } + let buf = buf as *mut Message; + unsafe { + buf.write(Message::new(msg)); + chan.chan.send(buf as *mut c_void); + } + } } Ok(()) } + + /// Sends a message over the given channel. Waiting if necessary. + /// + /// For unbounded channels, this will allocate space for a message, and immediately send it. + /// For bounded channels, this will block until a message slot is available, and then send the + /// message. + pub fn send(&self, msg: T) -> Result<(), SendError> { + self.send_timeout(msg, Forever) + } + + /// Attempts to send a message into the channel without blocking. + /// + /// This message will either send a message into the channel immediately or return an error if + /// the channel is full. The returned error contains the original message. + pub fn try_send(&self, msg: T) -> Result<(), SendError> { + self.send_timeout(msg, NoWait) + } } impl Drop for Sender { fn drop(&mut self) { - unsafe { - self.queue.release(|_| { - crate::printkln!("Release"); - true - }) + match &self.flavor { + SenderFlavor::Unbounded { queue, .. } => { + // SAFETY: It is not possible to free from Zephyr queues. This means drop has to + // either leak or panic. We will panic for now. + unsafe { + queue.release(|_| { + panic!("Unbounded queues cannot currently be dropped"); + }) + } + } + SenderFlavor::Bounded(chan) => { + // SAFETY: It is not possible to free from Zephyr queues. This means drop has to + // either leak or panic. We will panic for now. + unsafe { + chan.release(|_| { + panic!("Bounded queues cannot currently be dropped"); + }) + } + } } } } impl Clone for Sender { fn clone(&self) -> Self { - Sender { - queue: self.queue.acquire(), - _phantom: PhantomData, - } + let flavor = match &self.flavor { + SenderFlavor::Unbounded { queue, .. } => { + SenderFlavor::Unbounded { + queue: queue.acquire(), + _phantom: PhantomData, + } + } + SenderFlavor::Bounded(chan) => { + SenderFlavor::Bounded(chan.acquire()) + } + }; + + Sender { flavor } } } +/// The "flavor" of a sender. This maps to the type of channel. +enum SenderFlavor { + /// An unbounded queue. Messages are allocated with Box, and sent directly. + Unbounded { + queue: counter::Sender, + _phantom: PhantomData, + }, + Bounded(counter::Sender>), +} + impl fmt::Debug for Sender { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Sender {:?}", *self.queue) + write!(f, "Sender") } } /// The receiving side of a channel. pub struct Receiver { - queue: counter::Receiver, - _phantom: PhantomData, + flavor: ReceiverFlavor, } +// SAFETY: We implement Send and Sync for the Receiver itself, as long as the underlying data can be +// sent. The underlying zephyr primitives used for the channel provide the Sync safety. unsafe impl Send for Receiver {} unsafe impl Sync for Receiver {} impl Receiver { + /// Waits for a message to be received from the channel, but only for a limited time. + /// + /// If the channel is empty and not disconnected, this call will block until the receive + /// operation can proceed or the operation times out. + /// wake up and return an error. + pub fn recv_timeout(&self, timeout: D) -> Result + where D: Into, + { + match &self.flavor { + ReceiverFlavor::Unbounded { queue, .. } => { + // SAFETY: Messages were sent by converting a Box through `into_raw()`. + let msg = unsafe { + let msg = queue.recv(timeout); + if msg.is_null() { + return Err(RecvError); + } + msg + }; + let msg = msg as *mut Message; + // SAFETY: After receiving the message from the queue's `recv` method, Zephyr will + // no longer use the `usize` at the beginning, and it is safe for us to convert the + // message back into a box, copy the field out of it, an allow the Box itself to be + // freed. + let msg = unsafe { Box::from_raw(msg) }; + Ok(msg.data) + } + ReceiverFlavor::Bounded(chan) => { + // SAFETY: Please see the safety discussion on Bounded. + let rawbuf = unsafe { + let buf = chan.chan.recv(timeout); + if buf.is_null() { + return Err(RecvError); + } + buf + }; + let buf = rawbuf as *mut Message; + let msg: Message = unsafe { buf.read() }; + unsafe { + chan.free.send(buf as *mut c_void); + } + Ok(msg.data) + } + } + } + /// Blocks the current thread until a message is received or the channel is empty and /// disconnected. /// /// If the channel is empty and not disconnected, this call will block until the receive - /// operation can proceed. If the channel is empty and becomes disconnected, this call will - /// wake up and return an error. + /// operation can proceed. pub fn recv(&self) -> Result { - let msg = unsafe { - self.queue.recv() - }; - let msg = msg as *mut Message; - let msg = unsafe { Box::from_raw(msg) }; - Ok(msg.data) + self.recv_timeout(Forever) + } + + /// Attempts to receive a message from the channel without blocking. + /// + /// This method will either receive a message from the channel immediately, or return an error + /// if the channel is empty. + /// + /// This method is safe to use from IRQ context, if and only if the channel was created as a + /// bounded channel. + pub fn try_recv(&self) -> Result { + self.recv_timeout(NoWait) } } impl Drop for Receiver { fn drop(&mut self) { - unsafe { - self.queue.release(|_| { - crate::printkln!("Release"); - true - }) + match &self.flavor { + ReceiverFlavor::Unbounded { queue, .. } => { + // SAFETY: As the Zephyr channel cannot be freed we must either leak or panic. + // Chose panic for now. + unsafe { + queue.release(|_| { + panic!("Unnbounded channel cannot be dropped"); + }) + } + } + ReceiverFlavor::Bounded(chan) => { + // SAFETY: As the Zephyr channel cannot be freed we must either leak or panic. + // Chose panic for now. + unsafe { + chan.release(|_| { + panic!("Bounded channels cannot be dropped"); + }) + } + } } } } impl Clone for Receiver { fn clone(&self) -> Self { - Receiver { - queue: self.queue.acquire(), - _phantom: PhantomData, - } + let flavor = match &self.flavor { + ReceiverFlavor::Unbounded { queue, .. } => { + ReceiverFlavor::Unbounded { + queue: queue.acquire(), + _phantom: PhantomData, + } + } + ReceiverFlavor::Bounded(chan) => { + ReceiverFlavor::Bounded(chan.acquire()) + } + }; + + Receiver { flavor } } } impl fmt::Debug for Receiver { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Sender {:?}", *self.queue) + write!(f, "Sender") + } +} + +/// The "flavor" of a receiver. This maps to the type of the channel. +enum ReceiverFlavor { + /// An unbounded queue. Messages were allocated with Box, and will be freed upon receipt. + Unbounded { + queue: counter::Receiver, + _phantom: PhantomData, + }, + Bounded(counter::Receiver>), +} + +type Slot = UnsafeCell>>; + +// SAFETY: A Bounded channel contains an array of messages that are allocated together in a Box. +// This Box is held for an eventual future implementation that is able to free the messages, once +// they have all been taken from Zephyr's knowledge. For now, they could also be leaked. +// It is a `Pin>` because it is important that the data never be moved, as we maintain +// pointers to the items in Zephyr queues. +// +// There are two `Queue`s used here: `free` is the free list of messages that are not being sent, +// and `chan` for messages that have been sent but not received. Initially, all slots are placed on +// the `free` queue. At any time, outside of the calls in this module, each slot must live inside +// of one of the two queues. This means that the messages cannot be moved or accessed, except +// inside of the individual send/receive operations. Zephyr makes use of the initial `usize` field +// at the beginning of each Slot. +// +// We use MaybeUninit for the messages to avoid needing to initialize the messages. The individual +// messages are accessed through pointers when they are retrieved from the Zephyr `Queue`, so these +// values are never marked as initialized. +/// Bounded channel implementation. +struct Bounded { + /// The messages themselves. This Box owns the allocation of the messages, although it is + /// unsafe to drop this with any messages stored in either of the Zephyr queues. + /// + /// The UnsafeCell is needed to indicate that this data is handled outside of what Rust is aware + /// of. MaybeUninit allows us to create this without allocation. + _slots: Pin]>>, + /// The free queue, holds messages that aren't be used. + free: Queue, + /// The channel queue. These are messages that have been sent and are waiting to be received. + chan: Queue, +} + +impl Bounded { + fn new(cap: usize) -> Self { + let slots: Box<[Slot]> = (0..cap) + .map(|_| { + UnsafeCell::new(MaybeUninit::uninit()) + }) + .collect(); + let slots = Box::into_pin(slots); + + let free = Queue::new().unwrap(); + let chan = Queue::new().unwrap(); + + // Add each of the boxes to the free list. + for slot in slots.as_ref().iter() { + // SAFETY: See safety discussion on `Bounded`. + unsafe { + free.send(slot.get() as *mut c_void); + } + } + + Bounded { _slots: slots, free, chan } } } diff --git a/zephyr/src/sys/queue.rs b/zephyr/src/sys/queue.rs index 713b8f8e..a2c69752 100644 --- a/zephyr/src/sys/queue.rs +++ b/zephyr/src/sys/queue.rs @@ -18,8 +18,8 @@ use zephyr_sys::{ #[cfg(CONFIG_RUST_ALLOC)] use crate::error::Result; -use crate::sys::K_FOREVER; use crate::object::{Fixed, StaticKernelObject, Wrapped}; +use crate::time::Timeout; /// A wrapper around a Zephyr `k_queue` object. pub struct Queue { @@ -62,8 +62,14 @@ impl Queue { /// Get an element from a queue. /// /// This routine removes the first data item from the [`Queue`]. - pub unsafe fn recv(&self) -> *mut c_void { - k_queue_get(self.item.get(), K_FOREVER) + /// The timeout value can be [`Forever`] to block until there is a message, [`NoWait`] to check + /// and immediately return if there is no message, or a [`Duration`] to indicate a specific + /// timeout. + pub unsafe fn recv(&self, timeout: T) -> *mut c_void + where T: Into + { + let timeout: Timeout = timeout.into(); + k_queue_get(self.item.get(), timeout.0) } }