Get current state by using current_state_events table

This commit is contained in:
Erik Johnston 2017-03-10 17:39:35 +00:00
parent 3872c7a107
commit 8ffbe43ba1
5 changed files with 52 additions and 31 deletions

View File

@ -262,7 +262,7 @@ class DeviceHandler(BaseHandler):
# ordering: treat it the same as a new room # ordering: treat it the same as a new room
event_ids = [] event_ids = []
current_state_ids = yield self.state.get_current_state_ids(room_id) current_state_ids = yield self.store.get_current_state_ids(room_id)
# special-case for an empty prev state: include all members # special-case for an empty prev state: include all members
# in the changed list # in the changed list

View File

@ -21,6 +21,7 @@ from synapse.api.constants import (
EventTypes, JoinRules, EventTypes, JoinRules,
) )
from synapse.util.async import concurrently_execute from synapse.util.async import concurrently_execute
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache from synapse.util.caches.response_cache import ResponseCache
from synapse.types import ThirdPartyInstanceID from synapse.types import ThirdPartyInstanceID
@ -62,6 +63,10 @@ class RoomListHandler(BaseHandler):
appservice and network id to use an appservice specific one. appservice and network id to use an appservice specific one.
Setting to None returns all public rooms across all lists. Setting to None returns all public rooms across all lists.
""" """
logger.info(
"Getting public room list: limit=%r, since=%r, search=%r, network=%r",
limit, since_token, bool(search_filter), network_tuple,
)
if search_filter: if search_filter:
# We explicitly don't bother caching searches or requests for # We explicitly don't bother caching searches or requests for
# appservice specific lists. # appservice specific lists.
@ -91,7 +96,6 @@ class RoomListHandler(BaseHandler):
rooms_to_order_value = {} rooms_to_order_value = {}
rooms_to_num_joined = {} rooms_to_num_joined = {}
rooms_to_latest_event_ids = {}
newly_visible = [] newly_visible = []
newly_unpublished = [] newly_unpublished = []
@ -116,12 +120,9 @@ class RoomListHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def get_order_for_room(room_id): def get_order_for_room(room_id):
latest_event_ids = rooms_to_latest_event_ids.get(room_id, None) latest_event_ids = yield self.store.get_forward_extremeties_for_room(
if not latest_event_ids: room_id, stream_token
latest_event_ids = yield self.store.get_forward_extremeties_for_room( )
room_id, stream_token
)
rooms_to_latest_event_ids[room_id] = latest_event_ids
if not latest_event_ids: if not latest_event_ids:
return return
@ -165,19 +166,19 @@ class RoomListHandler(BaseHandler):
rooms_to_scan = rooms_to_scan[:since_token.current_limit] rooms_to_scan = rooms_to_scan[:since_token.current_limit]
rooms_to_scan.reverse() rooms_to_scan.reverse()
# Actually generate the entries. _generate_room_entry will append to # Actually generate the entries. _append_room_entry_to_chunk will append to
# chunk but will stop if len(chunk) > limit # chunk but will stop if len(chunk) > limit
chunk = [] chunk = []
if limit and not search_filter: if limit and not search_filter:
step = limit + 1 step = limit + 1
for i in xrange(0, len(rooms_to_scan), step): for i in xrange(0, len(rooms_to_scan), step):
# We iterate here because the vast majority of cases we'll stop # We iterate here because the vast majority of cases we'll stop
# at first iteration, but occaisonally _generate_room_entry # at first iteration, but occaisonally _append_room_entry_to_chunk
# won't append to the chunk and so we need to loop again. # won't append to the chunk and so we need to loop again.
# We don't want to scan over the entire range either as that # We don't want to scan over the entire range either as that
# would potentially waste a lot of work. # would potentially waste a lot of work.
yield concurrently_execute( yield concurrently_execute(
lambda r: self._generate_room_entry( lambda r: self._append_room_entry_to_chunk(
r, rooms_to_num_joined[r], r, rooms_to_num_joined[r],
chunk, limit, search_filter chunk, limit, search_filter
), ),
@ -187,7 +188,7 @@ class RoomListHandler(BaseHandler):
break break
else: else:
yield concurrently_execute( yield concurrently_execute(
lambda r: self._generate_room_entry( lambda r: self._append_room_entry_to_chunk(
r, rooms_to_num_joined[r], r, rooms_to_num_joined[r],
chunk, limit, search_filter chunk, limit, search_filter
), ),
@ -256,21 +257,30 @@ class RoomListHandler(BaseHandler):
defer.returnValue(results) defer.returnValue(results)
@defer.inlineCallbacks @defer.inlineCallbacks
def _generate_room_entry(self, room_id, num_joined_users, chunk, limit, def _append_room_entry_to_chunk(self, room_id, num_joined_users, chunk, limit,
search_filter): search_filter):
if limit and len(chunk) > limit + 1: if limit and len(chunk) > limit + 1:
# We've already got enough, so lets just drop it. # We've already got enough, so lets just drop it.
return return
result = yield self._generate_room_entry(room_id, num_joined_users)
if result and _matches_room_entry(result, search_filter):
chunk.append(result)
@cachedInlineCallbacks(num_args=1, cache_context=True)
def _generate_room_entry(self, room_id, num_joined_users, cache_context):
result = { result = {
"room_id": room_id, "room_id": room_id,
"num_joined_members": num_joined_users, "num_joined_members": num_joined_users,
} }
current_state_ids = yield self.state_handler.get_current_state_ids(room_id) current_state_ids = yield self.store.get_current_state_ids(
room_id, on_invalidate=cache_context.invalidate,
)
event_map = yield self.store.get_events([ event_map = yield self.store.get_events([
event_id for key, event_id in current_state_ids.items() event_id for key, event_id in current_state_ids.iteritems()
if key[0] in ( if key[0] in (
EventTypes.JoinRules, EventTypes.JoinRules,
EventTypes.Name, EventTypes.Name,
@ -294,7 +304,9 @@ class RoomListHandler(BaseHandler):
if join_rule and join_rule != JoinRules.PUBLIC: if join_rule and join_rule != JoinRules.PUBLIC:
defer.returnValue(None) defer.returnValue(None)
aliases = yield self.store.get_aliases_for_room(room_id) aliases = yield self.store.get_aliases_for_room(
room_id, on_invalidate=cache_context.invalidate
)
if aliases: if aliases:
result["aliases"] = aliases result["aliases"] = aliases
@ -334,8 +346,7 @@ class RoomListHandler(BaseHandler):
if avatar_url: if avatar_url:
result["avatar_url"] = avatar_url result["avatar_url"] = avatar_url
if _matches_room_entry(result, search_filter): defer.returnValue(result)
chunk.append(result)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_remote_public_room_list(self, server_name, limit=None, since_token=None, def get_remote_public_room_list(self, server_name, limit=None, since_token=None,

View File

@ -139,7 +139,7 @@ class Mailer(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _fetch_room_state(room_id): def _fetch_room_state(room_id):
room_state = yield self.state_handler.get_current_state_ids(room_id) room_state = yield self.store.get_current_state_ids(room_id)
state_by_room[room_id] = room_state state_by_room[room_id] = room_state
# Run at most 3 of these at once: sync does 10 at a time but email # Run at most 3 of these at once: sync does 10 at a time but email

View File

@ -442,14 +442,9 @@ class EventsStore(SQLBaseStore):
else: else:
return return
existing_state_rows = yield self._simple_select_list( existing_state = yield self.get_current_state_ids(room_id)
table="current_state_events",
keyvalues={"room_id": room_id},
retcols=["event_id", "type", "state_key"],
desc="_calculate_state_delta",
)
existing_events = set(row["event_id"] for row in existing_state_rows) existing_events = set(existing_state.itervalues())
new_events = set(ev_id for ev_id in current_state.itervalues()) new_events = set(ev_id for ev_id in current_state.itervalues())
changed_events = existing_events ^ new_events changed_events = existing_events ^ new_events
@ -457,9 +452,8 @@ class EventsStore(SQLBaseStore):
return return
to_delete = { to_delete = {
(row["type"], row["state_key"]): row["event_id"] key: ev_id for key, ev_id in existing_state.iteritems()
for row in existing_state_rows if ev_id in changed_events
if row["event_id"] in changed_events
} }
events_to_insert = (new_events - existing_events) events_to_insert = (new_events - existing_events)
to_insert = { to_insert = {
@ -585,6 +579,10 @@ class EventsStore(SQLBaseStore):
txn, self.get_users_in_room, (room_id,) txn, self.get_users_in_room, (room_id,)
) )
self._invalidate_cache_and_stream(
txn, self.get_current_state_ids, (room_id,)
)
for room_id, new_extrem in new_forward_extremeties.items(): for room_id, new_extrem in new_forward_extremeties.items():
self._simple_delete_txn( self._simple_delete_txn(
txn, txn,

View File

@ -14,7 +14,7 @@
# limitations under the License. # limitations under the License.
from ._base import SQLBaseStore from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
from synapse.util.caches import intern_string from synapse.util.caches import intern_string
from synapse.storage.engines import PostgresEngine from synapse.storage.engines import PostgresEngine
@ -69,6 +69,18 @@ class StateStore(SQLBaseStore):
where_clause="type='m.room.member'", where_clause="type='m.room.member'",
) )
@cachedInlineCallbacks(max_entries=100000, iterable=True)
def get_current_state_ids(self, room_id):
rows = yield self._simple_select_list(
table="current_state_events",
keyvalues={"room_id": room_id},
retcols=["event_id", "type", "state_key"],
desc="_calculate_state_delta",
)
defer.returnValue({
(r["type"], r["state_key"]): r["event_id"] for r in rows
})
@defer.inlineCallbacks @defer.inlineCallbacks
def get_state_groups_ids(self, room_id, event_ids): def get_state_groups_ids(self, room_id, event_ids):
if not event_ids: if not event_ids: