diff --git a/changelog.d/18945.misc b/changelog.d/18945.misc new file mode 100644 index 00000000000..e49077c8f2e --- /dev/null +++ b/changelog.d/18945.misc @@ -0,0 +1 @@ +Introduce `Clock.add_system_event_trigger(...)` to wrap system event callback code in a logcontext, ensuring we can identify which server generated the logs. diff --git a/scripts-dev/mypy_synapse_plugin.py b/scripts-dev/mypy_synapse_plugin.py index 439a75fc7e2..e170aabdae3 100644 --- a/scripts-dev/mypy_synapse_plugin.py +++ b/scripts-dev/mypy_synapse_plugin.py @@ -74,6 +74,12 @@ category="synapse-reactor-clock", ) +PREFER_SYNAPSE_CLOCK_ADD_SYSTEM_EVENT_TRIGGER = ErrorCode( + "prefer-synapse-clock-add-system-event-trigger", + "`synapse.util.Clock.add_system_event_trigger` should be used instead of `reactor.addSystemEventTrigger`", + category="synapse-reactor-clock", +) + class Sentinel(enum.Enum): # defining a sentinel in this way allows mypy to correctly handle the @@ -242,6 +248,13 @@ def get_method_signature_hook( ): return check_call_when_running + if fullname in ( + "twisted.internet.interfaces.IReactorCore.addSystemEventTrigger", + "synapse.types.ISynapseThreadlessReactor.addSystemEventTrigger", + "synapse.types.ISynapseReactor.addSystemEventTrigger", + ): + return check_add_system_event_trigger + return None @@ -272,6 +285,33 @@ def check_call_when_running(ctx: MethodSigContext) -> CallableType: return signature +def check_add_system_event_trigger(ctx: MethodSigContext) -> CallableType: + """ + Ensure that the `reactor.addSystemEventTrigger` callsites aren't used. + + `synapse.util.Clock.add_system_event_trigger` should always be used instead of + `reactor.addSystemEventTrigger`. + + Since `reactor.addSystemEventTrigger` is a reactor callback, the callback will start out + with the sentinel logcontext. `synapse.util.Clock` starts a default logcontext as we + want to know which server the logs came from. + + Args: + ctx: The `FunctionSigContext` from mypy. + """ + signature: CallableType = ctx.default_signature + ctx.api.fail( + ( + "Expected all `reactor.addSystemEventTrigger` calls to use `synapse.util.Clock.add_system_event_trigger` instead. " + "This is so all Synapse code runs with a logcontext as we want to know which server the logs came from." + ), + ctx.context, + code=PREFER_SYNAPSE_CLOCK_ADD_SYSTEM_EVENT_TRIGGER, + ) + + return signature + + def analyze_prometheus_metric_classes(ctx: ClassDefContext) -> None: """ Cross-check the list of Prometheus metric classes against the diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 02c56496bfb..1cf76d2a0b4 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -518,7 +518,9 @@ async def start(hs: "HomeServer") -> None: # numbers of DNS requests don't starve out other users of the threadpool. resolver_threadpool = ThreadPool(name="gai_resolver") resolver_threadpool.start() - reactor.addSystemEventTrigger("during", "shutdown", resolver_threadpool.stop) + hs.get_clock().add_system_event_trigger( + "during", "shutdown", resolver_threadpool.stop + ) reactor.installNameResolver( GAIResolver(reactor, getThreadPool=lambda: resolver_threadpool) ) @@ -605,7 +607,7 @@ def log_shutdown() -> None: logger.info("Shutting down...") # Log when we start the shut down process. - hs.get_reactor().addSystemEventTrigger("before", "shutdown", log_shutdown) + hs.get_clock().add_system_event_trigger("before", "shutdown", log_shutdown) setup_sentry(hs) setup_sdnotify(hs) @@ -720,7 +722,7 @@ def setup_sdnotify(hs: "HomeServer") -> None: # we're not using systemd. sdnotify(b"READY=1\nMAINPID=%i" % (os.getpid(),)) - hs.get_reactor().addSystemEventTrigger( + hs.get_clock().add_system_event_trigger( "before", "shutdown", sdnotify, b"STOPPING=1" ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index d7de20f8841..c787c847bd9 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -541,7 +541,7 @@ def __init__(self, hs: "HomeServer"): self.send_stop_syncing, UPDATE_SYNCING_USERS_MS ) - hs.get_reactor().addSystemEventTrigger( + hs.get_clock().add_system_event_trigger( "before", "shutdown", run_as_background_process, @@ -842,7 +842,7 @@ def __init__(self, hs: "HomeServer"): # have not yet been persisted self.unpersisted_users_changes: Set[str] = set() - hs.get_reactor().addSystemEventTrigger( + hs.get_clock().add_system_event_trigger( "before", "shutdown", run_as_background_process, diff --git a/synapse/server.py b/synapse/server.py index 00862eb1376..118dee70512 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -1007,7 +1007,7 @@ def get_media_sender_thread_pool(self) -> ThreadPool: ) media_threadpool.start() - self.get_reactor().addSystemEventTrigger( + self.get_clock().add_system_event_trigger( "during", "shutdown", media_threadpool.stop ) diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index cf7bc4ac693..c7a330cc83d 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -455,7 +455,7 @@ def __init__( self._client_ip_looper = self._clock.looping_call( self._update_client_ips_batch, 5 * 1000 ) - self.hs.get_reactor().addSystemEventTrigger( + self.hs.get_clock().add_system_event_trigger( "before", "shutdown", self._update_client_ips_batch ) diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py index 27c3578a316..d0e4a91b595 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py @@ -99,7 +99,7 @@ def __init__( # lead to a race, as we may drop the lock while we are still processing. # However, a) it should be a small window, b) the lock is best effort # anyway and c) we want to really avoid leaking locks when we restart. - hs.get_reactor().addSystemEventTrigger( + hs.get_clock().add_system_event_trigger( "before", "shutdown", self._on_shutdown, diff --git a/synapse/util/clock.py b/synapse/util/clock.py index 8d6ab007ba1..043b06a1087 100644 --- a/synapse/util/clock.py +++ b/synapse/util/clock.py @@ -206,3 +206,59 @@ def wrapped_callback(*args: Any, **kwargs: Any) -> None: # We can ignore the lint here since this class is the one location # callWhenRunning should be called. self._reactor.callWhenRunning(wrapped_callback, *args, **kwargs) # type: ignore[prefer-synapse-clock-call-when-running] + + def add_system_event_trigger( + self, + phase: str, + event_type: str, + callback: Callable[P, object], + *args: P.args, + **kwargs: P.kwargs, + ) -> None: + """ + Add a function to be called when a system event occurs. + + Equivalent to `reactor.addSystemEventTrigger` (see the that docstring for more + details), but ensures that the callback is run in a logging context. + + Args: + phase: a time to call the event -- either the string 'before', 'after', or + 'during', describing when to call it relative to the event's execution. + eventType: this is a string describing the type of event. + callback: Function to call + *args: Postional arguments to pass to function. + **kwargs: Key arguments to pass to function. + """ + + def wrapped_callback(*args: Any, **kwargs: Any) -> None: + assert context.current_context() is context.SENTINEL_CONTEXT, ( + "Expected `add_system_event_trigger` callback from the reactor to start with the sentinel logcontext " + f"but saw {context.current_context()}. In other words, another task shouldn't have " + "leaked their logcontext to us." + ) + + # Because this is a callback from the reactor, we will be using the + # `sentinel` log context at this point. We want the function to log with + # some logcontext as we want to know which server the logs came from. + # + # We use `PreserveLoggingContext` to prevent our new `system_event` + # logcontext from finishing as soon as we exit this function, in case `f` + # returns an awaitable/deferred which would continue running and may try to + # restore the `loop_call` context when it's done (because it's trying to + # adhere to the Synapse logcontext rules.) + # + # This also ensures that we return to the `sentinel` context when we exit + # this function and yield control back to the reactor to avoid leaking the + # current logcontext to the reactor (which would then get picked up and + # associated with the next thing the reactor does) + with context.PreserveLoggingContext(context.LoggingContext("system_event")): + # We use `run_in_background` to reset the logcontext after `f` (or the + # awaitable returned by `f`) completes to avoid leaking the current + # logcontext to the reactor + context.run_in_background(callback, *args, **kwargs) + + # We can ignore the lint here since this class is the one location + # `addSystemEventTrigger` should be called. + self._reactor.addSystemEventTrigger( + phase, event_type, wrapped_callback, *args, **kwargs + ) # type: ignore[prefer-synapse-clock-add-system-event-trigger]