Merge pull request #2115 from matrix-org/erikj/dedupe_federation_repl

Reduce federation replication traffic
This commit is contained in:
Erik Johnston 2017-04-12 11:07:13 +01:00 committed by GitHub
commit 247c736b9b
7 changed files with 203 additions and 130 deletions

View file

@ -53,18 +53,19 @@ class FederationRemoteSendQueue(object):
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
self.is_mine_id = hs.is_mine_id
self.presence_map = {}
self.presence_changed = sorteddict()
self.presence_map = {} # Pending presence map user_id -> UserPresenceState
self.presence_changed = sorteddict() # Stream position -> user_id
self.keyed_edu = {}
self.keyed_edu_changed = sorteddict()
self.keyed_edu = {} # (destination, key) -> EDU
self.keyed_edu_changed = sorteddict() # stream position -> (destination, key)
self.edus = sorteddict()
self.edus = sorteddict() # stream position -> Edu
self.failures = sorteddict()
self.failures = sorteddict() # stream position -> (destination, Failure)
self.device_messages = sorteddict()
self.device_messages = sorteddict() # stream position -> destination
self.pos = 1
self.pos_time = sorteddict()
@ -120,7 +121,9 @@ class FederationRemoteSendQueue(object):
del self.presence_changed[key]
user_ids = set(
user_id for uids in self.presence_changed.values() for _, user_id in uids
user_id
for uids in self.presence_changed.itervalues()
for user_id in uids
)
to_del = [
@ -187,18 +190,20 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data()
def send_presence(self, destination, states):
"""As per TransactionQueue"""
def send_presence(self, states):
"""As per TransactionQueue
Args:
states (list(UserPresenceState))
"""
pos = self._next_pos()
self.presence_map.update({
state.user_id: state
for state in states
})
# We only want to send presence for our own users, so lets always just
# filter here just in case.
local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
self.presence_changed[pos] = [
(destination, state.user_id) for state in states
]
self.presence_map.update({state.user_id: state for state in local_states})
self.presence_changed[pos] = [state.user_id for state in local_states]
self.notifier.on_new_replication_data()
@ -251,15 +256,14 @@ class FederationRemoteSendQueue(object):
keys = self.presence_changed.keys()
i = keys.bisect_right(from_token)
j = keys.bisect_right(to_token) + 1
dest_user_ids = set(
(pos, dest_user_id)
dest_user_ids = [
(pos, user_id)
for pos in keys[i:j]
for dest_user_id in self.presence_changed[pos]
)
for user_id in self.presence_changed[pos]
]
for (key, (dest, user_id)) in dest_user_ids:
for (key, user_id) in dest_user_ids:
rows.append((key, PresenceRow(
destination=dest,
state=self.presence_map[user_id],
)))
@ -357,7 +361,6 @@ class BaseFederationRow(object):
class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
"destination", # str
"state", # UserPresenceState
))):
TypeId = "p"
@ -365,18 +368,14 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", (
@staticmethod
def from_data(data):
return PresenceRow(
destination=data["destination"],
state=UserPresenceState.from_dict(data["state"])
state=UserPresenceState.from_dict(data)
)
def to_data(self):
return {
"destination": self.destination,
"state": self.state.as_dict()
}
return self.state.as_dict()
def add_to_buffer(self, buff):
buff.presence.setdefault(self.destination, []).append(self.state)
buff.presence.append(self.state)
class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", (
@ -487,7 +486,7 @@ TypeToRow = {
ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
"presence", # dict of destination -> [UserPresenceState]
"presence", # list(UserPresenceState)
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
"failures", # dict of destination -> [failures]
@ -509,7 +508,7 @@ def process_rows_for_federation(transaction_queue, rows):
# them into the appropriate collection and then send them off.
buff = ParsedFederationStreamData(
presence={},
presence=[],
keyed_edus={},
edus={},
failures={},
@ -526,8 +525,8 @@ def process_rows_for_federation(transaction_queue, rows):
parsed_row = RowType.from_data(row.data)
parsed_row.add_to_buffer(buff)
for destination, states in buff.presence.iteritems():
transaction_queue.send_presence(destination, states)
if buff.presence:
transaction_queue.send_presence(buff.presence)
for destination, edu_map in buff.keyed_edus.iteritems():
for key, edu in edu_map.items():

View file

@ -21,11 +21,11 @@ from .units import Transaction, Edu
from synapse.api.errors import HttpResponseException
from synapse.util.async import run_on_reactor
from synapse.util.logcontext import preserve_context_over_fn
from synapse.util.logcontext import preserve_context_over_fn, preserve_fn
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.util.metrics import measure_func
from synapse.types import get_domain_from_id
from synapse.handlers.presence import format_user_presence_state
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
import synapse.metrics
import logging
@ -79,8 +79,18 @@ class TransactionQueue(object):
# destination -> list of tuple(edu, deferred)
self.pending_edus_by_dest = edus = {}
# Presence needs to be separate as we send single aggragate EDUs
# Map of user_id -> UserPresenceState for all the pending presence
# to be sent out by user_id. Entries here get processed and put in
# pending_presence_by_dest
self.pending_presence = {}
# Map of destination -> user_id -> UserPresenceState of pending presence
# to be sent to each destinations
self.pending_presence_by_dest = presence = {}
# Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
# based on their key (e.g. typing events by room_id)
# Map of destination -> (edu_type, key) -> Edu
self.pending_edus_keyed_by_dest = edus_keyed = {}
metrics.register_callback(
@ -115,6 +125,8 @@ class TransactionQueue(object):
self._is_processing = False
self._last_poked_id = -1
self._processing_pending_presence = False
def can_send_to(self, destination):
"""Can we send messages to the given server?
@ -226,17 +238,71 @@ class TransactionQueue(object):
self._attempt_new_transaction, destination
)
def send_presence(self, destination, states):
if not self.can_send_to(destination):
return
@preserve_fn # the caller should not yield on this
@defer.inlineCallbacks
def send_presence(self, states):
"""Send the new presence states to the appropriate destinations.
self.pending_presence_by_dest.setdefault(destination, {}).update({
This actually queues up the presence states ready for sending and
triggers a background task to process them and send out the transactions.
Args:
states (list(UserPresenceState))
"""
# First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled
# We only want to send presence for our own users, so lets always just
# filter here just in case.
self.pending_presence.update({
state.user_id: state for state in states
if self.is_mine_id(state.user_id)
})
preserve_context_over_fn(
self._attempt_new_transaction, destination
)
# We then handle the new pending presence in batches, first figuring
# out the destinations we need to send each state to and then poking it
# to attempt a new transaction. We linearize this so that we don't
# accidentally mess up the ordering and send multiple presence updates
# in the wrong order
if self._processing_pending_presence:
return
self._processing_pending_presence = True
try:
while True:
states_map = self.pending_presence
self.pending_presence = {}
if not states_map:
break
yield self._process_presence_inner(states_map.values())
finally:
self._processing_pending_presence = False
@measure_func("txnqueue._process_presence")
@defer.inlineCallbacks
def _process_presence_inner(self, states):
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
Args:
states (list(UserPresenceState))
"""
hosts_and_states = yield get_interested_remotes(self.store, states)
for destinations, states in hosts_and_states:
for destination in destinations:
if not self.can_send_to(destination):
continue
self.pending_presence_by_dest.setdefault(
destination, {}
).update({
state.user_id: state for state in states
})
preserve_fn(self._attempt_new_transaction)(destination)
def send_edu(self, destination, edu_type, content, key=None):
edu = Edu(