mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2024-10-01 11:49:51 -04:00
Merge pull request #602 from matrix-org/erikj/presence
Change the way we figure out presence updates for small deltas
This commit is contained in:
commit
e3e0ac6ec7
@ -846,35 +846,61 @@ class PresenceEventSource(object):
|
|||||||
room_ids = room_ids or []
|
room_ids = room_ids or []
|
||||||
|
|
||||||
presence = self.hs.get_handlers().presence_handler
|
presence = self.hs.get_handlers().presence_handler
|
||||||
|
stream_change_cache = self.store.presence_stream_cache
|
||||||
|
|
||||||
if not room_ids:
|
if not room_ids:
|
||||||
rooms = yield self.store.get_rooms_for_user(user_id)
|
rooms = yield self.store.get_rooms_for_user(user_id)
|
||||||
room_ids = set(e.room_id for e in rooms)
|
room_ids = set(e.room_id for e in rooms)
|
||||||
|
else:
|
||||||
user_ids_to_check = set()
|
room_ids = set(room_ids)
|
||||||
for room_id in room_ids:
|
|
||||||
users = yield self.store.get_users_in_room(room_id)
|
|
||||||
user_ids_to_check.update(users)
|
|
||||||
|
|
||||||
plist = yield self.store.get_presence_list_accepted(user.localpart)
|
plist = yield self.store.get_presence_list_accepted(user.localpart)
|
||||||
user_ids_to_check.update([row["observed_user_id"] for row in plist])
|
friends = set(row["observed_user_id"] for row in plist)
|
||||||
|
friends.add(user_id) # So that we receive our own presence
|
||||||
|
|
||||||
# Always include yourself. Only really matters for when the user is
|
user_ids_changed = set()
|
||||||
# not in any rooms, but still.
|
changed = None
|
||||||
user_ids_to_check.add(user_id)
|
if from_key and from_key < 100:
|
||||||
|
# For small deltas, its quicker to get all changes and then
|
||||||
|
# work out if we share a room or they're in our presence list
|
||||||
|
changed = stream_change_cache.get_all_entities_changed(from_key)
|
||||||
|
|
||||||
max_token = self.store.get_current_presence_token()
|
# get_all_entities_changed can return None
|
||||||
|
if changed is not None:
|
||||||
if from_key:
|
for other_user_id in changed:
|
||||||
user_ids_changed = self.store.presence_stream_cache.get_entities_changed(
|
if other_user_id in friends:
|
||||||
user_ids_to_check, from_key,
|
user_ids_changed.add(other_user_id)
|
||||||
)
|
continue
|
||||||
|
other_rooms = yield self.store.get_rooms_for_user(other_user_id)
|
||||||
|
if room_ids.intersection(e.room_id for e in other_rooms):
|
||||||
|
user_ids_changed.add(other_user_id)
|
||||||
|
continue
|
||||||
else:
|
else:
|
||||||
user_ids_changed = user_ids_to_check
|
# Too many possible updates. Find all users we can see and check
|
||||||
|
# if any of them have changed.
|
||||||
|
user_ids_to_check = set()
|
||||||
|
for room_id in room_ids:
|
||||||
|
users = yield self.store.get_users_in_room(room_id)
|
||||||
|
user_ids_to_check.update(users)
|
||||||
|
|
||||||
|
plist = yield self.store.get_presence_list_accepted(user.localpart)
|
||||||
|
user_ids_to_check.update([row["observed_user_id"] for row in plist])
|
||||||
|
|
||||||
|
# Always include yourself. Only really matters for when the user is
|
||||||
|
# not in any rooms, but still.
|
||||||
|
user_ids_to_check.add(user_id)
|
||||||
|
|
||||||
|
if from_key:
|
||||||
|
user_ids_changed = stream_change_cache.get_entities_changed(
|
||||||
|
user_ids_to_check, from_key,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
user_ids_changed = user_ids_to_check
|
||||||
|
|
||||||
updates = yield presence.current_state_for_users(user_ids_changed)
|
updates = yield presence.current_state_for_users(user_ids_changed)
|
||||||
|
|
||||||
now = self.clock.time_msec()
|
max_token = self.store.get_current_presence_token()
|
||||||
|
now = self.clock.time_msec()
|
||||||
|
|
||||||
defer.returnValue(([
|
defer.returnValue(([
|
||||||
{
|
{
|
||||||
|
@ -85,6 +85,22 @@ class StreamChangeCache(object):
|
|||||||
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
def get_all_entities_changed(self, stream_pos):
|
||||||
|
"""Returns all entites that have had new things since the given
|
||||||
|
position. If the position is too old it will return None.
|
||||||
|
"""
|
||||||
|
assert type(stream_pos) is int
|
||||||
|
|
||||||
|
if stream_pos >= self._earliest_known_stream_pos:
|
||||||
|
keys = self._cache.keys()
|
||||||
|
i = keys.bisect_right(stream_pos)
|
||||||
|
|
||||||
|
return (
|
||||||
|
self._cache[k] for k in keys[i:]
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
return None
|
||||||
|
|
||||||
def entity_has_changed(self, entity, stream_pos):
|
def entity_has_changed(self, entity, stream_pos):
|
||||||
"""Informs the cache that the entity has been changed at the given
|
"""Informs the cache that the entity has been changed at the given
|
||||||
position.
|
position.
|
||||||
|
Loading…
Reference in New Issue
Block a user