Skip to content

Commit a10ff3b

Browse files
FlovaBrad Martinbmartin427jmachowinski
authored
Backport of the Events Executor (#1391) (#1435)
* Introduce EventsExecutor implementation (#1389) Signed-off-by: Brad Martin <[email protected]> Signed-off-by: Brad Martin <[email protected]> Signed-off-by: Florian Vahl <[email protected]> Co-authored-by: Brad Martin <[email protected]> Co-authored-by: Brad Martin <[email protected]> Co-authored-by: Janosch Machowinski <[email protected]>
1 parent dc598ab commit a10ff3b

22 files changed

+3291
-320
lines changed

rclpy/CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ pybind11_add_module(_rclpy_pybind11
8484
src/rclpy/destroyable.cpp
8585
src/rclpy/duration.cpp
8686
src/rclpy/clock_event.cpp
87+
src/rclpy/events_executor/delayed_event_thread.cpp
88+
src/rclpy/events_executor/events_executor.cpp
89+
src/rclpy/events_executor/events_queue.cpp
90+
src/rclpy/events_executor/rcl_support.cpp
91+
src/rclpy/events_executor/timers_manager.cpp
8792
src/rclpy/exceptions.cpp
8893
src/rclpy/graph.cpp
8994
src/rclpy/guard_condition.cpp
@@ -176,6 +181,7 @@ if(BUILD_TESTING)
176181
test/test_create_node.py
177182
test/test_create_while_spinning.py
178183
test/test_destruction.py
184+
test/test_events_executor.py
179185
test/test_executor.py
180186
test/test_expand_topic_name.py
181187
test/test_guard_condition.py

rclpy/rclpy/event_handler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ def _default_incompatible_type_callback(event):
261261
pass
262262

263263
if self.matched:
264-
event_handlers.append(QoSEventHandler(
264+
event_handlers.append(EventHandler(
265265
callback_group=callback_group,
266266
callback=self.matched,
267267
event_type=QoSSubscriptionEventType.RCL_SUBSCRIPTION_MATCHED,
@@ -372,7 +372,7 @@ def _default_incompatible_type_callback(event):
372372
pass
373373

374374
if self.matched:
375-
event_handlers.append(QoSEventHandler(
375+
event_handlers.append(EventHandler(
376376
callback_group=callback_group,
377377
callback=self.matched,
378378
event_type=QoSPublisherEventType.RCL_PUBLISHER_MATCHED,
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Copyright 2024-2025 Brad Martin
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from .events_executor import EventsExecutor # noqa: F401
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# Copyright 2024-2025 Brad Martin
2+
# Copyright 2024 Merlin Labs, Inc.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
import faulthandler
16+
import typing
17+
18+
import rclpy.executors
19+
from rclpy.impl.implementation_singleton import rclpy_implementation as _rclpy
20+
21+
22+
# Try to look like we inherit from the rclpy Executor for type checking purposes without
23+
# getting any of the code from the base class.
24+
def EventsExecutor(*, context: typing.Optional[rclpy.Context] = None) -> rclpy.executors.Executor:
25+
if context is None:
26+
context = rclpy.get_default_context()
27+
28+
# For debugging purposes, if anything goes wrong in C++ make sure we also get a
29+
# Python backtrace dumped with the crash.
30+
faulthandler.enable()
31+
32+
ex = typing.cast(rclpy.executors.Executor, _rclpy.EventsExecutor(context))
33+
34+
# rclpy.Executor does this too. Note, the context itself is smart enough to check
35+
# for bound methods, and check whether the instances they're bound to still exist at
36+
# callback time, so we don't have to worry about tearing down this stale callback at
37+
# destruction time.
38+
# TODO(bmartin427) This should really be done inside of the EventsExecutor
39+
# implementation itself, but I'm unable to figure out a pybind11 incantation that
40+
# allows me to pass this bound method call from C++.
41+
context.on_shutdown(ex.wake)
42+
43+
return ex

rclpy/rclpy/task.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import inspect
1717
import sys
1818
import threading
19+
from typing import Callable
1920
import warnings
2021
import weakref
2122

@@ -197,6 +198,20 @@ def add_done_callback(self, callback):
197198
if invoke:
198199
callback(self)
199200

201+
def remove_done_callback(self, callback: Callable[['Future'], None]) -> bool:
202+
"""
203+
Remove a previously-added done callback.
204+
205+
Returns true if the given callback was found and removed. Always fails if the Future was
206+
already complete.
207+
"""
208+
with self._lock:
209+
try:
210+
self._callbacks.remove(callback)
211+
except ValueError:
212+
return False
213+
return True
214+
200215

201216
class Task(Future):
202217
"""

rclpy/src/rclpy/_rclpy_pybind11.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "duration.hpp"
3333
#include "clock_event.hpp"
3434
#include "event_handle.hpp"
35+
#include "events_executor/events_executor.hpp"
3536
#include "exceptions.hpp"
3637
#include "graph.hpp"
3738
#include "guard_condition.hpp"
@@ -247,4 +248,6 @@ PYBIND11_MODULE(_rclpy_pybind11, m) {
247248
rclpy::define_signal_handler_api(m);
248249
rclpy::define_clock_event(m);
249250
rclpy::define_lifecycle_api(m);
251+
252+
rclpy::events_executor::define_events_executor(m);
250253
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright 2025 Brad Martin
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
#include "events_executor/delayed_event_thread.hpp"
15+
16+
#include <utility>
17+
18+
namespace rclpy
19+
{
20+
namespace events_executor
21+
{
22+
23+
DelayedEventThread::DelayedEventThread(EventsQueue * events_queue)
24+
: events_queue_(events_queue), thread_([this]() {RunThread();})
25+
{
26+
}
27+
28+
DelayedEventThread::~DelayedEventThread()
29+
{
30+
{
31+
std::unique_lock<std::mutex> lock(mutex_);
32+
done_ = true;
33+
}
34+
cv_.notify_one();
35+
thread_.join();
36+
}
37+
38+
void DelayedEventThread::EnqueueAt(
39+
std::chrono::steady_clock::time_point when, std::function<void()> handler)
40+
{
41+
{
42+
std::unique_lock<std::mutex> lock(mutex_);
43+
when_ = when;
44+
handler_ = handler;
45+
}
46+
cv_.notify_one();
47+
}
48+
49+
void DelayedEventThread::RunThread()
50+
{
51+
std::unique_lock<std::mutex> lock(mutex_);
52+
while (!done_) {
53+
if (handler_) {
54+
// Make sure we don't dispatch a stale wait if it changes while we're waiting.
55+
const auto latched_when = when_;
56+
if (
57+
(std::cv_status::timeout == cv_.wait_until(lock, latched_when)) && handler_ &&
58+
(when_ <= latched_when))
59+
{
60+
auto handler = std::move(handler_);
61+
handler_ = {};
62+
events_queue_->Enqueue(std::move(handler));
63+
}
64+
} else {
65+
// Wait indefinitely until we get signaled that there's something worth looking at.
66+
cv_.wait(lock);
67+
}
68+
}
69+
}
70+
71+
} // namespace events_executor
72+
} // namespace rclpy
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright 2025 Brad Martin
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef RCLPY__EVENTS_EXECUTOR__DELAYED_EVENT_THREAD_HPP_
16+
#define RCLPY__EVENTS_EXECUTOR__DELAYED_EVENT_THREAD_HPP_
17+
18+
#include <chrono>
19+
#include <condition_variable>
20+
#include <functional>
21+
#include <mutex>
22+
#include <thread>
23+
24+
#include "events_executor/events_queue.hpp"
25+
26+
namespace rclpy
27+
{
28+
namespace events_executor
29+
{
30+
31+
/// This object manages posting an event handler to an EventsQueue after a specified delay. The
32+
/// current delay may be changed or canceled at any time. This is done by way of a self-contained
33+
/// child thread to perform the wait.
34+
class DelayedEventThread
35+
{
36+
public:
37+
/// The pointer is aliased and must live for the lifetime of this object.
38+
explicit DelayedEventThread(EventsQueue *);
39+
~DelayedEventThread();
40+
41+
/// Schedules an event handler to be enqueued at the specified time point. Replaces any previous
42+
/// wait and handler, which will never be dispatched if it has not been already.
43+
void EnqueueAt(std::chrono::steady_clock::time_point when, std::function<void()> handler);
44+
45+
/// Cancels any previously-scheduled handler.
46+
void Cancel() {EnqueueAt({}, {});}
47+
48+
private:
49+
void RunThread();
50+
51+
EventsQueue * const events_queue_;
52+
std::mutex mutex_;
53+
bool done_{};
54+
std::condition_variable cv_;
55+
std::chrono::steady_clock::time_point when_;
56+
std::function<void()> handler_;
57+
std::thread thread_;
58+
};
59+
60+
} // namespace events_executor
61+
} // namespace rclpy
62+
63+
#endif // RCLPY__EVENTS_EXECUTOR__DELAYED_EVENT_THREAD_HPP_

0 commit comments

Comments
 (0)