-
Notifications
You must be signed in to change notification settings - Fork 32
Implement Bounded channels #33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Taken from crossbeam channel, instead of directly storing the queue into the Receiver and Sender, instead wrap this in a enum, which will allow us to have different kinds of implementations. This is done via enum rather than traits as it would otherwise have to be dynamic, and a small dynamic decision is generally faster than the vtable that would result from using `dyn`. Signed-off-by: David Brown <[email protected]>
In addition to unbounded channels, also implement bounded channels. These use pre-allocated message buffers, where send as well as recv can block. They are implemented by using two `z_fifo`s to hold the items, treating one as a free list. Signed-off-by: David Brown <[email protected]>
Bounded channels only allocate on creation, and are able to block upon send until there is a message slot available. Signed-off-by: David Brown <[email protected]>
Add an argument to the `recv` method on Queue to allow a timeout to be specified. This will allow channels to have timeout variants available. Signed-off-by: David Brown <[email protected]>
Add implementations for `try_send` and `send_timeout` to channels. Currently, these all use the same error code, as there is only one kind of failure possible with Zephyr's channels (and zephyr treats "try" as just a zero timeout). Signed-off-by: David Brown <[email protected]>
Add `try_recv` and `recv_timeout` variants to the channel Receiver. Notably, this allows the special case of a `try_recv` on a bounded channel being safe to call from IRQ context. Notably, unbounded channels are _never_ safe to use from IRQ context. Signed-off-by: David Brown <[email protected]>
Add a bit of commentary as to the limited situations where it safe to use channels from IRQ context. Signed-off-by: David Brown <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only some nitpicking comments. LGTM overall. But I will repeat myself by saying that I find the safety tough to review without // SAFETY comments :).
zephyr/src/sync/channel.rs
Outdated
ReceiverFlavor::Unbounded { queue, .. } => { | ||
unsafe { | ||
queue.release(|_| { | ||
crate::printkln!("Release"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reads like it is just a temporary debug print?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Honestly, maybe it should just be a panic. Not really sure what to do other than just leak the underlying channel. In order to actually close them, Zephyr needs a new API to be able to wake up all blockers on a queue, not just one of them.
/// An unbounded queue. Messages are allocated with Box, and sent directly. | ||
Unbounded { | ||
queue: counter::Sender<Queue>, | ||
_phantom: PhantomData<T>, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I personally would have put them over the type where they were used (same below, would have kept Sender + Receiver Flavor together).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps I should move them to their own file, that's how crossbeam does it. But crossbeam also has many more flavors, and the code is a lot more complex, as they are actually implementing the channel, not just using an underlying k_queue
like this does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I'm not quite sure I understand where you would put this definition. You would group the flavors together rather than keeping the sender and receiver things together. I'm not sure I'd find that clearer.
zephyr/src/sync/channel.rs
Outdated
// Add each of the boxes to the free list. | ||
for chan in &slots { | ||
unsafe { | ||
free.send(chan.get() as *mut c_void); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this pretty tough to review without SAFETY comments. The Box itself does not disallow moving the contents of it (through swap). Yet, this relies on pointers into it. So should the Slot or Box be wrapped in Pin
?
I know that it does not change anything in practice. But I feel like it makes it a bit easier to reason about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could be, which might make it clearer. But, it isn't public, so doesn't leak anything. I'll add the pin.
/// The timeout value can be [`Forever`] to block until there is a message, [`NoWait`] to check | ||
/// and immediately return if there is no message, or a [`Duration`] to indicate a specific | ||
/// timeout. | ||
pub unsafe fn recv<T>(&self, timeout: T) -> *mut c_void |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not relevant for this comment, but: pub unsafe
? Should this be pub(crate) unsafe
(probably the same for the other functions on this impl)? It reads like it is just an internal wrapper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, definitely 'pub'. This is part of the public interface. The k_queue
is perfectly usable outside of this crate, it is just unsafe to do so. But there is no reason to stop external users from being able to do so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure, but if the intention is to create a safe wrapper for unsafe Zephyr API calls.
Would it not be better to wrap the k_queue_append call in an unsafe block, instead of declaring the send unsafe?
Otherwise the user of this function will have to wrap the code using this API in an unsafe block.
The same goes for the other unsafe APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that k_queue_append
is never safe. It takes a lot of stuff around it to make it safe. The idea is that the sys
types are rust-language interfaces, but are still unsafe. Then in sync
or other places will be the usable interfaces. The sys
ones are available as there might be cases where someone needs the lower level functionality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
Fix a typo in the description of the Kconfig setting. Signed-off-by: David Brown <[email protected]>
Change behavior to panic if all handles are dropped. Add a comment describing this behavior, and the reason behind it. It could also be made to leak, but still should be documented. Signed-off-by: David Brown <[email protected]>
Rename this loop variable to better match what each element is, and avoid a conflict with a variable in an outer scope. Signed-off-by: David Brown <[email protected]>
Add comments describe the safey of unsafe blocks. Signed-off-by: David Brown <[email protected]>
Although this is a private field and never accessed within this code, explicitly mark it as `Pin` to make it clear that it is important that the data never be moved. Signed-off-by: David Brown <[email protected]>
I have updated based on the review feedback, other than one that I'm not sure I agree with. These changes cosmetic, mostly adding comments. The |
The `west twister` command seems to be broken right now. Fix this by directly invoking the twister script better. Signed-off-by: David Brown <[email protected]>
zephyr/src/sync/channel.rs
Outdated
/// with a capacity of zero. | ||
pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) { | ||
if cap == 0 { | ||
panic!("Zero capacity queues no supported on Zephyr"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe change message to: Zero capacity queues are not supported on Zephyr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it would be even better with: Zero capacity queues are not supported by Zephyr
.collect(); | ||
let slots = Box::into_pin(slots); | ||
|
||
let free = Queue::new().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allocating memory could fail. So maybe queue::new should return an Option or Result that can be handled here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At least in stable rust, allocation failures result in a panic. There is nightly provision for handling it, but it doesn't seem to be through returns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. If it is not the convention. I just thought it might be better to let the user of these modules decide what should happen in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. If it is not the convention. I just thought it might be better to let the user of these modules decide what should happen in this case.
I think we'll have to wait for Rust development itself to figure out how they're going to solve this. Nothing in 'alloc' returns an error on memory exhaustion. They call to a function that can either panic overall, or stop the present thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
} | ||
} | ||
|
||
/// The "flavor" of a sender. This maps to the type of channel. | ||
enum SenderFlavor<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be more clear to a user what this type means, if it was called something like ChannelType.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps, but I'm trying to keep the naming and such as close to the crossbeam-channel implementation as possible, where it is called this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok
/// If the channel is empty and not disconnected, this call will block until the receive | ||
/// operation can proceed or the operation times out. | ||
/// wake up and return an error. | ||
pub fn recv_timeout<D>(&self, timeout: D) -> Result<T, RecvError> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about calling this method receive and let the timeout be an Option?
Makes the name more easily read and also covers the recv function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, I'm trying to be compatible with crossbeam-channel, which has defined the API this way. The timeouts in Zephyr already support NoWait and Forever, but doing it this way allows the code to be used on the host with crossbeam-channel without changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
/// | ||
/// This method is safe to use from IRQ context, if and only if the channel was created as a | ||
/// bounded channel. | ||
pub fn try_recv(&self) -> Result<T, RecvError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method could be called get_message to more clearly say that it is retrieving a message from the channel, if there is one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise, just keeping compatible with an existing implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
Fix so that it has valid grammar. Signed-off-by: David Brown <[email protected]>
Whereas the existing unbounded channels perform an allocation for each message sent, these bounded channels do a single allocation on initialization (of the channel, the zephyr queues used to implement it do an allocation). This allows send to block as well as recv.
This is the first part, coming soon will be send and receive with a timeout. This will allow using bounded channels from irq context (with no timeout).
Fixes #34