Skip to content

Commit 414fdbd

Browse files
authored
Add functions for waiting for publishers and subscribers (ros2#907)
These blocking functions are especially useful for tests where we want to wait for some number of publishers/subscribers to be available before proceeding with some other checks. Update tests to use new graph API We can simplify some tests by reusing the new graph functions. Signed-off-by: Jacob Perron <[email protected]>
1 parent b7784eb commit 414fdbd

File tree

6 files changed

+401
-166
lines changed

6 files changed

+401
-166
lines changed

rcl/include/rcl/graph.h

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ extern "C"
2626
#include <rmw/get_topic_names_and_types.h>
2727
#include <rmw/topic_endpoint_info_array.h>
2828

29+
#include "rcutils/time.h"
2930
#include "rcutils/types.h"
3031

3132
#include "rosidl_runtime_c/service_type_support_struct.h"
@@ -581,6 +582,98 @@ rcl_count_subscribers(
581582
const char * topic_name,
582583
size_t * count);
583584

585+
/// Wait for there to be a specified number of publishers on a given topic.
586+
/**
587+
* The `node` parameter must point to a valid node.
588+
* The nodes graph guard condition is used by this function, and therefore the caller should
589+
* take care not to use the guard condition concurrently in any other wait sets.
590+
*
591+
* The `allocator` parameter must point to a valid allocator.
592+
*
593+
* The `topic_name` parameter must not be `NULL`, and must not be an empty string.
594+
* It should also follow the topic name rules.
595+
*
596+
* This function blocks and will return when the number of publishers for `topic_name`
597+
* is greater than or equal to the `count` parameter, or the specified `timeout` is reached.
598+
*
599+
* The `timeout` parameter is in nanoseconds.
600+
* The timeout is based on system time elapsed.
601+
* A negative value disables the timeout (i.e. this function blocks until the number of
602+
* publishers is greater than or equals to `count`).
603+
*
604+
* The `success` parameter must point to a valid bool.
605+
* The `success` parameter is the output for this function and will be set.
606+
*
607+
* <hr>
608+
* Attribute | Adherence
609+
* ------------------ | -------------
610+
* Allocates Memory | Yes
611+
* Thread-Safe | No
612+
* Uses Atomics | No
613+
* Lock-Free | Maybe [1]
614+
* <i>[1] implementation may need to protect the data structure with a lock</i>
615+
*
616+
* \param[in] node the handle to the node being used to query the ROS graph
617+
* \param[in] allocator to allocate space for the rcl_wait_set_t used to wait for graph events
618+
* \param[in] topic_name the name of the topic in question
619+
* \param[in] count number of publishers to wait for
620+
* \param[in] timeout maximum duration to wait for publishers
621+
* \param[out] success `true` if the number of publishers is equal to or greater than count, or
622+
* `false` if a timeout occurred waiting for publishers.
623+
* \return #RCL_RET_OK if there was no errors, or
624+
* \return #RCL_RET_NODE_INVALID if the node is invalid, or
625+
* \return #RCL_RET_INVALID_ARGUMENT if any arguments are invalid, or
626+
* \return #RCL_RET_TIMEOUT if a timeout occurs before the number of publishers is detected, or
627+
* \return #RCL_RET_ERROR if an unspecified error occurred.
628+
*/
629+
RCL_PUBLIC
630+
RCL_WARN_UNUSED
631+
rcl_ret_t
632+
rcl_wait_for_publishers(
633+
const rcl_node_t * node,
634+
rcl_allocator_t * allocator,
635+
const char * topic_name,
636+
const size_t count,
637+
rcutils_duration_value_t timeout,
638+
bool * success);
639+
640+
/// Wait for there to be a specified number of subscribers on a given topic.
641+
/**
642+
* \see rcl_wait_for_publishers
643+
*
644+
* <hr>
645+
* Attribute | Adherence
646+
* ------------------ | -------------
647+
* Allocates Memory | Yes
648+
* Thread-Safe | No
649+
* Uses Atomics | No
650+
* Lock-Free | Maybe [1]
651+
* <i>[1] implementation may need to protect the data structure with a lock</i>
652+
*
653+
* \param[in] node the handle to the node being used to query the ROS graph
654+
* \param[in] allocator to allocate space for the rcl_wait_set_t used to wait for graph events
655+
* \param[in] topic_name the name of the topic in question
656+
* \param[in] count number of subscribers to wait for
657+
* \param[in] timeout maximum duration to wait for subscribers
658+
* \param[out] success `true` if the number of subscribers is equal to or greater than count, or
659+
* `false` if a timeout occurred waiting for subscribers.
660+
* \return #RCL_RET_OK if there was no errors, or
661+
* \return #RCL_RET_NODE_INVALID if the node is invalid, or
662+
* \return #RCL_RET_INVALID_ARGUMENT if any arguments are invalid, or
663+
* \return #RCL_RET_TIMEOUT if a timeout occurs before the number of subscribers is detected, or
664+
* \return #RCL_RET_ERROR if an unspecified error occurred.
665+
*/
666+
RCL_PUBLIC
667+
RCL_WARN_UNUSED
668+
rcl_ret_t
669+
rcl_wait_for_subscribers(
670+
const rcl_node_t * node,
671+
rcl_allocator_t * allocator,
672+
const char * topic_name,
673+
const size_t count,
674+
rcutils_duration_value_t timeout,
675+
bool * success);
676+
584677
/// Return a list of all publishers to a topic.
585678
/**
586679
* The `node` parameter must point to a valid node.

rcl/src/rcl/graph.c

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,12 @@ extern "C"
2020
#include "rcl/graph.h"
2121

2222
#include "rcl/error_handling.h"
23+
#include "rcl/guard_condition.h"
24+
#include "rcl/wait.h"
2325
#include "rcutils/allocator.h"
26+
#include "rcutils/error_handling.h"
2427
#include "rcutils/macros.h"
28+
#include "rcutils/time.h"
2529
#include "rcutils/types.h"
2630
#include "rmw/error_handling.h"
2731
#include "rmw/get_node_info_and_types.h"
@@ -452,6 +456,179 @@ rcl_count_subscribers(
452456
return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
453457
}
454458

459+
typedef rcl_ret_t (* count_entities_func_t)(
460+
const rcl_node_t * node,
461+
const char * topic_name,
462+
size_t * count);
463+
464+
rcl_ret_t
465+
_rcl_wait_for_entities(
466+
const rcl_node_t * node,
467+
rcl_allocator_t * allocator,
468+
const char * topic_name,
469+
const size_t expected_count,
470+
rcutils_duration_value_t timeout,
471+
bool * success,
472+
count_entities_func_t count_entities_func)
473+
{
474+
if (!rcl_node_is_valid(node)) {
475+
return RCL_RET_NODE_INVALID;
476+
}
477+
RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT);
478+
RCL_CHECK_ARGUMENT_FOR_NULL(topic_name, RCL_RET_INVALID_ARGUMENT);
479+
RCL_CHECK_ARGUMENT_FOR_NULL(success, RCL_RET_INVALID_ARGUMENT);
480+
481+
rcl_ret_t ret = RCL_RET_OK;
482+
*success = false;
483+
484+
// We can avoid waiting if there are already the expected number of publishers
485+
size_t count = 0u;
486+
ret = count_entities_func(node, topic_name, &count);
487+
if (ret != RCL_RET_OK) {
488+
// Error message already set
489+
return ret;
490+
}
491+
if (expected_count <= count) {
492+
*success = true;
493+
return RCL_RET_OK;
494+
}
495+
496+
// Create a wait set and add the node graph guard condition to it
497+
rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set();
498+
ret = rcl_wait_set_init(
499+
&wait_set, 0, 1, 0, 0, 0, 0, node->context, *allocator);
500+
if (ret != RCL_RET_OK) {
501+
// Error message already set
502+
return ret;
503+
}
504+
505+
const rcl_guard_condition_t * guard_condition = rcl_node_get_graph_guard_condition(node);
506+
if (!guard_condition) {
507+
// Error message already set
508+
ret = RCL_RET_ERROR;
509+
goto cleanup;
510+
}
511+
512+
// Add it to the wait set
513+
ret = rcl_wait_set_add_guard_condition(&wait_set, guard_condition, NULL);
514+
if (ret != RCL_RET_OK) {
515+
// Error message already set
516+
goto cleanup;
517+
}
518+
519+
// Get current time
520+
// We use system time to be consistent with the clock used by rcl_wait()
521+
rcutils_time_point_value_t start;
522+
rcutils_ret_t time_ret = rcutils_system_time_now(&start);
523+
if (time_ret != RCUTILS_RET_OK) {
524+
rcutils_error_string_t error = rcutils_get_error_string();
525+
rcutils_reset_error();
526+
RCL_SET_ERROR_MSG(error.str);
527+
ret = RCL_RET_ERROR;
528+
goto cleanup;
529+
}
530+
531+
// Wait for expected count or timeout
532+
rcl_ret_t wait_ret;
533+
while (true) {
534+
// Use separate 'wait_ret' code to avoid returning spurious TIMEOUT value
535+
wait_ret = rcl_wait(&wait_set, timeout);
536+
if (wait_ret != RCL_RET_OK && wait_ret != RCL_RET_TIMEOUT) {
537+
// Error message already set
538+
ret = wait_ret;
539+
break;
540+
}
541+
542+
// Check count
543+
ret = count_entities_func(node, topic_name, &count);
544+
if (ret != RCL_RET_OK) {
545+
// Error already set
546+
break;
547+
}
548+
if (expected_count <= count) {
549+
*success = true;
550+
break;
551+
}
552+
553+
// If we're not waiting indefinitely, compute time remaining
554+
if (timeout >= 0) {
555+
rcutils_time_point_value_t now;
556+
time_ret = rcutils_system_time_now(&now);
557+
if (time_ret != RCUTILS_RET_OK) {
558+
rcutils_error_string_t error = rcutils_get_error_string();
559+
rcutils_reset_error();
560+
RCL_SET_ERROR_MSG(error.str);
561+
ret = RCL_RET_ERROR;
562+
break;
563+
}
564+
timeout = timeout - (now - start);
565+
if (timeout <= 0) {
566+
ret = RCL_RET_TIMEOUT;
567+
break;
568+
}
569+
}
570+
571+
// Clear wait set for next iteration
572+
ret = rcl_wait_set_clear(&wait_set);
573+
if (ret != RCL_RET_OK) {
574+
// Error message already set
575+
break;
576+
}
577+
}
578+
579+
rcl_ret_t cleanup_ret;
580+
cleanup:
581+
// Cleanup
582+
cleanup_ret = rcl_wait_set_fini(&wait_set);
583+
if (cleanup_ret != RCL_RET_OK) {
584+
// If we got two unexpected errors, return the earlier error
585+
if (ret != RCL_RET_OK && ret != RCL_RET_TIMEOUT) {
586+
// Error message already set
587+
ret = cleanup_ret;
588+
}
589+
}
590+
591+
return ret;
592+
}
593+
594+
rcl_ret_t
595+
rcl_wait_for_publishers(
596+
const rcl_node_t * node,
597+
rcl_allocator_t * allocator,
598+
const char * topic_name,
599+
const size_t expected_count,
600+
rcutils_duration_value_t timeout,
601+
bool * success)
602+
{
603+
return _rcl_wait_for_entities(
604+
node,
605+
allocator,
606+
topic_name,
607+
expected_count,
608+
timeout,
609+
success,
610+
rcl_count_publishers);
611+
}
612+
613+
rcl_ret_t
614+
rcl_wait_for_subscribers(
615+
const rcl_node_t * node,
616+
rcl_allocator_t * allocator,
617+
const char * topic_name,
618+
const size_t expected_count,
619+
rcutils_duration_value_t timeout,
620+
bool * success)
621+
{
622+
return _rcl_wait_for_entities(
623+
node,
624+
allocator,
625+
topic_name,
626+
expected_count,
627+
timeout,
628+
success,
629+
rcl_count_subscribers);
630+
}
631+
455632
typedef rmw_ret_t (* get_topic_endpoint_info_func_t)(
456633
const rmw_node_t * node,
457634
rcutils_allocator_t * allocator,

0 commit comments

Comments
 (0)