Fix a class of logcontext leaks

So, it turns out that if you have a first `Deferred` `D1`, you can add a
callback which returns another `Deferred` `D2`, and `D2` must then complete
before any further callbacks on `D1` will execute (and later callbacks on `D1`
get the *result* of `D2` rather than `D2` itself).

So, `D1` might have `called=True` (as in, it has started running its
callbacks), but any new callbacks added to `D1` won't get run until `D2`
completes - so if you `yield D1` in an `inlineCallbacks` function, your `yield`
will 'block'.

In conclusion: some of our assumptions in `logcontext` were invalid. We need to
make sure that we don't optimise out the logcontext juggling when this
situation happens. Fortunately, it is easy to detect by checking `D1.paused`.
This commit is contained in:
Richard van der Hoff 2018-05-02 11:46:23 +01:00
parent ca7211104e
commit f22e7cda2c
2 changed files with 93 additions and 32 deletions

View File

@ -302,7 +302,7 @@ def preserve_fn(f):
def run_in_background(f, *args, **kwargs): def run_in_background(f, *args, **kwargs):
"""Calls a function, ensuring that the current context is restored after """Calls a function, ensuring that the current context is restored after
return from the function, and that the sentinel context is set once the return from the function, and that the sentinel context is set once the
deferred returned by the funtion completes. deferred returned by the function completes.
Useful for wrapping functions that return a deferred which you don't yield Useful for wrapping functions that return a deferred which you don't yield
on (for instance because you want to pass it to deferred.gatherResults()). on (for instance because you want to pass it to deferred.gatherResults()).
@ -320,10 +320,17 @@ def run_in_background(f, *args, **kwargs):
# by synchronous exceptions, so let's turn them into Failures. # by synchronous exceptions, so let's turn them into Failures.
return defer.fail() return defer.fail()
if isinstance(res, defer.Deferred) and not res.called: if not isinstance(res, defer.Deferred):
# The function will have reset the context before returning, so return res
if res.called and not res.paused:
# The function should have maintained the logcontext, so we can
# optimise out the messing about
return res
# The function may have reset the context before returning, so
# we need to restore it now. # we need to restore it now.
LoggingContext.set_current_context(current) ctx = LoggingContext.set_current_context(current)
# The original context will be restored when the deferred # The original context will be restored when the deferred
# completes, but there is nothing waiting for it, so it will # completes, but there is nothing waiting for it, so it will
@ -337,7 +344,7 @@ def run_in_background(f, *args, **kwargs):
# which is supposed to have a single entry and exit point. But # which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively # by spawning off another deferred, we are effectively
# adding a new exit point.) # adding a new exit point.)
res.addBoth(_set_context_cb, LoggingContext.sentinel) res.addBoth(_set_context_cb, ctx)
return res return res
@ -354,7 +361,16 @@ def make_deferred_yieldable(deferred):
(This is more-or-less the opposite operation to run_in_background.) (This is more-or-less the opposite operation to run_in_background.)
""" """
if isinstance(deferred, defer.Deferred) and not deferred.called: if not isinstance(deferred, defer.Deferred):
return deferred
if deferred.called and not deferred.paused:
# it looks like this deferred is ready to run any callbacks we give it
# immediately. We may as well optimise out the logcontext faffery.
return deferred
# ok, we can't be sure that a yield won't block, so let's reset the
# logcontext, and add a callback to the deferred to restore it.
prev_context = LoggingContext.set_current_context(LoggingContext.sentinel) prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)
deferred.addBoth(_set_context_cb, prev_context) deferred.addBoth(_set_context_cb, prev_context)
return deferred return deferred

View File

@ -36,24 +36,28 @@ class LoggingContextTestCase(unittest.TestCase):
yield sleep(0) yield sleep(0)
self._check_test_key("one") self._check_test_key("one")
def _test_preserve_fn(self, function): def _test_run_in_background(self, function):
sentinel_context = LoggingContext.current_context() sentinel_context = LoggingContext.current_context()
callback_completed = [False] callback_completed = [False]
@defer.inlineCallbacks def test():
def cb():
context_one.request = "one" context_one.request = "one"
yield function() d = function()
self._check_test_key("one")
def cb(res):
self._check_test_key("one")
callback_completed[0] = True callback_completed[0] = True
return res
d.addCallback(cb)
return d
with LoggingContext() as context_one: with LoggingContext() as context_one:
context_one.request = "one" context_one.request = "one"
# fire off function, but don't wait on it. # fire off function, but don't wait on it.
logcontext.preserve_fn(cb)() logcontext.run_in_background(test)
self._check_test_key("one") self._check_test_key("one")
@ -80,20 +84,31 @@ class LoggingContextTestCase(unittest.TestCase):
# test is done once d2 finishes # test is done once d2 finishes
return d2 return d2
def test_preserve_fn_with_blocking_fn(self): def test_run_in_background_with_blocking_fn(self):
@defer.inlineCallbacks @defer.inlineCallbacks
def blocking_function(): def blocking_function():
yield sleep(0) yield sleep(0)
return self._test_preserve_fn(blocking_function) return self._test_run_in_background(blocking_function)
def test_preserve_fn_with_non_blocking_fn(self): def test_run_in_background_with_non_blocking_fn(self):
@defer.inlineCallbacks @defer.inlineCallbacks
def nonblocking_function(): def nonblocking_function():
with logcontext.PreserveLoggingContext(): with logcontext.PreserveLoggingContext():
yield defer.succeed(None) yield defer.succeed(None)
return self._test_preserve_fn(nonblocking_function) return self._test_run_in_background(nonblocking_function)
@unittest.DEBUG
def test_run_in_background_with_chained_deferred(self):
# a function which returns a deferred which looks like it has been
# called, but is actually paused
def testfunc():
return logcontext.make_deferred_yieldable(
_chained_deferred_function()
)
return self._test_run_in_background(testfunc)
@defer.inlineCallbacks @defer.inlineCallbacks
def test_make_deferred_yieldable(self): def test_make_deferred_yieldable(self):
@ -118,6 +133,22 @@ class LoggingContextTestCase(unittest.TestCase):
# now it should be restored # now it should be restored
self._check_test_key("one") self._check_test_key("one")
@defer.inlineCallbacks
def test_make_deferred_yieldable_with_chained_deferreds(self):
sentinel_context = LoggingContext.current_context()
with LoggingContext() as context_one:
context_one.request = "one"
d1 = logcontext.make_deferred_yieldable(_chained_deferred_function())
# make sure that the context was reset by make_deferred_yieldable
self.assertIs(LoggingContext.current_context(), sentinel_context)
yield d1
# now it should be restored
self._check_test_key("one")
@defer.inlineCallbacks @defer.inlineCallbacks
def test_make_deferred_yieldable_on_non_deferred(self): def test_make_deferred_yieldable_on_non_deferred(self):
"""Check that make_deferred_yieldable does the right thing when its """Check that make_deferred_yieldable does the right thing when its
@ -132,3 +163,17 @@ class LoggingContextTestCase(unittest.TestCase):
r = yield d1 r = yield d1
self.assertEqual(r, "bum") self.assertEqual(r, "bum")
self._check_test_key("one") self._check_test_key("one")
# a function which returns a deferred which has been "called", but
# which had a function which returned another incomplete deferred on
# its callback list, so won't yet call any other new callbacks.
def _chained_deferred_function():
d = defer.succeed(None)
def cb(res):
d2 = defer.Deferred()
reactor.callLater(0, d2.callback, res)
return d2
d.addCallback(cb)
return d