Skip to content

Commit 9c77f32

Browse files
Janosch MachowinskiJanosch Machowinski
authored andcommitted
feat: Implemented a timer manager that can handle any timer type
Signed-off-by: Janosch Machowinski <[email protected]>
1 parent 8230d15 commit 9c77f32

File tree

1 file changed

+356
-0
lines changed

1 file changed

+356
-0
lines changed
Lines changed: 356 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,356 @@
1+
#pragma once
2+
#include <chrono>
3+
#include <functional>
4+
#include <memory>
5+
#include <vector>
6+
#include <map>
7+
8+
#include <rcl/timer.h>
9+
#include <rclcpp/timer.hpp>
10+
11+
#include <inttypes.h>
12+
13+
namespace rclcpp::executors
14+
{
15+
16+
/**
17+
* @brief A class for managing a queue of timers
18+
*
19+
* This class holds a queue of timers of one type (RCL_ROS_TIME, RCL_SYSTEM_TIME or RCL_STEADY_TIME).
20+
* The queue itself manages an internal map of the timers, orders by the next time a timer will be
21+
* ready. Each time a timer is ready, a callback will be called from the internal thread.
22+
*/
23+
class TimerQueue
24+
{
25+
struct TimerData
26+
{
27+
std::shared_ptr<const rcl_timer_t> rcl_ref;
28+
std::function<void()> timer_ready_callback;
29+
};
30+
31+
public:
32+
TimerQueue(rcl_clock_type_t timer_type)
33+
: timer_type(timer_type),
34+
used_clock_for_timers(timer_type),
35+
trigger_thread([this]() {
36+
timer_thread();
37+
})
38+
{
39+
};
40+
41+
~TimerQueue()
42+
{
43+
stop();
44+
45+
trigger_thread.join();
46+
}
47+
48+
void stop()
49+
{
50+
running = false;
51+
used_clock_for_timers.cancel_sleep_or_wait();
52+
}
53+
54+
/**
55+
* @brief Removes a new timer from the queue.
56+
* This function is thread safe.
57+
*
58+
* Removes a timer, if it was added to this queue.
59+
* Ignores timers that are not part of this queue
60+
*
61+
* @param timer the timer to remove.
62+
*/
63+
64+
void remove_timer(const rclcpp::TimerBase::SharedPtr & timer)
65+
{
66+
rcl_clock_t * clock_type_of_timer;
67+
68+
std::shared_ptr<const rcl_timer_t> handle = timer->get_timer_handle();
69+
70+
if (rcl_timer_clock(
71+
const_cast<rcl_timer_t *>(handle.get()),
72+
&clock_type_of_timer) != RCL_RET_OK)
73+
{
74+
assert(false);
75+
}
76+
77+
if (clock_type_of_timer->type != timer_type) {
78+
// timer is handled by another queue
79+
return;
80+
}
81+
82+
timer->clear_on_reset_callback();
83+
84+
std::scoped_lock l(mutex);
85+
86+
auto it = std::find_if(
87+
all_timers.begin(), all_timers.end(),
88+
[rcl_ref = timer->get_timer_handle()](const std::unique_ptr<TimerData> & d)
89+
{
90+
return d->rcl_ref == rcl_ref;
91+
});
92+
93+
if (it != all_timers.end()) {
94+
const TimerData * data_ptr = it->get();
95+
96+
97+
auto it2 = std::find_if(
98+
running_timers.begin(), running_timers.end(), [data_ptr](const auto & e) {
99+
return e.second == data_ptr;
100+
});
101+
102+
running_timers.erase(it2);
103+
all_timers.erase(it);
104+
}
105+
106+
used_clock_for_timers.cancel_sleep_or_wait();
107+
}
108+
109+
/**
110+
* @brief Adds a new timer to the queue.
111+
* This function is thread safe.
112+
*
113+
* This function will ignore any timer, that has not a matching type
114+
*
115+
* @param timer the timer to add.
116+
* @param timer_ready_callback callback that should be called when the timer is ready.
117+
*/
118+
void add_timer(
119+
const rclcpp::TimerBase::SharedPtr & timer,
120+
const std::function<void()> & timer_ready_callback)
121+
{
122+
rcl_clock_t * clock_type_of_timer;
123+
124+
std::shared_ptr<const rcl_timer_t> handle = timer->get_timer_handle();
125+
126+
if (rcl_timer_clock(
127+
const_cast<rcl_timer_t *>(handle.get()),
128+
&clock_type_of_timer) != RCL_RET_OK)
129+
{
130+
assert(false);
131+
}
132+
133+
if (clock_type_of_timer->type != timer_type) {
134+
// timer is handled by another queue
135+
return;
136+
}
137+
138+
std::unique_ptr<TimerData> data = std::make_unique<TimerData>();
139+
data->timer_ready_callback = std::move(timer_ready_callback);
140+
data->rcl_ref = std::move(handle);
141+
142+
timer->set_on_reset_callback(
143+
[data_ptr = data.get(), this](size_t) {
144+
std::scoped_lock l(mutex);
145+
if (!remove_if_dropped(data_ptr))
146+
{
147+
add_timer_to_running_map(data_ptr);
148+
}
149+
});
150+
151+
{
152+
std::scoped_lock l(mutex);
153+
add_timer_to_running_map(data.get());
154+
155+
all_timers.emplace_back(std::move(data) );
156+
}
157+
158+
//wake up thread as new timer was added
159+
used_clock_for_timers.cancel_sleep_or_wait();
160+
}
161+
162+
private:
163+
/**
164+
* Checks if the timer is still referenced if not deletes it from the queue
165+
*
166+
* @param timer_data The timer to check
167+
* @return true if removed / invalid
168+
*/
169+
bool remove_if_dropped(const TimerData * timer_data)
170+
{
171+
if (timer_data->rcl_ref.unique()) {
172+
// timer was deleted
173+
auto it = std::find_if(
174+
all_timers.begin(), all_timers.end(), [timer_data](const std::unique_ptr<TimerData> & e) {
175+
return timer_data == e.get();
176+
}
177+
);
178+
179+
if (it != all_timers.end()) {
180+
all_timers.erase(it);
181+
}
182+
return true;
183+
}
184+
return false;
185+
}
186+
187+
/**
188+
* @brief adds the given timer_data to the map of running timers, if valid.
189+
*
190+
* Advances the rcl timer.
191+
* Computes the next call time of the timer.
192+
* readds the timer to the map of running timers
193+
*/
194+
void add_timer_to_running_map(const TimerData * timer_data)
195+
{
196+
rcl_ret_t ret = rcl_timer_call(const_cast<rcl_timer_t *>(timer_data->rcl_ref.get()));
197+
if (ret == RCL_RET_TIMER_CANCELED) {
198+
return;
199+
}
200+
201+
int64_t next_call_time;
202+
203+
ret = rcl_timer_get_next_call_time(timer_data->rcl_ref.get(), &next_call_time);
204+
205+
if (ret == RCL_RET_OK) {
206+
running_timers.emplace(next_call_time, timer_data);
207+
}
208+
209+
// wake up the timer thread so that it can pick up the timer
210+
used_clock_for_timers.cancel_sleep_or_wait();
211+
}
212+
213+
/**
214+
* Returns the time when the next timer becomes ready
215+
*/
216+
std::chrono::nanoseconds get_next_timer_ready_time() const
217+
{
218+
if (running_timers.empty()) {
219+
// can't use std::chrono::nanoseconds::max, as wait_for
220+
// internally computes end time by using ::now() + timeout
221+
// as a workaround, we use some absurd high timeout
222+
return std::chrono::nanoseconds(used_clock_for_timers.now().nanoseconds()) + std::chrono::hours(10000);
223+
}
224+
return running_timers.begin()->first;
225+
}
226+
227+
void call_ready_timer_callbacks()
228+
{
229+
auto readd_timer_to_running_map = [this](TimerMap::node_type && e)
230+
{
231+
const auto & timer_data = e.mapped();
232+
if(remove_if_dropped(timer_data))
233+
{
234+
return;
235+
}
236+
237+
int64_t next_call_time;
238+
239+
auto ret = rcl_timer_get_next_call_time(timer_data->rcl_ref.get(), &next_call_time);
240+
241+
if (ret == RCL_RET_OK) {
242+
e.key() = std::chrono::nanoseconds(next_call_time);
243+
running_timers.insert(std::move(e));
244+
}
245+
};
246+
247+
while (!running_timers.empty()) {
248+
249+
if(remove_if_dropped(running_timers.begin()->second))
250+
{
251+
continue;
252+
}
253+
254+
int64_t time_until_call;
255+
256+
const rcl_timer_t * rcl_timer_ref = running_timers.begin()->second->rcl_ref.get();
257+
auto ret = rcl_timer_get_time_until_next_call(rcl_timer_ref, &time_until_call);
258+
if (ret == RCL_RET_TIMER_CANCELED) {
259+
running_timers.erase(running_timers.begin());
260+
continue;
261+
}
262+
263+
if (time_until_call <= 0) {
264+
// advance next call time;
265+
rcl_ret_t ret = rcl_timer_call(const_cast<rcl_timer_t *>(rcl_timer_ref));
266+
if (ret == RCL_RET_TIMER_CANCELED) {
267+
running_timers.erase(running_timers.begin());
268+
continue;
269+
}
270+
271+
// timer is ready, execute callback
272+
running_timers.begin()->second->timer_ready_callback();
273+
readd_timer_to_running_map(running_timers.extract(running_timers.begin()));
274+
continue;
275+
}
276+
break;
277+
}
278+
}
279+
280+
void timer_thread()
281+
{
282+
while (running && rclcpp::ok()) {
283+
std::chrono::nanoseconds next_wakeup_time;
284+
{
285+
std::scoped_lock l(mutex);
286+
call_ready_timer_callbacks();
287+
288+
next_wakeup_time = get_next_timer_ready_time();
289+
}
290+
try {
291+
// RCUTILS_LOG_ERROR_NAMED("rclcpp", "TimerQueue::timer_thread before sleep, next wakeup time %+" PRId64 , next_wakeup_time.count());
292+
used_clock_for_timers.sleep_until(rclcpp::Time(next_wakeup_time.count(), timer_type));
293+
} catch (const std::runtime_error &) {
294+
//there is a race on shutdown, were the context may become invalid, while we call sleep_until
295+
running = false;
296+
}
297+
}
298+
thread_terminated = true;
299+
}
300+
301+
rcl_clock_type_t timer_type;
302+
303+
Context::SharedPtr clock_sleep_context;
304+
305+
rclcpp::Clock used_clock_for_timers;
306+
307+
std::mutex mutex;
308+
309+
std::atomic_bool running = true;
310+
std::atomic_bool thread_terminated = false;
311+
312+
std::vector<std::unique_ptr<TimerData>> all_timers;
313+
314+
using TimerMap = std::multimap<std::chrono::nanoseconds, const TimerData *>;
315+
TimerMap running_timers;
316+
317+
std::thread trigger_thread;
318+
319+
std::condition_variable thread_conditional;
320+
};
321+
322+
class TimerManager
323+
{
324+
std::array<TimerQueue, 3> timer_queues;
325+
326+
public:
327+
TimerManager()
328+
: timer_queues{RCL_ROS_TIME, RCL_SYSTEM_TIME, RCL_STEADY_TIME}
329+
{
330+
331+
}
332+
333+
void remove_timer(const rclcpp::TimerBase::SharedPtr & timer)
334+
{
335+
for (TimerQueue & q : timer_queues) {
336+
q.remove_timer(timer);
337+
}
338+
}
339+
340+
void add_timer(
341+
const rclcpp::TimerBase::SharedPtr & timer,
342+
const std::function<void()> & timer_ready_callback)
343+
{
344+
for (TimerQueue & q : timer_queues) {
345+
q.add_timer(timer, timer_ready_callback);
346+
}
347+
}
348+
349+
void stop()
350+
{
351+
for (TimerQueue & q : timer_queues) {
352+
q.stop();
353+
}
354+
}
355+
};
356+
}

0 commit comments

Comments
 (0)