Skip to content

Allow and track multiple publisher to same topic #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "performance_test/ros2/communication.hpp"
#include "performance_test/ros2/tracker.hpp"
#include "performance_test/ros2/events_logger.hpp"
#include "performance_test/ros2/names_utilities.hpp"

using namespace std::chrono_literals;

Expand Down Expand Up @@ -178,12 +179,10 @@ friend class System;
&Node::_request<Srv>,
this,
service.name,
size
size,
period
);

// store the frequency of this client task
std::get<1>(_clients.at(service.name)).set_frequency(1000000 / period.count());

this->add_timer(period, client_task);

}
Expand Down Expand Up @@ -271,6 +270,8 @@ friend class System;
auto msg = std::make_shared<Msg>();
resize_msg(msg->data, msg->header, size);

// set the node name
msg->header.node_id = item_name_to_id(this->get_fully_qualified_name());
// get the frequency value that we stored when creating the publisher
msg->header.frequency = 1000000.0 / period.count();
// set the tracking count for this message
Expand All @@ -288,6 +289,8 @@ friend class System;
auto msg = std::make_unique<Msg>();
resize_msg(msg->data, msg->header, size);

// set the node name
msg->header.node_id = item_name_to_id(this->get_fully_qualified_name());
// get the frequency value that we stored when creating the publisher
msg->header.frequency = 1000000.0 / period.count();
// set the tracking count for this message
Expand Down Expand Up @@ -337,7 +340,7 @@ friend class System;


template <typename Srv>
void _request(const std::string& name, size_t size)
void _request(const std::string& name, size_t size, std::chrono::microseconds period)
{

(void)size;
Expand Down Expand Up @@ -374,7 +377,8 @@ friend class System;
// Create request
auto request = std::make_shared<typename Srv::Request>();
// get the frequency value that we stored when creating the publisher
request->header.frequency = tracker.frequency();
request->header.node_id = item_name_to_id(this->get_fully_qualified_name());
request->header.frequency = 1000000.0 / period.count();
request->header.tracking_number = tracking_number;
request->header.stamp = this->now();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class Stat
}

_n++;
_sum += x;
Ex += x - K;
Ex2 += (x - K) * (x - K);
}
Expand Down Expand Up @@ -77,11 +78,20 @@ class Stat
return _min;
}

double sum() const
{
if (_n == 0) {
return std::nan("");
}
return _sum;
}

unsigned long int n() const { return _n; }

private:
double _max;
double _min;
double _sum = 0;
double K;
double Ex = 0;
double Ex2 = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#pragma once

#include <iostream>
#include <unordered_map>

#include "rclcpp/time.hpp"

Expand Down Expand Up @@ -55,14 +56,12 @@ class Tracker

unsigned long int received() const { return _received_messages; }

size_t size() const { return _size; }
float size() const { return _size.mean(); }

float frequency() const { return _frequency; }
float frequency() const { return _frequency.sum(); }

Stat<unsigned long int> stat() const { return _stat; }

void set_frequency(float f) { _frequency = f; }

unsigned long int last() const { return _last_latency; }

private:
Expand All @@ -75,10 +74,11 @@ class Tracker
unsigned long int _received_messages = 0;
unsigned long int _late_messages = 0;
unsigned long int _too_late_messages = 0;
size_t _size = 0;
float _frequency = 0;
Stat<size_t> _size;
Stat<float> _frequency;
Stat<unsigned long int> _stat;
TrackingNumber _tracking_number_count = 0;
// A node-name indexed map to store the publisher tracking number to track.
std::unordered_map<size_t, TrackingNumber> _tracking_number_count_map;
TrackingOptions _tracking_options;
};
}
29 changes: 16 additions & 13 deletions performances/performance_test/src/ros2/tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@ void performance_test::Tracker::scan(
const rclcpp::Time& now,
std::shared_ptr<EventsLogger> elog)
{
// If this is first message received store some info about it
if (stat().n() == 0) {
_size = header.size;
_frequency = header.frequency;
}

// Compute latency
rclcpp::Time stamp(header.stamp.sec, header.stamp.nanosec, RCL_ROS_TIME);
auto lat = std::chrono::nanoseconds((now - stamp).nanoseconds());
Expand All @@ -35,16 +29,23 @@ void performance_test::Tracker::scan(

if (_tracking_options.is_enabled){

auto it = _tracking_number_count_map.find(header.node_id);
// If this is first message received for the node store some info about it
if (it == _tracking_number_count_map.end()) {
_size.add_sample(header.size);
_frequency.add_sample(header.frequency);
it = _tracking_number_count_map.insert(it, {header.node_id, header.tracking_number});
}

// Check if we received the correct message. The assumption here is
// that the messages arrive in chronological order
if (header.tracking_number == _tracking_number_count) {
_tracking_number_count++;
} else {
if (header.tracking_number == it->second) {
it->second++;
} else if (header.tracking_number > it->second) {
// We missed some mesages...
long unsigned int n_lost = header.tracking_number - _tracking_number_count;
long unsigned int n_lost = header.tracking_number - it->second;
_lost_messages += n_lost;
_tracking_number_count = header.tracking_number + 1;

it->second = header.tracking_number + 1;
// Log the event
if (elog != nullptr){
EventsLogger::Event ev;
Expand All @@ -60,10 +61,12 @@ void performance_test::Tracker::scan(
ev.description = description.str();
elog->write_event(ev);
}
} else {
std::cerr << "warning: order of message is not sequential! Is node_name unique for each publisher?" << std::endl;
}

// Check if the message latency qualifies the message as a lost or late message.
const int period_us = 1000000 / _frequency;
const int period_us = 1000000 / header.frequency;
const unsigned int latency_late_threshold_us = std::min(_tracking_options.late_absolute_us,
_tracking_options.late_percentage * period_us / 100);
const unsigned int latency_too_late_threshold_us = std::min(_tracking_options.too_late_absolute_us,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
builtin_interfaces/Time stamp
uint64 node_id
uint32 tracking_number
float32 frequency
uint32 size