diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 935c33970..26b036808 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -80,6 +80,9 @@ class ReceiptsHandler(BaseHandler): def _handle_new_receipts(self, receipts): """Takes a list of receipts, stores them and informs the notifier. """ + min_batch_id = None + max_batch_id = None + for receipt in receipts: room_id = receipt["room_id"] receipt_type = receipt["receipt_type"] @@ -97,10 +100,20 @@ class ReceiptsHandler(BaseHandler): stream_id, max_persisted_id = res - with PreserveLoggingContext(): - self.notifier.on_new_event( - "receipt_key", max_persisted_id, rooms=[room_id] - ) + if min_batch_id is None or stream_id < min_batch_id: + min_batch_id = stream_id + if max_batch_id is None or max_persisted_id > max_batch_id: + max_batch_id = max_persisted_id + + affected_room_ids = list(set([r["room_id"] for r in receipts])) + + with PreserveLoggingContext(): + self.notifier.on_new_event( + "receipt_key", max_batch_id, rooms=affected_room_ids + ) + self.hs.get_pusherpool().on_new_receipts( + min_batch_id, max_batch_id, affected_room_ids + ) defer.returnValue(True) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index d69588564..0d5450bc0 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -76,15 +76,25 @@ class HttpPusher(object): self.data_minus_url.update(self.data) del self.data_minus_url['url'] + @defer.inlineCallbacks def on_started(self): - self._process() + yield self._process() + @defer.inlineCallbacks def on_new_notifications(self, min_stream_ordering, max_stream_ordering): self.max_stream_ordering = max_stream_ordering - self._process() + yield self._process() + @defer.inlineCallbacks + def on_new_receipts(self, min_stream_id, max_stream_id): + # We could check the receipts are actually m.read receipts here, + # but currently that's the only type of receipt anyway... + badge = yield push_tools.get_badge_count(self.hs, self.user_id) + yield self.send_badge(badge) + + @defer.inlineCallbacks def on_timer(self): - self._process() + yield self._process() def on_stop(self): if self.timed_call: @@ -106,22 +116,24 @@ class HttpPusher(object): self.last_stream_ordering, self.clock.time_msec() ) - self.failing_since = None - yield self.store.update_pusher_failing_since( - self.app_id, self.pushkey, self.user_id, - self.failing_since - ) + if self.failing_since: + self.failing_since = None + yield self.store.update_pusher_failing_since( + self.app_id, self.pushkey, self.user_id, + self.failing_since + ) else: - self.failing_since = self.clock.time_msec() - yield self.store.update_pusher_failing_since( - self.app_id, self.pushkey, self.user_id, - self.failing_since - ) + if not self.failing_since: + self.failing_since = self.clock.time_msec() + yield self.store.update_pusher_failing_since( + self.app_id, self.pushkey, self.user_id, + self.failing_since + ) if ( self.failing_since and self.failing_since < - self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER + self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER_MS ): # we really only give up so that if the URL gets # fixed, we don't suddenly deliver a load @@ -148,7 +160,7 @@ class HttpPusher(object): else: logger.info("Push failed: delaying for %ds", self.backoff_delay) self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer) - self.backoff_delay = min(self.backoff_delay, self.MAX_BACKOFF_SEC) + self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC) break @defer.inlineCallbacks @@ -191,7 +203,8 @@ class HttpPusher(object): d = { 'notification': { - 'id': event.event_id, + 'id': event.event_id, # deprecated: remove soon + 'event_id': event.event_id, 'room_id': event.room_id, 'type': event.type, 'sender': event.user_id, diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index b67ad455e..7b1ce81e9 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -126,10 +126,28 @@ class PusherPool: for u in users_affected: if u in self.pushers: for p in self.pushers[u].values(): - p.on_new_notifications(min_stream_id, max_stream_id) + yield p.on_new_notifications(min_stream_id, max_stream_id) except: logger.exception("Exception in pusher on_new_notifications") + @defer.inlineCallbacks + def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids): + yield run_on_reactor() + try: + # Need to subtract 1 from the minimum because the lower bound here + # is not inclusive + updated_receipts = yield self.store.get_all_updated_receipts( + min_stream_id - 1, max_stream_id + ) + # This returns a tuple, user_id is at index 3 + users_affected = set([r[3] for r in updated_receipts]) + for u in users_affected: + if u in self.pushers: + for p in self.pushers[u].values(): + yield p.on_new_receipts(min_stream_id, max_stream_id) + except: + logger.exception("Exception in pusher on_new_receipts") + @defer.inlineCallbacks def _refresh_pusher(self, app_id, pushkey, user_id): resultlist = yield self.store.get_pushers_by_app_id_and_pushkey( diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 4befebc8e..59d1ac031 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -390,16 +390,19 @@ class ReceiptsStore(SQLBaseStore): } ) - def get_all_updated_receipts(self, last_id, current_id, limit): + def get_all_updated_receipts(self, last_id, current_id, limit=None): def get_all_updated_receipts_txn(txn): sql = ( "SELECT stream_id, room_id, receipt_type, user_id, event_id, data" " FROM receipts_linearized" " WHERE ? < stream_id AND stream_id <= ?" " ORDER BY stream_id ASC" - " LIMIT ?" ) - txn.execute(sql, (last_id, current_id, limit)) + args = [last_id, current_id] + if limit is not None: + sql += " LIMIT ?" + args.append(limit) + txn.execute(sql, args) return txn.fetchall() return self.runInteraction(