Remove redundant run_as_background_process() from pusherpool

`on_new_notifications` and `on_new_receipts` in `HttpPusher` and `EmailPusher`
now always return synchronously, so we can remove the `defer.gatherResults` on
their results, and the `run_as_background_process` wrappers can be removed too
because the PusherPool methods will now complete quickly enough.
This commit is contained in:
Richard van der Hoff 2018-10-22 16:12:11 +01:00
parent c7273c11bc
commit e7a16c6210
7 changed files with 12 additions and 48 deletions

View File

@ -161,11 +161,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
else: else:
yield self.start_pusher(row.user_id, row.app_id, row.pushkey) yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
elif stream_name == "events": elif stream_name == "events":
self.pusher_pool.on_new_notifications( yield self.pusher_pool.on_new_notifications(
token, token, token, token,
) )
elif stream_name == "receipts": elif stream_name == "receipts":
self.pusher_pool.on_new_receipts( yield self.pusher_pool.on_new_receipts(
token, token, set(row.room_id for row in rows) token, token, set(row.room_id for row in rows)
) )
except Exception: except Exception:

View File

@ -2520,7 +2520,7 @@ class FederationHandler(BaseHandler):
if not backfilled: # Never notify for backfilled events if not backfilled: # Never notify for backfilled events
for event, _ in event_and_contexts: for event, _ in event_and_contexts:
self._notify_persisted_event(event, max_stream_id) yield self._notify_persisted_event(event, max_stream_id)
def _notify_persisted_event(self, event, max_stream_id): def _notify_persisted_event(self, event, max_stream_id):
"""Checks to see if notifier/pushers should be notified about the """Checks to see if notifier/pushers should be notified about the
@ -2553,7 +2553,7 @@ class FederationHandler(BaseHandler):
extra_users=extra_users extra_users=extra_users
) )
self.pusher_pool.on_new_notifications( return self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id, event_stream_id, max_stream_id,
) )

View File

@ -779,7 +779,7 @@ class EventCreationHandler(object):
event, context=context event, context=context
) )
self.pusher_pool.on_new_notifications( yield self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id, event_stream_id, max_stream_id,
) )

View File

@ -119,7 +119,7 @@ class ReceiptsHandler(BaseHandler):
"receipt_key", max_batch_id, rooms=affected_room_ids "receipt_key", max_batch_id, rooms=affected_room_ids
) )
# Note that the min here shouldn't be relied upon to be accurate. # Note that the min here shouldn't be relied upon to be accurate.
self.hs.get_pusherpool().on_new_receipts( yield self.hs.get_pusherpool().on_new_receipts(
min_batch_id, max_batch_id, affected_room_ids, min_batch_id, max_batch_id, affected_room_ids,
) )

View File

@ -94,13 +94,12 @@ class EmailPusher(object):
def on_new_notifications(self, min_stream_ordering, max_stream_ordering): def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering) self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
self._start_processing() self._start_processing()
return defer.succeed(None)
def on_new_receipts(self, min_stream_id, max_stream_id): def on_new_receipts(self, min_stream_id, max_stream_id):
# We could wake up and cancel the timer but there tend to be quite a # We could wake up and cancel the timer but there tend to be quite a
# lot of read receipts so it's probably less work to just let the # lot of read receipts so it's probably less work to just let the
# timer fire # timer fire
return defer.succeed(None) pass
def on_timer(self): def on_timer(self):
self.timed_call = None self.timed_call = None

View File

@ -98,7 +98,6 @@ class HttpPusher(object):
def on_new_notifications(self, min_stream_ordering, max_stream_ordering): def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0) self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)
self._start_processing() self._start_processing()
return defer.suceed(None)
def on_new_receipts(self, min_stream_id, max_stream_id): def on_new_receipts(self, min_stream_id, max_stream_id):
# Note that the min here shouldn't be relied upon to be accurate. # Note that the min here shouldn't be relied upon to be accurate.
@ -106,7 +105,6 @@ class HttpPusher(object):
# We could check the receipts are actually m.read receipts here, # We could check the receipts are actually m.read receipts here,
# but currently that's the only type of receipt anyway... # but currently that's the only type of receipt anyway...
run_as_background_process("http_pusher.on_new_receipts", self._update_badge) run_as_background_process("http_pusher.on_new_receipts", self._update_badge)
return defer.succeed(None)
@defer.inlineCallbacks @defer.inlineCallbacks
def _update_badge(self): def _update_badge(self):

View File

@ -18,9 +18,8 @@ import logging
from twisted.internet import defer from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push.pusher import PusherFactory from synapse.push.pusher import PusherFactory
from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.logcontext import run_in_background
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -122,45 +121,23 @@ class PusherPool:
p['app_id'], p['pushkey'], p['user_name'], p['app_id'], p['pushkey'], p['user_name'],
) )
def on_new_notifications(self, min_stream_id, max_stream_id):
run_as_background_process(
"on_new_notifications",
self._on_new_notifications, min_stream_id, max_stream_id,
)
@defer.inlineCallbacks @defer.inlineCallbacks
def _on_new_notifications(self, min_stream_id, max_stream_id): def on_new_notifications(self, min_stream_id, max_stream_id):
try: try:
users_affected = yield self.store.get_push_action_users_in_range( users_affected = yield self.store.get_push_action_users_in_range(
min_stream_id, max_stream_id min_stream_id, max_stream_id
) )
deferreds = []
for u in users_affected: for u in users_affected:
if u in self.pushers: if u in self.pushers:
for p in self.pushers[u].values(): for p in self.pushers[u].values():
deferreds.append( p.on_new_notifications(min_stream_id, max_stream_id)
run_in_background(
p.on_new_notifications,
min_stream_id, max_stream_id,
)
)
yield make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True),
)
except Exception: except Exception:
logger.exception("Exception in pusher on_new_notifications") logger.exception("Exception in pusher on_new_notifications")
def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
run_as_background_process(
"on_new_receipts",
self._on_new_receipts, min_stream_id, max_stream_id, affected_room_ids,
)
@defer.inlineCallbacks @defer.inlineCallbacks
def _on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids): def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
try: try:
# Need to subtract 1 from the minimum because the lower bound here # Need to subtract 1 from the minimum because the lower bound here
# is not inclusive # is not inclusive
@ -170,21 +147,11 @@ class PusherPool:
# This returns a tuple, user_id is at index 3 # This returns a tuple, user_id is at index 3
users_affected = set([r[3] for r in updated_receipts]) users_affected = set([r[3] for r in updated_receipts])
deferreds = []
for u in users_affected: for u in users_affected:
if u in self.pushers: if u in self.pushers:
for p in self.pushers[u].values(): for p in self.pushers[u].values():
deferreds.append( p.on_new_receipts(min_stream_id, max_stream_id)
run_in_background(
p.on_new_receipts,
min_stream_id, max_stream_id,
)
)
yield make_deferred_yieldable(
defer.gatherResults(deferreds, consumeErrors=True),
)
except Exception: except Exception:
logger.exception("Exception in pusher on_new_receipts") logger.exception("Exception in pusher on_new_receipts")