Change the way we figure out presence updates for small deltas

This commit is contained in:
Erik Johnston 2016-02-23 14:48:23 +00:00
parent 02e928cf9b
commit c77dae7a1a
2 changed files with 52 additions and 17 deletions

View File

@ -845,35 +845,54 @@ class PresenceEventSource(object):
room_ids = room_ids or [] room_ids = room_ids or []
presence = self.hs.get_handlers().presence_handler presence = self.hs.get_handlers().presence_handler
stream_change_cache = self.store.presence_stream_cache
if not room_ids: if not room_ids:
rooms = yield self.store.get_rooms_for_user(user_id) rooms = yield self.store.get_rooms_for_user(user_id)
room_ids = set(e.room_id for e in rooms) room_ids = set(e.room_id for e in rooms)
else:
user_ids_to_check = set() room_ids = set(room_ids)
for room_id in room_ids:
users = yield self.store.get_users_in_room(room_id)
user_ids_to_check.update(users)
plist = yield self.store.get_presence_list_accepted(user.localpart) plist = yield self.store.get_presence_list_accepted(user.localpart)
user_ids_to_check.update([row["observed_user_id"] for row in plist]) friends = set(row["observed_user_id"] for row in plist)
friends.add(user_id) # So that we receive our own presence
# Always include yourself. Only really matters for when the user is user_ids_changed = set()
# not in any rooms, but still. if from_key and from_key < 100:
user_ids_to_check.add(user_id) changed = stream_change_cache.get_all_entities_changed(from_key)
max_token = self.store.get_current_presence_token() for other_user_id in changed:
if other_user_id in friends:
if from_key: user_ids_changed.add(other_user_id)
user_ids_changed = self.store.presence_stream_cache.get_entities_changed( continue
user_ids_to_check, from_key, other_rooms = yield self.store.get_rooms_for_user(other_user_id)
) if room_ids.intersection(e.room_id for e in other_rooms):
user_ids_changed.add(other_user_id)
continue
else: else:
user_ids_changed = user_ids_to_check user_ids_to_check = set()
for room_id in room_ids:
users = yield self.store.get_users_in_room(room_id)
user_ids_to_check.update(users)
plist = yield self.store.get_presence_list_accepted(user.localpart)
user_ids_to_check.update([row["observed_user_id"] for row in plist])
# Always include yourself. Only really matters for when the user is
# not in any rooms, but still.
user_ids_to_check.add(user_id)
if from_key:
user_ids_changed = stream_change_cache.get_entities_changed(
user_ids_to_check, from_key,
)
else:
user_ids_changed = user_ids_to_check
updates = yield presence.current_state_for_users(user_ids_changed) updates = yield presence.current_state_for_users(user_ids_changed)
now = self.clock.time_msec() max_token = self.store.get_current_presence_token()
now = self.clock.time_msec()
defer.returnValue(([ defer.returnValue(([
{ {

View File

@ -85,6 +85,22 @@ class StreamChangeCache(object):
return result return result
def get_all_entities_changed(self, stream_pos):
"""Returns all entites that have had new things since the given
position. If the position is too old it will return None.
"""
assert type(stream_pos) is int
if stream_pos >= self._earliest_known_stream_pos:
keys = self._cache.keys()
i = keys.bisect_right(stream_pos)
return (
self._cache[k] for k in keys[i:]
)
else:
return None
def entity_has_changed(self, entity, stream_pos): def entity_has_changed(self, entity, stream_pos):
"""Informs the cache that the entity has been changed at the given """Informs the cache that the entity has been changed at the given
position. position.