From 986e15c6e402bc16b8aca3b483378dab45c372bf Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Sep 2025 22:53:43 -0500 Subject: [PATCH 01/21] Wrap the Rust HTTP client with `make_deferred_yieldable` So downstream usage doesn't need to use `PreserveLoggingContext()` or `make_deferred_yieldable` Spawning from https://github.com/element-hq/synapse/pull/18870 and https://github.com/element-hq/synapse/pull/18357#discussion_r2294941827 --- synapse/api/auth/mas.py | 16 ++++----- synapse/api/auth/msc3861_delegated.py | 16 ++++----- synapse/synapse_rust/http_client.pyi | 7 ++++ synapse/synapse_rust_wrapper/__init__.py | 11 ++++++ synapse/synapse_rust_wrapper/http_client.py | 39 +++++++++++++++++++++ 5 files changed, 71 insertions(+), 18 deletions(-) create mode 100644 synapse/synapse_rust_wrapper/__init__.py create mode 100644 synapse/synapse_rust_wrapper/http_client.py diff --git a/synapse/api/auth/mas.py b/synapse/api/auth/mas.py index 40b4a5bd34b..8d66c21faac 100644 --- a/synapse/api/auth/mas.py +++ b/synapse/api/auth/mas.py @@ -33,7 +33,6 @@ UnrecognizedRequestError, ) from synapse.http.site import SynapseRequest -from synapse.logging.context import PreserveLoggingContext from synapse.logging.opentracing import ( active_span, force_tracing, @@ -41,7 +40,7 @@ start_active_span, ) from synapse.metrics import SERVER_NAME_LABEL -from synapse.synapse_rust.http_client import HttpClient +from synapse.synapse_rust_wrapper.http_client import HttpClient from synapse.types import JsonDict, Requester, UserID, create_requester from synapse.util import json_decoder from synapse.util.caches.cached_call import RetryOnExceptionCachedCall @@ -229,13 +228,12 @@ async def _introspect_token( try: with start_active_span("mas-introspect-token"): inject_request_headers(raw_headers) - with PreserveLoggingContext(): - resp_body = await self._rust_http_client.post( - url=self._introspection_endpoint, - response_limit=1 * 1024 * 1024, - headers=raw_headers, - request_body=body, - ) + resp_body = await self._rust_http_client.post( + url=self._introspection_endpoint, + response_limit=1 * 1024 * 1024, + headers=raw_headers, + request_body=body, + ) except HttpResponseException as e: end_time = self._clock.time() introspection_response_timer.labels( diff --git a/synapse/api/auth/msc3861_delegated.py b/synapse/api/auth/msc3861_delegated.py index c406c683e71..bb84ed2c78a 100644 --- a/synapse/api/auth/msc3861_delegated.py +++ b/synapse/api/auth/msc3861_delegated.py @@ -38,7 +38,6 @@ UnrecognizedRequestError, ) from synapse.http.site import SynapseRequest -from synapse.logging.context import PreserveLoggingContext from synapse.logging.opentracing import ( active_span, force_tracing, @@ -46,7 +45,7 @@ start_active_span, ) from synapse.metrics import SERVER_NAME_LABEL -from synapse.synapse_rust.http_client import HttpClient +from synapse.synapse_rust_wrapper.http_client import HttpClient from synapse.types import Requester, UserID, create_requester from synapse.util import json_decoder from synapse.util.caches.cached_call import RetryOnExceptionCachedCall @@ -327,13 +326,12 @@ async def _introspect_token( try: with start_active_span("mas-introspect-token"): inject_request_headers(raw_headers) - with PreserveLoggingContext(): - resp_body = await self._rust_http_client.post( - url=uri, - response_limit=1 * 1024 * 1024, - headers=raw_headers, - request_body=body, - ) + resp_body = await self._rust_http_client.post( + url=uri, + response_limit=1 * 1024 * 1024, + headers=raw_headers, + request_body=body, + ) except HttpResponseException as e: end_time = self._clock.time() introspection_response_timer.labels( diff --git a/synapse/synapse_rust/http_client.pyi b/synapse/synapse_rust/http_client.pyi index 9fb7831e6b6..1635c9afa12 100644 --- a/synapse/synapse_rust/http_client.pyi +++ b/synapse/synapse_rust/http_client.pyi @@ -17,6 +17,13 @@ from twisted.internet.defer import Deferred from synapse.types import ISynapseReactor class HttpClient: + """ + Since the returned deferreds don't follow Synapse logcontext rules, + this is not meant to be used by Synapse code directly. + + Use `synapse.synapse_rust_wrapper.http_client.HttpClient` instead. + """ + def __init__(self, reactor: ISynapseReactor, user_agent: str) -> None: ... def get(self, url: str, response_limit: int) -> Deferred[bytes]: ... def post( diff --git a/synapse/synapse_rust_wrapper/__init__.py b/synapse/synapse_rust_wrapper/__init__.py new file mode 100644 index 00000000000..e056679fd55 --- /dev/null +++ b/synapse/synapse_rust_wrapper/__init__.py @@ -0,0 +1,11 @@ +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2025 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . diff --git a/synapse/synapse_rust_wrapper/http_client.py b/synapse/synapse_rust_wrapper/http_client.py new file mode 100644 index 00000000000..ed004a81a9a --- /dev/null +++ b/synapse/synapse_rust_wrapper/http_client.py @@ -0,0 +1,39 @@ +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2025 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . + + +from typing import Mapping + +from twisted.internet.defer import Deferred + +from synapse.logging.context import make_deferred_yieldable +from synapse.synapse_rust.http_client import HttpClient as RustHttpClient +from synapse.types import ISynapseReactor + + +class HttpClient: + def __init__(self, reactor: ISynapseReactor, user_agent: str) -> None: + self._http_client = RustHttpClient(reactor, user_agent) + + def get(self, url: str, response_limit: int) -> Deferred[bytes]: + deferred = self._http_client.get(url, response_limit) + return make_deferred_yieldable(deferred) + + def post( + self, + url: str, + response_limit: int, + headers: Mapping[str, str], + request_body: str, + ) -> Deferred[bytes]: + deferred = self._http_client.post(url, response_limit, headers, request_body) + return make_deferred_yieldable(deferred) From 5e555f3abb6602160a511b9f2ba01b1bb43d92c5 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Sep 2025 22:59:01 -0500 Subject: [PATCH 02/21] Add chagnelog --- changelog.d/18903.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/18903.misc diff --git a/changelog.d/18903.misc b/changelog.d/18903.misc new file mode 100644 index 00000000000..bafa7dad5cf --- /dev/null +++ b/changelog.d/18903.misc @@ -0,0 +1 @@ +Wrap the Rust HTTP client with `make_deferred_yieldable` so it follows Synapse logcontext rules. From 4e0085c2213cc74d18fcd5383dc87aaa15e93ee2 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 8 Sep 2025 23:00:29 -0500 Subject: [PATCH 03/21] Docstring for why we have a wrapper --- synapse/synapse_rust_wrapper/http_client.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/synapse_rust_wrapper/http_client.py b/synapse/synapse_rust_wrapper/http_client.py index ed004a81a9a..4ac5aa9acdf 100644 --- a/synapse/synapse_rust_wrapper/http_client.py +++ b/synapse/synapse_rust_wrapper/http_client.py @@ -21,6 +21,11 @@ class HttpClient: + """ + Wrap `synapse.synapse_rust.http_client.HttpClient` to ensure the returned + deferreds follow Synapse logcontext rules. + """ + def __init__(self, reactor: ISynapseReactor, user_agent: str) -> None: self._http_client = RustHttpClient(reactor, user_agent) From ce6f16deaa3de2884744faa5ab50535df7570229 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 9 Sep 2025 14:12:32 -0500 Subject: [PATCH 04/21] WIP: Add test --- tests/synapse_rust/__init__.py | 11 ++++ tests/synapse_rust/test_http_client.py | 87 ++++++++++++++++++++++++++ 2 files changed, 98 insertions(+) create mode 100644 tests/synapse_rust/__init__.py create mode 100644 tests/synapse_rust/test_http_client.py diff --git a/tests/synapse_rust/__init__.py b/tests/synapse_rust/__init__.py new file mode 100644 index 00000000000..e056679fd55 --- /dev/null +++ b/tests/synapse_rust/__init__.py @@ -0,0 +1,11 @@ +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2025 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . diff --git a/tests/synapse_rust/test_http_client.py b/tests/synapse_rust/test_http_client.py new file mode 100644 index 00000000000..c6da4d6cbaa --- /dev/null +++ b/tests/synapse_rust/test_http_client.py @@ -0,0 +1,87 @@ +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2025 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# . + +import time +from typing import Any, Coroutine, Generator, TypeVar, Union + +from twisted.internet.defer import Deferred, ensureDeferred +from twisted.internet.testing import MemoryReactor + +from synapse.logging.context import LoggingContext +from synapse.server import HomeServer +from synapse.synapse_rust.http_client import HttpClient +from synapse.util import Clock + +from tests.unittest import HomeserverTestCase + +T = TypeVar("T") + + +class HttpClientTestCase(HomeserverTestCase): + def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: + hs = self.setup_test_homeserver() + # This triggers the server startup hooks, which starts the Tokio thread pool + reactor.run() + return hs + + def tearDown(self) -> None: + # MemoryReactor doesn't trigger the shutdown phases, and we want the + # Tokio thread pool to be stopped + # XXX: This logic should probably get moved somewhere else + shutdown_triggers = self.reactor.triggers.get("shutdown", {}) + for phase in ["before", "during", "after"]: + triggers = shutdown_triggers.get(phase, []) + for callbable, args, kwargs in triggers: + callbable(*args, **kwargs) + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self._http_client = hs.get_proxied_http_client() + self._rust_http_client = HttpClient( + reactor=hs.get_reactor(), + user_agent=self._http_client.user_agent.decode("utf8"), + ) + + def till_deferred_has_result( + self, + awaitable: Union[ + "Coroutine[Deferred[Any], Any, T]", + "Generator[Deferred[Any], Any, T]", + "Deferred[T]", + ], + ) -> "Deferred[T]": + """Wait until a deferred has a result. + + This is useful because the Rust HTTP client will resolve the deferred + using reactor.callFromThread, which are only run when we call + reactor.advance. + """ + deferred = ensureDeferred(awaitable) + tries = 0 + while not deferred.called: + time.sleep(0.1) + self.reactor.advance(0) + tries += 1 + if tries > 100: + raise Exception("Timed out waiting for deferred to resolve") + + return deferred + + def test_logging_context(self) -> None: + async def asdf() -> None: + with LoggingContext("test"): + # TODO: Test logging context before/after this call + await self._rust_http_client.get( + url="http://localhost", + response_limit=1 * 1024 * 1024, + ) + + self.get_success(self.till_deferred_has_result(asdf())) From 77970eb501b570cbf943fb675adf26ec5f588f09 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Sep 2025 14:01:07 -0500 Subject: [PATCH 05/21] Maintenance from merging in the latest `develop` changes --- tests/synapse_rust/test_http_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/synapse_rust/test_http_client.py b/tests/synapse_rust/test_http_client.py index c6da4d6cbaa..d5aa529c919 100644 --- a/tests/synapse_rust/test_http_client.py +++ b/tests/synapse_rust/test_http_client.py @@ -19,7 +19,7 @@ from synapse.logging.context import LoggingContext from synapse.server import HomeServer from synapse.synapse_rust.http_client import HttpClient -from synapse.util import Clock +from synapse.util.clock import Clock from tests.unittest import HomeserverTestCase @@ -77,7 +77,7 @@ def till_deferred_has_result( def test_logging_context(self) -> None: async def asdf() -> None: - with LoggingContext("test"): + with LoggingContext(name="test", server_name="test_server"): # TODO: Test logging context before/after this call await self._rust_http_client.get( url="http://localhost", From b89fb2e3fe63745a3cadea7434e66d680d88ca3b Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Sep 2025 15:38:57 -0500 Subject: [PATCH 06/21] Fix `Tokio runtime is not running` See https://github.com/element-hq/synapse/pull/18903#discussion_r2334546936 --- Related to https://github.com/twisted/twisted/pull/12514 --- tests/synapse_rust/test_http_client.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/tests/synapse_rust/test_http_client.py b/tests/synapse_rust/test_http_client.py index d5aa529c919..07009f6446a 100644 --- a/tests/synapse_rust/test_http_client.py +++ b/tests/synapse_rust/test_http_client.py @@ -29,8 +29,19 @@ class HttpClientTestCase(HomeserverTestCase): def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: hs = self.setup_test_homeserver() + + # XXX: We must create the Rust HTTP client before we call `reactor.run()` below. + # Twisted's `MemoryReactor` doesn't invoke `callWhenRunning` callbacks if it's + # already running and we rely on that to start the Tokio thread pool in Rust. + self._http_client = hs.get_proxied_http_client() + self._rust_http_client = HttpClient( + reactor=hs.get_reactor(), + user_agent=self._http_client.user_agent.decode("utf8"), + ) + # This triggers the server startup hooks, which starts the Tokio thread pool reactor.run() + return hs def tearDown(self) -> None: @@ -43,13 +54,6 @@ def tearDown(self) -> None: for callbable, args, kwargs in triggers: callbable(*args, **kwargs) - def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: - self._http_client = hs.get_proxied_http_client() - self._rust_http_client = HttpClient( - reactor=hs.get_reactor(), - user_agent=self._http_client.user_agent.decode("utf8"), - ) - def till_deferred_has_result( self, awaitable: Union[ From 34b6f2ad2db6551390a99608d38a562725bdbd3d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Sep 2025 15:57:10 -0500 Subject: [PATCH 07/21] Add working test --- tests/synapse_rust/test_http_client.py | 91 ++++++++++++++++++++++++-- 1 file changed, 85 insertions(+), 6 deletions(-) diff --git a/tests/synapse_rust/test_http_client.py b/tests/synapse_rust/test_http_client.py index 07009f6446a..1cb7cd12f54 100644 --- a/tests/synapse_rust/test_http_client.py +++ b/tests/synapse_rust/test_http_client.py @@ -10,15 +10,19 @@ # See the GNU Affero General Public License for more details: # . +import json +import threading import time -from typing import Any, Coroutine, Generator, TypeVar, Union +from http.server import BaseHTTPRequestHandler, HTTPServer +from typing import Any, Coroutine, Generator, Optional, TypeVar, Union from twisted.internet.defer import Deferred, ensureDeferred from twisted.internet.testing import MemoryReactor -from synapse.logging.context import LoggingContext +from synapse.logging.context import LoggingContext, current_context, _Sentinel from synapse.server import HomeServer from synapse.synapse_rust.http_client import HttpClient +from synapse.types import JsonDict from synapse.util.clock import Clock from tests.unittest import HomeserverTestCase @@ -26,6 +30,53 @@ T = TypeVar("T") +class StubRequestHandler(BaseHTTPRequestHandler): + server: "StubServer" + + def do_GET(self) -> None: + self.server.calls += 1 + + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"ok": True}).encode("utf-8")) + + def log_message(self, format: str, *args: Any) -> None: + # Don't log anything; by default, the server logs to stderr + pass + + +class StubServer(HTTPServer): + """A stub HTTP server that we can send requests to for testing. + + This opens a real HTTP server on a random port, on a separate thread. + """ + + calls: int = 0 + """How many times has the endpoint been requested.""" + + _thread: threading.Thread + + def __init__(self) -> None: + super().__init__(("127.0.0.1", 0), StubRequestHandler) + + self._thread = threading.Thread( + target=self.serve_forever, + name="StubServer", + kwargs={"poll_interval": 0.01}, + daemon=True, + ) + self._thread.start() + + def shutdown(self) -> None: + super().shutdown() + self._thread.join() + + @property + def endpoint(self) -> str: + return f"http://127.0.0.1:{self.server_port}/" + + class HttpClientTestCase(HomeserverTestCase): def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: hs = self.setup_test_homeserver() @@ -44,6 +95,9 @@ def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: return hs + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.server = StubServer() + def tearDown(self) -> None: # MemoryReactor doesn't trigger the shutdown phases, and we want the # Tokio thread pool to be stopped @@ -79,13 +133,38 @@ def till_deferred_has_result( return deferred + def _check_current_logcontext(self, expected_logcontext_string: str) -> None: + context = current_context() + assert isinstance(context, LoggingContext) or isinstance(context, _Sentinel), ( + f"Expected LoggingContext({expected_logcontext_string}) but saw {context}" + ) + self.assertEqual( + str(context), + expected_logcontext_string, + f"Expected LoggingContext({expected_logcontext_string}) but saw {context}", + ) + def test_logging_context(self) -> None: - async def asdf() -> None: + # Sanity check that we start in the sentinel context + self._check_current_logcontext("sentinel") + + async def do_request() -> None: + # Should have the same logcontext as the caller + self._check_current_logcontext("sentinel") + with LoggingContext(name="test", server_name="test_server"): - # TODO: Test logging context before/after this call + self._check_current_logcontext("test") + # Make the actual request await self._rust_http_client.get( - url="http://localhost", + url=self.server.endpoint, response_limit=1 * 1024 * 1024, ) + self._check_current_logcontext("test") + + # Back to the caller's context outside of the `LoggingContext` block + self._check_current_logcontext("sentinel") + + self.get_success(self.till_deferred_has_result(do_request())) - self.get_success(self.till_deferred_has_result(asdf())) + # Back to the sentinel context + self._check_current_logcontext("sentinel") From 8a9d6822a09e13bae1c22e7b10b2bc9e1cbb0379 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Sep 2025 16:04:36 -0500 Subject: [PATCH 08/21] Add basic request/response test --- tests/synapse_rust/test_http_client.py | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/tests/synapse_rust/test_http_client.py b/tests/synapse_rust/test_http_client.py index 1cb7cd12f54..f575540de0a 100644 --- a/tests/synapse_rust/test_http_client.py +++ b/tests/synapse_rust/test_http_client.py @@ -11,22 +11,25 @@ # . import json +import logging import threading import time from http.server import BaseHTTPRequestHandler, HTTPServer -from typing import Any, Coroutine, Generator, Optional, TypeVar, Union +from typing import Any, Coroutine, Generator, TypeVar, Union from twisted.internet.defer import Deferred, ensureDeferred from twisted.internet.testing import MemoryReactor -from synapse.logging.context import LoggingContext, current_context, _Sentinel +from synapse.logging.context import LoggingContext, _Sentinel, current_context from synapse.server import HomeServer from synapse.synapse_rust.http_client import HttpClient -from synapse.types import JsonDict from synapse.util.clock import Clock +from synapse.util.json import json_decoder from tests.unittest import HomeserverTestCase +logger = logging.getLogger(__name__) + T = TypeVar("T") @@ -144,6 +147,23 @@ def _check_current_logcontext(self, expected_logcontext_string: str) -> None: f"Expected LoggingContext({expected_logcontext_string}) but saw {context}", ) + def test_request_response(self) -> None: + """ + Test to make sure we can make a basic request and get the expected + response. + """ + + async def do_request() -> None: + resp_body = await self._rust_http_client.get( + url=self.server.endpoint, + response_limit=1 * 1024 * 1024, + ) + raw_response = json_decoder.decode(resp_body.decode("utf-8")) + self.assertEqual(raw_response, {"ok": True}) + + self.get_success(self.till_deferred_has_result(do_request())) + self.assertEqual(self.server.calls, 1) + def test_logging_context(self) -> None: # Sanity check that we start in the sentinel context self._check_current_logcontext("sentinel") From 2d341cb5ca3599a3989517778894b25ff3796501 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Sep 2025 16:05:47 -0500 Subject: [PATCH 09/21] Add docstring --- tests/synapse_rust/test_http_client.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/synapse_rust/test_http_client.py b/tests/synapse_rust/test_http_client.py index f575540de0a..6934a27bb02 100644 --- a/tests/synapse_rust/test_http_client.py +++ b/tests/synapse_rust/test_http_client.py @@ -165,6 +165,10 @@ async def do_request() -> None: self.assertEqual(self.server.calls, 1) def test_logging_context(self) -> None: + """ + Test to make sure the `LoggingContext` (logcontext) is handled correctly + when making requests. + """ # Sanity check that we start in the sentinel context self._check_current_logcontext("sentinel") From 58c1209c41121fe96f438049772b987894730ff7 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Sep 2025 16:09:57 -0500 Subject: [PATCH 10/21] Better test that fails --- tests/synapse_rust/test_http_client.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/tests/synapse_rust/test_http_client.py b/tests/synapse_rust/test_http_client.py index 6934a27bb02..d1d32fa1662 100644 --- a/tests/synapse_rust/test_http_client.py +++ b/tests/synapse_rust/test_http_client.py @@ -174,21 +174,24 @@ def test_logging_context(self) -> None: async def do_request() -> None: # Should have the same logcontext as the caller - self._check_current_logcontext("sentinel") + self._check_current_logcontext("foo") - with LoggingContext(name="test", server_name="test_server"): - self._check_current_logcontext("test") + with LoggingContext(name="competing", server_name="test_server"): + logger.info("asdf1") # Make the actual request await self._rust_http_client.get( url=self.server.endpoint, response_limit=1 * 1024 * 1024, ) - self._check_current_logcontext("test") + logger.info("asdf2") + self._check_current_logcontext("competing") # Back to the caller's context outside of the `LoggingContext` block - self._check_current_logcontext("sentinel") + self._check_current_logcontext("foo") - self.get_success(self.till_deferred_has_result(do_request())) + with LoggingContext(name="foo", server_name="test_server"): + self.get_success(self.till_deferred_has_result(do_request())) + self._check_current_logcontext("foo") # Back to the sentinel context self._check_current_logcontext("sentinel") From 9a4e67de9fdb0b9ca7ce59f9addf8bfb1872b563 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Sep 2025 16:49:47 -0500 Subject: [PATCH 11/21] Cross-link Twisted PR --- tests/synapse_rust/test_http_client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/synapse_rust/test_http_client.py b/tests/synapse_rust/test_http_client.py index d1d32fa1662..a8f13656fb2 100644 --- a/tests/synapse_rust/test_http_client.py +++ b/tests/synapse_rust/test_http_client.py @@ -86,7 +86,8 @@ def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: # XXX: We must create the Rust HTTP client before we call `reactor.run()` below. # Twisted's `MemoryReactor` doesn't invoke `callWhenRunning` callbacks if it's - # already running and we rely on that to start the Tokio thread pool in Rust. + # already running and we rely on that to start the Tokio thread pool in Rust. In + # the future, this may not matter, see https://github.com/twisted/twisted/pull/12514 self._http_client = hs.get_proxied_http_client() self._rust_http_client = HttpClient( reactor=hs.get_reactor(), From 59516a4a537e990f9991f7191a7b82da37d5dd56 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Sep 2025 17:01:59 -0500 Subject: [PATCH 12/21] WIP: Failing test --- tests/synapse_rust/test_http_client.py | 71 +++++++++++++++++++------- 1 file changed, 52 insertions(+), 19 deletions(-) diff --git a/tests/synapse_rust/test_http_client.py b/tests/synapse_rust/test_http_client.py index a8f13656fb2..e4bbda27b2a 100644 --- a/tests/synapse_rust/test_http_client.py +++ b/tests/synapse_rust/test_http_client.py @@ -20,7 +20,13 @@ from twisted.internet.defer import Deferred, ensureDeferred from twisted.internet.testing import MemoryReactor -from synapse.logging.context import LoggingContext, _Sentinel, current_context +from synapse.logging.context import ( + LoggingContext, + _Sentinel, + current_context, + run_in_background, + PreserveLoggingContext, +) from synapse.server import HomeServer from synapse.synapse_rust.http_client import HttpClient from synapse.util.clock import Clock @@ -165,7 +171,7 @@ async def do_request() -> None: self.get_success(self.till_deferred_has_result(do_request())) self.assertEqual(self.server.calls, 1) - def test_logging_context(self) -> None: + async def test_logging_context(self) -> None: """ Test to make sure the `LoggingContext` (logcontext) is handled correctly when making requests. @@ -173,26 +179,53 @@ def test_logging_context(self) -> None: # Sanity check that we start in the sentinel context self._check_current_logcontext("sentinel") - async def do_request() -> None: - # Should have the same logcontext as the caller - self._check_current_logcontext("foo") - - with LoggingContext(name="competing", server_name="test_server"): - logger.info("asdf1") - # Make the actual request - await self._rust_http_client.get( - url=self.server.endpoint, - response_limit=1 * 1024 * 1024, - ) - logger.info("asdf2") - self._check_current_logcontext("competing") - - # Back to the caller's context outside of the `LoggingContext` block - self._check_current_logcontext("foo") + callback_finished = False + async def do_request() -> None: + nonlocal callback_finished + try: + # Should have the same logcontext as the caller + self._check_current_logcontext("foo") + + with LoggingContext(name="competing", server_name="test_server"): + logger.info("asdf3") + # with PreserveLoggingContext(): + # Make the actual request + await self._rust_http_client.get( + url=self.server.endpoint, + response_limit=1 * 1024 * 1024, + ) + logger.info("asdf4") + self._check_current_logcontext("competing") + + # Back to the caller's context outside of the `LoggingContext` block + self._check_current_logcontext("foo") + finally: + # When exceptions happen, we still want to mark the callback as finished + # so that the test can complete and we see the underlying error. + callback_finished = True + + logger.info("asdf1") with LoggingContext(name="foo", server_name="test_server"): - self.get_success(self.till_deferred_has_result(do_request())) + # Fire off the function, but don't wait on it. + run_in_background(do_request) + logger.info("asdf2") + + # Now wait for the function under test to have run + with PreserveLoggingContext(): + while not callback_finished: + # await self.hs.get_clock().sleep(0) + time.sleep(0.1) + self.reactor.advance(0) + + logger.info("asdf5") + # check that the logcontext is left in a sane state. self._check_current_logcontext("foo") + self.assertTrue( + callback_finished, + "Callback never finished which means the test probably didn't wait long enough", + ) + # Back to the sentinel context self._check_current_logcontext("sentinel") From 74f01a94833c5fcb8983f7db9535a4ddb9132893 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Sep 2025 17:20:12 -0500 Subject: [PATCH 13/21] Use `make_deferred_yieldable` in Rust See https://github.com/element-hq/synapse/pull/18903#discussion_r2331948132 --- rust/src/http_client.rs | 58 +++++++++++++++++++++++++++++------------ 1 file changed, 41 insertions(+), 17 deletions(-) diff --git a/rust/src/http_client.rs b/rust/src/http_client.rs index b6cdf98f552..5f9cf21830b 100644 --- a/rust/src/http_client.rs +++ b/rust/src/http_client.rs @@ -19,6 +19,7 @@ use futures::TryStreamExt; use once_cell::sync::OnceCell; use pyo3::{create_exception, exceptions::PyException, prelude::*}; use reqwest::RequestBuilder; +use std::sync::OnceLock; use tokio::runtime::Runtime; use crate::errors::HttpResponseException; @@ -218,29 +219,32 @@ impl HttpClient { builder: RequestBuilder, response_limit: usize, ) -> PyResult> { - create_deferred(py, self.reactor.bind(py), async move { - let response = builder.send().await.context("sending request")?; + Ok(make_deferred_yieldable( + py, + &create_deferred(py, self.reactor.bind(py), async move { + let response = builder.send().await.context("sending request")?; - let status = response.status(); + let status = response.status(); - let mut stream = response.bytes_stream(); - let mut buffer = Vec::new(); - while let Some(chunk) = stream.try_next().await.context("reading body")? { - if buffer.len() + chunk.len() > response_limit { - Err(anyhow::anyhow!("Response size too large"))?; - } + let mut stream = response.bytes_stream(); + let mut buffer = Vec::new(); + while let Some(chunk) = stream.try_next().await.context("reading body")? { + if buffer.len() + chunk.len() > response_limit { + Err(anyhow::anyhow!("Response size too large"))?; + } - buffer.extend_from_slice(&chunk); - } + buffer.extend_from_slice(&chunk); + } - if !status.is_success() { - return Err(HttpResponseException::new(status, buffer)); - } + if !status.is_success() { + return Err(HttpResponseException::new(status, buffer)); + } - let r = Python::with_gil(|py| buffer.into_pyobject(py).map(|o| o.unbind()))?; + let r = Python::with_gil(|py| buffer.into_pyobject(py).map(|o| o.unbind()))?; - Ok(r) - }) + Ok(r) + })?, + )) } } @@ -301,3 +305,23 @@ where Ok(deferred) } + +static MAKE_DEFERRED_YIELDABLE: OnceLock> = OnceLock::new(); + +/// Given a deferred, make it follow the Synapse logcontext rules +fn make_deferred_yieldable<'py>( + py: Python<'py>, + deferred: &Bound<'py, PyAny>, +) -> Bound<'py, PyAny> { + let make_deferred_yieldable = MAKE_DEFERRED_YIELDABLE.get_or_init(|| { + let sys = PyModule::import(py, "synapse.logging.context").unwrap(); + let func = sys.getattr("make_deferred_yieldable").unwrap().unbind(); + func + }); + + make_deferred_yieldable + .call1(py, (deferred,)) + .unwrap() + .extract(py) + .unwrap() +} From 4424a88d2c8577e9455324b393a7bcc9a2e30840 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Sep 2025 17:21:38 -0500 Subject: [PATCH 14/21] Clean up test --- tests/synapse_rust/test_http_client.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tests/synapse_rust/test_http_client.py b/tests/synapse_rust/test_http_client.py index e4bbda27b2a..df5fb750be4 100644 --- a/tests/synapse_rust/test_http_client.py +++ b/tests/synapse_rust/test_http_client.py @@ -188,14 +188,11 @@ async def do_request() -> None: self._check_current_logcontext("foo") with LoggingContext(name="competing", server_name="test_server"): - logger.info("asdf3") - # with PreserveLoggingContext(): # Make the actual request await self._rust_http_client.get( url=self.server.endpoint, response_limit=1 * 1024 * 1024, ) - logger.info("asdf4") self._check_current_logcontext("competing") # Back to the caller's context outside of the `LoggingContext` block @@ -205,11 +202,9 @@ async def do_request() -> None: # so that the test can complete and we see the underlying error. callback_finished = True - logger.info("asdf1") with LoggingContext(name="foo", server_name="test_server"): # Fire off the function, but don't wait on it. run_in_background(do_request) - logger.info("asdf2") # Now wait for the function under test to have run with PreserveLoggingContext(): @@ -218,7 +213,6 @@ async def do_request() -> None: time.sleep(0.1) self.reactor.advance(0) - logger.info("asdf5") # check that the logcontext is left in a sane state. self._check_current_logcontext("foo") From e6df6ee4b164a80d51fe112b7d4ca79eb910ae20 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Sep 2025 17:24:35 -0500 Subject: [PATCH 15/21] Remove Rust wrapper (no longer necessary) --- synapse/synapse_rust_wrapper/__init__.py | 11 ------ synapse/synapse_rust_wrapper/http_client.py | 44 --------------------- 2 files changed, 55 deletions(-) delete mode 100644 synapse/synapse_rust_wrapper/__init__.py delete mode 100644 synapse/synapse_rust_wrapper/http_client.py diff --git a/synapse/synapse_rust_wrapper/__init__.py b/synapse/synapse_rust_wrapper/__init__.py deleted file mode 100644 index e056679fd55..00000000000 --- a/synapse/synapse_rust_wrapper/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -# This file is licensed under the Affero General Public License (AGPL) version 3. -# -# Copyright (C) 2025 New Vector, Ltd -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# See the GNU Affero General Public License for more details: -# . diff --git a/synapse/synapse_rust_wrapper/http_client.py b/synapse/synapse_rust_wrapper/http_client.py deleted file mode 100644 index 4ac5aa9acdf..00000000000 --- a/synapse/synapse_rust_wrapper/http_client.py +++ /dev/null @@ -1,44 +0,0 @@ -# This file is licensed under the Affero General Public License (AGPL) version 3. -# -# Copyright (C) 2025 New Vector, Ltd -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# See the GNU Affero General Public License for more details: -# . - - -from typing import Mapping - -from twisted.internet.defer import Deferred - -from synapse.logging.context import make_deferred_yieldable -from synapse.synapse_rust.http_client import HttpClient as RustHttpClient -from synapse.types import ISynapseReactor - - -class HttpClient: - """ - Wrap `synapse.synapse_rust.http_client.HttpClient` to ensure the returned - deferreds follow Synapse logcontext rules. - """ - - def __init__(self, reactor: ISynapseReactor, user_agent: str) -> None: - self._http_client = RustHttpClient(reactor, user_agent) - - def get(self, url: str, response_limit: int) -> Deferred[bytes]: - deferred = self._http_client.get(url, response_limit) - return make_deferred_yieldable(deferred) - - def post( - self, - url: str, - response_limit: int, - headers: Mapping[str, str], - request_body: str, - ) -> Deferred[bytes]: - deferred = self._http_client.post(url, response_limit, headers, request_body) - return make_deferred_yieldable(deferred) From 92ea10a80b41b81fcc9f5f0445fda6911bea15da Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Sep 2025 17:25:08 -0500 Subject: [PATCH 16/21] Note that the returned deferred follow Synapse logcontext rules now --- rust/src/http_client.rs | 2 ++ synapse/synapse_rust/http_client.pyi | 5 +---- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/rust/src/http_client.rs b/rust/src/http_client.rs index 5f9cf21830b..8adfac5219d 100644 --- a/rust/src/http_client.rs +++ b/rust/src/http_client.rs @@ -219,6 +219,8 @@ impl HttpClient { builder: RequestBuilder, response_limit: usize, ) -> PyResult> { + // We use `make_deferred_yieldable` to make the returned deferred follow Synapse + // logcontext rules. Ok(make_deferred_yieldable( py, &create_deferred(py, self.reactor.bind(py), async move { diff --git a/synapse/synapse_rust/http_client.pyi b/synapse/synapse_rust/http_client.pyi index 1635c9afa12..530d2be8e38 100644 --- a/synapse/synapse_rust/http_client.pyi +++ b/synapse/synapse_rust/http_client.pyi @@ -18,10 +18,7 @@ from synapse.types import ISynapseReactor class HttpClient: """ - Since the returned deferreds don't follow Synapse logcontext rules, - this is not meant to be used by Synapse code directly. - - Use `synapse.synapse_rust_wrapper.http_client.HttpClient` instead. + The returned deferreds follow Synapse logcontext rules. """ def __init__(self, reactor: ISynapseReactor, user_agent: str) -> None: ... From 5c324bccf8f6af8f2d3236d77e2da1e9496b947c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Sep 2025 17:27:47 -0500 Subject: [PATCH 17/21] No more Rust wrapper (switch back to Rust imports) --- synapse/api/auth/mas.py | 2 +- synapse/api/auth/msc3861_delegated.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/api/auth/mas.py b/synapse/api/auth/mas.py index 15c54908ad2..baa6b27336f 100644 --- a/synapse/api/auth/mas.py +++ b/synapse/api/auth/mas.py @@ -40,7 +40,7 @@ start_active_span, ) from synapse.metrics import SERVER_NAME_LABEL -from synapse.synapse_rust_wrapper.http_client import HttpClient +from synapse.synapse_rust.http_client import HttpClient from synapse.types import JsonDict, Requester, UserID, create_requester from synapse.util.caches.cached_call import RetryOnExceptionCachedCall from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext diff --git a/synapse/api/auth/msc3861_delegated.py b/synapse/api/auth/msc3861_delegated.py index 3b5fbdcf9ad..b6adcc83dca 100644 --- a/synapse/api/auth/msc3861_delegated.py +++ b/synapse/api/auth/msc3861_delegated.py @@ -45,7 +45,7 @@ start_active_span, ) from synapse.metrics import SERVER_NAME_LABEL -from synapse.synapse_rust_wrapper.http_client import HttpClient +from synapse.synapse_rust.http_client import HttpClient from synapse.types import Requester, UserID, create_requester from synapse.util.caches.cached_call import RetryOnExceptionCachedCall from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext From e243b0f07ced1c4f6e81f1b2faf4a858f88d7603 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Sep 2025 17:29:50 -0500 Subject: [PATCH 18/21] Fix lints --- tests/synapse_rust/test_http_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/synapse_rust/test_http_client.py b/tests/synapse_rust/test_http_client.py index df5fb750be4..032eab77e80 100644 --- a/tests/synapse_rust/test_http_client.py +++ b/tests/synapse_rust/test_http_client.py @@ -22,10 +22,10 @@ from synapse.logging.context import ( LoggingContext, + PreserveLoggingContext, _Sentinel, current_context, run_in_background, - PreserveLoggingContext, ) from synapse.server import HomeServer from synapse.synapse_rust.http_client import HttpClient From ab453b7e2971f65d34512fce1d0b91580a6fabc0 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 30 Sep 2025 17:39:57 -0500 Subject: [PATCH 19/21] Move imports (try fix lints) --- rust/src/http_client.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rust/src/http_client.rs b/rust/src/http_client.rs index 8adfac5219d..51f6d739d18 100644 --- a/rust/src/http_client.rs +++ b/rust/src/http_client.rs @@ -12,14 +12,13 @@ * . */ -use std::{collections::HashMap, future::Future}; +use std::{collections::HashMap, future::Future, sync::OnceLock}; use anyhow::Context; use futures::TryStreamExt; use once_cell::sync::OnceCell; use pyo3::{create_exception, exceptions::PyException, prelude::*}; use reqwest::RequestBuilder; -use std::sync::OnceLock; use tokio::runtime::Runtime; use crate::errors::HttpResponseException; From 1d9ae7fee378184ce0f88ca3cd5cd2e3326fc33a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 1 Oct 2025 11:40:25 -0500 Subject: [PATCH 20/21] Avoid `unwrap` and return fallible See https://github.com/element-hq/synapse/pull/18903#discussion_r2393691853 --- rust/src/http_client.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/rust/src/http_client.rs b/rust/src/http_client.rs index 51f6d739d18..0ae7b3713fd 100644 --- a/rust/src/http_client.rs +++ b/rust/src/http_client.rs @@ -313,16 +313,12 @@ static MAKE_DEFERRED_YIELDABLE: OnceLock> = OnceLock::new( fn make_deferred_yieldable<'py>( py: Python<'py>, deferred: &Bound<'py, PyAny>, -) -> Bound<'py, PyAny> { +) -> PyResult> { let make_deferred_yieldable = MAKE_DEFERRED_YIELDABLE.get_or_init(|| { let sys = PyModule::import(py, "synapse.logging.context").unwrap(); let func = sys.getattr("make_deferred_yieldable").unwrap().unbind(); func }); - make_deferred_yieldable - .call1(py, (deferred,)) - .unwrap() - .extract(py) - .unwrap() + make_deferred_yieldable.call1(py, (deferred,))?.extract(py) } From 311969f8e67ffecab4e7beb0f418fe22ca0e1a38 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 1 Oct 2025 11:41:07 -0500 Subject: [PATCH 21/21] Always `make_deferred_yieldable(...)` when `create_deferred(...)` See https://github.com/element-hq/synapse/pull/18903#discussion_r2393695005 --- rust/src/http_client.rs | 46 +++++++++++++++++++---------------------- 1 file changed, 21 insertions(+), 25 deletions(-) diff --git a/rust/src/http_client.rs b/rust/src/http_client.rs index 0ae7b3713fd..e67dae169f6 100644 --- a/rust/src/http_client.rs +++ b/rust/src/http_client.rs @@ -218,34 +218,29 @@ impl HttpClient { builder: RequestBuilder, response_limit: usize, ) -> PyResult> { - // We use `make_deferred_yieldable` to make the returned deferred follow Synapse - // logcontext rules. - Ok(make_deferred_yieldable( - py, - &create_deferred(py, self.reactor.bind(py), async move { - let response = builder.send().await.context("sending request")?; - - let status = response.status(); - - let mut stream = response.bytes_stream(); - let mut buffer = Vec::new(); - while let Some(chunk) = stream.try_next().await.context("reading body")? { - if buffer.len() + chunk.len() > response_limit { - Err(anyhow::anyhow!("Response size too large"))?; - } - - buffer.extend_from_slice(&chunk); - } + create_deferred(py, self.reactor.bind(py), async move { + let response = builder.send().await.context("sending request")?; + + let status = response.status(); - if !status.is_success() { - return Err(HttpResponseException::new(status, buffer)); + let mut stream = response.bytes_stream(); + let mut buffer = Vec::new(); + while let Some(chunk) = stream.try_next().await.context("reading body")? { + if buffer.len() + chunk.len() > response_limit { + Err(anyhow::anyhow!("Response size too large"))?; } - let r = Python::with_gil(|py| buffer.into_pyobject(py).map(|o| o.unbind()))?; + buffer.extend_from_slice(&chunk); + } - Ok(r) - })?, - )) + if !status.is_success() { + return Err(HttpResponseException::new(status, buffer)); + } + + let r = Python::with_gil(|py| buffer.into_pyobject(py).map(|o| o.unbind()))?; + + Ok(r) + }) } } @@ -304,7 +299,8 @@ where }); }); - Ok(deferred) + // Make the deferred follow the Synapse logcontext rules + make_deferred_yieldable(py, &deferred) } static MAKE_DEFERRED_YIELDABLE: OnceLock> = OnceLock::new();