mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2024-10-01 11:49:51 -04:00
Merge pull request #157 from matrix-org/markjh/presence_performance
Improve presence performance in loadtest
This commit is contained in:
commit
49a2c10279
@ -146,6 +146,10 @@ class PresenceHandler(BaseHandler):
|
|||||||
self._user_cachemap = {}
|
self._user_cachemap = {}
|
||||||
self._user_cachemap_latest_serial = 0
|
self._user_cachemap_latest_serial = 0
|
||||||
|
|
||||||
|
# map room_ids to the latest presence serial for a member of that
|
||||||
|
# room
|
||||||
|
self._room_serials = {}
|
||||||
|
|
||||||
metrics.register_callback(
|
metrics.register_callback(
|
||||||
"userCachemap:size",
|
"userCachemap:size",
|
||||||
lambda: len(self._user_cachemap),
|
lambda: len(self._user_cachemap),
|
||||||
@ -297,13 +301,34 @@ class PresenceHandler(BaseHandler):
|
|||||||
|
|
||||||
self.changed_presencelike_data(user, {"last_active": now})
|
self.changed_presencelike_data(user, {"last_active": now})
|
||||||
|
|
||||||
|
def get_joined_rooms_for_user(self, user):
|
||||||
|
"""Get the list of rooms a user is joined to.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user(UserID): The user.
|
||||||
|
Returns:
|
||||||
|
A Deferred of a list of room id strings.
|
||||||
|
"""
|
||||||
|
rm_handler = self.homeserver.get_handlers().room_member_handler
|
||||||
|
return rm_handler.get_joined_rooms_for_user(user)
|
||||||
|
|
||||||
|
def get_joined_users_for_room_id(self, room_id):
|
||||||
|
rm_handler = self.homeserver.get_handlers().room_member_handler
|
||||||
|
return rm_handler.get_room_members(room_id)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
def changed_presencelike_data(self, user, state):
|
def changed_presencelike_data(self, user, state):
|
||||||
statuscache = self._get_or_make_usercache(user)
|
"""Updates the presence state of a local user.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user(UserID): The user being updated.
|
||||||
|
state(dict): The new presence state for the user.
|
||||||
|
Returns:
|
||||||
|
A Deferred
|
||||||
|
"""
|
||||||
self._user_cachemap_latest_serial += 1
|
self._user_cachemap_latest_serial += 1
|
||||||
statuscache.update(state, serial=self._user_cachemap_latest_serial)
|
statuscache = yield self.update_presence_cache(user, state)
|
||||||
|
yield self.push_presence(user, statuscache=statuscache)
|
||||||
return self.push_presence(user, statuscache=statuscache)
|
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
def started_user_eventstream(self, user):
|
def started_user_eventstream(self, user):
|
||||||
@ -326,13 +351,12 @@ class PresenceHandler(BaseHandler):
|
|||||||
room_id(str): The room id the user joined.
|
room_id(str): The room id the user joined.
|
||||||
"""
|
"""
|
||||||
if self.hs.is_mine(user):
|
if self.hs.is_mine(user):
|
||||||
statuscache = self._get_or_make_usercache(user)
|
|
||||||
|
|
||||||
# No actual update but we need to bump the serial anyway for the
|
# No actual update but we need to bump the serial anyway for the
|
||||||
# event source
|
# event source
|
||||||
self._user_cachemap_latest_serial += 1
|
self._user_cachemap_latest_serial += 1
|
||||||
statuscache.update({}, serial=self._user_cachemap_latest_serial)
|
statuscache = yield self.update_presence_cache(
|
||||||
|
user, room_ids=[room_id]
|
||||||
|
)
|
||||||
self.push_update_to_local_and_remote(
|
self.push_update_to_local_and_remote(
|
||||||
observed_user=user,
|
observed_user=user,
|
||||||
room_ids=[room_id],
|
room_ids=[room_id],
|
||||||
@ -340,16 +364,17 @@ class PresenceHandler(BaseHandler):
|
|||||||
)
|
)
|
||||||
|
|
||||||
# We also want to tell them about current presence of people.
|
# We also want to tell them about current presence of people.
|
||||||
rm_handler = self.homeserver.get_handlers().room_member_handler
|
curr_users = yield self.get_joined_users_for_room_id(room_id)
|
||||||
curr_users = yield rm_handler.get_room_members(room_id)
|
|
||||||
|
|
||||||
for local_user in [c for c in curr_users if self.hs.is_mine(c)]:
|
for local_user in [c for c in curr_users if self.hs.is_mine(c)]:
|
||||||
statuscache = self._get_or_offline_usercache(local_user)
|
statuscache = yield self.update_presence_cache(
|
||||||
statuscache.update({}, serial=self._user_cachemap_latest_serial)
|
local_user, room_ids=[room_id], add_to_cache=False
|
||||||
|
)
|
||||||
|
|
||||||
self.push_update_to_local_and_remote(
|
self.push_update_to_local_and_remote(
|
||||||
observed_user=local_user,
|
observed_user=local_user,
|
||||||
users_to_push=[user],
|
users_to_push=[user],
|
||||||
statuscache=self._get_or_offline_usercache(local_user),
|
statuscache=statuscache,
|
||||||
)
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@ -546,8 +571,7 @@ class PresenceHandler(BaseHandler):
|
|||||||
|
|
||||||
# Also include people in all my rooms
|
# Also include people in all my rooms
|
||||||
|
|
||||||
rm_handler = self.homeserver.get_handlers().room_member_handler
|
room_ids = yield self.get_joined_rooms_for_user(user)
|
||||||
room_ids = yield rm_handler.get_joined_rooms_for_user(user)
|
|
||||||
|
|
||||||
if state is None:
|
if state is None:
|
||||||
state = yield self.store.get_presence_state(user.localpart)
|
state = yield self.store.get_presence_state(user.localpart)
|
||||||
@ -747,8 +771,7 @@ class PresenceHandler(BaseHandler):
|
|||||||
# and also user is informed of server-forced pushes
|
# and also user is informed of server-forced pushes
|
||||||
localusers.add(user)
|
localusers.add(user)
|
||||||
|
|
||||||
rm_handler = self.homeserver.get_handlers().room_member_handler
|
room_ids = yield self.get_joined_rooms_for_user(user)
|
||||||
room_ids = yield rm_handler.get_joined_rooms_for_user(user)
|
|
||||||
|
|
||||||
if not localusers and not room_ids:
|
if not localusers and not room_ids:
|
||||||
defer.returnValue(None)
|
defer.returnValue(None)
|
||||||
@ -793,8 +816,7 @@ class PresenceHandler(BaseHandler):
|
|||||||
" | %d interested local observers %r", len(observers), observers
|
" | %d interested local observers %r", len(observers), observers
|
||||||
)
|
)
|
||||||
|
|
||||||
rm_handler = self.homeserver.get_handlers().room_member_handler
|
room_ids = yield self.get_joined_rooms_for_user(user)
|
||||||
room_ids = yield rm_handler.get_joined_rooms_for_user(user)
|
|
||||||
if room_ids:
|
if room_ids:
|
||||||
logger.debug(" | %d interested room IDs %r", len(room_ids), room_ids)
|
logger.debug(" | %d interested room IDs %r", len(room_ids), room_ids)
|
||||||
|
|
||||||
@ -813,10 +835,8 @@ class PresenceHandler(BaseHandler):
|
|||||||
self.clock.time_msec() - state.pop("last_active_ago")
|
self.clock.time_msec() - state.pop("last_active_ago")
|
||||||
)
|
)
|
||||||
|
|
||||||
statuscache = self._get_or_make_usercache(user)
|
|
||||||
|
|
||||||
self._user_cachemap_latest_serial += 1
|
self._user_cachemap_latest_serial += 1
|
||||||
statuscache.update(state, serial=self._user_cachemap_latest_serial)
|
yield self.update_presence_cache(user, state, room_ids=room_ids)
|
||||||
|
|
||||||
if not observers and not room_ids:
|
if not observers and not room_ids:
|
||||||
logger.debug(" | no interested observers or room IDs")
|
logger.debug(" | no interested observers or room IDs")
|
||||||
@ -874,6 +894,35 @@ class PresenceHandler(BaseHandler):
|
|||||||
|
|
||||||
yield defer.DeferredList(deferreds, consumeErrors=True)
|
yield defer.DeferredList(deferreds, consumeErrors=True)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def update_presence_cache(self, user, state={}, room_ids=None,
|
||||||
|
add_to_cache=True):
|
||||||
|
"""Update the presence cache for a user with a new state and bump the
|
||||||
|
serial to the latest value.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user(UserID): The user being updated
|
||||||
|
state(dict): The presence state being updated
|
||||||
|
room_ids(None or list of str): A list of room_ids to update. If
|
||||||
|
room_ids is None then fetch the list of room_ids the user is
|
||||||
|
joined to.
|
||||||
|
add_to_cache: Whether to add an entry to the presence cache if the
|
||||||
|
user isn't already in the cache.
|
||||||
|
Returns:
|
||||||
|
A Deferred UserPresenceCache for the user being updated.
|
||||||
|
"""
|
||||||
|
if room_ids is None:
|
||||||
|
room_ids = yield self.get_joined_rooms_for_user(user)
|
||||||
|
|
||||||
|
for room_id in room_ids:
|
||||||
|
self._room_serials[room_id] = self._user_cachemap_latest_serial
|
||||||
|
if add_to_cache:
|
||||||
|
statuscache = self._get_or_make_usercache(user)
|
||||||
|
else:
|
||||||
|
statuscache = self._get_or_offline_usercache(user)
|
||||||
|
statuscache.update(state, serial=self._user_cachemap_latest_serial)
|
||||||
|
defer.returnValue(statuscache)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def push_update_to_local_and_remote(self, observed_user, statuscache,
|
def push_update_to_local_and_remote(self, observed_user, statuscache,
|
||||||
users_to_push=[], room_ids=[],
|
users_to_push=[], room_ids=[],
|
||||||
@ -996,39 +1045,11 @@ class PresenceEventSource(object):
|
|||||||
self.hs = hs
|
self.hs = hs
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def is_visible(self, observer_user, observed_user):
|
|
||||||
if observer_user == observed_user:
|
|
||||||
defer.returnValue(True)
|
|
||||||
|
|
||||||
presence = self.hs.get_handlers().presence_handler
|
|
||||||
|
|
||||||
if (yield presence.store.user_rooms_intersect(
|
|
||||||
[u.to_string() for u in observer_user, observed_user])):
|
|
||||||
defer.returnValue(True)
|
|
||||||
|
|
||||||
if self.hs.is_mine(observed_user):
|
|
||||||
pushmap = presence._local_pushmap
|
|
||||||
|
|
||||||
defer.returnValue(
|
|
||||||
observed_user.localpart in pushmap and
|
|
||||||
observer_user in pushmap[observed_user.localpart]
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
recvmap = presence._remote_recvmap
|
|
||||||
|
|
||||||
defer.returnValue(
|
|
||||||
observed_user in recvmap and
|
|
||||||
observer_user in recvmap[observed_user]
|
|
||||||
)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def get_new_events_for_user(self, user, from_key, limit):
|
def get_new_events_for_user(self, user, from_key, limit):
|
||||||
from_key = int(from_key)
|
from_key = int(from_key)
|
||||||
|
|
||||||
observer_user = user
|
|
||||||
|
|
||||||
presence = self.hs.get_handlers().presence_handler
|
presence = self.hs.get_handlers().presence_handler
|
||||||
cachemap = presence._user_cachemap
|
cachemap = presence._user_cachemap
|
||||||
|
|
||||||
@ -1037,17 +1058,27 @@ class PresenceEventSource(object):
|
|||||||
clock = self.clock
|
clock = self.clock
|
||||||
latest_serial = 0
|
latest_serial = 0
|
||||||
|
|
||||||
|
user_ids_to_check = {user}
|
||||||
|
presence_list = yield presence.store.get_presence_list(
|
||||||
|
user.localpart, accepted=True
|
||||||
|
)
|
||||||
|
if presence_list is not None:
|
||||||
|
user_ids_to_check |= set(
|
||||||
|
UserID.from_string(p["observed_user_id"]) for p in presence_list
|
||||||
|
)
|
||||||
|
room_ids = yield presence.get_joined_rooms_for_user(user)
|
||||||
|
for room_id in set(room_ids) & set(presence._room_serials):
|
||||||
|
if presence._room_serials[room_id] > from_key:
|
||||||
|
joined = yield presence.get_joined_users_for_room_id(room_id)
|
||||||
|
user_ids_to_check |= set(joined)
|
||||||
|
|
||||||
updates = []
|
updates = []
|
||||||
# TODO(paul): use a DeferredList ? How to limit concurrency.
|
for observed_user in user_ids_to_check & set(cachemap):
|
||||||
for observed_user in cachemap.keys():
|
|
||||||
cached = cachemap[observed_user]
|
cached = cachemap[observed_user]
|
||||||
|
|
||||||
if cached.serial <= from_key or cached.serial > max_serial:
|
if cached.serial <= from_key or cached.serial > max_serial:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if not (yield self.is_visible(observer_user, observed_user)):
|
|
||||||
continue
|
|
||||||
|
|
||||||
latest_serial = max(cached.serial, latest_serial)
|
latest_serial = max(cached.serial, latest_serial)
|
||||||
updates.append(cached.make_event(user=observed_user, clock=clock))
|
updates.append(cached.make_event(user=observed_user, clock=clock))
|
||||||
|
|
||||||
@ -1084,8 +1115,6 @@ class PresenceEventSource(object):
|
|||||||
def get_pagination_rows(self, user, pagination_config, key):
|
def get_pagination_rows(self, user, pagination_config, key):
|
||||||
# TODO (erikj): Does this make sense? Ordering?
|
# TODO (erikj): Does this make sense? Ordering?
|
||||||
|
|
||||||
observer_user = user
|
|
||||||
|
|
||||||
from_key = int(pagination_config.from_key)
|
from_key = int(pagination_config.from_key)
|
||||||
|
|
||||||
if pagination_config.to_key:
|
if pagination_config.to_key:
|
||||||
@ -1096,14 +1125,26 @@ class PresenceEventSource(object):
|
|||||||
presence = self.hs.get_handlers().presence_handler
|
presence = self.hs.get_handlers().presence_handler
|
||||||
cachemap = presence._user_cachemap
|
cachemap = presence._user_cachemap
|
||||||
|
|
||||||
|
user_ids_to_check = {user}
|
||||||
|
presence_list = yield presence.store.get_presence_list(
|
||||||
|
user.localpart, accepted=True
|
||||||
|
)
|
||||||
|
if presence_list is not None:
|
||||||
|
user_ids_to_check |= set(
|
||||||
|
UserID.from_string(p["observed_user_id"]) for p in presence_list
|
||||||
|
)
|
||||||
|
room_ids = yield presence.get_joined_rooms_for_user(user)
|
||||||
|
for room_id in set(room_ids) & set(presence._room_serials):
|
||||||
|
if presence._room_serials[room_id] >= from_key:
|
||||||
|
joined = yield presence.get_joined_users_for_room_id(room_id)
|
||||||
|
user_ids_to_check |= set(joined)
|
||||||
|
|
||||||
updates = []
|
updates = []
|
||||||
# TODO(paul): use a DeferredList ? How to limit concurrency.
|
for observed_user in user_ids_to_check & set(cachemap):
|
||||||
for observed_user in cachemap.keys():
|
|
||||||
if not (to_key < cachemap[observed_user].serial <= from_key):
|
if not (to_key < cachemap[observed_user].serial <= from_key):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if (yield self.is_visible(observer_user, observed_user)):
|
updates.append((observed_user, cachemap[observed_user]))
|
||||||
updates.append((observed_user, cachemap[observed_user]))
|
|
||||||
|
|
||||||
# TODO(paul): limit
|
# TODO(paul): limit
|
||||||
|
|
||||||
|
@ -624,6 +624,7 @@ class PresencePushTestCase(MockedDatastorePresenceTestCase):
|
|||||||
"""
|
"""
|
||||||
PRESENCE_LIST = {
|
PRESENCE_LIST = {
|
||||||
'apple': [ "@banana:test", "@clementine:test" ],
|
'apple': [ "@banana:test", "@clementine:test" ],
|
||||||
|
'banana': [ "@apple:test" ],
|
||||||
}
|
}
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@ -836,12 +837,7 @@ class PresencePushTestCase(MockedDatastorePresenceTestCase):
|
|||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_recv_remote(self):
|
def test_recv_remote(self):
|
||||||
# TODO(paul): Gut-wrenching
|
self.room_members = [self.u_apple, self.u_banana, self.u_potato]
|
||||||
potato_set = self.handler._remote_recvmap.setdefault(self.u_potato,
|
|
||||||
set())
|
|
||||||
potato_set.add(self.u_apple)
|
|
||||||
|
|
||||||
self.room_members = [self.u_banana, self.u_potato]
|
|
||||||
|
|
||||||
self.assertEquals(self.event_source.get_current_key(), 0)
|
self.assertEquals(self.event_source.get_current_key(), 0)
|
||||||
|
|
||||||
@ -886,11 +882,8 @@ class PresencePushTestCase(MockedDatastorePresenceTestCase):
|
|||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_recv_remote_offline(self):
|
def test_recv_remote_offline(self):
|
||||||
""" Various tests relating to SYN-261 """
|
""" Various tests relating to SYN-261 """
|
||||||
potato_set = self.handler._remote_recvmap.setdefault(self.u_potato,
|
|
||||||
set())
|
|
||||||
potato_set.add(self.u_apple)
|
|
||||||
|
|
||||||
self.room_members = [self.u_banana, self.u_potato]
|
self.room_members = [self.u_apple, self.u_banana, self.u_potato]
|
||||||
|
|
||||||
self.assertEquals(self.event_source.get_current_key(), 0)
|
self.assertEquals(self.event_source.get_current_key(), 0)
|
||||||
|
|
||||||
|
@ -297,6 +297,9 @@ class PresenceEventStreamTestCase(unittest.TestCase):
|
|||||||
else:
|
else:
|
||||||
return []
|
return []
|
||||||
hs.handlers.room_member_handler.get_joined_rooms_for_user = get_rooms_for_user
|
hs.handlers.room_member_handler.get_joined_rooms_for_user = get_rooms_for_user
|
||||||
|
hs.handlers.room_member_handler.get_room_members = (
|
||||||
|
lambda r: self.room_members if r == "a-room" else []
|
||||||
|
)
|
||||||
|
|
||||||
self.mock_datastore = hs.get_datastore()
|
self.mock_datastore = hs.get_datastore()
|
||||||
self.mock_datastore.get_app_service_by_token = Mock(return_value=None)
|
self.mock_datastore.get_app_service_by_token = Mock(return_value=None)
|
||||||
|
Loading…
Reference in New Issue
Block a user