Stop preserve_fn leaking context into the reactor

Fix a bug in ``logcontext.preserve_fn`` which made it leak context into the
reactor, and add a test for it.

Also, get rid of ``logcontext.reset_context_after_deferred``, which tried to do
the same thing but had its own, different, set of bugs.
This commit is contained in:
Richard van der Hoff 2017-03-17 20:56:54 +00:00
parent 067b00d49d
commit f40c2db05a
3 changed files with 93 additions and 34 deletions

View File

@ -933,8 +933,9 @@ class FederationHandler(BaseHandler):
# lots of requests for missing prev_events which we do actually # lots of requests for missing prev_events which we do actually
# have. Hence we fire off the deferred, but don't wait for it. # have. Hence we fire off the deferred, but don't wait for it.
synapse.util.logcontext.reset_context_after_deferred( synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)(
self._handle_queued_pdus(room_queue)) room_queue
)
defer.returnValue(True) defer.returnValue(True)

View File

@ -308,46 +308,43 @@ def preserve_context_over_deferred(deferred, context=None):
return d return d
def reset_context_after_deferred(deferred): def preserve_fn(f):
"""If the deferred is incomplete, add a callback which will reset the """Wraps a function, to ensure that the current context is restored after
context. return from the function, and that the sentinel context is set once the
deferred returned by the funtion completes.
This is useful when you want to fire off a deferred, but don't want to Useful for wrapping functions that return a deferred which you don't yield
wait for it to complete. (The deferred will restore the current log context on.
when it completes, so if you don't do anything, it will leak log context.)
(If this feels asymmetric, consider it this way: we are effectively forking
a new thread of execution. We are probably currently within a
``with LoggingContext()`` block, which is supposed to have a single entry
and exit point. But by spawning off another deferred, we are effectively
adding a new exit point.)
Args:
deferred (defer.Deferred): deferred
""" """
def reset_context(result): def reset_context(result):
LoggingContext.set_current_context(LoggingContext.sentinel) LoggingContext.set_current_context(LoggingContext.sentinel)
return result return result
if not deferred.called: # XXX: why is this here rather than inside g? surely we want to preserve
deferred.addBoth(reset_context) # the context from the time the function was called, not when it was
# wrapped?
def preserve_fn(f):
"""Ensures that function is called with correct context and that context is
restored after return. Useful for wrapping functions that return a deferred
which you don't yield on.
"""
current = LoggingContext.current_context() current = LoggingContext.current_context()
def g(*args, **kwargs): def g(*args, **kwargs):
with PreserveLoggingContext(current):
res = f(*args, **kwargs) res = f(*args, **kwargs)
if isinstance(res, defer.Deferred): if isinstance(res, defer.Deferred) and not res.called:
return preserve_context_over_deferred( # The function will have reset the context before returning, so
res, context=LoggingContext.sentinel # we need to restore it now.
) LoggingContext.set_current_context(current)
else:
# The original context will be restored when the deferred
# completes, but there is nothing waiting for it, so it will
# get leaked into the reactor or some other function which
# wasn't expecting it. We therefore need to reset the context
# here.
#
# (If this feels asymmetric, consider it this way: we are
# effectively forking a new thread of execution. We are
# probably currently within a ``with LoggingContext()`` block,
# which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively
# adding a new exit point.)
res.addBoth(reset_context)
return res return res
return g return g

View File

@ -1,8 +1,10 @@
import twisted.python.failure
from twisted.internet import defer from twisted.internet import defer
from twisted.internet import reactor from twisted.internet import reactor
from .. import unittest from .. import unittest
from synapse.util.async import sleep from synapse.util.async import sleep
from synapse.util import logcontext
from synapse.util.logcontext import LoggingContext from synapse.util.logcontext import LoggingContext
@ -33,3 +35,62 @@ class LoggingContextTestCase(unittest.TestCase):
context_one.test_key = "one" context_one.test_key = "one"
yield sleep(0) yield sleep(0)
self._check_test_key("one") self._check_test_key("one")
def _test_preserve_fn(self, function):
sentinel_context = LoggingContext.current_context()
callback_completed = [False]
@defer.inlineCallbacks
def cb():
context_one.test_key = "one"
yield function()
self._check_test_key("one")
callback_completed[0] = True
with LoggingContext() as context_one:
context_one.test_key = "one"
# fire off function, but don't wait on it.
logcontext.preserve_fn(cb)()
self._check_test_key("one")
# now wait for the function under test to have run, and check that
# the logcontext is left in a sane state.
d2 = defer.Deferred()
def check_logcontext():
if not callback_completed[0]:
reactor.callLater(0.01, check_logcontext)
return
# make sure that the context was reset before it got thrown back
# into the reactor
try:
self.assertIs(LoggingContext.current_context(),
sentinel_context)
d2.callback(None)
except BaseException:
d2.errback(twisted.python.failure.Failure())
reactor.callLater(0.01, check_logcontext)
# test is done once d2 finishes
return d2
def test_preserve_fn_with_blocking_fn(self):
@defer.inlineCallbacks
def blocking_function():
yield sleep(0)
return self._test_preserve_fn(blocking_function)
def test_preserve_fn_with_non_blocking_fn(self):
@defer.inlineCallbacks
def nonblocking_function():
with logcontext.PreserveLoggingContext():
yield defer.succeed(None)
return self._test_preserve_fn(nonblocking_function)