Skip to content

Commit 0fe1f21

Browse files
authored
Fix RAII for subscription, client, and service (#463)
* Fix RAII for subscription, client, and service Signed-off-by: Michael X. Grey <[email protected]> * Fix style Signed-off-by: Michael X. Grey <[email protected]> * Fix compilation errors Signed-off-by: Michael X. Grey <[email protected]> --------- Signed-off-by: Michael X. Grey <[email protected]> Signed-off-by: Michael X. Grey <[email protected]>
1 parent caacf4d commit 0fe1f21

File tree

6 files changed

+72
-33
lines changed

6 files changed

+72
-33
lines changed

rclrs/src/client.rs

+9-4
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use rosidl_runtime_rs::Message;
1111
use crate::{
1212
error::{RclReturnCode, ToResult},
1313
rcl_bindings::*,
14-
MessageCow, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
14+
MessageCow, Node, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
1515
};
1616

1717
// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
@@ -76,14 +76,18 @@ where
7676
pub(crate) handle: Arc<ClientHandle>,
7777
requests: Mutex<HashMap<RequestId, RequestValue<T::Response>>>,
7878
futures: Arc<Mutex<HashMap<RequestId, oneshot::Sender<T::Response>>>>,
79+
/// Ensure the parent node remains alive as long as the subscription is held.
80+
/// This implementation will change in the future.
81+
#[allow(unused)]
82+
node: Arc<Node>,
7983
}
8084

8185
impl<T> Client<T>
8286
where
8387
T: rosidl_runtime_rs::Service,
8488
{
8589
/// Creates a new client.
86-
pub(crate) fn new(node_handle: Arc<NodeHandle>, topic: &str) -> Result<Self, RclrsError>
90+
pub(crate) fn new(node: &Arc<Node>, topic: &str) -> Result<Self, RclrsError>
8791
// This uses pub(crate) visibility to avoid instantiating this struct outside
8892
// [`Node::create_client`], see the struct's documentation for the rationale
8993
where
@@ -102,7 +106,7 @@ where
102106
let client_options = unsafe { rcl_client_get_default_options() };
103107

104108
{
105-
let rcl_node = node_handle.rcl_node.lock().unwrap();
109+
let rcl_node = node.handle.rcl_node.lock().unwrap();
106110
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
107111

108112
// SAFETY:
@@ -126,7 +130,7 @@ where
126130

127131
let handle = Arc::new(ClientHandle {
128132
rcl_client: Mutex::new(rcl_client),
129-
node_handle,
133+
node_handle: Arc::clone(&node.handle),
130134
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
131135
});
132136

@@ -136,6 +140,7 @@ where
136140
futures: Arc::new(Mutex::new(
137141
HashMap::<RequestId, oneshot::Sender<T::Response>>::new(),
138142
)),
143+
node: Arc::clone(node),
139144
})
140145
}
141146

rclrs/src/node.rs

+6-15
Original file line numberDiff line numberDiff line change
@@ -225,11 +225,11 @@ impl Node {
225225
///
226226
/// [1]: crate::Client
227227
// TODO: make client's lifetime depend on node's lifetime
228-
pub fn create_client<T>(&self, topic: &str) -> Result<Arc<Client<T>>, RclrsError>
228+
pub fn create_client<T>(self: &Arc<Self>, topic: &str) -> Result<Arc<Client<T>>, RclrsError>
229229
where
230230
T: rosidl_runtime_rs::Service,
231231
{
232-
let client = Arc::new(Client::<T>::new(Arc::clone(&self.handle), topic)?);
232+
let client = Arc::new(Client::<T>::new(self, topic)?);
233233
{ self.clients_mtx.lock().unwrap() }.push(Arc::downgrade(&client) as Weak<dyn ClientBase>);
234234
Ok(client)
235235
}
@@ -292,19 +292,15 @@ impl Node {
292292
///
293293
/// [1]: crate::Service
294294
pub fn create_service<T, F>(
295-
&self,
295+
self: &Arc<Self>,
296296
topic: &str,
297297
callback: F,
298298
) -> Result<Arc<Service<T>>, RclrsError>
299299
where
300300
T: rosidl_runtime_rs::Service,
301301
F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send,
302302
{
303-
let service = Arc::new(Service::<T>::new(
304-
Arc::clone(&self.handle),
305-
topic,
306-
callback,
307-
)?);
303+
let service = Arc::new(Service::<T>::new(self, topic, callback)?);
308304
{ self.services_mtx.lock().unwrap() }
309305
.push(Arc::downgrade(&service) as Weak<dyn ServiceBase>);
310306
Ok(service)
@@ -314,20 +310,15 @@ impl Node {
314310
///
315311
/// [1]: crate::Subscription
316312
pub fn create_subscription<T, Args>(
317-
&self,
313+
self: &Arc<Self>,
318314
topic: &str,
319315
qos: QoSProfile,
320316
callback: impl SubscriptionCallback<T, Args>,
321317
) -> Result<Arc<Subscription<T>>, RclrsError>
322318
where
323319
T: Message,
324320
{
325-
let subscription = Arc::new(Subscription::<T>::new(
326-
Arc::clone(&self.handle),
327-
topic,
328-
qos,
329-
callback,
330-
)?);
321+
let subscription = Arc::new(Subscription::<T>::new(self, topic, qos, callback)?);
331322
{ self.subscriptions_mtx.lock() }
332323
.unwrap()
333324
.push(Arc::downgrade(&subscription) as Weak<dyn SubscriptionBase>);

rclrs/src/parameter.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -789,7 +789,7 @@ impl ParameterInterface {
789789
}
790790
}
791791

792-
pub(crate) fn create_services(&self, node: &Node) -> Result<(), RclrsError> {
792+
pub(crate) fn create_services(&self, node: &Arc<Node>) -> Result<(), RclrsError> {
793793
*self.services.lock().unwrap() =
794794
Some(ParameterService::new(node, self.parameter_map.clone())?);
795795
Ok(())

rclrs/src/parameter/service.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ fn set_parameters_atomically(
239239

240240
impl ParameterService {
241241
pub(crate) fn new(
242-
node: &Node,
242+
node: &Arc<Node>,
243243
parameter_map: Arc<Mutex<ParameterMap>>,
244244
) -> Result<Self, RclrsError> {
245245
let fqn = node.fully_qualified_name();

rclrs/src/service.rs

+9-8
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use rosidl_runtime_rs::Message;
99
use crate::{
1010
error::{RclReturnCode, ToResult},
1111
rcl_bindings::*,
12-
MessageCow, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
12+
MessageCow, Node, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
1313
};
1414

1515
// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
@@ -73,18 +73,18 @@ where
7373
pub(crate) handle: Arc<ServiceHandle>,
7474
/// The callback function that runs when a request was received.
7575
pub callback: Mutex<ServiceCallback<T::Request, T::Response>>,
76+
/// Ensure the parent node remains alive as long as the subscription is held.
77+
/// This implementation will change in the future.
78+
#[allow(unused)]
79+
node: Arc<Node>,
7680
}
7781

7882
impl<T> Service<T>
7983
where
8084
T: rosidl_runtime_rs::Service,
8185
{
8286
/// Creates a new service.
83-
pub(crate) fn new<F>(
84-
node_handle: Arc<NodeHandle>,
85-
topic: &str,
86-
callback: F,
87-
) -> Result<Self, RclrsError>
87+
pub(crate) fn new<F>(node: &Arc<Node>, topic: &str, callback: F) -> Result<Self, RclrsError>
8888
// This uses pub(crate) visibility to avoid instantiating this struct outside
8989
// [`Node::create_service`], see the struct's documentation for the rationale
9090
where
@@ -104,7 +104,7 @@ where
104104
let service_options = unsafe { rcl_service_get_default_options() };
105105

106106
{
107-
let rcl_node = node_handle.rcl_node.lock().unwrap();
107+
let rcl_node = node.handle.rcl_node.lock().unwrap();
108108
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
109109
unsafe {
110110
// SAFETY:
@@ -127,12 +127,13 @@ where
127127

128128
let handle = Arc::new(ServiceHandle {
129129
rcl_service: Mutex::new(rcl_service),
130-
node_handle,
130+
node_handle: Arc::clone(&node.handle),
131131
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
132132
});
133133

134134
Ok(Self {
135135
handle,
136+
node: Arc::clone(node),
136137
callback: Mutex::new(Box::new(callback)),
137138
})
138139
}

rclrs/src/subscription.rs

+46-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::{
1010
error::{RclReturnCode, ToResult},
1111
qos::QoSProfile,
1212
rcl_bindings::*,
13-
NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
13+
Node, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX,
1414
};
1515

1616
mod callback;
@@ -84,6 +84,10 @@ where
8484
pub(crate) handle: Arc<SubscriptionHandle>,
8585
/// The callback function that runs when a message was received.
8686
pub callback: Mutex<AnySubscriptionCallback<T>>,
87+
/// Ensure the parent node remains alive as long as the subscription is held.
88+
/// This implementation will change in the future.
89+
#[allow(unused)]
90+
node: Arc<Node>,
8791
message: PhantomData<T>,
8892
}
8993

@@ -93,7 +97,7 @@ where
9397
{
9498
/// Creates a new subscription.
9599
pub(crate) fn new<Args>(
96-
node_handle: Arc<NodeHandle>,
100+
node: &Arc<Node>,
97101
topic: &str,
98102
qos: QoSProfile,
99103
callback: impl SubscriptionCallback<T, Args>,
@@ -117,7 +121,7 @@ where
117121
subscription_options.qos = qos.into();
118122

119123
{
120-
let rcl_node = node_handle.rcl_node.lock().unwrap();
124+
let rcl_node = node.handle.rcl_node.lock().unwrap();
121125
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
122126
unsafe {
123127
// SAFETY:
@@ -139,13 +143,14 @@ where
139143

140144
let handle = Arc::new(SubscriptionHandle {
141145
rcl_subscription: Mutex::new(rcl_subscription),
142-
node_handle,
146+
node_handle: Arc::clone(&node.handle),
143147
in_use_by_wait_set: Arc::new(AtomicBool::new(false)),
144148
});
145149

146150
Ok(Self {
147151
handle,
148152
callback: Mutex::new(callback.into_callback()),
153+
node: Arc::clone(node),
149154
message: PhantomData,
150155
})
151156
}
@@ -396,4 +401,41 @@ mod tests {
396401
);
397402
Ok(())
398403
}
404+
405+
#[test]
406+
fn test_node_subscription_raii() {
407+
use crate::*;
408+
use std::sync::atomic::Ordering;
409+
410+
let mut executor = Context::default().create_basic_executor();
411+
412+
let triggered = Arc::new(AtomicBool::new(false));
413+
let inner_triggered = Arc::clone(&triggered);
414+
let callback = move |_: msg::Empty| {
415+
inner_triggered.store(true, Ordering::Release);
416+
};
417+
418+
let (_subscription, publisher) = {
419+
let node = executor
420+
.create_node(&format!("test_node_subscription_raii_{}", line!()))
421+
.unwrap();
422+
423+
let qos = QoSProfile::default().keep_all().reliable();
424+
let subscription = node
425+
.create_subscription::<msg::Empty, _>("test_topic", qos, callback)
426+
.unwrap();
427+
let publisher = node
428+
.create_publisher::<msg::Empty>("test_topic", qos)
429+
.unwrap();
430+
431+
(subscription, publisher)
432+
};
433+
434+
publisher.publish(msg::Empty::default()).unwrap();
435+
let start_time = std::time::Instant::now();
436+
while !triggered.load(Ordering::Acquire) {
437+
assert!(executor.spin(SpinOptions::spin_once()).is_empty());
438+
assert!(start_time.elapsed() < std::time::Duration::from_secs(10));
439+
}
440+
}
399441
}

0 commit comments

Comments
 (0)