diff --git a/rmw_dds_common/include/rmw_dds_common/graph_cache.hpp b/rmw_dds_common/include/rmw_dds_common/graph_cache.hpp index bc08ecf..75beec5 100644 --- a/rmw_dds_common/include/rmw_dds_common/graph_cache.hpp +++ b/rmw_dds_common/include/rmw_dds_common/graph_cache.hpp @@ -26,6 +26,8 @@ #include "rcutils/logging_macros.h" #include "rmw/names_and_types.h" +#include "rmw/service_endpoint_info.h" +#include "rmw/service_endpoint_info_array.h" #include "rmw/topic_endpoint_info.h" #include "rmw/topic_endpoint_info_array.h" #include "rmw/types.h" @@ -42,6 +44,7 @@ namespace rmw_dds_common // Forward-declaration, defined at end of file. struct EntityInfo; struct ParticipantInfo; +struct ServiceEntityInfo; /// Graph cache data structure. /** @@ -86,6 +89,8 @@ class GraphCache * \param type_hash Hash of the description of the topic type. * \param participant_gid GUID of the participant. * \param qos QoS profile of the data writer. + * \param service_type_hash Optional pointer to the service type hash, + * used when the writer is part of a service (e.g., request or reply topic) * \return `true` if the cache was updated, `false` if the data writer * was already present. */ @@ -97,7 +102,8 @@ class GraphCache const std::string & type_name, const rosidl_type_hash_t & type_hash, const rmw_gid_t & participant_gid, - const rmw_qos_profile_t & qos); + const rmw_qos_profile_t & qos, + const rosidl_type_hash_t * service_type_hash = nullptr); /// Add a data writer based on discovery. /** @@ -122,6 +128,8 @@ class GraphCache * \param type_hash Hash of the description of the topic type. * \param participant_gid GUID of the participant. * \param qos QoS profile of the data reader. + * \param service_type_hash Optional pointer to the service type hash, + * used when the reader is part of a service (e.g., request or reply topic) * \return `true` if the cache was updated, `false` if the data reader * was already present. */ @@ -133,7 +141,8 @@ class GraphCache const std::string & type_name, const rosidl_type_hash_t & type_hash, const rmw_gid_t & participant_gid, - const rmw_qos_profile_t & qos); + const rmw_qos_profile_t & qos, + const rosidl_type_hash_t * service_type_hash = nullptr); /// Add a data reader based on discovery. /** @@ -159,6 +168,8 @@ class GraphCache * \param participant_gid GUID of the participant. * \param qos QoS profile of the entity. * \param is_reader Whether the entity is a data reader or a writer. + * \param service_type_hash Optional pointer to the service type hash, + * used when the entity is part of a service (e.g., request or reply topic) * \return `true` if the cache was updated, `false` if the entity * was already present. */ @@ -171,7 +182,8 @@ class GraphCache const rosidl_type_hash_t & type_hash, const rmw_gid_t & participant_gid, const rmw_qos_profile_t & qos, - bool is_reader); + bool is_reader, + const rosidl_type_hash_t * service_type_hash = nullptr); /// Add a data reader or writer. /** @@ -445,6 +457,46 @@ class GraphCache rcutils_allocator_t * allocator, rmw_topic_endpoint_info_array_t * endpoints_info) const; + /// Get an array with information about the clients for a service. + /** + * \param[in] readers_info array containing data readers for the service’s reply topic. + * \param[in] writers_info array containing data writers for the service’s request topic. + * \param[in] allocator Allocator to allocate memory when populating `endpoints_info`. + * \param[out] endpoints_info A zero-initialized service endpoint information + * array to be populated with client information derived from the topic readers and writers. + * + * \return RMW_RET_INVALID_ARGUMENT if any input is null, or + * \return RMW_RET_ERROR if an unexpected error occurs, or + * \return RMW_RET_OK if successful. + */ + RMW_DDS_COMMON_PUBLIC + rmw_ret_t + get_clients_info_by_service( + const rmw_topic_endpoint_info_array_t * readers_info, + const rmw_topic_endpoint_info_array_t * writers_info, + rcutils_allocator_t * allocator, + rmw_service_endpoint_info_array_t * endpoints_info) const; + + /// Get an array with information about the servers for a service. + /** + * \param[in] readers_info array containing data readers for the service’s request topic. + * \param[in] writers_info array containing data writers for the service’s reply topic. + * \param[in] allocator Allocator to allocate memory when populating `endpoints_info`. + * \param[out] endpoints_info A zero-initialized service endpoint information array + * to be populated with server information derived from the topic readers and writers. + * + * \return RMW_RET_INVALID_ARGUMENT if any input is null, or + * \return RMW_RET_ERROR if an unexpected error occurs, or + * \return RMW_RET_OK if successful. + */ + RMW_DDS_COMMON_PUBLIC + rmw_ret_t + get_servers_info_by_service( + const rmw_topic_endpoint_info_array_t * readers_info, + const rmw_topic_endpoint_info_array_t * writers_info, + rcutils_allocator_t * allocator, + rmw_service_endpoint_info_array_t * endpoints_info) const; + /// Get all topic names and types. /** * \param[in] demangle_topic Function to demangle DDS topic names. @@ -565,10 +617,14 @@ class GraphCache /// Sequence of endpoints gids. using GidSeq = decltype(std::declval().writer_gid_seq); + /// \internal + /// Map from endpoint gids to service endpoints discovery info. + using EntityGidToServiceInfo = std::map; private: EntityGidToInfo data_writers_; EntityGidToInfo data_readers_; + EntityGidToServiceInfo data_services_; ParticipantToNodesMap participants_; std::function on_change_callback_ = nullptr; @@ -617,6 +673,20 @@ struct EntityInfo {} }; +/// Structure to represent the service discovery data from an endpoint (data reader or writer). +struct ServiceEntityInfo +{ + /// Hash of the associated service type, if applicable. + /// This field is used to associate endpoints (readers or writers) that form a service, + /// whether the middleware explicitly supports services or represents them as separate topics. + /// See: https://github.com/ros2/rmw/pull/371#issuecomment-2763634820 + rosidl_type_hash_t service_type_hash; + + explicit ServiceEntityInfo(const rosidl_type_hash_t & service_type_hash) + : service_type_hash(service_type_hash) + {} +}; + } // namespace rmw_dds_common #endif // RMW_DDS_COMMON__GRAPH_CACHE_HPP_ diff --git a/rmw_dds_common/include/rmw_dds_common/qos.hpp b/rmw_dds_common/include/rmw_dds_common/qos.hpp index 63e94f7..d0cd4c5 100644 --- a/rmw_dds_common/include/rmw_dds_common/qos.hpp +++ b/rmw_dds_common/include/rmw_dds_common/qos.hpp @@ -255,6 +255,37 @@ encode_type_hash_for_user_data_qos( const rosidl_type_hash_t & type_hash, std::string & string_out); +/// Parse USER_DATA "key=value;key=value;"" encoding, finding value of key "sertypehash" +/** + * \param[in] user_data USER_DATA qos raw bytes + * \param[in] user_data_size Length of user_data + * \param[out] type_hash_out Filled with type hash data if found, or to zero value if key not found + * \return RMW_RET_OK if key parsed successfully + * \return RMW_RET_UNSUPPORTED if key not found + * \return RMW_RET_INVALID_ARGUMENT if user_data is null + * \return RMW_RET_ERROR if sertypehash key found, but value could not be parsed + */ +RMW_DDS_COMMON_PUBLIC +rmw_ret_t +parse_sertype_hash_from_user_data( + const uint8_t * user_data, + size_t user_data_size, + rosidl_type_hash_t & type_hash_out); + +/// Encode type hash as "sertypehash=hash_string;" for use in USER_DATA QoS +/** + * \param[in] type_hash Type hash value to encode + * \param[out] string_out On success, will be set to "sertypehash=stringified_type_hash;" + * If type_hash's version is 0, string_out will be set to empty + * \return RMW_RET_OK on success, including empty string for unset version + * \return RMW_RET_BAD_ALLOC if memory allocation fails + */ +RMW_DDS_COMMON_PUBLIC +rmw_ret_t +encode_sertype_hash_for_user_data_qos( + const rosidl_type_hash_t & type_hash, + std::string & string_out); + } // namespace rmw_dds_common #endif // RMW_DDS_COMMON__QOS_HPP_ diff --git a/rmw_dds_common/src/graph_cache.cpp b/rmw_dds_common/src/graph_cache.cpp index 52f6b20..f84ab73 100644 --- a/rmw_dds_common/src/graph_cache.cpp +++ b/rmw_dds_common/src/graph_cache.cpp @@ -69,13 +69,20 @@ GraphCache::add_writer( const std::string & type_name, const rosidl_type_hash_t & type_hash, const rmw_gid_t & participant_gid, - const rmw_qos_profile_t & qos) + const rmw_qos_profile_t & qos, + const rosidl_type_hash_t * service_type_hash) { std::lock_guard guard(mutex_); auto pair = data_writers_.emplace( std::piecewise_construct, std::forward_as_tuple(gid), std::forward_as_tuple(topic_name, type_name, type_hash, participant_gid, qos)); + if (service_type_hash) { + data_services_.emplace( + std::piecewise_construct, + std::forward_as_tuple(gid), + std::forward_as_tuple(*service_type_hash)); + } GRAPH_CACHE_CALL_ON_CHANGE_CALLBACK_IF(this, pair.second); return pair.second; } @@ -104,13 +111,20 @@ GraphCache::add_reader( const std::string & type_name, const rosidl_type_hash_t & type_hash, const rmw_gid_t & participant_gid, - const rmw_qos_profile_t & qos) + const rmw_qos_profile_t & qos, + const rosidl_type_hash_t * service_type_hash) { std::lock_guard guard(mutex_); auto pair = data_readers_.emplace( std::piecewise_construct, std::forward_as_tuple(gid), std::forward_as_tuple(topic_name, type_name, type_hash, participant_gid, qos)); + if (service_type_hash) { + data_services_.emplace( + std::piecewise_construct, + std::forward_as_tuple(gid), + std::forward_as_tuple(*service_type_hash)); + } GRAPH_CACHE_CALL_ON_CHANGE_CALLBACK_IF(this, pair.second); return pair.second; } @@ -140,7 +154,8 @@ GraphCache::add_entity( const rosidl_type_hash_t & type_hash, const rmw_gid_t & participant_gid, const rmw_qos_profile_t & qos, - bool is_reader) + bool is_reader, + const rosidl_type_hash_t * service_type_hash) { if (is_reader) { return this->add_reader( @@ -149,7 +164,8 @@ GraphCache::add_entity( type_name, type_hash, participant_gid, - qos); + qos, + service_type_hash); } return this->add_writer( gid, @@ -157,7 +173,8 @@ GraphCache::add_entity( type_name, type_hash, participant_gid, - qos); + qos, + service_type_hash); } bool @@ -183,6 +200,7 @@ bool GraphCache::remove_writer(const rmw_gid_t & gid) { std::lock_guard guard(mutex_); + data_services_.erase(gid); bool ret = data_writers_.erase(gid) > 0; GRAPH_CACHE_CALL_ON_CHANGE_CALLBACK_IF(this, ret); return ret; @@ -192,6 +210,7 @@ bool GraphCache::remove_reader(const rmw_gid_t & gid) { std::lock_guard guard(mutex_); + data_services_.erase(gid); bool ret = data_readers_.erase(gid) > 0; GRAPH_CACHE_CALL_ON_CHANGE_CALLBACK_IF(this, ret); return ret; @@ -699,6 +718,176 @@ GraphCache::get_readers_info_by_topic( endpoints_info); } +static +rmw_ret_t +__get_entities_info_by_service( + const GraphCache::EntityGidToServiceInfo & services, + const rmw_topic_endpoint_info_array_t * readers_info, + const rmw_topic_endpoint_info_array_t * writers_info, + bool is_server, + rcutils_allocator_t * allocator, + rmw_service_endpoint_info_array_t * endpoints_info) +{ + assert(allocator); + assert(endpoints_info); + + if (0u == writers_info->size && 0u == readers_info->size) { + return RMW_RET_OK; + } + + rmw_ret_t ret = rmw_service_endpoint_info_array_init_with_size( + endpoints_info, + readers_info->size, + allocator); + if (RMW_RET_OK != ret) { + return ret; + } + std::unique_ptr< + rmw_service_endpoint_info_array_t, + std::function> + endpoints_info_delete_on_error( + endpoints_info, + [allocator](rmw_service_endpoint_info_array_t * p) { + rmw_ret_t ret = rmw_service_endpoint_info_array_fini( + p, + allocator + ); + if (RMW_RET_OK != ret) { + RCUTILS_LOG_ERROR_NAMED( + log_tag, + "Failed to destroy endpoints_info when function failed."); + } + } + ); + + auto matched_writer_info = [&writers_info] + (rmw_topic_endpoint_info_t & reader_info) -> rmw_topic_endpoint_info_t { + for(size_t i = 0; i < writers_info->size; i++) { + if(strcmp(writers_info->info_array[i].node_name, reader_info.node_name) == 0 && + strcmp(writers_info->info_array[i].node_namespace, reader_info.node_namespace) == 0 && + strcmp(writers_info->info_array[i].topic_type, reader_info.topic_type) == 0) + { + return writers_info->info_array[i]; + } + } + return rmw_get_zero_initialized_topic_endpoint_info(); + }; + + for(size_t i = 0; i < readers_info->size; i++) { + rmw_service_endpoint_info_t & endpoint_info = endpoints_info->info_array[i]; + ret = rmw_service_endpoint_info_set_node_name( + &endpoint_info, + readers_info->info_array[i].node_name, + allocator); + if (RMW_RET_OK != ret) { + return ret; + } + + ret = rmw_service_endpoint_info_set_node_namespace( + &endpoint_info, + readers_info->info_array[i].node_namespace, + allocator); + if (RMW_RET_OK != ret) { + return ret; + } + + ret = rmw_service_endpoint_info_set_endpoint_type( + &endpoint_info, + is_server ? RMW_ENDPOINT_SERVER : RMW_ENDPOINT_CLIENT); + if (RMW_RET_OK != ret) { + return ret; + } + + ret = rmw_service_endpoint_info_set_endpoint_count(&endpoint_info, 2); + if (RMW_RET_OK != ret) { + return ret; + } + ret = rmw_service_endpoint_info_set_service_type( + &endpoint_info, + readers_info->info_array[i].topic_type, + allocator); + if (RMW_RET_OK != ret) { + return ret; + } + rmw_gid_t gid; + memset(&gid.data, 0, RMW_GID_STORAGE_SIZE); + memcpy(&gid.data, readers_info->info_array[i].endpoint_gid, RMW_GID_STORAGE_SIZE); + if(services.find(gid) == services.end()) { + RCUTILS_LOG_ERROR_NAMED(log_tag, "Failed to find gid in services."); + return RMW_RET_ERROR; + } + ret = rmw_service_endpoint_info_set_service_type_hash( + &endpoint_info, + &services.at(gid).service_type_hash); + if (RMW_RET_OK != ret) { + return ret; + } + + auto writer_info = matched_writer_info(readers_info->info_array[i]); + uint8_t gids[2][RMW_GID_STORAGE_SIZE]; + memset(gids[0], 0, RMW_GID_STORAGE_SIZE * 2); + memcpy(gids[0], readers_info->info_array[i].endpoint_gid, RMW_GID_STORAGE_SIZE); + memcpy(gids[1], writer_info.endpoint_gid, RMW_GID_STORAGE_SIZE); + + ret = rmw_service_endpoint_info_set_gids( + &endpoint_info, + (const uint8_t *) gids, + 2, + RMW_GID_STORAGE_SIZE, + allocator); + if (RMW_RET_OK != ret) { + return ret; + } + + const rmw_qos_profile_t qos_profiles[2] = + {readers_info->info_array[i].qos_profile, writer_info.qos_profile}; + ret = rmw_service_endpoint_info_set_qos_profiles( + &endpoint_info, + qos_profiles, + 2, + allocator); + if (RMW_RET_OK != ret) { + return ret; + } + } + endpoints_info_delete_on_error.release(); + return RMW_RET_OK; +} + +rmw_ret_t +GraphCache::get_clients_info_by_service( + const rmw_topic_endpoint_info_array_t * readers_info, + const rmw_topic_endpoint_info_array_t * writers_info, + rcutils_allocator_t * allocator, + rmw_service_endpoint_info_array_t * endpoints_info) const +{ + std::lock_guard lock(mutex_); + return __get_entities_info_by_service( + data_services_, + readers_info, + writers_info, + false, + allocator, + endpoints_info); +} + +rmw_ret_t +GraphCache::get_servers_info_by_service( + const rmw_topic_endpoint_info_array_t * readers_info, + const rmw_topic_endpoint_info_array_t * writers_info, + rcutils_allocator_t * allocator, + rmw_service_endpoint_info_array_t * endpoints_info) const +{ + std::lock_guard lock(mutex_); + return __get_entities_info_by_service( + data_services_, + readers_info, + writers_info, + true, + allocator, + endpoints_info); +} + using NamesAndTypes = std::map>; static diff --git a/rmw_dds_common/src/qos.cpp b/rmw_dds_common/src/qos.cpp index 7604cb1..8c9e96e 100644 --- a/rmw_dds_common/src/qos.cpp +++ b/rmw_dds_common/src/qos.cpp @@ -713,4 +713,48 @@ encode_type_hash_for_user_data_qos( return RMW_RET_OK; } +rmw_ret_t +parse_sertype_hash_from_user_data( + const uint8_t * user_data, + size_t user_data_size, + rosidl_type_hash_t & type_hash_out) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(user_data, RMW_RET_INVALID_ARGUMENT); + std::vector udvec(user_data, user_data + user_data_size); + auto key_value = rmw::impl::cpp::parse_key_value(udvec); + auto typehash_it = key_value.find("sertypehash"); + if (typehash_it == key_value.end()) { + type_hash_out = rosidl_get_zero_initialized_type_hash(); + return RMW_RET_UNSUPPORTED; + } + std::string type_hash_str(typehash_it->second.begin(), typehash_it->second.end()); + if (RCUTILS_RET_OK != rosidl_parse_type_hash_string(type_hash_str.c_str(), &type_hash_out)) { + return RMW_RET_ERROR; + } + return RMW_RET_OK; +} + +rmw_ret_t +encode_sertype_hash_for_user_data_qos( + const rosidl_type_hash_t & type_hash, + std::string & string_out) +{ + if (type_hash.version == ROSIDL_TYPE_HASH_VERSION_UNSET) { + string_out.clear(); + return RMW_RET_OK; + } + auto allocator = rcutils_get_default_allocator(); + char * type_hash_c_str = nullptr; + rcutils_ret_t stringify_ret = rosidl_stringify_type_hash(&type_hash, allocator, &type_hash_c_str); + if (RCUTILS_RET_BAD_ALLOC == stringify_ret) { + return RMW_RET_BAD_ALLOC; + } + if (RCUTILS_RET_OK != stringify_ret) { + return RMW_RET_ERROR; + } + RCPPUTILS_SCOPE_EXIT(allocator.deallocate(type_hash_c_str, &allocator.state)); + string_out = "sertypehash=" + std::string(type_hash_c_str) + ";"; + return RMW_RET_OK; +} + } // namespace rmw_dds_common