Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18945.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Introduce `Clock.add_system_event_trigger(...)` to wrap system event callback code in a logcontext, ensuring we can identify which server generated the logs.
40 changes: 40 additions & 0 deletions scripts-dev/mypy_synapse_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@
category="synapse-reactor-clock",
)

PREFER_SYNAPSE_CLOCK_ADD_SYSTEM_EVENT_TRIGGER = ErrorCode(
"prefer-synapse-clock-add-system-event-trigger",
"`synapse.util.Clock.add_system_event_trigger` should be used instead of `reactor.addSystemEventTrigger`",
category="synapse-reactor-clock",
)


class Sentinel(enum.Enum):
# defining a sentinel in this way allows mypy to correctly handle the
Expand Down Expand Up @@ -242,6 +248,13 @@ def get_method_signature_hook(
):
return check_call_when_running

if fullname in (
"twisted.internet.interfaces.IReactorCore.addSystemEventTrigger",
"synapse.types.ISynapseThreadlessReactor.addSystemEventTrigger",
"synapse.types.ISynapseReactor.addSystemEventTrigger",
):
return check_add_system_event_trigger

return None


Expand Down Expand Up @@ -272,6 +285,33 @@ def check_call_when_running(ctx: MethodSigContext) -> CallableType:
return signature


def check_add_system_event_trigger(ctx: MethodSigContext) -> CallableType:
"""
Ensure that the `reactor.addSystemEventTrigger` callsites aren't used.

`synapse.util.Clock.add_system_event_trigger` should always be used instead of
`reactor.addSystemEventTrigger`.

Since `reactor.addSystemEventTrigger` is a reactor callback, the callback will start out
with the sentinel logcontext. `synapse.util.Clock` starts a default logcontext as we
want to know which server the logs came from.

Args:
ctx: The `FunctionSigContext` from mypy.
"""
signature: CallableType = ctx.default_signature
ctx.api.fail(
(
"Expected all `reactor.addSystemEventTrigger` calls to use `synapse.util.Clock.add_system_event_trigger` instead. "
"This is so all Synapse code runs with a logcontext as we want to know which server the logs came from."
),
ctx.context,
code=PREFER_SYNAPSE_CLOCK_ADD_SYSTEM_EVENT_TRIGGER,
)

return signature


def analyze_prometheus_metric_classes(ctx: ClassDefContext) -> None:
"""
Cross-check the list of Prometheus metric classes against the
Expand Down
8 changes: 5 additions & 3 deletions synapse/app/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,9 @@ async def start(hs: "HomeServer") -> None:
# numbers of DNS requests don't starve out other users of the threadpool.
resolver_threadpool = ThreadPool(name="gai_resolver")
resolver_threadpool.start()
reactor.addSystemEventTrigger("during", "shutdown", resolver_threadpool.stop)
hs.get_clock().add_system_event_trigger(
"during", "shutdown", resolver_threadpool.stop
)
reactor.installNameResolver(
GAIResolver(reactor, getThreadPool=lambda: resolver_threadpool)
)
Expand Down Expand Up @@ -605,7 +607,7 @@ def log_shutdown() -> None:
logger.info("Shutting down...")

# Log when we start the shut down process.
hs.get_reactor().addSystemEventTrigger("before", "shutdown", log_shutdown)
hs.get_clock().add_system_event_trigger("before", "shutdown", log_shutdown)

setup_sentry(hs)
setup_sdnotify(hs)
Expand Down Expand Up @@ -720,7 +722,7 @@ def setup_sdnotify(hs: "HomeServer") -> None:
# we're not using systemd.
sdnotify(b"READY=1\nMAINPID=%i" % (os.getpid(),))

hs.get_reactor().addSystemEventTrigger(
hs.get_clock().add_system_event_trigger(
"before", "shutdown", sdnotify, b"STOPPING=1"
)

Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ def __init__(self, hs: "HomeServer"):
self.send_stop_syncing, UPDATE_SYNCING_USERS_MS
)

hs.get_reactor().addSystemEventTrigger(
hs.get_clock().add_system_event_trigger(
"before",
"shutdown",
run_as_background_process,
Expand Down Expand Up @@ -842,7 +842,7 @@ def __init__(self, hs: "HomeServer"):
# have not yet been persisted
self.unpersisted_users_changes: Set[str] = set()

hs.get_reactor().addSystemEventTrigger(
hs.get_clock().add_system_event_trigger(
"before",
"shutdown",
run_as_background_process,
Expand Down
2 changes: 1 addition & 1 deletion synapse/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ def get_media_sender_thread_pool(self) -> ThreadPool:
)

media_threadpool.start()
self.get_reactor().addSystemEventTrigger(
self.get_clock().add_system_event_trigger(
"during", "shutdown", media_threadpool.stop
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ def __init__(
self._client_ip_looper = self._clock.looping_call(
self._update_client_ips_batch, 5 * 1000
)
self.hs.get_reactor().addSystemEventTrigger(
self.hs.get_clock().add_system_event_trigger(
"before", "shutdown", self._update_client_ips_batch
)

Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def __init__(
# lead to a race, as we may drop the lock while we are still processing.
# However, a) it should be a small window, b) the lock is best effort
# anyway and c) we want to really avoid leaking locks when we restart.
hs.get_reactor().addSystemEventTrigger(
hs.get_clock().add_system_event_trigger(
"before",
"shutdown",
self._on_shutdown,
Expand Down
56 changes: 56 additions & 0 deletions synapse/util/clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,59 @@ def wrapped_callback(*args: Any, **kwargs: Any) -> None:
# We can ignore the lint here since this class is the one location
# callWhenRunning should be called.
self._reactor.callWhenRunning(wrapped_callback, *args, **kwargs) # type: ignore[prefer-synapse-clock-call-when-running]

def add_system_event_trigger(
self,
phase: str,
event_type: str,
callback: Callable[P, object],
*args: P.args,
**kwargs: P.kwargs,
) -> None:
"""
Add a function to be called when a system event occurs.

Equivalent to `reactor.addSystemEventTrigger` (see the that docstring for more
details), but ensures that the callback is run in a logging context.

Args:
phase: a time to call the event -- either the string 'before', 'after', or
'during', describing when to call it relative to the event's execution.
eventType: this is a string describing the type of event.
callback: Function to call
*args: Postional arguments to pass to function.
**kwargs: Key arguments to pass to function.
"""

def wrapped_callback(*args: Any, **kwargs: Any) -> None:
assert context.current_context() is context.SENTINEL_CONTEXT, (
"Expected `add_system_event_trigger` callback from the reactor to start with the sentinel logcontext "
f"but saw {context.current_context()}. In other words, another task shouldn't have "
"leaked their logcontext to us."
)

# Because this is a callback from the reactor, we will be using the
# `sentinel` log context at this point. We want the function to log with
# some logcontext as we want to know which server the logs came from.
#
# We use `PreserveLoggingContext` to prevent our new `system_event`
# logcontext from finishing as soon as we exit this function, in case `f`
# returns an awaitable/deferred which would continue running and may try to
# restore the `loop_call` context when it's done (because it's trying to
# adhere to the Synapse logcontext rules.)
#
# This also ensures that we return to the `sentinel` context when we exit
# this function and yield control back to the reactor to avoid leaking the
# current logcontext to the reactor (which would then get picked up and
# associated with the next thing the reactor does)
with context.PreserveLoggingContext(context.LoggingContext("system_event")):
# We use `run_in_background` to reset the logcontext after `f` (or the
# awaitable returned by `f`) completes to avoid leaking the current
# logcontext to the reactor
context.run_in_background(callback, *args, **kwargs)

# We can ignore the lint here since this class is the one location
# `addSystemEventTrigger` should be called.
self._reactor.addSystemEventTrigger(
phase, event_type, wrapped_callback, *args, **kwargs
) # type: ignore[prefer-synapse-clock-add-system-event-trigger]
Loading