diff --git a/trio/_core/__init__.py b/trio/_core/__init__.py index fda027e193..bf73a73c11 100644 --- a/trio/_core/__init__.py +++ b/trio/_core/__init__.py @@ -32,7 +32,7 @@ reattach_detached_coroutine_object ) -from ._entry_queue import TrioToken +from ._entry_queue import TrioToken, TrioEntryHandle from ._parking_lot import ParkingLot diff --git a/trio/_core/_entry_queue.py b/trio/_core/_entry_queue.py index 97b1c56fa4..4f42af4bce 100644 --- a/trio/_core/_entry_queue.py +++ b/trio/_core/_entry_queue.py @@ -1,10 +1,12 @@ from collections import deque import threading +import warnings import attr from .. import _core from ._wakeup_socketpair import WakeupSocketpair +from .._deprecate import deprecated __all__ = ["TrioToken"] @@ -33,6 +35,9 @@ class EntryQueue: # just to make 1 assignment, so that's atomic WRT a signal anyway. lock = attr.ib(factory=threading.RLock) + live_handles = attr.ib(default=0) + live_handles_lock = attr.ib(factory=threading.Lock) + async def task(self): assert _core.currently_ki_protected() # RLock has two implementations: a signal-safe version in _thread, and @@ -122,6 +127,136 @@ def run_sync_soon(self, sync_fn, *args, idempotent=False): self.queue.append((sync_fn, args)) self.wakeup.wakeup_thread_and_signal_safe() + def open_handle(self): + with self.live_handles_lock: + self.live_handles += 1 + return TrioEntryHandle(self) + + +class TrioEntryHandle: + def __init__(self, reentry_queue): + self._reentry_queue = reentry_queue + self._thread = threading.current_thread() + self._lock = threading.RLock() + + def run_sync_soon(self, sync_fn, *args, idempotent=False): + """Schedule a call to ``sync_fn(*args)`` to occur in the context of a + Trio task. + + Most functions in Trio are only safe to call from inside the main Trio + thread, and never from signal handlers or ``__del__`` methods. But + this method is safe to call from the main thread, from other threads, + from signal handlers, and from ``__del__`` methods. This is the + fundamental primitive used to re-enter the Trio run loop from outside + of it. + + The call will happen "soon", but there's no guarantee about exactly + when, and no mechanism provided for finding out when it's happened. + If you need this, you'll have to build your own. + + The call is effectively run as part of a system task (see + :func:`~trio.hazmat.spawn_system_task`). In particular this means + that: + + * :exc:`KeyboardInterrupt` protection is *enabled* by default. Your + function won't be interrupted by control-C. + + * If ``sync_fn`` raises an exception, then it's converted into a + :exc:`~trio.TrioInternalError` and *all* tasks are cancelled. You + should be careful that ``sync_fn`` doesn't raise an exception. + + All calls with ``idempotent=False`` are processed in strict + first-in first-out order. + + If ``idempotent=True``, then ``sync_fn`` and ``args`` must be + hashable, and Trio will make a best-effort attempt to discard any call + submission which is equal to an already-pending call. Trio will make + an attempt to process these in first-in first-out order, but no + guarantees. (Currently processing is FIFO on CPython 3.6+ and on PyPy, + but not CPython 3.5.) + + Any ordering guarantees apply separately to ``idempotent=False`` + and ``idempotent=True`` calls; there's no rule for how calls in the + different categories are ordered with respect to each other. + + Raises: + trio.ClosedResourceError: If this handle has already been closed. + trio.RunFinishedError: If the associated call to :func:`trio.run` + has already exited. + + """ + with self._lock: + if self._reentry_queue is None: + raise _core.ClosedResourceError + self._reentry_queue.run_sync_soon( + sync_fn, *args, idempotent=idempotent + ) + + def close(self): + """Close this entry handle. + + After the entry handle is closed, it cannot be used. + + This method is thread-safe: you can call it from any thread at any + time. It is *not* reentrant-safe: you should *never* call it from a + signal handler or ``__del__`` method. + + .. warning:: It is very important to always close your entry handles + when you are done with them! If you don't, Trio won't be able to + detect when a program deadlocks. + + """ + with self._lock: + # This code can't mutate anything – see the comment below. + if self._reentry_queue is None: + return + rq = self._reentry_queue + + # We have the lock, so we can't be racing with another thread + # calling close() or run_sync_soon(). And by assumption, no signal + # handler or __del__ method can call close(). So we don't need to + # worry about multiple close() calls racing with each other. + # + # BUT, a signal handler or __del__ method *could* call + # run_sync_soon at any point during this call, i.e. the + # interpreter could pause interpreting this function while it uses + # our thread to execute a call to run_sync_soon, and we need to + # handle that correctly. + # + # The next line is the barrier for any reentrant run_sync_soon + # calls. If they get called before this assignment takes effect, + # they're fine; we haven't done anything yet. If not, then they'll + # raise ClosedResourceError. + self._reentry_queue = None + + with rq.live_handles_lock: + rq.live_handles -= 1 + if rq.live_handles == 0: + try: + rq.run_sync_soon(lambda: None) + except _core.RunFinishedError: + pass + + def __del__(self): + # Closing ourselves reliably from __del__ would be very tricky, and if + # anyone did rely on it then it would almost certainly break the + # deadlock detector. (If you rely on the GC to detect when you're + # deadlocked, that's a problem, because deadlocked programs don't tend + # to trigger the GC.) So instead we just issue a noisy warning. + if self._reentry_queue is not None: + warnings.warn( + RuntimeWarning( + "failed to close TrioEntryHandle. this is a bug!" + ), + source=self, + ) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + self.close() + class TrioToken: """An opaque object representing a single call to :func:`trio.run`. @@ -147,6 +282,7 @@ class TrioToken: def __init__(self, reentry_queue): self._reentry_queue = reentry_queue + @deprecated("0.12.0", issue=1085, instead="open_trio_entry_handle") def run_sync_soon(self, sync_fn, *args, idempotent=False): """Schedule a call to ``sync_fn(*args)`` to occur in the context of a Trio task. diff --git a/trio/_core/_run.py b/trio/_core/_run.py index 96da9262bf..edf3ca2316 100644 --- a/trio/_core/_run.py +++ b/trio/_core/_run.py @@ -1482,6 +1482,14 @@ def current_trio_token(self): self.trio_token = TrioToken(self.entry_queue) return self.trio_token + @_public + def open_trio_entry_handle(self): + """Open a new `TrioEntryHandle`, to let you reenter the current call + to `trio.run` from other contexts. + + """ + return self.entry_queue.open_handle() + ################ # KI handling ################ diff --git a/trio/_signals.py b/trio/_signals.py index 2ebb4a0a5a..5817182aef 100644 --- a/trio/_signals.py +++ b/trio/_signals.py @@ -154,14 +154,14 @@ def open_signal_receiver(*signals): "Sorry, open_signal_receiver is only possible when running in " "Python interpreter's main thread" ) - token = trio.hazmat.current_trio_token() - queue = SignalReceiver() + with trio.hazmat.open_trio_entry_handle() as entry_handle: + queue = SignalReceiver() - def handler(signum, _): - token.run_sync_soon(queue._add, signum, idempotent=True) + def handler(signum, _): + entry_handle.run_sync_soon(queue._add, signum, idempotent=True) - try: - with _signal_handler(signals, handler): - yield queue - finally: - queue._redeliver_remaining() + try: + with _signal_handler(signals, handler): + yield queue + finally: + queue._redeliver_remaining() diff --git a/trio/hazmat.py b/trio/hazmat.py index 5fe32c03d9..0c0b14b492 100644 --- a/trio/hazmat.py +++ b/trio/hazmat.py @@ -20,7 +20,8 @@ permanently_detach_coroutine_object, reattach_detached_coroutine_object, current_statistics, reschedule, remove_instrument, add_instrument, current_clock, current_root_task, checkpoint_if_cancelled, - spawn_system_task, wait_readable, wait_writable, notify_closing + spawn_system_task, wait_readable, wait_writable, notify_closing, + open_trio_entry_handle, TrioEntryHandle ) # Unix-specific symbols diff --git a/trio/testing/_mock_clock.py b/trio/testing/_mock_clock.py index 73b27f3467..d1e31c9e25 100644 --- a/trio/testing/_mock_clock.py +++ b/trio/testing/_mock_clock.py @@ -177,8 +177,8 @@ def _real_to_virtual(self, real): return self._virtual_base + virtual_offset def start_clock(self): - token = _core.current_trio_token() - token.run_sync_soon(self._maybe_spawn_autojump_task) + with _core.open_trio_entry_handle() as handle: + handle.run_sync_soon(self._maybe_spawn_autojump_task) def current_time(self): return self._real_to_virtual(self._real_clock()) diff --git a/trio/tests/test_signals.py b/trio/tests/test_signals.py index 7ae930403c..2d46f1e064 100644 --- a/trio/tests/test_signals.py +++ b/trio/tests/test_signals.py @@ -70,8 +70,8 @@ async def test_open_signal_receiver_conflict(): # processed. async def wait_run_sync_soon_idempotent_queue_barrier(): ev = trio.Event() - token = _core.current_trio_token() - token.run_sync_soon(ev.set, idempotent=True) + with _core.open_trio_entry_handle() as handle: + handle.run_sync_soon(ev.set, idempotent=True) await ev.wait()