diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 80de4f43b..84624d490 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -845,35 +845,54 @@ class PresenceEventSource(object): room_ids = room_ids or [] presence = self.hs.get_handlers().presence_handler + stream_change_cache = self.store.presence_stream_cache if not room_ids: rooms = yield self.store.get_rooms_for_user(user_id) room_ids = set(e.room_id for e in rooms) - - user_ids_to_check = set() - for room_id in room_ids: - users = yield self.store.get_users_in_room(room_id) - user_ids_to_check.update(users) + else: + room_ids = set(room_ids) plist = yield self.store.get_presence_list_accepted(user.localpart) - user_ids_to_check.update([row["observed_user_id"] for row in plist]) + friends = set(row["observed_user_id"] for row in plist) + friends.add(user_id) # So that we receive our own presence - # Always include yourself. Only really matters for when the user is - # not in any rooms, but still. - user_ids_to_check.add(user_id) + user_ids_changed = set() + if from_key and from_key < 100: + changed = stream_change_cache.get_all_entities_changed(from_key) - max_token = self.store.get_current_presence_token() - - if from_key: - user_ids_changed = self.store.presence_stream_cache.get_entities_changed( - user_ids_to_check, from_key, - ) + for other_user_id in changed: + if other_user_id in friends: + user_ids_changed.add(other_user_id) + continue + other_rooms = yield self.store.get_rooms_for_user(other_user_id) + if room_ids.intersection(e.room_id for e in other_rooms): + user_ids_changed.add(other_user_id) + continue else: - user_ids_changed = user_ids_to_check + user_ids_to_check = set() + for room_id in room_ids: + users = yield self.store.get_users_in_room(room_id) + user_ids_to_check.update(users) + + plist = yield self.store.get_presence_list_accepted(user.localpart) + user_ids_to_check.update([row["observed_user_id"] for row in plist]) + + # Always include yourself. Only really matters for when the user is + # not in any rooms, but still. + user_ids_to_check.add(user_id) + + if from_key: + user_ids_changed = stream_change_cache.get_entities_changed( + user_ids_to_check, from_key, + ) + else: + user_ids_changed = user_ids_to_check updates = yield presence.current_state_for_users(user_ids_changed) - now = self.clock.time_msec() + max_token = self.store.get_current_presence_token() + now = self.clock.time_msec() defer.returnValue(([ { diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index b37f1c072..970488a19 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -85,6 +85,22 @@ class StreamChangeCache(object): return result + def get_all_entities_changed(self, stream_pos): + """Returns all entites that have had new things since the given + position. If the position is too old it will return None. + """ + assert type(stream_pos) is int + + if stream_pos >= self._earliest_known_stream_pos: + keys = self._cache.keys() + i = keys.bisect_right(stream_pos) + + return ( + self._cache[k] for k in keys[i:] + ) + else: + return None + def entity_has_changed(self, entity, stream_pos): """Informs the cache that the entity has been changed at the given position.