Skip to content

Add TakerSubscription #499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 81 additions & 1 deletion rclrs/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ use crate::{
IntoNodeSubscriptionCallback, LogParams, Logger, ParameterBuilder, ParameterInterface,
ParameterVariant, Parameters, Promise, Publisher, PublisherOptions, PublisherState, RclrsError,
Service, ServiceOptions, ServiceState, Subscription, SubscriptionOptions, SubscriptionState,
TimeSource, ToLogParams, Worker, WorkerOptions, WorkerState, ENTITY_LIFECYCLE_MUTEX,
TakerSubscription, TimeSource, ToLogParams, Worker, WorkerOptions, WorkerState,
ENTITY_LIFECYCLE_MUTEX,
};

/// A processing unit that can communicate with other nodes. See the API of
Expand Down Expand Up @@ -785,6 +786,85 @@ impl NodeState {
)
}

/// Creates a [`TakerSubscription`].
///
/// # Behavior
///
/// This subscription uses no callback and calling [`spin`][1] on the
/// node's executor will have no effect, nor is it required to receive
/// messages.
///
/// In order to receive messages, use [`take`][2] or one of its variants.
///
/// ```no_run
/// # use rclrs::*;
/// # let executor = Context::default().create_basic_executor();
/// # let node = executor.create_node("my_node").unwrap();
/// use example_interfaces::msg::String as StringMsg;
///
/// let subscription =
/// node.create_taker_subscription::<StringMsg>("topic".keep_last(1))?;
///
/// loop {
/// if let Some(msg) = subscription.take()? {
/// println!("{}", msg.data);
/// }
/// std::thread::sleep(std::time::Duration::from_millis(100));
/// }
/// # Ok::<(), RclrsError>(())
/// ```
///
/// [TakerSubscription]s can also be used in a [`WaitSet`][3] to wait for
/// messages from one or more subscriptions.
///
/// ```no_run
/// # use rclrs::*;
/// # let context = Context::default();
/// # let executor = context.create_basic_executor();
/// # let node = executor.create_node("my_node").unwrap();
/// use std::sync::Arc;
/// use example_interfaces::msg::String as StringMsg;
///
/// let subscription =
/// Arc::new(node.create_taker_subscription::<StringMsg>("topic")?);
///
/// // `_lifecycle` must be named to avoid being dropped, which would cause
/// // the waitable to be dropped from the WaitSet.
/// let (waitable, _lifecycle) =
/// Waitable::new(Box::new(Arc::clone(&subscription)), None);
///
/// let mut waitset = WaitSet::new(&context)?;
/// waitset.add([waitable])?;
///
/// loop {
/// waitset.wait(None, |_| Ok(()))?;
///
/// if let Some(msg) = subscription.take()? {
/// println!("{}", msg.data);
/// }
/// }
/// # Ok::<(), RclrsError>(())
/// ```
///
/// # Subscription Options
///
/// See [`create_subscription`][4] for examples
/// of setting the subscription options.
///
/// [1]: crate::Executor::spin
/// [2]: crate::TakerSubscription::take
/// [3]: crate::WaitSet
/// [4]: crate::NodeState::create_subscription
pub fn create_taker_subscription<'a, T>(
&self,
options: impl Into<SubscriptionOptions<'a>>,
) -> Result<TakerSubscription<T>, RclrsError>
where
T: Message,
{
TakerSubscription::create(options, &self.handle)
}

/// Creates a [`Subscription`] with an async callback.
///
/// # Behavior
Expand Down
234 changes: 193 additions & 41 deletions rclrs/src/subscription.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use std::{
any::Any,
ffi::{CStr, CString},
marker::PhantomData,
sync::{Arc, Mutex, MutexGuard},
};

use rosidl_runtime_rs::{Message, RmwMessage};

use crate::{
error::ToResult, qos::QoSProfile, rcl_bindings::*, IntoPrimitiveOptions, Node, NodeHandle,
RclPrimitive, RclPrimitiveHandle, RclPrimitiveKind, RclrsError, Waitable, WaitableLifecycle,
WorkScope, Worker, WorkerCommands, ENTITY_LIFECYCLE_MUTEX,
RclPrimitive, RclPrimitiveHandle, RclPrimitiveKind, RclReturnCode, RclrsError, Waitable,
WaitableLifecycle, WorkScope, Worker, WorkerCommands, ENTITY_LIFECYCLE_MUTEX,
};

mod any_subscription_callback;
Expand Down Expand Up @@ -117,47 +118,9 @@ where
node_handle: &Arc<NodeHandle>,
commands: &Arc<WorkerCommands>,
) -> Result<Arc<Self>, RclrsError> {
let SubscriptionOptions { topic, qos } = options.into();
let callback = Arc::new(Mutex::new(callback));

// SAFETY: Getting a zero-initialized value is always safe.
let mut rcl_subscription = unsafe { rcl_get_zero_initialized_subscription() };
let type_support =
<T as Message>::RmwMsg::get_type_support() as *const rosidl_message_type_support_t;
let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul {
err,
s: topic.into(),
})?;

// SAFETY: No preconditions for this function.
let mut rcl_subscription_options = unsafe { rcl_subscription_get_default_options() };
rcl_subscription_options.qos = qos.into();

{
let rcl_node = node_handle.rcl_node.lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
unsafe {
// SAFETY:
// * The rcl_subscription is zero-initialized as mandated by this function.
// * The rcl_node is kept alive by the NodeHandle because it is a dependency of the subscription.
// * The topic name and the options are copied by this function, so they can be dropped afterwards.
// * The entity lifecycle mutex is locked to protect against the risk of global
// variables in the rmw implementation being unsafely modified during cleanup.
rcl_subscription_init(
&mut rcl_subscription,
&*rcl_node,
type_support,
topic_c_string.as_ptr(),
&rcl_subscription_options,
)
.ok()?;
}
}

let handle = Arc::new(SubscriptionHandle {
rcl_subscription: Mutex::new(rcl_subscription),
node_handle: Arc::clone(node_handle),
});
let handle = SubscriptionHandle::create::<T>(options, node_handle)?;

let (waitable, lifecycle) = Waitable::new(
Box::new(SubscriptionExecutable {
Expand Down Expand Up @@ -292,6 +255,52 @@ struct SubscriptionHandle {
}

impl SubscriptionHandle {
fn create<'a, T: Message>(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this logic into SubscriptionHandle to be used by both SubscriptionBase and TakerSubscription

options: impl Into<SubscriptionOptions<'a>>,
node_handle: &Arc<NodeHandle>,
) -> Result<Arc<Self>, RclrsError> {
let SubscriptionOptions { topic, qos } = options.into();

// SAFETY: Getting a zero-initialized value is always safe.
let mut rcl_subscription = unsafe { rcl_get_zero_initialized_subscription() };
let type_support =
<T as Message>::RmwMsg::get_type_support() as *const rosidl_message_type_support_t;
let topic_c_string = CString::new(topic).map_err(|err| RclrsError::StringContainsNul {
err,
s: topic.into(),
})?;

// SAFETY: No preconditions for this function.
let mut rcl_subscription_options = unsafe { rcl_subscription_get_default_options() };
rcl_subscription_options.qos = qos.into();

{
let rcl_node = node_handle.rcl_node.lock().unwrap();
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
unsafe {
// SAFETY:
// * The rcl_subscription is zero-initialized as mandated by this function.
// * The rcl_node is kept alive by the NodeHandle because it is a dependency of the subscription.
// * The topic name and the options are copied by this function, so they can be dropped afterwards.
// * The entity lifecycle mutex is locked to protect against the risk of global
// variables in the rmw implementation being unsafely modified during cleanup.
rcl_subscription_init(
&mut rcl_subscription,
&*rcl_node,
type_support,
topic_c_string.as_ptr(),
&rcl_subscription_options,
)
.ok()?;
}
}

Ok(Arc::new(Self {
rcl_subscription: Mutex::new(rcl_subscription),
node_handle: Arc::clone(node_handle),
}))
}

fn lock(&self) -> MutexGuard<rcl_subscription_t> {
self.rcl_subscription.lock().unwrap()
}
Expand Down Expand Up @@ -408,6 +417,109 @@ impl Drop for SubscriptionHandle {
}
}

/// Struct for receiving messages of type `T` without a callback.
///
/// Create a subscription using [`NodeState::create_taker_subscription()`][1].
///
/// Calling [`spin`][2] on the node's executor will have no effect on this subscription.
///
/// When a subscription is created, it may take some time to get "matched" with a corresponding
/// publisher.
///
/// [1]: crate::NodeState::create_taker_subscription
/// [2]: crate::Executor::spin
pub struct TakerSubscription<T: Message> {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Named TakerSubscription rather than the discussed SubscriptionTaker to be closer to WorkerSubscription

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I think SubscriptionTaker makes more sense grammatically.

In this case Taker is the main object, and Subscription is describing the taker. Whereas for WorkerSubscription, the Subscription is the main object, and Worker is describing the subscription.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want Take to come first, then maybe TakingSubscription could work grammatically. That way Taking is describing the Subscription.

handle: Arc<SubscriptionHandle>,
_phantom: PhantomData<T>,
}

impl<T: Message> TakerSubscription<T> {
/// Used by [`Node`][crate::Node] to create a new taker subscription.
pub(crate) fn create<'a>(
options: impl Into<SubscriptionOptions<'a>>,
node_handle: &Arc<NodeHandle>,
) -> Result<Self, RclrsError> {
let handle = SubscriptionHandle::create::<T>(options, node_handle)?;

Ok(Self {
handle,
_phantom: PhantomData,
})
}

/// Fetches a new message.
///
/// When there is no new message, this will return `Ok(None)`.
pub fn take(&self) -> Result<Option<T>, RclrsError> {
self.take_with_info().map(|res| res.map(|msg| msg.0))
}

/// Fetches a new message and its associated [`MessageInfo`][1].
///
/// When there is no new message, this will return `Ok(None)`.
///
/// [1]: crate::MessageInfo
pub fn take_with_info(&self) -> Result<Option<(T, MessageInfo)>, RclrsError> {
match self.handle.take() {
Ok(msg) => Ok(Some(msg)),
Err(RclrsError::RclError {
code: RclReturnCode::SubscriptionTakeFailed,
..
}) => Ok(None),
Err(e) => Err(e),
}
}

/// Obtains a read-only handle to a message owned by the middleware.
///
/// When there is no new message, this will return `Ok(None)`.
///
/// This is the counterpart to [`Publisher::borrow_loaned_message()`][1]. See its documentation
/// for more information.
///
/// [1]: crate::Publisher::borrow_loaned_message
pub fn take_loaned(&self) -> Result<Option<ReadOnlyLoanedMessage<T>>, RclrsError> {
self.take_loaned_with_info().map(|res| res.map(|msg| msg.0))
}

/// Obtains a read-only handle to a message owned by the middleware and its associated
/// [`MessageInfo`][1].
///
/// When there is no new message, this will return `Ok(None)`.
///
/// This is the counterpart to [`Publisher::borrow_loaned_message()`][2]. See its documentation
/// for more information.
///
/// [1]: crate::MessageInfo
/// [2]: crate::Publisher::borrow_loaned_message
pub fn take_loaned_with_info(
&self,
) -> Result<Option<(ReadOnlyLoanedMessage<T>, MessageInfo)>, RclrsError> {
match self.handle.take_loaned() {
Ok(msg) => Ok(Some(msg)),
Err(RclrsError::RclError {
code: RclReturnCode::SubscriptionTakeFailed,
..
}) => Ok(None),
Err(e) => Err(e),
}
}
}

impl<T: Message> RclPrimitive for Arc<TakerSubscription<T>> {
Copy link
Author

@harrisonmg harrisonmg Jul 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented for use with WaitSet, see examples in unit test below and in NodeState::create_taker_subscription docs

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One recommendation I would make for the executor is to include a watch sender and trigger it each time a new message is available. That way a user with a TakerSubscription could use Receiver::changed to .await an update signal so they know that a message is ready to be taken.

This will allow the TakerSubscription to work well with async programs.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But also we can't use Arc<TakerSubscription<T>> for this because then the wait set will never know when to drop this from its list.

If you take a look at Subscription::lifecycle you'll see that the struct the user holds onto needs to contain a WaitableLifecycle. That struct keeps the associated RclPrimitive inside the wait set. Once the WaitableLifecycle gets dropped, the RclPrimitive will be removed from the wait set.

In the current state of this PR, this RclPrimitive will be dropped from the wait set immediately because you're dropping the lifecycle right away (and you named it _lifecycle up above to escape the compiler warning). Since this RclPrimitive implementation is doing nothing, I suppose it doesn't matter that it's being dropped from the wait set, but then we should do one of the following:

  • Don't bother implementing RclPrimitive here and don't bother adding anything to the wait set for a TakerSubscription.
  • Follow the pattern of the other primitive implementations and create separate structs for the RclPrimitive versus the user-facing object. Then have the RclPrimitive just trigger a signal so users can know when a message is available.

unsafe fn execute(&mut self, _payload: &mut dyn Any) -> Result<(), RclrsError> {
Ok(())
}

fn kind(&self) -> RclPrimitiveKind {
RclPrimitiveKind::Subscription
}

fn handle(&self) -> RclPrimitiveHandle {
RclPrimitiveHandle::Subscription(self.handle.lock())
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -520,4 +632,44 @@ mod tests {
assert!(start_time.elapsed() < std::time::Duration::from_secs(10));
}
}

#[test]
fn test_taker_subscription() -> Result<(), RclrsError> {
use crate::*;
use std::time::Duration;

let context = Context::default();
let executor = context.create_basic_executor();
let node = executor.create_node("test_node_taker_subscription")?;

let publisher = node.create_publisher::<msg::Empty>("test_topic")?;
let subscriber = Arc::new(node.create_taker_subscription::<msg::Empty>("test_topic")?);

let (waitable, _lifecycle) = Waitable::new(Box::new(Arc::clone(&subscriber)), None);
let mut waitset = WaitSet::new(&context)?;
waitset.add([waitable])?;

let timeout = Some(Duration::from_millis(100));

publisher.publish(msg::Empty::default())?;
waitset.wait(timeout, |_| Ok(()))?;

assert!(subscriber.take()?.is_some());

assert!(subscriber.take()?.is_none());
assert!(subscriber.take_with_info()?.is_none());

publisher.publish(msg::Empty::default())?;
waitset.wait(timeout, |_| Ok(()))?;

assert!(subscriber.take_with_info()?.is_some());

assert!(subscriber.take()?.is_none());
assert!(subscriber.take_with_info()?.is_none());

publisher.publish(msg::Empty::default())?;
waitset.wait(timeout, |_| Ok(()))?;

Ok(())
}
}
Loading