Batch up outgoing read-receipts to reduce federation traffic. (#4890)

Rate-limit outgoing read-receipts as per #4730.
This commit is contained in:
Richard van der Hoff 2019-03-20 16:02:25 +00:00 committed by GitHub
parent 11f2125885
commit a902d13180
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 308 additions and 22 deletions

View file

@ -80,6 +80,10 @@ class PerDestinationQueue(object):
# destination
self._pending_presence = {} # type: dict[str, UserPresenceState]
# room_id -> receipt_type -> user_id -> receipt_dict
self._pending_rrs = {}
self._rrs_pending_flush = False
# stream_id of last successfully sent to-device message.
# NB: may be a long or an int.
self._last_device_stream_id = 0
@ -87,6 +91,9 @@ class PerDestinationQueue(object):
# stream_id of last successfully sent device list update.
self._last_device_list_stream_id = 0
def __str__(self):
return "PerDestinationQueue[%s]" % self._destination
def pending_pdu_count(self):
return len(self._pending_pdus)
@ -118,6 +125,30 @@ class PerDestinationQueue(object):
})
self.attempt_new_transaction()
def queue_read_receipt(self, receipt):
"""Add a RR to the list to be sent. Doesn't start the transmission loop yet
(see flush_read_receipts_for_room)
Args:
receipt (synapse.api.receipt_info.ReceiptInfo): receipt to be queued
"""
self._pending_rrs.setdefault(
receipt.room_id, {},
).setdefault(
receipt.receipt_type, {}
)[receipt.user_id] = {
"event_ids": receipt.event_ids,
"data": receipt.data,
}
def flush_read_receipts_for_room(self, room_id):
# if we don't have any read-receipts for this room, it may be that we've already
# sent them out, so we don't need to flush.
if room_id not in self._pending_rrs:
return
self._rrs_pending_flush = True
self.attempt_new_transaction()
def send_keyed_edu(self, edu, key):
self._pending_edus_keyed[(edu.edu_type, key)] = edu
self.attempt_new_transaction()
@ -183,10 +214,12 @@ class PerDestinationQueue(object):
# We can only include at most 50 PDUs per transactions
pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:]
pending_edus = self._pending_edus
pending_edus = []
pending_edus.extend(self._get_rr_edus(force_flush=False))
# We can only include at most 100 EDUs per transactions
pending_edus, self._pending_edus = pending_edus[:100], pending_edus[100:]
pending_edus.extend(self._pop_pending_edus(100 - len(pending_edus)))
pending_edus.extend(
self._pending_edus_keyed.values()
@ -224,6 +257,11 @@ class PerDestinationQueue(object):
self._last_device_stream_id = device_stream_id
return
# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
if len(pending_edus) < 100:
pending_edus.extend(self._get_rr_edus(force_flush=True))
# END CRITICAL SECTION
success = yield self._transaction_manager.send_new_transaction(
@ -285,6 +323,28 @@ class PerDestinationQueue(object):
# We want to be *very* sure we clear this after we stop processing
self.transmission_loop_running = False
def _get_rr_edus(self, force_flush):
if not self._pending_rrs:
return
if not force_flush and not self._rrs_pending_flush:
# not yet time for this lot
return
edu = Edu(
origin=self._server_name,
destination=self._destination,
edu_type="m.receipt",
content=self._pending_rrs,
)
self._pending_rrs = {}
self._rrs_pending_flush = False
yield edu
def _pop_pending_edus(self, limit):
pending_edus = self._pending_edus
pending_edus, self._pending_edus = pending_edus[:limit], pending_edus[limit:]
return pending_edus
@defer.inlineCallbacks
def _get_new_device_messages(self):
last_device_stream_id = self._last_device_stream_id