-
Notifications
You must be signed in to change notification settings - Fork 395
Fix run_coroutine_in_background(...)
incorrectly handling logcontext
#18964
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
00a4076
29b121e
c1f2431
2feae9a
ba22b73
7fcec49
ade9853
de7d49d
b60cd31
c6e8141
2ae72a6
caaa6a5
c9d5cda
8e5935c
ffbf188
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix `run_coroutine_in_background(...)` incorrectly handling logcontext. | ||
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -802,13 +802,15 @@ def run_in_background( | |||||||||||
deferred returned by the function completes. | ||||||||||||
|
||||||||||||
To explain how the log contexts work here: | ||||||||||||
- When `run_in_background` is called, the current context is stored ("original"), | ||||||||||||
we kick off the background task in the current context, and we restore that | ||||||||||||
original context before returning | ||||||||||||
- When the background task finishes, we don't want to leak our context into the | ||||||||||||
reactor which would erroneously get attached to the next operation picked up by | ||||||||||||
the event loop. We add a callback to the deferred which will clear the logging | ||||||||||||
context after it finishes and yields control back to the reactor. | ||||||||||||
- When `run_in_background` is called, the calling logcontext is stored | ||||||||||||
("original"), we kick off the background task in the current context, and we | ||||||||||||
restore that original context before returning. | ||||||||||||
- For a completed deferred, that's the end of the story. | ||||||||||||
- For an incomplete deferred, when the background task finishes, we don't want to | ||||||||||||
leak our context into the reactor which would erroneously get attached to the | ||||||||||||
next operation picked up by the event loop. We add a callback to the deferred | ||||||||||||
which will clear the logging context after it finishes and yields control back to | ||||||||||||
the reactor. | ||||||||||||
Comment on lines
+805
to
+813
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on the explanation laid out in #18900 (comment) |
||||||||||||
|
||||||||||||
Useful for wrapping functions that return a deferred or coroutine, which you don't | ||||||||||||
yield or await on (for instance because you want to pass it to | ||||||||||||
|
@@ -857,22 +859,36 @@ def run_in_background( | |||||||||||
|
||||||||||||
# The deferred has already completed | ||||||||||||
if d.called and not d.paused: | ||||||||||||
# The function should have maintained the logcontext, so we can | ||||||||||||
# optimise out the messing about | ||||||||||||
# If the function messes with logcontexts, we can assume it follows the Synapse | ||||||||||||
# logcontext rules (Rules for functions returning awaitables: "If the awaitable | ||||||||||||
# is already complete, the function returns with the same logcontext it started | ||||||||||||
# with."). If it function doesn't touch logcontexts at all, we can also assume | ||||||||||||
# the logcontext is unchanged. | ||||||||||||
# | ||||||||||||
# Either way, the function should have maintained the calling logcontext, so we | ||||||||||||
# can avoid messing with it further. Additionally, if the deferred has already | ||||||||||||
# completed, then it would be a mistake to then add a deferred callback (below) | ||||||||||||
# to reset the logcontext to the sentinel logcontext as that would run | ||||||||||||
# immediately (remember our goal is to maintain the calling logcontext when we | ||||||||||||
# return). | ||||||||||||
return d | ||||||||||||
|
||||||||||||
# The function may have reset the context before returning, so we need to restore it | ||||||||||||
# now. | ||||||||||||
# Since the function we called may follow the Synapse logcontext rules (Rules for | ||||||||||||
# functions returning awaitables: "If the awaitable is incomplete, the function | ||||||||||||
# clears the logcontext before returning"), the function may have reset the | ||||||||||||
# logcontext before returning, so we need to restore the calling logcontext now | ||||||||||||
# before we return ourselves. | ||||||||||||
# | ||||||||||||
# Our goal is to have the caller logcontext unchanged after firing off the | ||||||||||||
# background task and returning. | ||||||||||||
set_current_context(calling_context) | ||||||||||||
|
||||||||||||
# The original logcontext will be restored when the deferred completes, but | ||||||||||||
# there is nothing waiting for it, so it will get leaked into the reactor (which | ||||||||||||
# would then get picked up by the next thing the reactor does). We therefore | ||||||||||||
# need to reset the logcontext here (set the `sentinel` logcontext) before | ||||||||||||
# yielding control back to the reactor. | ||||||||||||
# If the function we called is playing nice and following the Synapse logcontext | ||||||||||||
# rules, it will restore original calling logcontext when the deferred completes; | ||||||||||||
# but there is nothing waiting for it, so it will get leaked into the reactor (which | ||||||||||||
# would then get picked up by the next thing the reactor does). We therefore need to | ||||||||||||
# reset the logcontext here (set the `sentinel` logcontext) before yielding control | ||||||||||||
# back to the reactor. | ||||||||||||
# | ||||||||||||
# (If this feels asymmetric, consider it this way: we are | ||||||||||||
# effectively forking a new thread of execution. We are | ||||||||||||
|
@@ -894,10 +910,9 @@ def run_coroutine_in_background( | |||||||||||
Useful for wrapping coroutines that you don't yield or await on (for | ||||||||||||
instance because you want to pass it to deferred.gatherResults()). | ||||||||||||
|
||||||||||||
This is a special case of `run_in_background` where we can accept a | ||||||||||||
coroutine directly rather than a function. We can do this because coroutines | ||||||||||||
do not run until called, and so calling an async function without awaiting | ||||||||||||
cannot change the log contexts. | ||||||||||||
Comment on lines
-899
to
-900
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed this last sentence because this isn't true on multiple levels:
|
||||||||||||
This is a special case of `run_in_background` where we can accept a coroutine | ||||||||||||
directly rather than a function. We can do this because coroutines do not continue | ||||||||||||
running once they have yielded. | ||||||||||||
|
||||||||||||
This is an ergonomic helper so we can do this: | ||||||||||||
```python | ||||||||||||
|
@@ -908,33 +923,7 @@ def run_coroutine_in_background( | |||||||||||
run_in_background(lambda: func1(arg1)) | ||||||||||||
``` | ||||||||||||
""" | ||||||||||||
calling_context = current_context() | ||||||||||||
|
||||||||||||
# Wrap the coroutine in a deferred, which will have the side effect of executing the | ||||||||||||
# coroutine in the background. | ||||||||||||
d = defer.ensureDeferred(coroutine) | ||||||||||||
|
||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To fix the root problem, all we need to do is add this same fix that synapse/synapse/logging/context.py Lines 858 to 862 in 9680804
But instead of duplicating all of this specialty logic and context into See #18900 (comment) for more information on how this shortcut and the logcontext logic works for Related conversation where I asked why we even have |
||||||||||||
# The function may have reset the context before returning, so we need to restore it | ||||||||||||
# now. | ||||||||||||
# | ||||||||||||
# Our goal is to have the caller logcontext unchanged after firing off the | ||||||||||||
# background task and returning. | ||||||||||||
set_current_context(calling_context) | ||||||||||||
|
||||||||||||
# The original logcontext will be restored when the deferred completes, but | ||||||||||||
# there is nothing waiting for it, so it will get leaked into the reactor (which | ||||||||||||
# would then get picked up by the next thing the reactor does). We therefore | ||||||||||||
# need to reset the logcontext here (set the `sentinel` logcontext) before | ||||||||||||
# yielding control back to the reactor. | ||||||||||||
# | ||||||||||||
# (If this feels asymmetric, consider it this way: we are | ||||||||||||
# effectively forking a new thread of execution. We are | ||||||||||||
# probably currently within a ``with LoggingContext()`` block, | ||||||||||||
# which is supposed to have a single entry and exit point. But | ||||||||||||
# by spawning off another deferred, we are effectively | ||||||||||||
# adding a new exit point.) | ||||||||||||
d.addBoth(_set_context_cb, SENTINEL_CONTEXT) | ||||||||||||
return d | ||||||||||||
return run_in_background(lambda: coroutine) | ||||||||||||
|
||||||||||||
|
||||||||||||
T = TypeVar("T") | ||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,6 @@ | |
import logging | ||
from typing import Callable, Generator, cast | ||
|
||
import twisted.python.failure | ||
from twisted.internet import defer, reactor as _reactor | ||
|
||
from synapse.logging.context import ( | ||
|
@@ -33,6 +32,7 @@ | |
current_context, | ||
make_deferred_yieldable, | ||
nested_logging_context, | ||
run_coroutine_in_background, | ||
run_in_background, | ||
) | ||
from synapse.types import ISynapseReactor | ||
|
@@ -249,88 +249,192 @@ async def competing_callback() -> None: | |
# Back to the sentinel context | ||
self._check_test_key("sentinel") | ||
|
||
def _test_run_in_background(self, function: Callable[[], object]) -> defer.Deferred: | ||
sentinel_context = current_context() | ||
async def _test_run_in_background(self, function: Callable[[], object]) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
clock = Clock(reactor) | ||
|
||
# Sanity check that we start in the sentinel context | ||
self._check_test_key("sentinel") | ||
|
||
callback_completed = False | ||
callback_finished = False | ||
|
||
with LoggingContext("foo"): | ||
# fire off function, but don't wait on it. | ||
d2 = run_in_background(function) | ||
# Fire off the function, but don't wait on it. | ||
deferred = run_in_background(function) | ||
self._check_test_key("foo") | ||
|
||
def cb(res: object) -> object: | ||
nonlocal callback_completed | ||
callback_completed = True | ||
return res | ||
def callback(result: object) -> object: | ||
nonlocal callback_finished | ||
callback_finished = True | ||
# Pass through the result | ||
return result | ||
|
||
d2.addCallback(cb) | ||
# We `addBoth` because when exceptions happen, we still want to mark the | ||
# callback as finished so that the test can complete and we see the | ||
# underlying error. | ||
deferred.addBoth(callback) | ||
|
||
self._check_test_key("foo") | ||
|
||
# now wait for the function under test to have run, and check that | ||
# the logcontext is left in a sane state. | ||
d2 = defer.Deferred() | ||
|
||
def check_logcontext() -> None: | ||
if not callback_completed: | ||
reactor.callLater(0.01, check_logcontext) | ||
return | ||
# Now wait for the function under test to have run, and check that | ||
# the logcontext is left in a sane state. | ||
while not callback_finished: | ||
await clock.sleep(0) | ||
self._check_test_key("foo") | ||
|
||
# make sure that the context was reset before it got thrown back | ||
# into the reactor | ||
try: | ||
self.assertIs(current_context(), sentinel_context) | ||
d2.callback(None) | ||
except BaseException: | ||
d2.errback(twisted.python.failure.Failure()) | ||
|
||
reactor.callLater(0.01, check_logcontext) | ||
self.assertTrue( | ||
callback_finished, | ||
"Callback never finished which means the test probably didn't wait long enough", | ||
) | ||
|
||
# test is done once d2 finishes | ||
return d2 | ||
# Back to the sentinel context | ||
self._check_test_key("sentinel") | ||
|
||
@logcontext_clean | ||
def test_run_in_background_with_blocking_fn(self) -> defer.Deferred: | ||
async def test_run_in_background_with_blocking_fn(self) -> None: | ||
async def blocking_function() -> None: | ||
await Clock(reactor).sleep(0) | ||
|
||
return self._test_run_in_background(blocking_function) | ||
await self._test_run_in_background(blocking_function) | ||
|
||
@logcontext_clean | ||
def test_run_in_background_with_non_blocking_fn(self) -> defer.Deferred: | ||
async def test_run_in_background_with_non_blocking_fn(self) -> None: | ||
@defer.inlineCallbacks | ||
def nonblocking_function() -> Generator["defer.Deferred[object]", object, None]: | ||
with PreserveLoggingContext(): | ||
yield defer.succeed(None) | ||
|
||
return self._test_run_in_background(nonblocking_function) | ||
await self._test_run_in_background(nonblocking_function) | ||
|
||
@logcontext_clean | ||
def test_run_in_background_with_chained_deferred(self) -> defer.Deferred: | ||
async def test_run_in_background_with_chained_deferred(self) -> None: | ||
# a function which returns a deferred which looks like it has been | ||
# called, but is actually paused | ||
def testfunc() -> defer.Deferred: | ||
return make_deferred_yieldable(_chained_deferred_function()) | ||
|
||
return self._test_run_in_background(testfunc) | ||
await self._test_run_in_background(testfunc) | ||
|
||
@logcontext_clean | ||
def test_run_in_background_with_coroutine(self) -> defer.Deferred: | ||
async def test_run_in_background_with_coroutine(self) -> None: | ||
""" | ||
Test `run_in_background` with a coroutine that yields control back to the | ||
reactor. | ||
|
||
This will stress the logic around incomplete deferreds in `run_in_background`. | ||
""" | ||
|
||
async def testfunc() -> None: | ||
self._check_test_key("foo") | ||
d = defer.ensureDeferred(Clock(reactor).sleep(0)) | ||
self.assertIs(current_context(), SENTINEL_CONTEXT) | ||
await d | ||
self._check_test_key("foo") | ||
|
||
return self._test_run_in_background(testfunc) | ||
await self._test_run_in_background(testfunc) | ||
|
||
@logcontext_clean | ||
def test_run_in_background_with_nonblocking_coroutine(self) -> defer.Deferred: | ||
async def test_run_in_background_with_nonblocking_coroutine(self) -> None: | ||
""" | ||
Test `run_in_background` with a "nonblocking" coroutine (never yields control | ||
back to the reactor). | ||
|
||
This will stress the logic around completed deferreds in `run_in_background`. | ||
""" | ||
|
||
async def testfunc() -> None: | ||
self._check_test_key("foo") | ||
|
||
return self._test_run_in_background(testfunc) | ||
await self._test_run_in_background(testfunc) | ||
|
||
@logcontext_clean | ||
async def test_run_coroutine_in_background(self) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. New We could use a similar pattern to |
||
""" | ||
Test `run_coroutine_in_background` with a coroutine that yields control back to the | ||
reactor. | ||
|
||
This will stress the logic around incomplete deferreds in `run_coroutine_in_background`. | ||
""" | ||
clock = Clock(reactor) | ||
|
||
# Sanity check that we start in the sentinel context | ||
self._check_test_key("sentinel") | ||
|
||
callback_finished = False | ||
|
||
async def competing_callback() -> None: | ||
nonlocal callback_finished | ||
try: | ||
# The callback should have the same logcontext as the caller | ||
self._check_test_key("foo") | ||
|
||
with LoggingContext("competing"): | ||
await clock.sleep(0) | ||
self._check_test_key("competing") | ||
|
||
self._check_test_key("foo") | ||
finally: | ||
# When exceptions happen, we still want to mark the callback as finished | ||
# so that the test can complete and we see the underlying error. | ||
callback_finished = True | ||
|
||
with LoggingContext("foo"): | ||
run_coroutine_in_background(competing_callback()) | ||
self._check_test_key("foo") | ||
await clock.sleep(0) | ||
self._check_test_key("foo") | ||
|
||
self.assertTrue( | ||
callback_finished, | ||
"Callback never finished which means the test probably didn't wait long enough", | ||
) | ||
|
||
# Back to the sentinel context | ||
self._check_test_key("sentinel") | ||
|
||
@logcontext_clean | ||
async def test_run_coroutine_in_background_with_nonblocking_coroutine(self) -> None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not in love with the "nonblocking" terminology used here but I've aligned with the prior art here ( |
||
""" | ||
Test `run_coroutine_in_background` with a "nonblocking" coroutine (never yields control | ||
back to the reactor). | ||
|
||
This will stress the logic around completed deferreds in `run_coroutine_in_background`. | ||
""" | ||
# Sanity check that we start in the sentinel context | ||
self._check_test_key("sentinel") | ||
|
||
callback_finished = False | ||
|
||
async def competing_callback() -> None: | ||
nonlocal callback_finished | ||
try: | ||
# The callback should have the same logcontext as the caller | ||
self._check_test_key("foo") | ||
|
||
with LoggingContext("competing"): | ||
# We `await` here but there is nothing to wait for here since the | ||
# deferred is already complete so we should immediately continue | ||
# executing in the same context. | ||
await defer.succeed(None) | ||
|
||
self._check_test_key("competing") | ||
|
||
self._check_test_key("foo") | ||
finally: | ||
# When exceptions happen, we still want to mark the callback as finished | ||
# so that the test can complete and we see the underlying error. | ||
callback_finished = True | ||
|
||
with LoggingContext("foo"): | ||
run_coroutine_in_background(competing_callback()) | ||
self._check_test_key("foo") | ||
|
||
self.assertTrue( | ||
callback_finished, | ||
"Callback never finished which means the test probably didn't wait long enough", | ||
) | ||
|
||
# Back to the sentinel context | ||
self._check_test_key("sentinel") | ||
|
||
@logcontext_clean | ||
@defer.inlineCallbacks | ||
|
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given this technically regressed in #18900 which is part of
1.139.0rc1
, we should land another RC with this PR(The PR description here at the top explains the regression)
cc @anoadragon453