Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rcl/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ find_package(rmw REQUIRED)
find_package(rmw_implementation REQUIRED)
find_package(rosidl_runtime_c REQUIRED)
find_package(tracetools REQUIRED)
find_package(common_content_filter REQUIRED)

include(cmake/rcl_set_symbol_visibility_hidden.cmake)
include(cmake/get_default_rcl_logging_implementation.cmake)
Expand Down Expand Up @@ -81,6 +82,7 @@ ament_target_dependencies(${PROJECT_NAME}
${RCL_LOGGING_IMPL}
"rosidl_runtime_c"
"tracetools"
"common_content_filter"
)

# Causes the visibility macros to use dllexport rather than dllimport,
Expand Down Expand Up @@ -121,6 +123,7 @@ ament_export_dependencies(rcutils)
ament_export_dependencies(${RCL_LOGGING_IMPL})
ament_export_dependencies(rosidl_runtime_c)
ament_export_dependencies(tracetools)
ament_export_dependencies(common_content_filter)

if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
Expand Down
1 change: 1 addition & 0 deletions rcl/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
<depend>rmw_implementation</depend>
<depend>rosidl_runtime_c</depend>
<depend>tracetools</depend>
<depend>common_content_filter</depend>

<test_depend>ament_cmake_gtest</test_depend>
<test_depend>ament_lint_auto</test_depend>
Expand Down
132 changes: 127 additions & 5 deletions rcl/src/rcl/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ extern "C"

#include <stdio.h>

#include "common_content_filter/api.h"

#include "rcl/error_handling.h"
#include "rcl/node.h"
#include "rcutils/logging_macros.h"
Expand All @@ -42,6 +44,54 @@ rcl_get_zero_initialized_subscription()
return null_subscription;
}

static
bool
rcl_subscription_common_content_filter_set(
const rcl_subscription_t * subscription,
const rmw_subscription_content_filter_options_t * options)
{
if (!subscription->impl->common_content_filter) {
subscription->impl->common_content_filter =
common_content_filter_create(subscription->impl->type_support);
if (!subscription->impl->common_content_filter) {
RCL_SET_ERROR_MSG("Failed to create common content filter");
return false;
}
RCUTILS_LOG_DEBUG_NAMED(
ROS_PACKAGE_NAME, "common content filter is created for topic '%s'",
rcl_subscription_get_topic_name(subscription));
}

if (!common_content_filter_set(
subscription->impl->common_content_filter,
options))
{
RCL_SET_ERROR_MSG("Failed to set common content filter");
return false;
}

return true;
}

static
bool
rcl_subscription_common_content_filter_is_relevant(
const rcl_subscription_t * subscription,
void * data,
bool serialized)
{
if (subscription->impl->common_content_filter &&
common_content_filter_is_enabled(subscription->impl->common_content_filter))
{
return common_content_filter_evaluate(
subscription->impl->common_content_filter,
data,
serialized);
}

return true;
}

rcl_ret_t
rcl_subscription_init(
rcl_subscription_t * subscription,
Expand Down Expand Up @@ -120,6 +170,22 @@ rcl_subscription_init(
options->qos.avoid_ros_namespace_conventions;
// options
subscription->impl->options = *options;
subscription->impl->type_support = type_support;

if (options->rmw_subscription_options.content_filter_options) {
// Content filter topic not supported (or not enabled as some failed cases) on DDS.
// TODO(iuhilnehc-ynos): enable common content filter with an environment variable
// (e.g. FORCE_COMMON_CONTENT_FILTER) regardless of whether cft is enabled on DDS.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// (e.g. FORCE_COMMON_CONTENT_FILTER) regardless of whether cft is enabled on DDS.
// (e.g. FORCE_COMMON_CONTENT_FILTER) regardless of whether cft is enabled on RMW implementation.

if (!subscription->impl->rmw_handle->is_cft_enabled) {
if (!rcl_subscription_common_content_filter_set(
subscription,
options->rmw_subscription_options.content_filter_options))
{
goto fail;
}
}
}

RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription initialized");
ret = RCL_RET_OK;
TRACEPOINT(
Expand Down Expand Up @@ -147,6 +213,10 @@ rcl_subscription_init(
RCUTILS_SAFE_FWRITE_TO_STDERR("\n");
}

if (subscription->impl->common_content_filter) {
common_content_filter_destroy(subscription->impl->common_content_filter);
}

allocator->deallocate(subscription->impl, allocator->state);
subscription->impl = NULL;
}
Expand Down Expand Up @@ -190,6 +260,10 @@ rcl_subscription_fini(rcl_subscription_t * subscription, rcl_node_t * node)
result = RCL_RET_ERROR;
}

if (subscription->impl->common_content_filter) {
common_content_filter_destroy(subscription->impl->common_content_filter);
}

allocator.deallocate(subscription->impl, allocator.state);
subscription->impl = NULL;
}
Expand Down Expand Up @@ -432,7 +506,9 @@ rcl_subscription_is_cft_enabled(const rcl_subscription_t * subscription)
if (!rcl_subscription_is_valid(subscription)) {
return false;
}
return subscription->impl->rmw_handle->is_cft_enabled;
return subscription->impl->rmw_handle->is_cft_enabled ||
(subscription->impl->common_content_filter &&
common_content_filter_is_enabled(subscription->impl->common_content_filter));
}

rcl_ret_t
Expand All @@ -454,8 +530,13 @@ rcl_subscription_set_content_filter(
&options->rmw_subscription_content_filter_options);

if (ret != RMW_RET_OK) {
RCL_SET_ERROR_MSG(rmw_get_error_string().str);
return rcl_convert_rmw_ret_to_rcl_ret(ret);
rcl_reset_error();
if (!rcl_subscription_common_content_filter_set(
subscription,
&options->rmw_subscription_content_filter_options))
{
return RMW_RET_ERROR;
}
}

// copy options into subscription_options
Expand Down Expand Up @@ -489,8 +570,19 @@ rcl_subscription_get_content_filter(
subscription->impl->rmw_handle,
allocator,
&options->rmw_subscription_content_filter_options);

return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
// If options can be get from DDS, it's unnecessary to get them from common content filter.
if (rmw_ret != RMW_RET_OK) {
rcl_reset_error();
if (!common_content_filter_get(
subscription->impl->common_content_filter,
allocator,
&options->rmw_subscription_content_filter_options))
{
RCL_SET_ERROR_MSG("Failed to get content filter");
return RMW_RET_ERROR;
}
}
return RMW_RET_OK;
}

rcl_ret_t
Expand Down Expand Up @@ -525,6 +617,16 @@ rcl_take(
if (!taken) {
return RCL_RET_SUBSCRIPTION_TAKE_FAILED;
}

// filter ros message with common content filter
if (!rcl_subscription_common_content_filter_is_relevant(
subscription,
ros_message,
false))
{
return RCL_RET_SUBSCRIPTION_TAKE_FAILED;
}

return RCL_RET_OK;
}

Expand Down Expand Up @@ -604,6 +706,16 @@ rcl_take_serialized_message(
if (!taken) {
return RCL_RET_SUBSCRIPTION_TAKE_FAILED;
}

// filter ros message with common content filter
if (!rcl_subscription_common_content_filter_is_relevant(
subscription,
serialized_message,
true))
{
return RCL_RET_SUBSCRIPTION_TAKE_FAILED;
}

return RCL_RET_OK;
}

Expand Down Expand Up @@ -640,6 +752,16 @@ rcl_take_loaned_message(
if (!taken) {
return RCL_RET_SUBSCRIPTION_TAKE_FAILED;
}

// filter ros message with common content filter
if (!rcl_subscription_common_content_filter_is_relevant(
subscription,
*loaned_message,
false))
{
return RCL_RET_SUBSCRIPTION_TAKE_FAILED;
}

return RCL_RET_OK;
}

Expand Down
3 changes: 3 additions & 0 deletions rcl/src/rcl/subscription_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
#include "rmw/rmw.h"

#include "rcl/subscription.h"
#include "rosidl_runtime_c/message_type_support_struct.h"

struct rcl_subscription_impl_s
{
rcl_subscription_options_t options;
rmw_qos_profile_t actual_qos;
rmw_subscription_t * rmw_handle;
void * common_content_filter;
const rosidl_message_type_support_t * type_support;
};

#endif // RCL__SUBSCRIPTION_IMPL_H_
Loading