Clobber EDUs in send queue

This commit is contained in:
Erik Johnston 2016-09-09 15:59:08 +01:00
parent ab80d5e0a9
commit 52b2318777
5 changed files with 58 additions and 20 deletions

View file

@ -26,6 +26,7 @@ from synapse.util.retryutils import (
get_retry_limiter, NotRetryingDestination,
)
from synapse.util.metrics import measure_func
from synapse.handlers.presence import format_user_presence_state
import synapse.metrics
import logging
@ -69,13 +70,20 @@ class TransactionQueue(object):
# destination -> list of tuple(edu, deferred)
self.pending_edus_by_dest = edus = {}
self.pending_presence_by_dest = presence = {}
self.pending_edus_keyed_by_dest = edus_keyed = {}
metrics.register_callback(
"pending_pdus",
lambda: sum(map(len, pdus.values())),
)
metrics.register_callback(
"pending_edus",
lambda: sum(map(len, edus.values())),
lambda: (
sum(map(len, edus.values()))
+ sum(map(len, presence.values()))
+ sum(map(len, edus_keyed.values()))
),
)
# destination -> list of tuple(failure, deferred)
@ -130,13 +138,25 @@ class TransactionQueue(object):
self._attempt_new_transaction, destination
)
def enqueue_edu(self, edu):
def enqueue_presence(self, destination, states):
self.pending_presence_by_dest.setdefault(destination, {}).update({
state.user_id: state for state in states
})
preserve_context_over_fn(
self._attempt_new_transaction, destination
)
def enqueue_edu(self, edu, key=None):
destination = edu.destination
if not self.can_send_to(destination):
return
self.pending_edus_by_dest.setdefault(destination, []).append(edu)
if key:
self.pending_edus_keyed_by_dest.setdefault(destination, {})[key] = edu
else:
self.pending_edus_by_dest.setdefault(destination, []).append(edu)
preserve_context_over_fn(
self._attempt_new_transaction, destination
@ -190,8 +210,13 @@ class TransactionQueue(object):
while True:
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
pending_edus = self.pending_edus_by_dest.pop(destination, [])
pending_presence = self.pending_presence_by_dest.pop(destination, {})
pending_failures = self.pending_failures_by_dest.pop(destination, [])
pending_edus.extend(
self.pending_edus_keyed_by_dest.pop(destination, {}).values()
)
limiter = yield get_retry_limiter(
destination,
self.clock,
@ -203,6 +228,23 @@ class TransactionQueue(object):
)
pending_edus.extend(device_message_edus)
logger.info("Sending presence: %r", pending_presence)
if pending_presence:
pending_edus.append(
Edu(
origin=self.server_name,
destination=destination,
edu_type="m.presence",
content={
"push": [
format_user_presence_state(
presence, self.clock.time_msec()
)
for presence in pending_presence.values()
]
},
)
)
if pending_pdus:
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",