diff --git a/changelog.d/18964.misc b/changelog.d/18964.misc new file mode 100644 index 00000000000..69be53ad27b --- /dev/null +++ b/changelog.d/18964.misc @@ -0,0 +1 @@ +Fix `run_coroutine_in_background(...)` incorrectly handling logcontext. diff --git a/synapse/logging/context.py b/synapse/logging/context.py index aa4b98e7c76..b5b434f3a8d 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -802,13 +802,15 @@ def run_in_background( deferred returned by the function completes. To explain how the log contexts work here: - - When `run_in_background` is called, the current context is stored ("original"), - we kick off the background task in the current context, and we restore that - original context before returning - - When the background task finishes, we don't want to leak our context into the - reactor which would erroneously get attached to the next operation picked up by - the event loop. We add a callback to the deferred which will clear the logging - context after it finishes and yields control back to the reactor. + - When `run_in_background` is called, the calling logcontext is stored + ("original"), we kick off the background task in the current context, and we + restore that original context before returning. + - For a completed deferred, that's the end of the story. + - For an incomplete deferred, when the background task finishes, we don't want to + leak our context into the reactor which would erroneously get attached to the + next operation picked up by the event loop. We add a callback to the deferred + which will clear the logging context after it finishes and yields control back to + the reactor. Useful for wrapping functions that return a deferred or coroutine, which you don't yield or await on (for instance because you want to pass it to @@ -857,22 +859,36 @@ def run_in_background( # The deferred has already completed if d.called and not d.paused: - # The function should have maintained the logcontext, so we can - # optimise out the messing about + # If the function messes with logcontexts, we can assume it follows the Synapse + # logcontext rules (Rules for functions returning awaitables: "If the awaitable + # is already complete, the function returns with the same logcontext it started + # with."). If it function doesn't touch logcontexts at all, we can also assume + # the logcontext is unchanged. + # + # Either way, the function should have maintained the calling logcontext, so we + # can avoid messing with it further. Additionally, if the deferred has already + # completed, then it would be a mistake to then add a deferred callback (below) + # to reset the logcontext to the sentinel logcontext as that would run + # immediately (remember our goal is to maintain the calling logcontext when we + # return). return d - # The function may have reset the context before returning, so we need to restore it - # now. + # Since the function we called may follow the Synapse logcontext rules (Rules for + # functions returning awaitables: "If the awaitable is incomplete, the function + # clears the logcontext before returning"), the function may have reset the + # logcontext before returning, so we need to restore the calling logcontext now + # before we return ourselves. # # Our goal is to have the caller logcontext unchanged after firing off the # background task and returning. set_current_context(calling_context) - # The original logcontext will be restored when the deferred completes, but - # there is nothing waiting for it, so it will get leaked into the reactor (which - # would then get picked up by the next thing the reactor does). We therefore - # need to reset the logcontext here (set the `sentinel` logcontext) before - # yielding control back to the reactor. + # If the function we called is playing nice and following the Synapse logcontext + # rules, it will restore original calling logcontext when the deferred completes; + # but there is nothing waiting for it, so it will get leaked into the reactor (which + # would then get picked up by the next thing the reactor does). We therefore need to + # reset the logcontext here (set the `sentinel` logcontext) before yielding control + # back to the reactor. # # (If this feels asymmetric, consider it this way: we are # effectively forking a new thread of execution. We are @@ -894,10 +910,9 @@ def run_coroutine_in_background( Useful for wrapping coroutines that you don't yield or await on (for instance because you want to pass it to deferred.gatherResults()). - This is a special case of `run_in_background` where we can accept a - coroutine directly rather than a function. We can do this because coroutines - do not run until called, and so calling an async function without awaiting - cannot change the log contexts. + This is a special case of `run_in_background` where we can accept a coroutine + directly rather than a function. We can do this because coroutines do not continue + running once they have yielded. This is an ergonomic helper so we can do this: ```python @@ -908,33 +923,7 @@ def run_coroutine_in_background( run_in_background(lambda: func1(arg1)) ``` """ - calling_context = current_context() - - # Wrap the coroutine in a deferred, which will have the side effect of executing the - # coroutine in the background. - d = defer.ensureDeferred(coroutine) - - # The function may have reset the context before returning, so we need to restore it - # now. - # - # Our goal is to have the caller logcontext unchanged after firing off the - # background task and returning. - set_current_context(calling_context) - - # The original logcontext will be restored when the deferred completes, but - # there is nothing waiting for it, so it will get leaked into the reactor (which - # would then get picked up by the next thing the reactor does). We therefore - # need to reset the logcontext here (set the `sentinel` logcontext) before - # yielding control back to the reactor. - # - # (If this feels asymmetric, consider it this way: we are - # effectively forking a new thread of execution. We are - # probably currently within a ``with LoggingContext()`` block, - # which is supposed to have a single entry and exit point. But - # by spawning off another deferred, we are effectively - # adding a new exit point.) - d.addBoth(_set_context_cb, SENTINEL_CONTEXT) - return d + return run_in_background(lambda: coroutine) T = TypeVar("T") diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index 0ecf712bab7..651dd844835 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -22,7 +22,6 @@ import logging from typing import Callable, Generator, cast -import twisted.python.failure from twisted.internet import defer, reactor as _reactor from synapse.logging.context import ( @@ -33,6 +32,7 @@ current_context, make_deferred_yieldable, nested_logging_context, + run_coroutine_in_background, run_in_background, ) from synapse.types import ISynapseReactor @@ -249,73 +249,80 @@ async def competing_callback() -> None: # Back to the sentinel context self._check_test_key("sentinel") - def _test_run_in_background(self, function: Callable[[], object]) -> defer.Deferred: - sentinel_context = current_context() + async def _test_run_in_background(self, function: Callable[[], object]) -> None: + clock = Clock(reactor) + + # Sanity check that we start in the sentinel context + self._check_test_key("sentinel") - callback_completed = False + callback_finished = False with LoggingContext("foo"): - # fire off function, but don't wait on it. - d2 = run_in_background(function) + # Fire off the function, but don't wait on it. + deferred = run_in_background(function) + self._check_test_key("foo") - def cb(res: object) -> object: - nonlocal callback_completed - callback_completed = True - return res + def callback(result: object) -> object: + nonlocal callback_finished + callback_finished = True + # Pass through the result + return result - d2.addCallback(cb) + # We `addBoth` because when exceptions happen, we still want to mark the + # callback as finished so that the test can complete and we see the + # underlying error. + deferred.addBoth(callback) self._check_test_key("foo") - # now wait for the function under test to have run, and check that - # the logcontext is left in a sane state. - d2 = defer.Deferred() - - def check_logcontext() -> None: - if not callback_completed: - reactor.callLater(0.01, check_logcontext) - return + # Now wait for the function under test to have run, and check that + # the logcontext is left in a sane state. + while not callback_finished: + await clock.sleep(0) + self._check_test_key("foo") - # make sure that the context was reset before it got thrown back - # into the reactor - try: - self.assertIs(current_context(), sentinel_context) - d2.callback(None) - except BaseException: - d2.errback(twisted.python.failure.Failure()) - - reactor.callLater(0.01, check_logcontext) + self.assertTrue( + callback_finished, + "Callback never finished which means the test probably didn't wait long enough", + ) - # test is done once d2 finishes - return d2 + # Back to the sentinel context + self._check_test_key("sentinel") @logcontext_clean - def test_run_in_background_with_blocking_fn(self) -> defer.Deferred: + async def test_run_in_background_with_blocking_fn(self) -> None: async def blocking_function() -> None: await Clock(reactor).sleep(0) - return self._test_run_in_background(blocking_function) + await self._test_run_in_background(blocking_function) @logcontext_clean - def test_run_in_background_with_non_blocking_fn(self) -> defer.Deferred: + async def test_run_in_background_with_non_blocking_fn(self) -> None: @defer.inlineCallbacks def nonblocking_function() -> Generator["defer.Deferred[object]", object, None]: with PreserveLoggingContext(): yield defer.succeed(None) - return self._test_run_in_background(nonblocking_function) + await self._test_run_in_background(nonblocking_function) @logcontext_clean - def test_run_in_background_with_chained_deferred(self) -> defer.Deferred: + async def test_run_in_background_with_chained_deferred(self) -> None: # a function which returns a deferred which looks like it has been # called, but is actually paused def testfunc() -> defer.Deferred: return make_deferred_yieldable(_chained_deferred_function()) - return self._test_run_in_background(testfunc) + await self._test_run_in_background(testfunc) @logcontext_clean - def test_run_in_background_with_coroutine(self) -> defer.Deferred: + async def test_run_in_background_with_coroutine(self) -> None: + """ + Test `run_in_background` with a coroutine that yields control back to the + reactor. + + This will stress the logic around incomplete deferreds in `run_in_background`. + """ + async def testfunc() -> None: self._check_test_key("foo") d = defer.ensureDeferred(Clock(reactor).sleep(0)) @@ -323,14 +330,111 @@ async def testfunc() -> None: await d self._check_test_key("foo") - return self._test_run_in_background(testfunc) + await self._test_run_in_background(testfunc) @logcontext_clean - def test_run_in_background_with_nonblocking_coroutine(self) -> defer.Deferred: + async def test_run_in_background_with_nonblocking_coroutine(self) -> None: + """ + Test `run_in_background` with a "nonblocking" coroutine (never yields control + back to the reactor). + + This will stress the logic around completed deferreds in `run_in_background`. + """ + async def testfunc() -> None: self._check_test_key("foo") - return self._test_run_in_background(testfunc) + await self._test_run_in_background(testfunc) + + @logcontext_clean + async def test_run_coroutine_in_background(self) -> None: + """ + Test `run_coroutine_in_background` with a coroutine that yields control back to the + reactor. + + This will stress the logic around incomplete deferreds in `run_coroutine_in_background`. + """ + clock = Clock(reactor) + + # Sanity check that we start in the sentinel context + self._check_test_key("sentinel") + + callback_finished = False + + async def competing_callback() -> None: + nonlocal callback_finished + try: + # The callback should have the same logcontext as the caller + self._check_test_key("foo") + + with LoggingContext("competing"): + await clock.sleep(0) + self._check_test_key("competing") + + self._check_test_key("foo") + finally: + # When exceptions happen, we still want to mark the callback as finished + # so that the test can complete and we see the underlying error. + callback_finished = True + + with LoggingContext("foo"): + run_coroutine_in_background(competing_callback()) + self._check_test_key("foo") + await clock.sleep(0) + self._check_test_key("foo") + + self.assertTrue( + callback_finished, + "Callback never finished which means the test probably didn't wait long enough", + ) + + # Back to the sentinel context + self._check_test_key("sentinel") + + @logcontext_clean + async def test_run_coroutine_in_background_with_nonblocking_coroutine(self) -> None: + """ + Test `run_coroutine_in_background` with a "nonblocking" coroutine (never yields control + back to the reactor). + + This will stress the logic around completed deferreds in `run_coroutine_in_background`. + """ + # Sanity check that we start in the sentinel context + self._check_test_key("sentinel") + + callback_finished = False + + async def competing_callback() -> None: + nonlocal callback_finished + try: + # The callback should have the same logcontext as the caller + self._check_test_key("foo") + + with LoggingContext("competing"): + # We `await` here but there is nothing to wait for here since the + # deferred is already complete so we should immediately continue + # executing in the same context. + await defer.succeed(None) + + self._check_test_key("competing") + + self._check_test_key("foo") + finally: + # When exceptions happen, we still want to mark the callback as finished + # so that the test can complete and we see the underlying error. + callback_finished = True + + with LoggingContext("foo"): + run_coroutine_in_background(competing_callback()) + self._check_test_key("foo") + + self.assertTrue( + callback_finished, + "Callback never finished which means the test probably didn't wait long enough", + ) + + # Back to the sentinel context + self._check_test_key("sentinel") @logcontext_clean @defer.inlineCallbacks