Merge pull request #1989 from matrix-org/erikj/public_list_speed

Speed up public room list
This commit is contained in:
Erik Johnston 2017-03-14 16:58:26 +00:00 committed by GitHub
commit 1bf84c4b6b
7 changed files with 76 additions and 33 deletions

View File

@ -296,7 +296,7 @@ class DeviceHandler(BaseHandler):
# ordering: treat it the same as a new room # ordering: treat it the same as a new room
event_ids = [] event_ids = []
current_state_ids = yield self.state.get_current_state_ids(room_id) current_state_ids = yield self.store.get_current_state_ids(room_id)
# special-case for an empty prev state: include all members # special-case for an empty prev state: include all members
# in the changed list # in the changed list

View File

@ -21,6 +21,7 @@ from synapse.api.constants import (
EventTypes, JoinRules, EventTypes, JoinRules,
) )
from synapse.util.async import concurrently_execute from synapse.util.async import concurrently_execute
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache from synapse.util.caches.response_cache import ResponseCache
from synapse.types import ThirdPartyInstanceID from synapse.types import ThirdPartyInstanceID
@ -62,6 +63,10 @@ class RoomListHandler(BaseHandler):
appservice and network id to use an appservice specific one. appservice and network id to use an appservice specific one.
Setting to None returns all public rooms across all lists. Setting to None returns all public rooms across all lists.
""" """
logger.info(
"Getting public room list: limit=%r, since=%r, search=%r, network=%r",
limit, since_token, bool(search_filter), network_tuple,
)
if search_filter: if search_filter:
# We explicitly don't bother caching searches or requests for # We explicitly don't bother caching searches or requests for
# appservice specific lists. # appservice specific lists.
@ -91,7 +96,6 @@ class RoomListHandler(BaseHandler):
rooms_to_order_value = {} rooms_to_order_value = {}
rooms_to_num_joined = {} rooms_to_num_joined = {}
rooms_to_latest_event_ids = {}
newly_visible = [] newly_visible = []
newly_unpublished = [] newly_unpublished = []
@ -116,19 +120,26 @@ class RoomListHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def get_order_for_room(room_id): def get_order_for_room(room_id):
latest_event_ids = rooms_to_latest_event_ids.get(room_id, None) # Most of the rooms won't have changed between the since token and
if not latest_event_ids: # now (especially if the since token is "now"). So, we can ask what
# the current users are in a room (that will hit a cache) and then
# check if the room has changed since the since token. (We have to
# do it in that order to avoid races).
# If things have changed then fall back to getting the current state
# at the since token.
joined_users = yield self.store.get_users_in_room(room_id)
if self.store.has_room_changed_since(room_id, stream_token):
latest_event_ids = yield self.store.get_forward_extremeties_for_room( latest_event_ids = yield self.store.get_forward_extremeties_for_room(
room_id, stream_token room_id, stream_token
) )
rooms_to_latest_event_ids[room_id] = latest_event_ids
if not latest_event_ids: if not latest_event_ids:
return return
joined_users = yield self.state_handler.get_current_user_in_room(
room_id, latest_event_ids,
)
joined_users = yield self.state_handler.get_current_user_in_room(
room_id, latest_event_ids,
)
num_joined_users = len(joined_users) num_joined_users = len(joined_users)
rooms_to_num_joined[room_id] = num_joined_users rooms_to_num_joined[room_id] = num_joined_users
@ -165,19 +176,19 @@ class RoomListHandler(BaseHandler):
rooms_to_scan = rooms_to_scan[:since_token.current_limit] rooms_to_scan = rooms_to_scan[:since_token.current_limit]
rooms_to_scan.reverse() rooms_to_scan.reverse()
# Actually generate the entries. _generate_room_entry will append to # Actually generate the entries. _append_room_entry_to_chunk will append to
# chunk but will stop if len(chunk) > limit # chunk but will stop if len(chunk) > limit
chunk = [] chunk = []
if limit and not search_filter: if limit and not search_filter:
step = limit + 1 step = limit + 1
for i in xrange(0, len(rooms_to_scan), step): for i in xrange(0, len(rooms_to_scan), step):
# We iterate here because the vast majority of cases we'll stop # We iterate here because the vast majority of cases we'll stop
# at first iteration, but occaisonally _generate_room_entry # at first iteration, but occaisonally _append_room_entry_to_chunk
# won't append to the chunk and so we need to loop again. # won't append to the chunk and so we need to loop again.
# We don't want to scan over the entire range either as that # We don't want to scan over the entire range either as that
# would potentially waste a lot of work. # would potentially waste a lot of work.
yield concurrently_execute( yield concurrently_execute(
lambda r: self._generate_room_entry( lambda r: self._append_room_entry_to_chunk(
r, rooms_to_num_joined[r], r, rooms_to_num_joined[r],
chunk, limit, search_filter chunk, limit, search_filter
), ),
@ -187,7 +198,7 @@ class RoomListHandler(BaseHandler):
break break
else: else:
yield concurrently_execute( yield concurrently_execute(
lambda r: self._generate_room_entry( lambda r: self._append_room_entry_to_chunk(
r, rooms_to_num_joined[r], r, rooms_to_num_joined[r],
chunk, limit, search_filter chunk, limit, search_filter
), ),
@ -256,21 +267,35 @@ class RoomListHandler(BaseHandler):
defer.returnValue(results) defer.returnValue(results)
@defer.inlineCallbacks @defer.inlineCallbacks
def _generate_room_entry(self, room_id, num_joined_users, chunk, limit, def _append_room_entry_to_chunk(self, room_id, num_joined_users, chunk, limit,
search_filter): search_filter):
"""Generate the entry for a room in the public room list and append it
to the `chunk` if it matches the search filter
"""
if limit and len(chunk) > limit + 1: if limit and len(chunk) > limit + 1:
# We've already got enough, so lets just drop it. # We've already got enough, so lets just drop it.
return return
result = yield self._generate_room_entry(room_id, num_joined_users)
if result and _matches_room_entry(result, search_filter):
chunk.append(result)
@cachedInlineCallbacks(num_args=1, cache_context=True)
def _generate_room_entry(self, room_id, num_joined_users, cache_context):
"""Returns the entry for a room
"""
result = { result = {
"room_id": room_id, "room_id": room_id,
"num_joined_members": num_joined_users, "num_joined_members": num_joined_users,
} }
current_state_ids = yield self.state_handler.get_current_state_ids(room_id) current_state_ids = yield self.store.get_current_state_ids(
room_id, on_invalidate=cache_context.invalidate,
)
event_map = yield self.store.get_events([ event_map = yield self.store.get_events([
event_id for key, event_id in current_state_ids.items() event_id for key, event_id in current_state_ids.iteritems()
if key[0] in ( if key[0] in (
EventTypes.JoinRules, EventTypes.JoinRules,
EventTypes.Name, EventTypes.Name,
@ -294,7 +319,9 @@ class RoomListHandler(BaseHandler):
if join_rule and join_rule != JoinRules.PUBLIC: if join_rule and join_rule != JoinRules.PUBLIC:
defer.returnValue(None) defer.returnValue(None)
aliases = yield self.store.get_aliases_for_room(room_id) aliases = yield self.store.get_aliases_for_room(
room_id, on_invalidate=cache_context.invalidate
)
if aliases: if aliases:
result["aliases"] = aliases result["aliases"] = aliases
@ -334,8 +361,7 @@ class RoomListHandler(BaseHandler):
if avatar_url: if avatar_url:
result["avatar_url"] = avatar_url result["avatar_url"] = avatar_url
if _matches_room_entry(result, search_filter): defer.returnValue(result)
chunk.append(result)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_remote_public_room_list(self, server_name, limit=None, since_token=None, def get_remote_public_room_list(self, server_name, limit=None, since_token=None,

View File

@ -139,7 +139,7 @@ class Mailer(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _fetch_room_state(room_id): def _fetch_room_state(room_id):
room_state = yield self.state_handler.get_current_state_ids(room_id) room_state = yield self.store.get_current_state_ids(room_id)
state_by_room[room_id] = room_state state_by_room[room_id] = room_state
# Run at most 3 of these at once: sync does 10 at a time but email # Run at most 3 of these at once: sync does 10 at a time but email

View File

@ -109,6 +109,10 @@ class SlavedEventStore(BaseSlavedStore):
get_recent_event_ids_for_room = ( get_recent_event_ids_for_room = (
StreamStore.__dict__["get_recent_event_ids_for_room"] StreamStore.__dict__["get_recent_event_ids_for_room"]
) )
get_current_state_ids = (
StateStore.__dict__["get_current_state_ids"]
)
has_room_changed_since = DataStore.has_room_changed_since.__func__
get_unread_push_actions_for_user_in_range_for_http = ( get_unread_push_actions_for_user_in_range_for_http = (
DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__ DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__

View File

@ -467,14 +467,9 @@ class EventsStore(SQLBaseStore):
else: else:
return return
existing_state_rows = yield self._simple_select_list( existing_state = yield self.get_current_state_ids(room_id)
table="current_state_events",
keyvalues={"room_id": room_id},
retcols=["event_id", "type", "state_key"],
desc="_calculate_state_delta",
)
existing_events = set(row["event_id"] for row in existing_state_rows) existing_events = set(existing_state.itervalues())
new_events = set(ev_id for ev_id in current_state.itervalues()) new_events = set(ev_id for ev_id in current_state.itervalues())
changed_events = existing_events ^ new_events changed_events = existing_events ^ new_events
@ -482,9 +477,8 @@ class EventsStore(SQLBaseStore):
return return
to_delete = { to_delete = {
(row["type"], row["state_key"]): row["event_id"] key: ev_id for key, ev_id in existing_state.iteritems()
for row in existing_state_rows if ev_id in changed_events
if row["event_id"] in changed_events
} }
events_to_insert = (new_events - existing_events) events_to_insert = (new_events - existing_events)
to_insert = { to_insert = {
@ -610,6 +604,10 @@ class EventsStore(SQLBaseStore):
txn, self.get_users_in_room, (room_id,) txn, self.get_users_in_room, (room_id,)
) )
self._invalidate_cache_and_stream(
txn, self.get_current_state_ids, (room_id,)
)
for room_id, new_extrem in new_forward_extremeties.items(): for room_id, new_extrem in new_forward_extremeties.items():
self._simple_delete_txn( self._simple_delete_txn(
txn, txn,

View File

@ -14,7 +14,7 @@
# limitations under the License. # limitations under the License.
from ._base import SQLBaseStore from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
from synapse.util.caches import intern_string from synapse.util.caches import intern_string
from synapse.storage.engines import PostgresEngine from synapse.storage.engines import PostgresEngine
@ -69,6 +69,18 @@ class StateStore(SQLBaseStore):
where_clause="type='m.room.member'", where_clause="type='m.room.member'",
) )
@cachedInlineCallbacks(max_entries=100000, iterable=True)
def get_current_state_ids(self, room_id):
rows = yield self._simple_select_list(
table="current_state_events",
keyvalues={"room_id": room_id},
retcols=["event_id", "type", "state_key"],
desc="_calculate_state_delta",
)
defer.returnValue({
(r["type"], r["state_key"]): r["event_id"] for r in rows
})
@defer.inlineCallbacks @defer.inlineCallbacks
def get_state_groups_ids(self, room_id, event_ids): def get_state_groups_ids(self, room_id, event_ids):
if not event_ids: if not event_ids:

View File

@ -829,3 +829,6 @@ class StreamStore(SQLBaseStore):
updatevalues={"stream_id": stream_id}, updatevalues={"stream_id": stream_id},
desc="update_federation_out_pos", desc="update_federation_out_pos",
) )
def has_room_changed_since(self, room_id, stream_id):
return self._events_stream_cache.has_entity_changed(room_id, stream_id)