Skip to content

Commit

Permalink
Check if Task(Future) is canceled. (#1377)
Browse files Browse the repository at this point in the history
* Check if Task(Future) is canceled.

Signed-off-by: Tomoya Fujita <[email protected]>

* Close cancelled coroutine (#1394)

* Add FutureState

Signed-off-by: Nadav Elkabets <[email protected]>

* Close canceled coroutine

Signed-off-by: Nadav Elkabets <[email protected]>

* Fixed behavior in test

Signed-off-by: Nadav Elkabets <[email protected]>

---------

Signed-off-by: Nadav Elkabets <[email protected]>
Signed-off-by: Tomoya Fujita <[email protected]>

* address flake8 and pep257 failures.

Signed-off-by: Tomoya Fujita <[email protected]>

* Cancelled future is not done (#1397)

* Remove redundant coro.close

Signed-off-by: nadav <[email protected]>

* Only finished future is done

Signed-off-by: nadav <[email protected]>

* Add function _pending and fix checks

Signed-off-by: = <[email protected]>

* Replace check in done from pending to finished

Signed-off-by: = <[email protected]>

* Adapt test to new behavior

Signed-off-by: = <[email protected]>

* Add tests

Signed-off-by: = <[email protected]>

* Make changes within active task mutex

Signed-off-by: = <[email protected]>

---------

Signed-off-by: nadav <[email protected]>
Signed-off-by: = <[email protected]>

* keep the consistent behavior to avoid exception, and adjusted some tests accordingly.

Signed-off-by: Tomoya Fujita <[email protected]>

* revert doc section to raise the exception.

Signed-off-by: Tomoya Fujita <[email protected]>

* remove StrEnum and put logical operator in the beginning of line.

Signed-off-by: Tomoya Fujita <[email protected]>

* add more test to check Task state.

Signed-off-by: Tomoya Fujita <[email protected]>

---------

Signed-off-by: Tomoya Fujita <[email protected]>
Signed-off-by: Nadav Elkabets <[email protected]>
Signed-off-by: nadav <[email protected]>
Signed-off-by: = <[email protected]>
Co-authored-by: Nadav Elkabets <[email protected]>
Co-authored-by: Nadav Elkabets <[email protected]>
(cherry picked from commit 9a144bf)

# Conflicts:
#	rclpy/rclpy/executors.py
#	rclpy/rclpy/task.py
#	rclpy/test/test_executor.py
  • Loading branch information
fujitatomoya authored and mergify[bot] committed Jan 17, 2025
1 parent 85415f7 commit 134e0c7
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 11 deletions.
22 changes: 22 additions & 0 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,15 +317,35 @@ def spin_until_future_complete(
future.add_done_callback(lambda x: self.wake())

if timeout_sec is None or timeout_sec < 0:
<<<<<<< HEAD
while self._context.ok() and not future.done() and not self._is_shutdown:
self.spin_once_until_future_complete(future, timeout_sec)
=======
while (
self._context.ok()
and not future.done()
and not future.cancelled()
and not self._is_shutdown
):
self._spin_once_until_future_complete(future, timeout_sec)
>>>>>>> 9a144bf (Check if Task(Future) is canceled. (#1377))
else:
start = time.monotonic()
end = start + timeout_sec
timeout_left = TimeoutObject(timeout_sec)

<<<<<<< HEAD
while self._context.ok() and not future.done() and not self._is_shutdown:
self.spin_once_until_future_complete(future, timeout_left)
=======
while (
self._context.ok()
and not future.done()
and not future.cancelled()
and not self._is_shutdown
):
self._spin_once_until_future_complete(future, timeout_left)
>>>>>>> 9a144bf (Check if Task(Future) is canceled. (#1377))
now = time.monotonic()

if now >= end:
Expand Down Expand Up @@ -577,6 +597,8 @@ def _wait_for_ready_callbacks(
with self._tasks_lock:
# Get rid of any tasks that are done
self._tasks = list(filter(lambda t_e_n: not t_e_n[0].done(), self._tasks))
# Get rid of any tasks that are cancelled
self._tasks = list(filter(lambda t_e_n: not t_e_n[0].cancelled(), self._tasks))

# Gather entities that can be waited on
subscriptions: List[Subscription] = []
Expand Down
66 changes: 55 additions & 11 deletions rclpy/rclpy/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from enum import Enum
import inspect
import sys
import threading
Expand All @@ -24,6 +25,7 @@ def _fake_weakref():
return None


<<<<<<< HEAD
class Future:
"""Represent the outcome of a task in the future."""

Expand All @@ -32,6 +34,21 @@ def __init__(self, *, executor=None):
self._done = False
# true if the task is cancelled
self._cancelled = False
=======
class FutureState(Enum):
"""States defining the lifecycle of a future."""

PENDING = 'PENDING'
CANCELLED = 'CANCELLED'
FINISHED = 'FINISHED'


class Future(Generic[T]):
"""Represent the outcome of a task in the future."""

def __init__(self, *, executor: Optional['Executor'] = None) -> None:
self._state = FutureState.PENDING
>>>>>>> 9a144bf (Check if Task(Future) is canceled. (#1377))
# the final return value of the handler
self._result = None
# An exception raised by the handler when called
Expand All @@ -53,15 +70,24 @@ def __del__(self):

def __await__(self):
# Yield if the task is not finished
while not self._done:
while self._pending():
yield
return self.result()

<<<<<<< HEAD
def cancel(self):
=======
def _pending(self) -> bool:
return self._state == FutureState.PENDING

def cancel(self) -> None:
>>>>>>> 9a144bf (Check if Task(Future) is canceled. (#1377))
"""Request cancellation of the running task if it is not done already."""
with self._lock:
if not self._done:
self._cancelled = True
if not self._pending():
return

self._state = FutureState.CANCELLED
self._schedule_or_invoke_done_callbacks()

def cancelled(self):
Expand All @@ -71,7 +97,7 @@ def cancelled(self):
:return: True if the task was cancelled
:rtype: bool
"""
return self._cancelled
return self._state == FutureState.CANCELLED

def done(self):
"""
Expand All @@ -80,7 +106,7 @@ def done(self):
:return: True if the task is finished or raised while it was executing
:rtype: bool
"""
return self._done
return self._state == FutureState.FINISHED

def result(self):
"""
Expand Down Expand Up @@ -111,8 +137,8 @@ def set_result(self, result):
"""
with self._lock:
self._result = result
self._done = True
self._cancelled = False
self._state = FutureState.FINISHED

self._schedule_or_invoke_done_callbacks()

def set_exception(self, exception):
Expand All @@ -124,8 +150,8 @@ def set_exception(self, exception):
with self._lock:
self._exception = exception
self._exception_fetched = False
self._done = True
self._cancelled = False
self._state = FutureState.FINISHED

self._schedule_or_invoke_done_callbacks()

def _schedule_or_invoke_done_callbacks(self):
Expand Down Expand Up @@ -173,7 +199,12 @@ def add_done_callback(self, callback):
"""
invoke = False
with self._lock:
<<<<<<< HEAD
if self._done:
=======
if not self._pending():
assert self._executor is not None
>>>>>>> 9a144bf (Check if Task(Future) is canceled. (#1377))
executor = self._executor()
if executor is not None:
executor.create_task(callback, self)
Expand Down Expand Up @@ -226,10 +257,14 @@ def __call__(self):
The return value of the handler is stored as the task result.
"""
if self._done or self._executing or not self._task_lock.acquire(blocking=False):
if (
not self._pending() or
self._executing or
not self._task_lock.acquire(blocking=False)
):
return
try:
if self._done:
if not self._pending():
return
self._executing = True

Expand All @@ -239,7 +274,10 @@ def __call__(self):
self._handler.send(None)
except StopIteration as e:
# The coroutine finished; store the result
<<<<<<< HEAD
self._handler.close()
=======
>>>>>>> 9a144bf (Check if Task(Future) is canceled. (#1377))
self.set_result(e.value)
self._complete_task()
except Exception as e:
Expand Down Expand Up @@ -271,3 +309,9 @@ def executing(self):
:rtype: bool
"""
return self._executing

def cancel(self) -> None:
if self._pending() and inspect.iscoroutine(self._handler):
self._handler.close()

super().cancel()
24 changes: 24 additions & 0 deletions rclpy/test/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,31 @@ async def coroutine():
self.assertTrue(future.done())
self.assertEqual('Sentinel Result', future.result())

<<<<<<< HEAD
def test_create_task_normal_function(self):
=======
def test_create_task_coroutine_cancel(self) -> None:
self.assertIsNotNone(self.node.handle)
executor = SingleThreadedExecutor(context=self.context)
executor.add_node(self.node)

async def coroutine():
return 'Sentinel Result'

future = executor.create_task(coroutine)
self.assertFalse(future.done())
self.assertFalse(future.cancelled())

future.cancel()
self.assertTrue(future.cancelled())

executor.spin_until_future_complete(future)
self.assertFalse(future.done())
self.assertTrue(future.cancelled())
self.assertEqual(None, future.result())

def test_create_task_normal_function(self) -> None:
>>>>>>> 9a144bf (Check if Task(Future) is canceled. (#1377))
self.assertIsNotNone(self.node.handle)
executor = SingleThreadedExecutor(context=self.context)
executor.add_node(self.node)
Expand Down
33 changes: 33 additions & 0 deletions rclpy/test/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,39 @@ def cb(fut):
f.add_done_callback(cb)
assert called

def test_set_result_on_done_future_without_exception(self) -> None:
f = Future()
f.set_result(None)
self.assertTrue(f.done())
self.assertFalse(f.cancelled())
f.set_result(None)
self.assertTrue(f.done())
self.assertFalse(f.cancelled())

def test_set_result_on_cancelled_future_without_exception(self) -> None:
f = Future()
f.cancel()
self.assertTrue(f.cancelled())
self.assertFalse(f.done())
f.set_result(None)
self.assertTrue(f.done())

def test_set_exception_on_done_future_without_exception(self) -> None:
f = Future()
f.set_result(None)
self.assertIsNone(f.exception())
f.set_exception(Exception())
f.set_result(None)
self.assertIsNotNone(f.exception())

def test_set_exception_on_cancelled_future_without_exception(self) -> None:
f = Future()
f.cancel()
self.assertTrue(f.cancelled())
self.assertIsNone(f.exception())
f.set_exception(Exception())
self.assertIsNotNone(f.exception())


if __name__ == '__main__':
unittest.main()

0 comments on commit 134e0c7

Please sign in to comment.