From 54a79c1d374f09049d3eb8ac531bca45d68b5f2b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 2 Feb 2017 13:07:18 +0000 Subject: [PATCH 1/5] Make presence.get_new_events a bit faster We do this by caching the set of users a user shares rooms with. --- synapse/handlers/presence.py | 44 ++++++++++++----------------------- synapse/handlers/room.py | 1 + synapse/notifier.py | 1 + synapse/storage/roommember.py | 16 +++++++++++++ 4 files changed, 33 insertions(+), 29 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9982ae0fe..fdfce2a88 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1011,7 +1011,7 @@ class PresenceEventSource(object): @defer.inlineCallbacks @log_function def get_new_events(self, user, from_key, room_ids=None, include_offline=True, - **kwargs): + explicit_room_id=None, **kwargs): # The process for getting presence events are: # 1. Get the rooms the user is in. # 2. Get the list of user in the rooms. @@ -1028,22 +1028,24 @@ class PresenceEventSource(object): user_id = user.to_string() if from_key is not None: from_key = int(from_key) - room_ids = room_ids or [] presence = self.get_presence_handler() stream_change_cache = self.store.presence_stream_cache - if not room_ids: - rooms = yield self.store.get_rooms_for_user(user_id) - room_ids = set(e.room_id for e in rooms) - else: - room_ids = set(room_ids) - max_token = self.store.get_current_presence_token() plist = yield self.store.get_presence_list_accepted(user.localpart) - friends = set(row["observed_user_id"] for row in plist) - friends.add(user_id) # So that we receive our own presence + users_interested_in = set(row["observed_user_id"] for row in plist) + users_interested_in.add(user_id) # So that we receive our own presence + + users_who_share_room = yield self.store.get_users_who_share_room_with_user( + user_id + ) + users_interested_in.update(users_who_share_room) + + if explicit_room_id: + user_ids = yield self.store.get_users_in_room(explicit_room_id) + users_interested_in.update(user_ids) user_ids_changed = set() changed = None @@ -1055,35 +1057,19 @@ class PresenceEventSource(object): # work out if we share a room or they're in our presence list get_updates_counter.inc("stream") for other_user_id in changed: - if other_user_id in friends: + if other_user_id in users_interested_in: user_ids_changed.add(other_user_id) - continue - other_rooms = yield self.store.get_rooms_for_user(other_user_id) - if room_ids.intersection(e.room_id for e in other_rooms): - user_ids_changed.add(other_user_id) - continue else: # Too many possible updates. Find all users we can see and check # if any of them have changed. get_updates_counter.inc("full") - user_ids_to_check = set() - for room_id in room_ids: - users = yield self.store.get_users_in_room(room_id) - user_ids_to_check.update(users) - - user_ids_to_check.update(friends) - - # Always include yourself. Only really matters for when the user is - # not in any rooms, but still. - user_ids_to_check.add(user_id) - if from_key: user_ids_changed = stream_change_cache.get_entities_changed( - user_ids_to_check, from_key, + users_interested_in, from_key, ) else: - user_ids_changed = user_ids_to_check + user_ids_changed = users_interested_in updates = yield presence.current_state_for_users(user_ids_changed) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 5f18007e9..7e7671c9a 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -437,6 +437,7 @@ class RoomEventSource(object): limit, room_ids, is_guest, + explicit_room_id=None, ): # We just ignore the key for now. diff --git a/synapse/notifier.py b/synapse/notifier.py index acbd4bb5a..8051a7a84 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -378,6 +378,7 @@ class Notifier(object): limit=limit, is_guest=is_peeking, room_ids=room_ids, + explicit_room_id=explicit_room_id, ) if name == "room": diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index ee800d074..70718f41e 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -280,6 +280,22 @@ class RoomMemberStore(SQLBaseStore): user_id, membership_list=[Membership.JOIN], ) + @cachedInlineCallbacks(max_entries=50000, cache_context=True, iterable=True) + def get_users_who_share_room_with_user(self, user_id, cache_context): + rooms = yield self.get_rooms_for_user( + user_id, on_invalidate=cache_context.invalidate, + ) + + user_who_share_room = set() + for room in rooms: + user_ids = yield self.get_users_in_room( + room.room_id, on_invalidate=cache_context.invalidate, + ) + logger.info("Users in room: %r %r", room.room_id, user_ids) + user_who_share_room.update(user_ids) + + defer.returnValue(user_who_share_room) + def forget(self, user_id, room_id): """Indicate that user_id wishes to discard history for room_id.""" def f(txn): From 832e9c52ca9dd349e28967727ae87a484e6ce557 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 2 Feb 2017 13:09:56 +0000 Subject: [PATCH 2/5] Comment --- synapse/storage/roommember.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 70718f41e..249217e11 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -282,6 +282,8 @@ class RoomMemberStore(SQLBaseStore): @cachedInlineCallbacks(max_entries=50000, cache_context=True, iterable=True) def get_users_who_share_room_with_user(self, user_id, cache_context): + """Returns the set of users who share a room with `user_id` + """ rooms = yield self.get_rooms_for_user( user_id, on_invalidate=cache_context.invalidate, ) @@ -291,7 +293,6 @@ class RoomMemberStore(SQLBaseStore): user_ids = yield self.get_users_in_room( room.room_id, on_invalidate=cache_context.invalidate, ) - logger.info("Users in room: %r %r", room.room_id, user_ids) user_who_share_room.update(user_ids) defer.returnValue(user_who_share_room) From 9efcc3f3be1f56be8ffdb3172d6908d55028cc61 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 2 Feb 2017 13:50:22 +0000 Subject: [PATCH 3/5] Comment --- synapse/util/caches/descriptors.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 675bfd5fe..3c6838df1 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -478,6 +478,8 @@ class CacheListDescriptor(object): class _CacheContext(namedtuple("_CacheContext", ("cache", "key"))): + # We rely on _CacheContext implementing __eq__ and __hash__ sensibly, + # which namedtuple does for us. def invalidate(self): self.cache.invalidate(self.key) From 6b61060b51970bd170fffd442df0e9f02ddcf678 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 2 Feb 2017 14:47:15 +0000 Subject: [PATCH 4/5] Comment --- synapse/util/caches/descriptors.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 3c6838df1..998de70d2 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -479,7 +479,10 @@ class CacheListDescriptor(object): class _CacheContext(namedtuple("_CacheContext", ("cache", "key"))): # We rely on _CacheContext implementing __eq__ and __hash__ sensibly, - # which namedtuple does for us. + # which namedtuple does for us (i.e. two _CacheContext are the same if + # their caches and keys match). This is important in particular to + # dedupe when we add callbacks to lru cache nodes, otherwise the number + # of callbacks would grow. def invalidate(self): self.cache.invalidate(self.key) From 0f3e296cb7d63aa8b4cfcaa54a0b2a63fbe7c943 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 2 Feb 2017 15:02:03 +0000 Subject: [PATCH 5/5] Fix replication --- synapse/replication/slave/storage/events.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 15a025a01..d72ff6055 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -73,6 +73,9 @@ class SlavedEventStore(BaseSlavedStore): # to reach inside the __dict__ to extract them. get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"] get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"] + get_users_who_share_room_with_user = ( + RoomMemberStore.__dict__["get_users_who_share_room_with_user"] + ) get_latest_event_ids_in_room = EventFederationStore.__dict__[ "get_latest_event_ids_in_room" ]