Merge pull request #2027 from matrix-org/rav/logcontext_leaks

A few fixes to logcontext things
This commit is contained in:
Richard van der Hoff 2017-03-20 11:53:36 +00:00 committed by GitHub
commit eddce9d74a
12 changed files with 147 additions and 62 deletions

View File

@ -29,7 +29,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
from synapse.util.async import sleep from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
@ -187,7 +187,11 @@ def start(config_options):
ps.start_listening(config.worker_listeners) ps.start_listening(config.worker_listeners)
def run(): def run():
with LoggingContext("run"): # make sure that we run the reactor with the sentinel log context,
# otherwise other PreserveLoggingContext instances will get confused
# and complain when they see the logcontext arbitrarily swapping
# between the sentinel and `run` logcontexts.
with PreserveLoggingContext():
logger.info("Running") logger.info("Running")
change_resource_limit(config.soft_file_limit) change_resource_limit(config.soft_file_limit)
if config.gc_thresholds: if config.gc_thresholds:

View File

@ -35,7 +35,7 @@ from synapse.storage.client_ips import ClientIpStore
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
from synapse.util.async import sleep from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
@ -193,7 +193,11 @@ def start(config_options):
ss.start_listening(config.worker_listeners) ss.start_listening(config.worker_listeners)
def run(): def run():
with LoggingContext("run"): # make sure that we run the reactor with the sentinel log context,
# otherwise other PreserveLoggingContext instances will get confused
# and complain when they see the logcontext arbitrarily swapping
# between the sentinel and `run` logcontexts.
with PreserveLoggingContext():
logger.info("Running") logger.info("Running")
change_resource_limit(config.soft_file_limit) change_resource_limit(config.soft_file_limit)
if config.gc_thresholds: if config.gc_thresholds:

View File

@ -31,7 +31,7 @@ from synapse.server import HomeServer
from synapse.storage.engines import create_engine from synapse.storage.engines import create_engine
from synapse.util.async import sleep from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
@ -184,7 +184,11 @@ def start(config_options):
ss.start_listening(config.worker_listeners) ss.start_listening(config.worker_listeners)
def run(): def run():
with LoggingContext("run"): # make sure that we run the reactor with the sentinel log context,
# otherwise other PreserveLoggingContext instances will get confused
# and complain when they see the logcontext arbitrarily swapping
# between the sentinel and `run` logcontexts.
with PreserveLoggingContext():
logger.info("Running") logger.info("Running")
change_resource_limit(config.soft_file_limit) change_resource_limit(config.soft_file_limit)
if config.gc_thresholds: if config.gc_thresholds:

View File

@ -35,7 +35,7 @@ from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState from synapse.storage.presence import UserPresenceState
from synapse.util.async import sleep from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
@ -193,7 +193,11 @@ def start(config_options):
ps.start_listening(config.worker_listeners) ps.start_listening(config.worker_listeners)
def run(): def run():
with LoggingContext("run"): # make sure that we run the reactor with the sentinel log context,
# otherwise other PreserveLoggingContext instances will get confused
# and complain when they see the logcontext arbitrarily swapping
# between the sentinel and `run` logcontexts.
with PreserveLoggingContext():
logger.info("Running") logger.info("Running")
change_resource_limit(config.soft_file_limit) change_resource_limit(config.soft_file_limit)
if config.gc_thresholds: if config.gc_thresholds:

View File

@ -52,7 +52,7 @@ from synapse.api.urls import (
) )
from synapse.config.homeserver import HomeServerConfig from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory from synapse.crypto import context_factory
from synapse.util.logcontext import LoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.metrics import register_memory_metrics, get_metrics_for from synapse.metrics import register_memory_metrics, get_metrics_for
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX
@ -456,7 +456,12 @@ def run(hs):
def in_thread(): def in_thread():
# Uncomment to enable tracing of log context changes. # Uncomment to enable tracing of log context changes.
# sys.settrace(logcontext_tracer) # sys.settrace(logcontext_tracer)
with LoggingContext("run"):
# make sure that we run the reactor with the sentinel log context,
# otherwise other PreserveLoggingContext instances will get confused
# and complain when they see the logcontext arbitrarily swapping
# between the sentinel and `run` logcontexts.
with PreserveLoggingContext():
change_resource_limit(hs.config.soft_file_limit) change_resource_limit(hs.config.soft_file_limit)
if hs.config.gc_thresholds: if hs.config.gc_thresholds:
gc.set_threshold(*hs.config.gc_thresholds) gc.set_threshold(*hs.config.gc_thresholds)

View File

@ -32,7 +32,7 @@ from synapse.storage.engines import create_engine
from synapse.storage.media_repository import MediaRepositoryStore from synapse.storage.media_repository import MediaRepositoryStore
from synapse.util.async import sleep from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
@ -190,7 +190,11 @@ def start(config_options):
ss.start_listening(config.worker_listeners) ss.start_listening(config.worker_listeners)
def run(): def run():
with LoggingContext("run"): # make sure that we run the reactor with the sentinel log context,
# otherwise other PreserveLoggingContext instances will get confused
# and complain when they see the logcontext arbitrarily swapping
# between the sentinel and `run` logcontexts.
with PreserveLoggingContext():
logger.info("Running") logger.info("Running")
change_resource_limit(config.soft_file_limit) change_resource_limit(config.soft_file_limit)
if config.gc_thresholds: if config.gc_thresholds:

View File

@ -31,7 +31,8 @@ from synapse.storage.engines import create_engine
from synapse.storage import DataStore from synapse.storage import DataStore
from synapse.util.async import sleep from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.logcontext import LoggingContext, preserve_fn, \
PreserveLoggingContext
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
@ -275,7 +276,11 @@ def start(config_options):
ps.start_listening(config.worker_listeners) ps.start_listening(config.worker_listeners)
def run(): def run():
with LoggingContext("run"): # make sure that we run the reactor with the sentinel log context,
# otherwise other PreserveLoggingContext instances will get confused
# and complain when they see the logcontext arbitrarily swapping
# between the sentinel and `run` logcontexts.
with PreserveLoggingContext():
logger.info("Running") logger.info("Running")
change_resource_limit(config.soft_file_limit) change_resource_limit(config.soft_file_limit)
if config.gc_thresholds: if config.gc_thresholds:

View File

@ -48,7 +48,8 @@ from synapse.storage.presence import PresenceStore, UserPresenceState
from synapse.storage.roommember import RoomMemberStore from synapse.storage.roommember import RoomMemberStore
from synapse.util.async import sleep from synapse.util.async import sleep
from synapse.util.httpresourcetree import create_resource_tree from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn from synapse.util.logcontext import LoggingContext, preserve_fn, \
PreserveLoggingContext
from synapse.util.manhole import manhole from synapse.util.manhole import manhole
from synapse.util.rlimit import change_resource_limit from synapse.util.rlimit import change_resource_limit
from synapse.util.stringutils import random_string from synapse.util.stringutils import random_string
@ -496,7 +497,11 @@ def start(config_options):
ss.start_listening(config.worker_listeners) ss.start_listening(config.worker_listeners)
def run(): def run():
with LoggingContext("run"): # make sure that we run the reactor with the sentinel log context,
# otherwise other PreserveLoggingContext instances will get confused
# and complain when they see the logcontext arbitrarily swapping
# between the sentinel and `run` logcontexts.
with PreserveLoggingContext():
logger.info("Running") logger.info("Running")
change_resource_limit(config.soft_file_limit) change_resource_limit(config.soft_file_limit)
if config.gc_thresholds: if config.gc_thresholds:

View File

@ -933,8 +933,9 @@ class FederationHandler(BaseHandler):
# lots of requests for missing prev_events which we do actually # lots of requests for missing prev_events which we do actually
# have. Hence we fire off the deferred, but don't wait for it. # have. Hence we fire off the deferred, but don't wait for it.
synapse.util.logcontext.reset_context_after_deferred( synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)(
self._handle_queued_pdus(room_queue)) room_queue
)
defer.returnValue(True) defer.returnValue(True)

View File

@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import synapse.util.async
from ._base import SQLBaseStore from ._base import SQLBaseStore
from . import engines from . import engines
@ -84,24 +85,14 @@ class BackgroundUpdateStore(SQLBaseStore):
self._background_update_performance = {} self._background_update_performance = {}
self._background_update_queue = [] self._background_update_queue = []
self._background_update_handlers = {} self._background_update_handlers = {}
self._background_update_timer = None
@defer.inlineCallbacks @defer.inlineCallbacks
def start_doing_background_updates(self): def start_doing_background_updates(self):
assert self._background_update_timer is None, \
"background updates already running"
logger.info("Starting background schema updates") logger.info("Starting background schema updates")
while True: while True:
sleep = defer.Deferred() yield synapse.util.async.sleep(
self._background_update_timer = self._clock.call_later( self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None
)
try:
yield sleep
finally:
self._background_update_timer = None
try: try:
result = yield self.do_next_background_update( result = yield self.do_next_background_update(

View File

@ -308,46 +308,43 @@ def preserve_context_over_deferred(deferred, context=None):
return d return d
def reset_context_after_deferred(deferred): def preserve_fn(f):
"""If the deferred is incomplete, add a callback which will reset the """Wraps a function, to ensure that the current context is restored after
context. return from the function, and that the sentinel context is set once the
deferred returned by the funtion completes.
This is useful when you want to fire off a deferred, but don't want to Useful for wrapping functions that return a deferred which you don't yield
wait for it to complete. (The deferred will restore the current log context on.
when it completes, so if you don't do anything, it will leak log context.)
(If this feels asymmetric, consider it this way: we are effectively forking
a new thread of execution. We are probably currently within a
``with LoggingContext()`` block, which is supposed to have a single entry
and exit point. But by spawning off another deferred, we are effectively
adding a new exit point.)
Args:
deferred (defer.Deferred): deferred
""" """
def reset_context(result): def reset_context(result):
LoggingContext.set_current_context(LoggingContext.sentinel) LoggingContext.set_current_context(LoggingContext.sentinel)
return result return result
if not deferred.called: # XXX: why is this here rather than inside g? surely we want to preserve
deferred.addBoth(reset_context) # the context from the time the function was called, not when it was
# wrapped?
def preserve_fn(f):
"""Ensures that function is called with correct context and that context is
restored after return. Useful for wrapping functions that return a deferred
which you don't yield on.
"""
current = LoggingContext.current_context() current = LoggingContext.current_context()
def g(*args, **kwargs): def g(*args, **kwargs):
with PreserveLoggingContext(current):
res = f(*args, **kwargs) res = f(*args, **kwargs)
if isinstance(res, defer.Deferred): if isinstance(res, defer.Deferred) and not res.called:
return preserve_context_over_deferred( # The function will have reset the context before returning, so
res, context=LoggingContext.sentinel # we need to restore it now.
) LoggingContext.set_current_context(current)
else:
# The original context will be restored when the deferred
# completes, but there is nothing waiting for it, so it will
# get leaked into the reactor or some other function which
# wasn't expecting it. We therefore need to reset the context
# here.
#
# (If this feels asymmetric, consider it this way: we are
# effectively forking a new thread of execution. We are
# probably currently within a ``with LoggingContext()`` block,
# which is supposed to have a single entry and exit point. But
# by spawning off another deferred, we are effectively
# adding a new exit point.)
res.addBoth(reset_context)
return res return res
return g return g

View File

@ -1,8 +1,10 @@
import twisted.python.failure
from twisted.internet import defer from twisted.internet import defer
from twisted.internet import reactor from twisted.internet import reactor
from .. import unittest from .. import unittest
from synapse.util.async import sleep from synapse.util.async import sleep
from synapse.util import logcontext
from synapse.util.logcontext import LoggingContext from synapse.util.logcontext import LoggingContext
@ -33,3 +35,62 @@ class LoggingContextTestCase(unittest.TestCase):
context_one.test_key = "one" context_one.test_key = "one"
yield sleep(0) yield sleep(0)
self._check_test_key("one") self._check_test_key("one")
def _test_preserve_fn(self, function):
sentinel_context = LoggingContext.current_context()
callback_completed = [False]
@defer.inlineCallbacks
def cb():
context_one.test_key = "one"
yield function()
self._check_test_key("one")
callback_completed[0] = True
with LoggingContext() as context_one:
context_one.test_key = "one"
# fire off function, but don't wait on it.
logcontext.preserve_fn(cb)()
self._check_test_key("one")
# now wait for the function under test to have run, and check that
# the logcontext is left in a sane state.
d2 = defer.Deferred()
def check_logcontext():
if not callback_completed[0]:
reactor.callLater(0.01, check_logcontext)
return
# make sure that the context was reset before it got thrown back
# into the reactor
try:
self.assertIs(LoggingContext.current_context(),
sentinel_context)
d2.callback(None)
except BaseException:
d2.errback(twisted.python.failure.Failure())
reactor.callLater(0.01, check_logcontext)
# test is done once d2 finishes
return d2
def test_preserve_fn_with_blocking_fn(self):
@defer.inlineCallbacks
def blocking_function():
yield sleep(0)
return self._test_preserve_fn(blocking_function)
def test_preserve_fn_with_non_blocking_fn(self):
@defer.inlineCallbacks
def nonblocking_function():
with logcontext.PreserveLoggingContext():
yield defer.succeed(None)
return self._test_preserve_fn(nonblocking_function)