From 11ea16777f52264f0a1d6f4db5b7db5c6c147523 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 9 May 2019 12:01:41 +0200 Subject: [PATCH] Limit the number of EDUs in transactions to 100 as expected by receiver (#5138) Fixes #3951. --- changelog.d/5138.bugfix | 1 + .../sender/per_destination_queue.py | 66 ++++++++++--------- synapse/storage/deviceinbox.py | 2 +- 3 files changed, 37 insertions(+), 32 deletions(-) create mode 100644 changelog.d/5138.bugfix diff --git a/changelog.d/5138.bugfix b/changelog.d/5138.bugfix new file mode 100644 index 000000000..c1945a8eb --- /dev/null +++ b/changelog.d/5138.bugfix @@ -0,0 +1 @@ +Limit the number of EDUs in transactions to 100 as expected by synapse. Thanks to @superboum for this work! diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index be9921100..4d96f026c 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -33,6 +33,9 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage import UserPresenceState from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter +# This is defined in the Matrix spec and enforced by the receiver. +MAX_EDUS_PER_TRANSACTION = 100 + logger = logging.getLogger(__name__) @@ -197,7 +200,8 @@ class PerDestinationQueue(object): pending_pdus = [] while True: device_message_edus, device_stream_id, dev_list_id = ( - yield self._get_new_device_messages() + # We have to keep 2 free slots for presence and rr_edus + yield self._get_new_device_messages(MAX_EDUS_PER_TRANSACTION - 2) ) # BEGIN CRITICAL SECTION @@ -216,19 +220,9 @@ class PerDestinationQueue(object): pending_edus = [] - pending_edus.extend(self._get_rr_edus(force_flush=False)) - # We can only include at most 100 EDUs per transactions - pending_edus.extend(self._pop_pending_edus(100 - len(pending_edus))) - - pending_edus.extend( - self._pending_edus_keyed.values() - ) - - self._pending_edus_keyed = {} - - pending_edus.extend(device_message_edus) - + # rr_edus and pending_presence take at most one slot each + pending_edus.extend(self._get_rr_edus(force_flush=False)) pending_presence = self._pending_presence self._pending_presence = {} if pending_presence: @@ -248,6 +242,12 @@ class PerDestinationQueue(object): ) ) + pending_edus.extend(device_message_edus) + pending_edus.extend(self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))) + while len(pending_edus) < MAX_EDUS_PER_TRANSACTION and self._pending_edus_keyed: + _, val = self._pending_edus_keyed.popitem() + pending_edus.append(val) + if pending_pdus: logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d", self._destination, len(pending_pdus)) @@ -259,7 +259,7 @@ class PerDestinationQueue(object): # if we've decided to send a transaction anyway, and we have room, we # may as well send any pending RRs - if len(pending_edus) < 100: + if len(pending_edus) < MAX_EDUS_PER_TRANSACTION: pending_edus.extend(self._get_rr_edus(force_flush=True)) # END CRITICAL SECTION @@ -346,27 +346,13 @@ class PerDestinationQueue(object): return pending_edus @defer.inlineCallbacks - def _get_new_device_messages(self): - last_device_stream_id = self._last_device_stream_id - to_device_stream_id = self._store.get_to_device_stream_token() - contents, stream_id = yield self._store.get_new_device_msgs_for_remote( - self._destination, last_device_stream_id, to_device_stream_id - ) - edus = [ - Edu( - origin=self._server_name, - destination=self._destination, - edu_type="m.direct_to_device", - content=content, - ) - for content in contents - ] - + def _get_new_device_messages(self, limit): last_device_list = self._last_device_list_stream_id + # Will return at most 20 entries now_stream_id, results = yield self._store.get_devices_by_remote( self._destination, last_device_list ) - edus.extend( + edus = [ Edu( origin=self._server_name, destination=self._destination, @@ -374,5 +360,23 @@ class PerDestinationQueue(object): content=content, ) for content in results + ] + + assert len(edus) <= limit, "get_devices_by_remote returned too many EDUs" + + last_device_stream_id = self._last_device_stream_id + to_device_stream_id = self._store.get_to_device_stream_token() + contents, stream_id = yield self._store.get_new_device_msgs_for_remote( + self._destination, last_device_stream_id, to_device_stream_id, limit - len(edus) ) + edus.extend( + Edu( + origin=self._server_name, + destination=self._destination, + edu_type="m.direct_to_device", + content=content, + ) + for content in contents + ) + defer.returnValue((edus, stream_id, now_stream_id)) diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index fed4ea361..9b0a99cb4 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -118,7 +118,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): defer.returnValue(count) def get_new_device_msgs_for_remote( - self, destination, last_stream_id, current_stream_id, limit=100 + self, destination, last_stream_id, current_stream_id, limit ): """ Args: