From c77dae7a1abfef19940efd1378df5e2215a5b48b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 23 Feb 2016 14:48:23 +0000 Subject: [PATCH 1/3] Change the way we figure out presence updates for small deltas --- synapse/handlers/presence.py | 53 +++++++++++++++------- synapse/util/caches/stream_change_cache.py | 16 +++++++ 2 files changed, 52 insertions(+), 17 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 80de4f43b..84624d490 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -845,35 +845,54 @@ class PresenceEventSource(object): room_ids = room_ids or [] presence = self.hs.get_handlers().presence_handler + stream_change_cache = self.store.presence_stream_cache if not room_ids: rooms = yield self.store.get_rooms_for_user(user_id) room_ids = set(e.room_id for e in rooms) - - user_ids_to_check = set() - for room_id in room_ids: - users = yield self.store.get_users_in_room(room_id) - user_ids_to_check.update(users) + else: + room_ids = set(room_ids) plist = yield self.store.get_presence_list_accepted(user.localpart) - user_ids_to_check.update([row["observed_user_id"] for row in plist]) + friends = set(row["observed_user_id"] for row in plist) + friends.add(user_id) # So that we receive our own presence - # Always include yourself. Only really matters for when the user is - # not in any rooms, but still. - user_ids_to_check.add(user_id) + user_ids_changed = set() + if from_key and from_key < 100: + changed = stream_change_cache.get_all_entities_changed(from_key) - max_token = self.store.get_current_presence_token() - - if from_key: - user_ids_changed = self.store.presence_stream_cache.get_entities_changed( - user_ids_to_check, from_key, - ) + for other_user_id in changed: + if other_user_id in friends: + user_ids_changed.add(other_user_id) + continue + other_rooms = yield self.store.get_rooms_for_user(other_user_id) + if room_ids.intersection(e.room_id for e in other_rooms): + user_ids_changed.add(other_user_id) + continue else: - user_ids_changed = user_ids_to_check + user_ids_to_check = set() + for room_id in room_ids: + users = yield self.store.get_users_in_room(room_id) + user_ids_to_check.update(users) + + plist = yield self.store.get_presence_list_accepted(user.localpart) + user_ids_to_check.update([row["observed_user_id"] for row in plist]) + + # Always include yourself. Only really matters for when the user is + # not in any rooms, but still. + user_ids_to_check.add(user_id) + + if from_key: + user_ids_changed = stream_change_cache.get_entities_changed( + user_ids_to_check, from_key, + ) + else: + user_ids_changed = user_ids_to_check updates = yield presence.current_state_for_users(user_ids_changed) - now = self.clock.time_msec() + max_token = self.store.get_current_presence_token() + now = self.clock.time_msec() defer.returnValue(([ { diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index b37f1c072..970488a19 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -85,6 +85,22 @@ class StreamChangeCache(object): return result + def get_all_entities_changed(self, stream_pos): + """Returns all entites that have had new things since the given + position. If the position is too old it will return None. + """ + assert type(stream_pos) is int + + if stream_pos >= self._earliest_known_stream_pos: + keys = self._cache.keys() + i = keys.bisect_right(stream_pos) + + return ( + self._cache[k] for k in keys[i:] + ) + else: + return None + def entity_has_changed(self, entity, stream_pos): """Informs the cache that the entity has been changed at the given position. From 6e0209112bfb7fc2eee958bfc2e425b1cd399505 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 23 Feb 2016 14:57:45 +0000 Subject: [PATCH 2/3] Add comments --- synapse/handlers/presence.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 84624d490..ff62d4767 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -859,6 +859,8 @@ class PresenceEventSource(object): user_ids_changed = set() if from_key and from_key < 100: + # For small deltas, its quicker to get all changes and then + # work out if we share a room or they're in our presence list changed = stream_change_cache.get_all_entities_changed(from_key) for other_user_id in changed: @@ -870,6 +872,8 @@ class PresenceEventSource(object): user_ids_changed.add(other_user_id) continue else: + # Too many possible updates. Find all users we can see and check + # if any of them have changed. user_ids_to_check = set() for room_id in room_ids: users = yield self.store.get_users_in_room(room_id) From 13f86c3489dba5a2df06cf635f220c23e36b662a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 23 Feb 2016 15:05:37 +0000 Subject: [PATCH 3/3] Handle get_all_entities_changed returning None --- synapse/handlers/presence.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index ff62d4767..952e48e31 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -858,11 +858,14 @@ class PresenceEventSource(object): friends.add(user_id) # So that we receive our own presence user_ids_changed = set() + changed = None if from_key and from_key < 100: # For small deltas, its quicker to get all changes and then # work out if we share a room or they're in our presence list changed = stream_change_cache.get_all_entities_changed(from_key) + # get_all_entities_changed can return None + if changed is not None: for other_user_id in changed: if other_user_id in friends: user_ids_changed.add(other_user_id)