From 29574fd5b3537cc272a4d792669b8d5be2a92b6f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Apr 2017 16:48:30 +0100 Subject: [PATCH 1/9] Reduce federation presence replication traffic This is mainly done by moving the calculation of where to send presence updates from the presence handler to the transaction queue, so we only need to send the presence event (and not the destinations) across the replication connection. Before we were duplicating by sending the full state across once per destination. --- synapse/app/federation_sender.py | 12 +++ synapse/app/synchrotron.py | 6 +- synapse/federation/send_queue.py | 47 +++++----- synapse/federation/transaction_queue.py | 99 +++++++++++++++++++-- synapse/handlers/presence.py | 54 +++-------- synapse/replication/slave/storage/events.py | 1 + 6 files changed, 139 insertions(+), 80 deletions(-) diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 477e16e0f..49efb602b 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -32,6 +32,7 @@ from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine +from synapse.storage.presence import PresenceStore from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn @@ -80,6 +81,17 @@ class FederationSenderSlaveStore( return rows[0][0] if rows else -1 + # XXX: This is a bit broken because we don't persist the accepted list in a + # way that can be replicated. This means that we don't have a way to + # invalidate the cache correctly. + # This is fine since in practice nobody uses the presence list stuff... + get_presence_list_accepted = PresenceStore.__dict__[ + "get_presence_list_accepted" + ] + get_presence_list_observers_accepted = PresenceStore.__dict__[ + "get_presence_list_observers_accepted" + ] + class FederationSenderServer(HomeServer): def get_db_conn(self, run_new_connection=True): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index d39e3161f..7b6f82abd 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -206,10 +206,8 @@ class SynchrotronPresence(object): @defer.inlineCallbacks def notify_from_replication(self, states, stream_id): - parties = yield self._get_interested_parties( - states, calculate_remote_hosts=False - ) - room_ids_to_states, users_to_states, _ = parties + parties = yield self._get_interested_parties(states) + room_ids_to_states, users_to_states = parties self.notifier.on_new_event( "presence_key", stream_id, rooms=room_ids_to_states.keys(), diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 748548bbe..a12c18f4d 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -53,6 +53,7 @@ class FederationRemoteSendQueue(object): self.server_name = hs.hostname self.clock = hs.get_clock() self.notifier = hs.get_notifier() + self.is_mine_id = hs.is_mine_id self.presence_map = {} self.presence_changed = sorteddict() @@ -120,7 +121,9 @@ class FederationRemoteSendQueue(object): del self.presence_changed[key] user_ids = set( - user_id for uids in self.presence_changed.values() for _, user_id in uids + user_id + for uids in self.presence_changed.itervalues() + for user_id in uids ) to_del = [ @@ -187,18 +190,14 @@ class FederationRemoteSendQueue(object): self.notifier.on_new_replication_data() - def send_presence(self, destination, states): + def send_presence(self, states): """As per TransactionQueue""" pos = self._next_pos() - self.presence_map.update({ - state.user_id: state - for state in states - }) + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - self.presence_changed[pos] = [ - (destination, state.user_id) for state in states - ] + self.presence_map.update({state.user_id: state for state in local_states}) + self.presence_changed[pos] = [state.user_id for state in local_states] self.notifier.on_new_replication_data() @@ -251,15 +250,14 @@ class FederationRemoteSendQueue(object): keys = self.presence_changed.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - dest_user_ids = set( - (pos, dest_user_id) + dest_user_ids = [ + (pos, user_id) for pos in keys[i:j] - for dest_user_id in self.presence_changed[pos] - ) + for user_id in self.presence_changed[pos] + ] - for (key, (dest, user_id)) in dest_user_ids: + for (key, user_id) in dest_user_ids: rows.append((key, PresenceRow( - destination=dest, state=self.presence_map[user_id], ))) @@ -354,7 +352,6 @@ class BaseFederationRow(object): class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( - "destination", # str "state", # UserPresenceState ))): TypeId = "p" @@ -362,18 +359,14 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( @staticmethod def from_data(data): return PresenceRow( - destination=data["destination"], - state=UserPresenceState.from_dict(data["state"]) + state=UserPresenceState.from_dict(data) ) def to_data(self): - return { - "destination": self.destination, - "state": self.state.as_dict() - } + return self.state.as_dict() def add_to_buffer(self, buff): - buff.presence.setdefault(self.destination, []).append(self.state) + buff.presence.append(self.state) class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( @@ -469,7 +462,7 @@ TypeToRow = { ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", ( - "presence", # dict of destination -> [UserPresenceState] + "presence", # list(UserPresenceState) "keyed_edus", # dict of destination -> { key -> Edu } "edus", # dict of destination -> [Edu] "failures", # dict of destination -> [failures] @@ -491,7 +484,7 @@ def process_rows_for_federation(transaction_queue, rows): # them into the appropriate collection and then send them off. buff = ParsedFederationStreamData( - presence={}, + presence=[], keyed_edus={}, edus={}, failures={}, @@ -508,8 +501,8 @@ def process_rows_for_federation(transaction_queue, rows): parsed_row = RowType.from_data(row.data) parsed_row.add_to_buffer(buff) - for destination, states in buff.presence.iteritems(): - transaction_queue.send_presence(destination, states) + if buff.presence: + transaction_queue.send_presence(buff.presence) for destination, edu_map in buff.keyed_edus.iteritems(): for key, edu in edu_map.items(): diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index c27ce7c5f..fd9e1fa01 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -21,7 +21,7 @@ from .units import Transaction, Edu from synapse.api.errors import HttpResponseException from synapse.util.async import run_on_reactor -from synapse.util.logcontext import preserve_context_over_fn +from synapse.util.logcontext import preserve_context_over_fn, preserve_fn from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func from synapse.types import get_domain_from_id @@ -78,6 +78,7 @@ class TransactionQueue(object): self.pending_edus_by_dest = edus = {} # Presence needs to be separate as we send single aggragate EDUs + self.pending_presence = {} self.pending_presence_by_dest = presence = {} self.pending_edus_keyed_by_dest = edus_keyed = {} @@ -113,6 +114,8 @@ class TransactionQueue(object): self._is_processing = False self._last_poked_id = -1 + self._processing_pending_presence = False + def can_send_to(self, destination): """Can we send messages to the given server? @@ -224,17 +227,95 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) - def send_presence(self, destination, states): - if not self.can_send_to(destination): + @preserve_fn + @defer.inlineCallbacks + def send_presence(self, states): + """Send the new presence states to the appropriate destinations. + + Args: + states (list(UserPresenceState)) + """ + + # First we queue up the new presence by user ID, so multiple presence + # updates in quick successtion are correctly handled + self.pending_presence.update({state.user_id: state for state in states}) + + # We then handle the new pending presence in batches, first figuring + # out the destinations we need to send each state to and then poking it + # to attempt a new transaction. We linearize this so that we don't + # accidentally mess up the ordering and send multiple presence updates + # in the wrong order + if self._processing_pending_presence: return - self.pending_presence_by_dest.setdefault(destination, {}).update({ - state.user_id: state for state in states - }) + self._processing_pending_presence = True + try: + while True: + states = self.pending_presence + self.pending_presence = {} - preserve_context_over_fn( - self._attempt_new_transaction, destination - ) + if not states: + break + + yield self._process_presence_inner(states) + finally: + self._processing_pending_presence = False + + @measure_func("txnqueue._process_presence") + @defer.inlineCallbacks + def _process_presence_inner(self, states): + """Given a list of states populate self.pending_presence_by_dest and + poke to send a new transaction to each destination + + Args: + states (list(UserPresenceState)) + """ + # First we look up the rooms each user is in (as well as any explicit + # subscriptions), then for each distinct room we look up the remote + # hosts in those rooms. + room_ids_to_states = {} + users_to_states = {} + for state in states.itervalues(): + room_ids = yield self.store.get_rooms_for_user(state.user_id) + for room_id in room_ids: + room_ids_to_states.setdefault(room_id, []).append(state) + + plist = yield self.store.get_presence_list_observers_accepted( + state.user_id, + ) + for u in plist: + users_to_states.setdefault(u, []).append(state) + + hosts_and_states = [] + for room_id, states in room_ids_to_states.items(): + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) + if not local_states: + continue + + hosts = yield self.store.get_hosts_in_room(room_id) + hosts_and_states.append((hosts, local_states)) + + for user_id, states in users_to_states.items(): + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) + if not local_states: + continue + + host = get_domain_from_id(user_id) + hosts_and_states.append(([host], local_states)) + + # And now finally queue up new transactions + for destinations, states in hosts_and_states: + for destination in destinations: + if not self.can_send_to(destination): + continue + + self.pending_presence_by_dest.setdefault( + destination, {} + ).update({ + state.user_id: state for state in states + }) + + preserve_fn(self._attempt_new_transaction)(destination) def send_edu(self, destination, edu_type, content, key=None): edu = Edu( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9ed5af3cb..c1c0dd4d3 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -318,11 +318,7 @@ class PresenceHandler(object): if to_federation_ping: federation_presence_out_counter.inc_by(len(to_federation_ping)) - _, _, hosts_to_states = yield self._get_interested_parties( - to_federation_ping.values() - ) - - self._push_to_remotes(hosts_to_states) + self._push_to_remotes(to_federation_ping.values()) def _handle_timeouts(self): """Checks the presence of users that have timed out and updates as @@ -615,12 +611,12 @@ class PresenceHandler(object): defer.returnValue(states) @defer.inlineCallbacks - def _get_interested_parties(self, states, calculate_remote_hosts=True): + def _get_interested_parties(self, states): """Given a list of states return which entities (rooms, users, servers) are interested in the given states. Returns: - 3-tuple: `(room_ids_to_states, users_to_states, hosts_to_states)`, + 2-tuple: `(room_ids_to_states, users_to_states)`, with each item being a dict of `entity_name` -> `[UserPresenceState]` """ room_ids_to_states = {} @@ -637,30 +633,10 @@ class PresenceHandler(object): # Always notify self users_to_states.setdefault(state.user_id, []).append(state) - hosts_to_states = {} - if calculate_remote_hosts: - for room_id, states in room_ids_to_states.items(): - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - if not local_states: - continue - - hosts = yield self.store.get_hosts_in_room(room_id) - - for host in hosts: - hosts_to_states.setdefault(host, []).extend(local_states) - - for user_id, states in users_to_states.items(): - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - if not local_states: - continue - - host = get_domain_from_id(user_id) - hosts_to_states.setdefault(host, []).extend(local_states) - # TODO: de-dup hosts_to_states, as a single host might have multiple # of same presence - defer.returnValue((room_ids_to_states, users_to_states, hosts_to_states)) + defer.returnValue((room_ids_to_states, users_to_states)) @defer.inlineCallbacks def _persist_and_notify(self, states): @@ -670,33 +646,32 @@ class PresenceHandler(object): stream_id, max_token = yield self.store.update_presence(states) parties = yield self._get_interested_parties(states) - room_ids_to_states, users_to_states, hosts_to_states = parties + room_ids_to_states, users_to_states = parties self.notifier.on_new_event( "presence_key", stream_id, rooms=room_ids_to_states.keys(), - users=[UserID.from_string(u) for u in users_to_states.keys()] + users=[UserID.from_string(u) for u in users_to_states] ) - self._push_to_remotes(hosts_to_states) + self._push_to_remotes(states) @defer.inlineCallbacks def notify_for_states(self, state, stream_id): parties = yield self._get_interested_parties([state]) - room_ids_to_states, users_to_states, hosts_to_states = parties + room_ids_to_states, users_to_states = parties self.notifier.on_new_event( "presence_key", stream_id, rooms=room_ids_to_states.keys(), - users=[UserID.from_string(u) for u in users_to_states.keys()] + users=[UserID.from_string(u) for u in users_to_states] ) - def _push_to_remotes(self, hosts_to_states): + def _push_to_remotes(self, states): """Sends state updates to remote servers. Args: - hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]` + hosts_to_states (list): list(state) """ - for host, states in hosts_to_states.items(): - self.federation.send_presence(host, states) + self.federation.send_presence(states) @defer.inlineCallbacks def incoming_presence(self, origin, content): @@ -837,14 +812,13 @@ class PresenceHandler(object): if self.is_mine(user): state = yield self.current_state_for_user(user.to_string()) - hosts = set(get_domain_from_id(u) for u in user_ids) - self._push_to_remotes({host: (state,) for host in hosts}) + self._push_to_remotes([state]) else: user_ids = filter(self.is_mine_id, user_ids) states = yield self.current_state_for_users(user_ids) - self._push_to_remotes({user.domain: states.values()}) + self._push_to_remotes(states.values()) @defer.inlineCallbacks def get_presence_list(self, observer_user, accepted=None): diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 5fd47706e..4ca1e5aa8 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -71,6 +71,7 @@ class SlavedEventStore(BaseSlavedStore): # to reach inside the __dict__ to extract them. get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"] get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"] + get_hosts_in_room = RoomMemberStore.__dict__["get_hosts_in_room"] get_users_who_share_room_with_user = ( RoomMemberStore.__dict__["get_users_who_share_room_with_user"] ) From b9b72bc6e2bdb3c6684db3e05e18b632755c7ccc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 15:15:34 +0100 Subject: [PATCH 2/9] Comments --- synapse/federation/transaction_queue.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index fd9e1fa01..eb361d904 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -78,8 +78,18 @@ class TransactionQueue(object): self.pending_edus_by_dest = edus = {} # Presence needs to be separate as we send single aggragate EDUs + + # Map of user_id -> UserPresenceState for all the pending presence + # to be sent out by user_id. Entries here get processed and put in + # pending_presence_by_dest self.pending_presence = {} + # Map of destination -> user_id -> UserPresenceState of pending presence + # to be sent to each destinations self.pending_presence_by_dest = presence = {} + + # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered + # based on their key (e.g. typing events by room_id) + # Map of destination -> (edu_type, key) -> Edu self.pending_edus_keyed_by_dest = edus_keyed = {} metrics.register_callback( @@ -227,11 +237,14 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) - @preserve_fn + @preserve_fn # the caller should not yield on this @defer.inlineCallbacks def send_presence(self, states): """Send the new presence states to the appropriate destinations. + This actually queues up the presence states ready for sending and + triggers a background task to process them and send out the transactions. + Args: states (list(UserPresenceState)) """ From 6308ac45b08ff5bf7259f09a2a767212f3f97860 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 15:19:26 +0100 Subject: [PATCH 3/9] Move get_interested_remotes back to presence handler --- synapse/federation/transaction_queue.py | 41 +++---------------- synapse/handlers/presence.py | 52 +++++++++++++++++++++++-- 2 files changed, 55 insertions(+), 38 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index eb361d904..08ceda31a 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -25,7 +25,7 @@ from synapse.util.logcontext import preserve_context_over_fn, preserve_fn from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func from synapse.types import get_domain_from_id -from synapse.handlers.presence import format_user_presence_state +from synapse.handlers.presence import format_user_presence_state, get_interested_remotes import synapse.metrics import logging @@ -251,7 +251,10 @@ class TransactionQueue(object): # First we queue up the new presence by user ID, so multiple presence # updates in quick successtion are correctly handled - self.pending_presence.update({state.user_id: state for state in states}) + self.pending_presence.update({ + state.user_id: state for state in states + if self.is_mine_id(state.user_id) + }) # We then handle the new pending presence in batches, first figuring # out the destinations we need to send each state to and then poking it @@ -283,40 +286,8 @@ class TransactionQueue(object): Args: states (list(UserPresenceState)) """ - # First we look up the rooms each user is in (as well as any explicit - # subscriptions), then for each distinct room we look up the remote - # hosts in those rooms. - room_ids_to_states = {} - users_to_states = {} - for state in states.itervalues(): - room_ids = yield self.store.get_rooms_for_user(state.user_id) - for room_id in room_ids: - room_ids_to_states.setdefault(room_id, []).append(state) + hosts_and_states = yield get_interested_remotes(self.store, states) - plist = yield self.store.get_presence_list_observers_accepted( - state.user_id, - ) - for u in plist: - users_to_states.setdefault(u, []).append(state) - - hosts_and_states = [] - for room_id, states in room_ids_to_states.items(): - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - if not local_states: - continue - - hosts = yield self.store.get_hosts_in_room(room_id) - hosts_and_states.append((hosts, local_states)) - - for user_id, states in users_to_states.items(): - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - if not local_states: - continue - - host = get_domain_from_id(user_id) - hosts_and_states.append(([host], local_states)) - - # And now finally queue up new transactions for destinations, states in hosts_and_states: for destination in destinations: if not self.can_send_to(destination): diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index c1c0dd4d3..685373ff2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -633,9 +633,6 @@ class PresenceHandler(object): # Always notify self users_to_states.setdefault(state.user_id, []).append(state) - # TODO: de-dup hosts_to_states, as a single host might have multiple - # of same presence - defer.returnValue((room_ids_to_states, users_to_states)) @defer.inlineCallbacks @@ -1318,3 +1315,52 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now): persist_and_notify = True return new_state, persist_and_notify, federation_ping + +@defer.inlineCallbacks +def get_interested_remotes(store, states): + """Given a list of presence states figure out which remote servers + should be sent which. + + All the presence states should be for local users only. + + Args: + store (DataStore) + states (list(UserPresenceState)) + + Returns: + Deferred list of ([destinations], [UserPresenceState]), where for + each row the list of UserPresenceState should be sent to each + destination + """ + # First we look up the rooms each user is in (as well as any explicit + # subscriptions), then for each distinct room we look up the remote + # hosts in those rooms. + room_ids_to_states = {} + users_to_states = {} + for state in states.itervalues(): + room_ids = yield store.get_rooms_for_user(state.user_id) + for room_id in room_ids: + room_ids_to_states.setdefault(room_id, []).append(state) + + plist = yield store.get_presence_list_observers_accepted( + state.user_id, + ) + for u in plist: + users_to_states.setdefault(u, []).append(state) + + hosts_and_states = [] + for room_id, states in room_ids_to_states.items(): + if not local_states: + continue + + hosts = yield store.get_hosts_in_room(room_id) + hosts_and_states.append((hosts, local_states)) + + for user_id, states in users_to_states.items(): + if not local_states: + continue + + host = get_domain_from_id(user_id) + hosts_and_states.append(([host], local_states)) + + defer.returnValue(hosts_and_states) From 2be8a281d2aac8e2e3829b9aff6eb366506d22d1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 15:28:24 +0100 Subject: [PATCH 4/9] Comments --- synapse/federation/send_queue.py | 14 +++++++------- synapse/handlers/presence.py | 5 +++-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index a12c18f4d..7e52a55ed 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -55,17 +55,17 @@ class FederationRemoteSendQueue(object): self.notifier = hs.get_notifier() self.is_mine_id = hs.is_mine_id - self.presence_map = {} - self.presence_changed = sorteddict() + self.presence_map = {} # Pending presence map user_id -> UserPresenceState + self.presence_changed = sorteddict() # Stream position -> user_id - self.keyed_edu = {} - self.keyed_edu_changed = sorteddict() + self.keyed_edu = {} # (destination, key) -> EDU + self.keyed_edu_changed = sorteddict() # stream position -> (destination, key) - self.edus = sorteddict() + self.edus = sorteddict() # stream position -> Edu - self.failures = sorteddict() + self.failures = sorteddict() # stream position -> (destination, Failure) - self.device_messages = sorteddict() + self.device_messages = sorteddict() # stream position -> destination self.pos = 1 self.pos_time = sorteddict() diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 685373ff2..98e736be5 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -666,7 +666,7 @@ class PresenceHandler(object): """Sends state updates to remote servers. Args: - hosts_to_states (list): list(state) + hosts_to_states (list(UserPresenceState)) """ self.federation.send_presence(states) @@ -1332,6 +1332,8 @@ def get_interested_remotes(store, states): each row the list of UserPresenceState should be sent to each destination """ + hosts_and_states = [] # Final result to return + # First we look up the rooms each user is in (as well as any explicit # subscriptions), then for each distinct room we look up the remote # hosts in those rooms. @@ -1348,7 +1350,6 @@ def get_interested_remotes(store, states): for u in plist: users_to_states.setdefault(u, []).append(state) - hosts_and_states = [] for room_id, states in room_ids_to_states.items(): if not local_states: continue From 414522aed59aca9b711253aba98d15e2d59e14f9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 15:30:02 +0100 Subject: [PATCH 5/9] Move get_interested_parties --- synapse/app/synchrotron.py | 5 ++- synapse/handlers/presence.py | 69 ++++++++++++++++++------------------ 2 files changed, 36 insertions(+), 38 deletions(-) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 7b6f82abd..e3fbf02c9 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -20,7 +20,7 @@ from synapse.api.constants import EventTypes from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.handlers.presence import PresenceHandler +from synapse.handlers.presence import PresenceHandler, get_interested_parties from synapse.http.site import SynapseSite from synapse.http.server import JsonResource from synapse.metrics.resource import MetricsResource, METRICS_PREFIX @@ -172,7 +172,6 @@ class SynchrotronPresence(object): get_states = PresenceHandler.get_states.__func__ get_state = PresenceHandler.get_state.__func__ - _get_interested_parties = PresenceHandler._get_interested_parties.__func__ current_state_for_users = PresenceHandler.current_state_for_users.__func__ def user_syncing(self, user_id, affect_presence): @@ -206,7 +205,7 @@ class SynchrotronPresence(object): @defer.inlineCallbacks def notify_from_replication(self, states, stream_id): - parties = yield self._get_interested_parties(states) + parties = yield get_interested_parties(self.store, states) room_ids_to_states, users_to_states = parties self.notifier.on_new_event( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 98e736be5..b9ce997a9 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -610,31 +610,6 @@ class PresenceHandler(object): defer.returnValue(states) - @defer.inlineCallbacks - def _get_interested_parties(self, states): - """Given a list of states return which entities (rooms, users, servers) - are interested in the given states. - - Returns: - 2-tuple: `(room_ids_to_states, users_to_states)`, - with each item being a dict of `entity_name` -> `[UserPresenceState]` - """ - room_ids_to_states = {} - users_to_states = {} - for state in states: - room_ids = yield self.store.get_rooms_for_user(state.user_id) - for room_id in room_ids: - room_ids_to_states.setdefault(room_id, []).append(state) - - plist = yield self.store.get_presence_list_observers_accepted(state.user_id) - for u in plist: - users_to_states.setdefault(u, []).append(state) - - # Always notify self - users_to_states.setdefault(state.user_id, []).append(state) - - defer.returnValue((room_ids_to_states, users_to_states)) - @defer.inlineCallbacks def _persist_and_notify(self, states): """Persist states in the database, poke the notifier and send to @@ -642,7 +617,7 @@ class PresenceHandler(object): """ stream_id, max_token = yield self.store.update_presence(states) - parties = yield self._get_interested_parties(states) + parties = yield get_interested_parties(self.store, states) room_ids_to_states, users_to_states = parties self.notifier.on_new_event( @@ -654,7 +629,7 @@ class PresenceHandler(object): @defer.inlineCallbacks def notify_for_states(self, state, stream_id): - parties = yield self._get_interested_parties([state]) + parties = yield get_interested_parties(self.store, [state]) room_ids_to_states, users_to_states = parties self.notifier.on_new_event( @@ -1316,6 +1291,36 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now): return new_state, persist_and_notify, federation_ping + +@defer.inlineCallbacks +def get_interested_parties(store, states): + """Given a list of states return which entities (rooms, users) + are interested in the given states. + + Args: + states (list(UserPresenceState)) + + Returns: + 2-tuple: `(room_ids_to_states, users_to_states)`, + with each item being a dict of `entity_name` -> `[UserPresenceState]` + """ + room_ids_to_states = {} + users_to_states = {} + for state in states: + room_ids = yield store.get_rooms_for_user(state.user_id) + for room_id in room_ids: + room_ids_to_states.setdefault(room_id, []).append(state) + + plist = yield store.get_presence_list_observers_accepted(state.user_id) + for u in plist: + users_to_states.setdefault(u, []).append(state) + + # Always notify self + users_to_states.setdefault(state.user_id, []).append(state) + + defer.returnValue((room_ids_to_states, users_to_states)) + + @defer.inlineCallbacks def get_interested_remotes(store, states): """Given a list of presence states figure out which remote servers @@ -1351,17 +1356,11 @@ def get_interested_remotes(store, states): users_to_states.setdefault(u, []).append(state) for room_id, states in room_ids_to_states.items(): - if not local_states: - continue - hosts = yield store.get_hosts_in_room(room_id) - hosts_and_states.append((hosts, local_states)) + hosts_and_states.append((hosts, states)) for user_id, states in users_to_states.items(): - if not local_states: - continue - host = get_domain_from_id(user_id) - hosts_and_states.append(([host], local_states)) + hosts_and_states.append(([host], states)) defer.returnValue(hosts_and_states) From a8c8e4efd4055be316c2b38624e7afd77c57b971 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 15:34:55 +0100 Subject: [PATCH 6/9] Comment --- synapse/federation/send_queue.py | 8 +++++++- synapse/federation/transaction_queue.py | 2 ++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 7e52a55ed..19cb757ac 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -191,9 +191,15 @@ class FederationRemoteSendQueue(object): self.notifier.on_new_replication_data() def send_presence(self, states): - """As per TransactionQueue""" + """As per TransactionQueue + + Args: + states (list(UserPresenceState)) + """ pos = self._next_pos() + # We only want to send presence for our own users, so lets always just + # filter here just in case. local_states = filter(lambda s: self.is_mine_id(s.user_id), states) self.presence_map.update({state.user_id: state for state in local_states}) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 08ceda31a..260a47225 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -251,6 +251,8 @@ class TransactionQueue(object): # First we queue up the new presence by user ID, so multiple presence # updates in quick successtion are correctly handled + # We only want to send presence for our own users, so lets always just + # filter here just in case. self.pending_presence.update({ state.user_id: state for state in states if self.is_mine_id(state.user_id) From 9c712a366fbbba90dbe18f7246c279d3cbb1cf10 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 16:07:33 +0100 Subject: [PATCH 7/9] Move get_presence_list_* to SlaveStore --- synapse/app/federation_sender.py | 15 ++------------- synapse/app/synchrotron.py | 12 +----------- synapse/replication/slave/storage/presence.py | 10 ++++++++++ 3 files changed, 13 insertions(+), 24 deletions(-) diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 49efb602b..e51a69074 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -28,11 +28,11 @@ from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine -from synapse.storage.presence import PresenceStore from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn @@ -56,7 +56,7 @@ logger = logging.getLogger("synapse.app.appservice") class FederationSenderSlaveStore( SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore, - SlavedRegistrationStore, SlavedDeviceStore, + SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore, ): def __init__(self, db_conn, hs): super(FederationSenderSlaveStore, self).__init__(db_conn, hs) @@ -81,17 +81,6 @@ class FederationSenderSlaveStore( return rows[0][0] if rows else -1 - # XXX: This is a bit broken because we don't persist the accepted list in a - # way that can be replicated. This means that we don't have a way to - # invalidate the cache correctly. - # This is fine since in practice nobody uses the presence list stuff... - get_presence_list_accepted = PresenceStore.__dict__[ - "get_presence_list_accepted" - ] - get_presence_list_observers_accepted = PresenceStore.__dict__[ - "get_presence_list_observers_accepted" - ] - class FederationSenderServer(HomeServer): def get_db_conn(self, run_new_connection=True): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index e3fbf02c9..13c00ef2b 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -44,7 +44,7 @@ from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.client_ips import ClientIpStore from synapse.storage.engines import create_engine -from synapse.storage.presence import PresenceStore, UserPresenceState +from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn @@ -89,16 +89,6 @@ class SynchrotronSlavedStore( RoomMemberStore.__dict__["did_forget"] ) - # XXX: This is a bit broken because we don't persist the accepted list in a - # way that can be replicated. This means that we don't have a way to - # invalidate the cache correctly. - get_presence_list_accepted = PresenceStore.__dict__[ - "get_presence_list_accepted" - ] - get_presence_list_observers_accepted = PresenceStore.__dict__[ - "get_presence_list_observers_accepted" - ] - UPDATE_SYNCING_USERS_MS = 10 * 1000 diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py index dffc80adc..cfb928018 100644 --- a/synapse/replication/slave/storage/presence.py +++ b/synapse/replication/slave/storage/presence.py @@ -39,6 +39,16 @@ class SlavedPresenceStore(BaseSlavedStore): _get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"] get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"] + # XXX: This is a bit broken because we don't persist the accepted list in a + # way that can be replicated. This means that we don't have a way to + # invalidate the cache correctly. + get_presence_list_accepted = PresenceStore.__dict__[ + "get_presence_list_accepted" + ] + get_presence_list_observers_accepted = PresenceStore.__dict__[ + "get_presence_list_observers_accepted" + ] + def get_current_presence_token(self): return self._presence_id_gen.get_current_token() From c7ddb5ef7ac3dc7370010ee6685497ee73f46fa2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 Apr 2017 10:11:43 +0100 Subject: [PATCH 8/9] Reuse get_interested_parties --- synapse/federation/transaction_queue.py | 6 +++--- synapse/handlers/presence.py | 21 +++++---------------- 2 files changed, 8 insertions(+), 19 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 260a47225..feb160501 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -269,13 +269,13 @@ class TransactionQueue(object): self._processing_pending_presence = True try: while True: - states = self.pending_presence + states_map = self.pending_presence self.pending_presence = {} - if not states: + if not states_map: break - yield self._process_presence_inner(states) + yield self._process_presence_inner(states_map.values()) finally: self._processing_pending_presence = False diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index b9ce997a9..f3707afcd 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -641,7 +641,7 @@ class PresenceHandler(object): """Sends state updates to remote servers. Args: - hosts_to_states (list(UserPresenceState)) + states (list(UserPresenceState)) """ self.federation.send_presence(states) @@ -1337,29 +1337,18 @@ def get_interested_remotes(store, states): each row the list of UserPresenceState should be sent to each destination """ - hosts_and_states = [] # Final result to return + hosts_and_states = [] # First we look up the rooms each user is in (as well as any explicit # subscriptions), then for each distinct room we look up the remote # hosts in those rooms. - room_ids_to_states = {} - users_to_states = {} - for state in states.itervalues(): - room_ids = yield store.get_rooms_for_user(state.user_id) - for room_id in room_ids: - room_ids_to_states.setdefault(room_id, []).append(state) + room_ids_to_states, users_to_states = yield get_interested_parties(store, states) - plist = yield store.get_presence_list_observers_accepted( - state.user_id, - ) - for u in plist: - users_to_states.setdefault(u, []).append(state) - - for room_id, states in room_ids_to_states.items(): + for room_id, states in room_ids_to_states.iteritems(): hosts = yield store.get_hosts_in_room(room_id) hosts_and_states.append((hosts, states)) - for user_id, states in users_to_states.items(): + for user_id, states in users_to_states.iteritems(): host = get_domain_from_id(user_id) hosts_and_states.append(([host], states)) From 17450695438c0ed59d64e669ef6861316cef43af Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 Apr 2017 10:17:10 +0100 Subject: [PATCH 9/9] Comment --- synapse/federation/transaction_queue.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index feb160501..2919c2351 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -77,12 +77,11 @@ class TransactionQueue(object): # destination -> list of tuple(edu, deferred) self.pending_edus_by_dest = edus = {} - # Presence needs to be separate as we send single aggragate EDUs - # Map of user_id -> UserPresenceState for all the pending presence # to be sent out by user_id. Entries here get processed and put in # pending_presence_by_dest self.pending_presence = {} + # Map of destination -> user_id -> UserPresenceState of pending presence # to be sent to each destinations self.pending_presence_by_dest = presence = {}