Clear logcontext before starting fed txn queue runner

These processes take a long time compared to the request, so there is lots of
"Entering|Restoring dead context" in the logs. Let's try to shut it up a bit.
This commit is contained in:
Richard van der Hoff 2017-11-28 14:37:11 +00:00
parent 5a4da5bf78
commit d4fb4f7c52
2 changed files with 18 additions and 9 deletions

View File

@ -20,7 +20,7 @@ from .persistence import TransactionActions
from .units import Transaction, Edu from .units import Transaction, Edu
from synapse.api.errors import HttpResponseException from synapse.api.errors import HttpResponseException
from synapse.util import logcontext from synapse.util import logcontext, PreserveLoggingContext
from synapse.util.async import run_on_reactor from synapse.util.async import run_on_reactor
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.util.metrics import measure_func from synapse.util.metrics import measure_func
@ -146,7 +146,6 @@ class TransactionQueue(object):
else: else:
return not destination.startswith("localhost") return not destination.startswith("localhost")
@defer.inlineCallbacks
def notify_new_events(self, current_id): def notify_new_events(self, current_id):
"""This gets called when we have some new events we might want to """This gets called when we have some new events we might want to
send out to other servers. send out to other servers.
@ -156,6 +155,13 @@ class TransactionQueue(object):
if self._is_processing: if self._is_processing:
return return
# fire off a processing loop in the background. It's likely it will
# outlast the current request, so run it in the sentinel logcontext.
with PreserveLoggingContext():
self._process_event_queue_loop()
@defer.inlineCallbacks
def _process_event_queue_loop(self):
try: try:
self._is_processing = True self._is_processing = True
while True: while True:

View File

@ -255,9 +255,7 @@ class Notifier(object):
) )
if self.federation_sender: if self.federation_sender:
preserve_fn(self.federation_sender.notify_new_events)( self.federation_sender.notify_new_events(room_stream_id)
room_stream_id
)
if event.type == EventTypes.Member and event.membership == Membership.JOIN: if event.type == EventTypes.Member and event.membership == Membership.JOIN:
self._user_joined_room(event.state_key, event.room_id) self._user_joined_room(event.state_key, event.room_id)
@ -297,8 +295,7 @@ class Notifier(object):
def on_new_replication_data(self): def on_new_replication_data(self):
"""Used to inform replication listeners that something has happend """Used to inform replication listeners that something has happend
without waking up any of the normal user event streams""" without waking up any of the normal user event streams"""
with PreserveLoggingContext(): self.notify_replication()
self.notify_replication()
@defer.inlineCallbacks @defer.inlineCallbacks
def wait_for_events(self, user_id, timeout, callback, room_ids=None, def wait_for_events(self, user_id, timeout, callback, room_ids=None,
@ -516,8 +513,14 @@ class Notifier(object):
self.replication_deferred = ObservableDeferred(defer.Deferred()) self.replication_deferred = ObservableDeferred(defer.Deferred())
deferred.callback(None) deferred.callback(None)
for cb in self.replication_callbacks: # the callbacks may well outlast the current request, so we run
preserve_fn(cb)() # them in the sentinel logcontext.
#
# (ideally it would be up to the callbacks to know if they were
# starting off background processes and drop the logcontext
# accordingly, but that requires more changes)
for cb in self.replication_callbacks:
cb()
@defer.inlineCallbacks @defer.inlineCallbacks
def wait_for_replication(self, callback, timeout): def wait_for_replication(self, callback, timeout):