Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throw CancelledException into cancelled tasks #1392

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rclpy/rclpy/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,3 +155,6 @@ class ROSInterruptException(Exception):

def __init__(self) -> None:
Exception.__init__(self, 'rclpy.shutdown() has been called')

class CancelledException(BaseException):
"""The Future or Task was cancelled."""
16 changes: 4 additions & 12 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ def __init__(self, *, context: Optional[Context] = None) -> None:
self._nodes_lock = RLock()
# Tasks to be executed (oldest first) 3-tuple Task, Entity, Node
self._tasks: List[Tuple[Task, Optional[WaitableEntityType], Optional[Node]]] = []
# Prevent creating tasks while filtering old tasks
self._tasks_lock = Lock()
# This is triggered when wait_for_ready_callbacks should rebuild the wait list
self._guard = GuardCondition(
Expand Down Expand Up @@ -608,20 +609,15 @@ def _wait_for_ready_callbacks(
nodes_to_use = self.get_nodes()

# Yield tasks in-progress before waiting for new work
tasks = None
with self._tasks_lock:
tasks = list(self._tasks)
if tasks:
for task, entity, node in tasks:
if self._tasks:
for task, entity, node in self._tasks:
if (not task.executing() and not task.done() and
(node is None or node in nodes_to_use)):
yielded_work = True
yield task, entity, node
with self._tasks_lock:
# Get rid of any tasks that are done
self._tasks = list(filter(lambda t_e_n: not t_e_n[0].done(), self._tasks))
# Get rid of any tasks that are cancelled
self._tasks = list(filter(lambda t_e_n: not t_e_n[0].cancelled(), self._tasks))
self._tasks = list(filter(lambda t_e_n: not (t_e_n[0].done() and t_e_n[0].exception_claimed()), self._tasks))

# Gather entities that can be waited on
subscriptions: List[Subscription] = []
Expand Down Expand Up @@ -863,10 +859,6 @@ def _spin_once_impl(
pass
else:
handler()
if handler.exception() is not None:
raise handler.exception()

handler.result() # raise any exceptions

def spin_once(self, timeout_sec: Optional[float] = None) -> None:
self._spin_once_impl(timeout_sec)
Expand Down
116 changes: 73 additions & 43 deletions rclpy/rclpy/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
Optional, TYPE_CHECKING, TypeVar, Union)
import warnings
import weakref
from enum import StrEnum, auto

from rclpy.exceptions import CancelledException

if TYPE_CHECKING:
from rclpy.executors import Executor
Expand All @@ -31,19 +34,25 @@ def _fake_weakref() -> None:
return None


class FutureState(StrEnum):
"""States defining the lifecycle of a future"""
PENDING = auto()
CANCELLED = auto()
FINISHED = auto()


class Future(Generic[T]):
"""Represent the outcome of a task in the future."""

def __init__(self, *, executor: Optional['Executor'] = None) -> None:
# true if the task is done or cancelled
self._done = False
# true if the task is cancelled
self._cancelled = False
self._state = FutureState.PENDING
# the final return value of the handler
self._result: Optional[T] = None
# An exception raised by the handler when called
self._exception: Optional[Exception] = None
self._exception_fetched = False
# Indicates if the __await__ method was called
self._awaited = False
# callbacks to be scheduled after this task completes
self._callbacks: List[Callable[['Future[T]'], None]] = []
# Lock for threadsafety
Expand All @@ -61,15 +70,17 @@ def __del__(self) -> None:

def __await__(self) -> Generator[None, None, Optional[T]]:
# Yield if the task is not finished
while not self._done and not self._cancelled:
self._awaited = True
while not self.done():
yield
return self.result()

def cancel(self) -> None:
"""Request cancellation of the running task if it is not done already."""
with self._lock:
if not self._done:
self._cancelled = True
if not self.done():
self._state = FutureState.CANCELLED

self._schedule_or_invoke_done_callbacks()

def cancelled(self) -> bool:
Expand All @@ -78,15 +89,15 @@ def cancelled(self) -> bool:

:return: True if the task was cancelled
"""
return self._cancelled
return self._state == FutureState.CANCELLED

def done(self) -> bool:
"""
Indicate if the task has finished executing.

:return: True if the task is finished or raised while it was executing
"""
return self._done
return self._state != FutureState.PENDING

def result(self) -> Optional[T]:
"""
Expand Down Expand Up @@ -118,8 +129,7 @@ def set_result(self, result: T) -> None:
"""
with self._lock:
self._result = result
self._done = True
self._cancelled = False
self._state = FutureState.FINISHED
self._schedule_or_invoke_done_callbacks()

def set_exception(self, exception: Exception) -> None:
Expand All @@ -131,8 +141,7 @@ def set_exception(self, exception: Exception) -> None:
with self._lock:
self._exception = exception
self._exception_fetched = False
self._done = True
self._cancelled = False
self._state = FutureState.FINISHED
self._schedule_or_invoke_done_callbacks()

def _schedule_or_invoke_done_callbacks(self) -> None:
Expand Down Expand Up @@ -181,7 +190,7 @@ def add_done_callback(self, callback: Callable[['Future[T]'], None]) -> None:
"""
invoke = False
with self._lock:
if self._done:
if self.done():
assert self._executor is not None
executor = self._executor()
if executor is not None:
Expand All @@ -195,6 +204,19 @@ def add_done_callback(self, callback: Callable[['Future[T]'], None]) -> None:
if invoke:
callback(self)

def exception_claimed(self) -> bool:
"""
Indicate if an outside entity claimed the result

:return: True if the task ended without exception or the exception was retrieved, False if the task is not done
:raises: Exception if the task ended in one and the task is not awaited or the exception was not retrieved
"""
if not self.done():
return False
if self._result or self._awaited or (self._exception and self._exception_fetched):
return True

raise self._exception

class Task(Future[T]):
"""
Expand Down Expand Up @@ -229,6 +251,8 @@ def __init__(self,
self._executing = False
# Lock acquired to prevent task from executing in parallel with itself
self._task_lock = threading.Lock()
# True if the user requested to cancel the task
self._must_cancel = False

def __call__(self) -> None:
"""
Expand All @@ -240,42 +264,48 @@ def __call__(self) -> None:
The return value of the handler is stored as the task result.
"""
if (
self._done or
self._cancelled or
self.done() or
self._executing or
not self._task_lock.acquire(blocking=False)
):
return
try:
if self._done:
return
self._executing = True

if inspect.iscoroutine(self._handler):
# Execute a coroutine
handler = cast(Coroutine[None, None, T], self._handler)
try:
self._executing = True

if inspect.iscoroutine(self._handler):
# Execute a coroutine
handler = cast(Coroutine[None, None, T], self._handler)
try:
if self._must_cancel:
handler.throw(CancelledException())
else:
handler.send(None)
except StopIteration as e:
# The coroutine finished; store the result
handler.close()
self.set_result(e.value)
self._complete_task()
except Exception as e:
self.set_exception(e)
self._complete_task()
else:
# Execute a normal function
try:
assert self._handler is not None and callable(self._handler)
self.set_result(self._handler(*self._args, **self._kwargs))
except Exception as e:
self.set_exception(e)
self._complete_task()
return
except StopIteration as e:
# The coroutine finished; store the result
self.set_result(e.value)
except CancelledException as e:
with self._lock:
self._exception = e
self._exception_fetched = False
super().cancel()
except Exception as e:
self.set_exception(e)

self._executing = False
finally:
self._task_lock.release()
else:
# Execute a normal function
try:
assert self._handler is not None and callable(self._handler)
self.set_result(self._handler(*self._args, **self._kwargs))
except Exception as e:
self.set_exception(e)

self._complete_task()
self._executing = False
self._task_lock.release()

def cancel(self) -> None:
self._must_cancel = True

def _complete_task(self) -> None:
"""Cleanup after task finished."""
Expand Down