Merge pull request #3710 from matrix-org/rav/logcontext_for_pusher_updates

Fix logcontexts for running pushers
This commit is contained in:
Richard van der Hoff 2018-08-17 16:21:49 +01:00 committed by GitHub
commit c144252a8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 29 additions and 21 deletions

1
changelog.d/3710.bugfix Normal file
View File

@ -0,0 +1 @@
Fix "Starting db txn 'get_all_updated_receipts' from sentinel context" warning

View File

@ -162,11 +162,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
else: else:
yield self.start_pusher(row.user_id, row.app_id, row.pushkey) yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
elif stream_name == "events": elif stream_name == "events":
yield self.pusher_pool.on_new_notifications( self.pusher_pool.on_new_notifications(
token, token, token, token,
) )
elif stream_name == "receipts": elif stream_name == "receipts":
yield self.pusher_pool.on_new_receipts( self.pusher_pool.on_new_receipts(
token, token, set(row.room_id for row in rows) token, token, set(row.room_id for row in rows)
) )
except Exception: except Exception:

View File

@ -2386,8 +2386,7 @@ class FederationHandler(BaseHandler):
extra_users=extra_users extra_users=extra_users
) )
logcontext.run_in_background( self.pusher_pool.on_new_notifications(
self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id, event_stream_id, max_stream_id,
) )

View File

@ -778,11 +778,8 @@ class EventCreationHandler(object):
event, context=context event, context=context
) )
# this intentionally does not yield: we don't care about the result self.pusher_pool.on_new_notifications(
# and don't need to wait for it. event_stream_id, max_stream_id,
run_in_background(
self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id
) )
def _notify(): def _notify():

View File

@ -18,7 +18,6 @@ from twisted.internet import defer
from synapse.types import get_domain_from_id from synapse.types import get_domain_from_id
from synapse.util import logcontext from synapse.util import logcontext
from synapse.util.logcontext import PreserveLoggingContext
from ._base import BaseHandler from ._base import BaseHandler
@ -116,16 +115,15 @@ class ReceiptsHandler(BaseHandler):
affected_room_ids = list(set([r["room_id"] for r in receipts])) affected_room_ids = list(set([r["room_id"] for r in receipts]))
with PreserveLoggingContext(): self.notifier.on_new_event(
self.notifier.on_new_event( "receipt_key", max_batch_id, rooms=affected_room_ids
"receipt_key", max_batch_id, rooms=affected_room_ids )
) # Note that the min here shouldn't be relied upon to be accurate.
# Note that the min here shouldn't be relied upon to be accurate. self.hs.get_pusherpool().on_new_receipts(
self.hs.get_pusherpool().on_new_receipts( min_batch_id, max_batch_id, affected_room_ids,
min_batch_id, max_batch_id, affected_room_ids )
)
defer.returnValue(True) defer.returnValue(True)
@logcontext.preserve_fn # caller should not yield on this @logcontext.preserve_fn # caller should not yield on this
@defer.inlineCallbacks @defer.inlineCallbacks

View File

@ -18,6 +18,7 @@ import logging
from twisted.internet import defer from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push.pusher import PusherFactory from synapse.push.pusher import PusherFactory
from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.logcontext import make_deferred_yieldable, run_in_background
@ -122,8 +123,14 @@ class PusherPool:
p['app_id'], p['pushkey'], p['user_name'], p['app_id'], p['pushkey'], p['user_name'],
) )
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_id, max_stream_id): def on_new_notifications(self, min_stream_id, max_stream_id):
run_as_background_process(
"on_new_notifications",
self._on_new_notifications, min_stream_id, max_stream_id,
)
@defer.inlineCallbacks
def _on_new_notifications(self, min_stream_id, max_stream_id):
try: try:
users_affected = yield self.store.get_push_action_users_in_range( users_affected = yield self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id min_stream_id, max_stream_id
@ -147,8 +154,14 @@ class PusherPool:
except Exception: except Exception:
logger.exception("Exception in pusher on_new_notifications") logger.exception("Exception in pusher on_new_notifications")
@defer.inlineCallbacks
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids): def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
run_as_background_process(
"on_new_receipts",
self._on_new_receipts, min_stream_id, max_stream_id, affected_room_ids,
)
@defer.inlineCallbacks
def _on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
try: try:
# Need to subtract 1 from the minimum because the lower bound here # Need to subtract 1 from the minimum because the lower bound here
# is not inclusive # is not inclusive