diff --git a/changelog.d/18907.misc b/changelog.d/18907.misc new file mode 100644 index 00000000000..4fca9ec8fbe --- /dev/null +++ b/changelog.d/18907.misc @@ -0,0 +1 @@ +Remove `sentinel` logcontext usage in `Clock` utilities like `looping_call` and `call_later`. diff --git a/synapse/util/clock.py b/synapse/util/clock.py index 043b06a1087..d28dbac3579 100644 --- a/synapse/util/clock.py +++ b/synapse/util/clock.py @@ -23,6 +23,7 @@ from typing_extensions import ParamSpec from twisted.internet import defer, task +from twisted.internet.defer import Deferred from twisted.internet.interfaces import IDelayedCall from twisted.internet.task import LoopingCall @@ -46,6 +47,8 @@ class Clock: async def sleep(self, seconds: float) -> None: d: defer.Deferred[float] = defer.Deferred() + # Start task in the `sentinel` logcontext, to avoid leaking the current context + # into the reactor once it finishes. with context.PreserveLoggingContext(): self._reactor.callLater(seconds, d.callback, seconds) await d @@ -74,8 +77,9 @@ def looping_call( this functionality thanks to this function being a thin wrapper around `twisted.internet.task.LoopingCall`. - Note that the function will be called with no logcontext, so if it is anything - other than trivial, you probably want to wrap it in run_as_background_process. + Note that the function will be called with generic `looping_call` logcontext, so + if it is anything other than a trivial task, you probably want to wrap it in + `run_as_background_process` to give it more specific label and track metrics. Args: f: The function to call repeatedly. @@ -97,8 +101,9 @@ def looping_call_now( As with `looping_call`: subsequent calls are not scheduled until after the the Awaitable returned by a previous call has finished. - Also as with `looping_call`: the function is called with no logcontext and - you probably want to wrap it in `run_as_background_process`. + Note that the function will be called with generic `looping_call` logcontext, so + if it is anything other than a trivial task, you probably want to wrap it in + `run_as_background_process` to give it more specific label and track metrics. Args: f: The function to call repeatedly. @@ -117,9 +122,43 @@ def _looping_call_common( **kwargs: P.kwargs, ) -> LoopingCall: """Common functionality for `looping_call` and `looping_call_now`""" - call = task.LoopingCall(f, *args, **kwargs) + + def wrapped_f(*args: P.args, **kwargs: P.kwargs) -> Deferred: + assert context.current_context() is context.SENTINEL_CONTEXT, ( + "Expected `looping_call` callback from the reactor to start with the sentinel logcontext " + f"but saw {context.current_context()}. In other words, another task shouldn't have " + "leaked their logcontext to us." + ) + + # Because this is a callback from the reactor, we will be using the + # `sentinel` log context at this point. We want the function to log with + # some logcontext as we want to know which server the logs came from. + # + # We use `PreserveLoggingContext` to prevent our new `looping_call` + # logcontext from finishing as soon as we exit this function, in case `f` + # returns an awaitable/deferred which would continue running and may try to + # restore the `loop_call` context when it's done (because it's trying to + # adhere to the Synapse logcontext rules.) + # + # This also ensures that we return to the `sentinel` context when we exit + # this function and yield control back to the reactor to avoid leaking the + # current logcontext to the reactor (which would then get picked up and + # associated with the next thing the reactor does) + with context.PreserveLoggingContext(context.LoggingContext("looping_call")): + # We use `run_in_background` to reset the logcontext after `f` (or the + # awaitable returned by `f`) completes to avoid leaking the current + # logcontext to the reactor + return context.run_in_background(f, *args, **kwargs) + + call = task.LoopingCall(wrapped_f, *args, **kwargs) call.clock = self._reactor - d = call.start(msec / 1000.0, now=now) + # If `now=true`, the function will be called here immediately so we need to be + # in the sentinel context now. + # + # We want to start the task in the `sentinel` logcontext, to avoid leaking the + # current context into the reactor after the function finishes. + with context.PreserveLoggingContext(): + d = call.start(msec / 1000.0, now=now) d.addErrback(log_failure, "Looping call died", consumeErrors=False) return call @@ -128,8 +167,9 @@ def call_later( ) -> IDelayedCall: """Call something later - Note that the function will be called with no logcontext, so if it is anything - other than trivial, you probably want to wrap it in run_as_background_process. + Note that the function will be called with generic `call_later` logcontext, so + if it is anything other than a trivial task, you probably want to wrap it in + `run_as_background_process` to give it more specific label and track metrics. Args: delay: How long to wait in seconds. @@ -139,11 +179,33 @@ def call_later( """ def wrapped_callback(*args: Any, **kwargs: Any) -> None: - with context.PreserveLoggingContext(): - callback(*args, **kwargs) + assert context.current_context() is context.SENTINEL_CONTEXT, ( + "Expected `call_later` callback from the reactor to start with the sentinel logcontext " + f"but saw {context.current_context()}. In other words, another task shouldn't have " + "leaked their logcontext to us." + ) - with context.PreserveLoggingContext(): - return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) + # Because this is a callback from the reactor, we will be using the + # `sentinel` log context at this point. We want the function to log with + # some logcontext as we want to know which server the logs came from. + # + # We use `PreserveLoggingContext` to prevent our new `call_later` + # logcontext from finishing as soon as we exit this function, in case `f` + # returns an awaitable/deferred which would continue running and may try to + # restore the `loop_call` context when it's done (because it's trying to + # adhere to the Synapse logcontext rules.) + # + # This also ensures that we return to the `sentinel` context when we exit + # this function and yield control back to the reactor to avoid leaking the + # current logcontext to the reactor (which would then get picked up and + # associated with the next thing the reactor does) + with context.PreserveLoggingContext(context.LoggingContext("call_later")): + # We use `run_in_background` to reset the logcontext after `f` (or the + # awaitable returned by `f`) completes to avoid leaking the current + # logcontext to the reactor + context.run_in_background(callback, *args, **kwargs) + + return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) def cancel_call_later(self, timer: IDelayedCall, ignore_errs: bool = False) -> None: try: diff --git a/tests/push/test_email.py b/tests/push/test_email.py index 80a22044ddc..26819e2d3c5 100644 --- a/tests/push/test_email.py +++ b/tests/push/test_email.py @@ -31,6 +31,7 @@ import synapse.rest.admin from synapse.api.errors import Codes, SynapseError +from synapse.logging.context import make_deferred_yieldable from synapse.push.emailpusher import EmailPusher from synapse.rest.client import login, room from synapse.rest.synapse.client.unsubscribe import UnsubscribeResource @@ -89,7 +90,7 @@ def sendmail(*args: Any, **kwargs: Any) -> Deferred: # This mocks out synapse.reactor.send_email._sendmail. d: Deferred = Deferred() self.email_attempts.append((d, args, kwargs)) - return d + return make_deferred_yieldable(d) hs.get_send_email_handler()._sendmail = sendmail # type: ignore[assignment] diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index 43912d05dae..0ecf712bab7 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -19,6 +19,7 @@ # # +import logging from typing import Callable, Generator, cast import twisted.python.failure @@ -28,6 +29,7 @@ SENTINEL_CONTEXT, LoggingContext, PreserveLoggingContext, + _Sentinel, current_context, make_deferred_yieldable, nested_logging_context, @@ -36,7 +38,10 @@ from synapse.types import ISynapseReactor from synapse.util.clock import Clock -from .. import unittest +from tests import unittest +from tests.unittest import logcontext_clean + +logger = logging.getLogger(__name__) reactor = cast(ISynapseReactor, _reactor) @@ -44,33 +49,212 @@ class LoggingContextTestCase(unittest.TestCase): def _check_test_key(self, value: str) -> None: context = current_context() - assert isinstance(context, LoggingContext) - self.assertEqual(context.name, value) - + assert isinstance(context, LoggingContext) or isinstance(context, _Sentinel), ( + f"Expected LoggingContext({value}) but saw {context}" + ) + self.assertEqual( + str(context), value, f"Expected LoggingContext({value}) but saw {context}" + ) + + @logcontext_clean def test_with_context(self) -> None: with LoggingContext("test"): self._check_test_key("test") + @logcontext_clean async def test_sleep(self) -> None: + """ + Test `Clock.sleep` + """ clock = Clock(reactor) + # Sanity check that we start in the sentinel context + self._check_test_key("sentinel") + + callback_finished = False + async def competing_callback() -> None: - with LoggingContext("competing"): - await clock.sleep(0) - self._check_test_key("competing") + nonlocal callback_finished + try: + # A callback from the reactor should start with the sentinel context. In + # other words, another task shouldn't have leaked their context to us. + self._check_test_key("sentinel") + + with LoggingContext("competing"): + await clock.sleep(0) + self._check_test_key("competing") + + self._check_test_key("sentinel") + finally: + # When exceptions happen, we still want to mark the callback as finished + # so that the test can complete and we see the underlying error. + callback_finished = True reactor.callLater(0, lambda: defer.ensureDeferred(competing_callback())) - with LoggingContext("one"): + with LoggingContext("foo"): + await clock.sleep(0) + self._check_test_key("foo") + await clock.sleep(0) + self._check_test_key("foo") + + self.assertTrue( + callback_finished, + "Callback never finished which means the test probably didn't wait long enough", + ) + + # Back to the sentinel context + self._check_test_key("sentinel") + + @logcontext_clean + async def test_looping_call(self) -> None: + """ + Test `Clock.looping_call` + """ + clock = Clock(reactor) + + # Sanity check that we start in the sentinel context + self._check_test_key("sentinel") + + callback_finished = False + + async def competing_callback() -> None: + nonlocal callback_finished + try: + # A `looping_call` callback should have *some* logcontext since we should know + # which server spawned this loop and which server the logs came from. + self._check_test_key("looping_call") + + with LoggingContext("competing"): + await clock.sleep(0) + self._check_test_key("competing") + + self._check_test_key("looping_call") + finally: + # When exceptions happen, we still want to mark the callback as finished + # so that the test can complete and we see the underlying error. + callback_finished = True + + with LoggingContext("foo"): + lc = clock.looping_call( + lambda: defer.ensureDeferred(competing_callback()), 0 + ) + self._check_test_key("foo") + await clock.sleep(0) + self._check_test_key("foo") + await clock.sleep(0) + self._check_test_key("foo") + + self.assertTrue( + callback_finished, + "Callback never finished which means the test probably didn't wait long enough", + ) + + # Back to the sentinel context + self._check_test_key("sentinel") + + # Stop the looping call to prevent "Reactor was unclean" errors + lc.stop() + + @logcontext_clean + async def test_looping_call_now(self) -> None: + """ + Test `Clock.looping_call_now` + """ + clock = Clock(reactor) + + # Sanity check that we start in the sentinel context + self._check_test_key("sentinel") + + callback_finished = False + + async def competing_callback() -> None: + nonlocal callback_finished + try: + # A `looping_call` callback should have *some* logcontext since we should know + # which server spawned this loop and which server the logs came from. + self._check_test_key("looping_call") + + with LoggingContext("competing"): + await clock.sleep(0) + self._check_test_key("competing") + + self._check_test_key("looping_call") + finally: + # When exceptions happen, we still want to mark the callback as finished + # so that the test can complete and we see the underlying error. + callback_finished = True + + with LoggingContext("foo"): + lc = clock.looping_call_now( + lambda: defer.ensureDeferred(competing_callback()), 0 + ) + self._check_test_key("foo") + await clock.sleep(0) + self._check_test_key("foo") + + self.assertTrue( + callback_finished, + "Callback never finished which means the test probably didn't wait long enough", + ) + + # Back to the sentinel context + self._check_test_key("sentinel") + + # Stop the looping call to prevent "Reactor was unclean" errors + lc.stop() + + @logcontext_clean + async def test_call_later(self) -> None: + """ + Test `Clock.call_later` + """ + clock = Clock(reactor) + + # Sanity check that we start in the sentinel context + self._check_test_key("sentinel") + + callback_finished = False + + async def competing_callback() -> None: + nonlocal callback_finished + try: + # A `call_later` callback should have *some* logcontext since we should know + # which server spawned this loop and which server the logs came from. + self._check_test_key("call_later") + + with LoggingContext("competing"): + await clock.sleep(0) + self._check_test_key("competing") + + self._check_test_key("call_later") + finally: + # When exceptions happen, we still want to mark the callback as finished + # so that the test can complete and we see the underlying error. + callback_finished = True + + with LoggingContext("foo"): + clock.call_later(0, lambda: defer.ensureDeferred(competing_callback())) + self._check_test_key("foo") + await clock.sleep(0) + self._check_test_key("foo") await clock.sleep(0) - self._check_test_key("one") + self._check_test_key("foo") + + self.assertTrue( + callback_finished, + "Callback never finished which means the test probably didn't wait long enough", + ) + + # Back to the sentinel context + self._check_test_key("sentinel") def _test_run_in_background(self, function: Callable[[], object]) -> defer.Deferred: sentinel_context = current_context() callback_completed = False - with LoggingContext("one"): + with LoggingContext("foo"): # fire off function, but don't wait on it. d2 = run_in_background(function) @@ -81,7 +265,7 @@ def cb(res: object) -> object: d2.addCallback(cb) - self._check_test_key("one") + self._check_test_key("foo") # now wait for the function under test to have run, and check that # the logcontext is left in a sane state. @@ -105,12 +289,14 @@ def check_logcontext() -> None: # test is done once d2 finishes return d2 + @logcontext_clean def test_run_in_background_with_blocking_fn(self) -> defer.Deferred: async def blocking_function() -> None: await Clock(reactor).sleep(0) return self._test_run_in_background(blocking_function) + @logcontext_clean def test_run_in_background_with_non_blocking_fn(self) -> defer.Deferred: @defer.inlineCallbacks def nonblocking_function() -> Generator["defer.Deferred[object]", object, None]: @@ -119,6 +305,7 @@ def nonblocking_function() -> Generator["defer.Deferred[object]", object, None]: return self._test_run_in_background(nonblocking_function) + @logcontext_clean def test_run_in_background_with_chained_deferred(self) -> defer.Deferred: # a function which returns a deferred which looks like it has been # called, but is actually paused @@ -127,22 +314,25 @@ def testfunc() -> defer.Deferred: return self._test_run_in_background(testfunc) + @logcontext_clean def test_run_in_background_with_coroutine(self) -> defer.Deferred: async def testfunc() -> None: - self._check_test_key("one") + self._check_test_key("foo") d = defer.ensureDeferred(Clock(reactor).sleep(0)) self.assertIs(current_context(), SENTINEL_CONTEXT) await d - self._check_test_key("one") + self._check_test_key("foo") return self._test_run_in_background(testfunc) + @logcontext_clean def test_run_in_background_with_nonblocking_coroutine(self) -> defer.Deferred: async def testfunc() -> None: - self._check_test_key("one") + self._check_test_key("foo") return self._test_run_in_background(testfunc) + @logcontext_clean @defer.inlineCallbacks def test_make_deferred_yieldable( self, @@ -156,7 +346,7 @@ def blocking_function() -> defer.Deferred: sentinel_context = current_context() - with LoggingContext("one"): + with LoggingContext("foo"): d1 = make_deferred_yieldable(blocking_function()) # make sure that the context was reset by make_deferred_yieldable self.assertIs(current_context(), sentinel_context) @@ -164,15 +354,16 @@ def blocking_function() -> defer.Deferred: yield d1 # now it should be restored - self._check_test_key("one") + self._check_test_key("foo") + @logcontext_clean @defer.inlineCallbacks def test_make_deferred_yieldable_with_chained_deferreds( self, ) -> Generator["defer.Deferred[object]", object, None]: sentinel_context = current_context() - with LoggingContext("one"): + with LoggingContext("foo"): d1 = make_deferred_yieldable(_chained_deferred_function()) # make sure that the context was reset by make_deferred_yieldable self.assertIs(current_context(), sentinel_context) @@ -180,8 +371,9 @@ def test_make_deferred_yieldable_with_chained_deferreds( yield d1 # now it should be restored - self._check_test_key("one") + self._check_test_key("foo") + @logcontext_clean def test_nested_logging_context(self) -> None: with LoggingContext("foo"): nested_context = nested_logging_context(suffix="bar")