From 8ffbe43ba11c925322af06f4d12b076754aeac56 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 10 Mar 2017 17:39:35 +0000 Subject: [PATCH 1/4] Get current state by using current_state_events table --- synapse/handlers/device.py | 2 +- synapse/handlers/room_list.py | 47 +++++++++++++++++++++-------------- synapse/push/mailer.py | 2 +- synapse/storage/events.py | 18 ++++++-------- synapse/storage/state.py | 14 ++++++++++- 5 files changed, 52 insertions(+), 31 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index e859b3165..9374c085d 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -262,7 +262,7 @@ class DeviceHandler(BaseHandler): # ordering: treat it the same as a new room event_ids = [] - current_state_ids = yield self.state.get_current_state_ids(room_id) + current_state_ids = yield self.store.get_current_state_ids(room_id) # special-case for an empty prev state: include all members # in the changed list diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 19eebbd43..6283caaf7 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -21,6 +21,7 @@ from synapse.api.constants import ( EventTypes, JoinRules, ) from synapse.util.async import concurrently_execute +from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.response_cache import ResponseCache from synapse.types import ThirdPartyInstanceID @@ -62,6 +63,10 @@ class RoomListHandler(BaseHandler): appservice and network id to use an appservice specific one. Setting to None returns all public rooms across all lists. """ + logger.info( + "Getting public room list: limit=%r, since=%r, search=%r, network=%r", + limit, since_token, bool(search_filter), network_tuple, + ) if search_filter: # We explicitly don't bother caching searches or requests for # appservice specific lists. @@ -91,7 +96,6 @@ class RoomListHandler(BaseHandler): rooms_to_order_value = {} rooms_to_num_joined = {} - rooms_to_latest_event_ids = {} newly_visible = [] newly_unpublished = [] @@ -116,12 +120,9 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def get_order_for_room(room_id): - latest_event_ids = rooms_to_latest_event_ids.get(room_id, None) - if not latest_event_ids: - latest_event_ids = yield self.store.get_forward_extremeties_for_room( - room_id, stream_token - ) - rooms_to_latest_event_ids[room_id] = latest_event_ids + latest_event_ids = yield self.store.get_forward_extremeties_for_room( + room_id, stream_token + ) if not latest_event_ids: return @@ -165,19 +166,19 @@ class RoomListHandler(BaseHandler): rooms_to_scan = rooms_to_scan[:since_token.current_limit] rooms_to_scan.reverse() - # Actually generate the entries. _generate_room_entry will append to + # Actually generate the entries. _append_room_entry_to_chunk will append to # chunk but will stop if len(chunk) > limit chunk = [] if limit and not search_filter: step = limit + 1 for i in xrange(0, len(rooms_to_scan), step): # We iterate here because the vast majority of cases we'll stop - # at first iteration, but occaisonally _generate_room_entry + # at first iteration, but occaisonally _append_room_entry_to_chunk # won't append to the chunk and so we need to loop again. # We don't want to scan over the entire range either as that # would potentially waste a lot of work. yield concurrently_execute( - lambda r: self._generate_room_entry( + lambda r: self._append_room_entry_to_chunk( r, rooms_to_num_joined[r], chunk, limit, search_filter ), @@ -187,7 +188,7 @@ class RoomListHandler(BaseHandler): break else: yield concurrently_execute( - lambda r: self._generate_room_entry( + lambda r: self._append_room_entry_to_chunk( r, rooms_to_num_joined[r], chunk, limit, search_filter ), @@ -256,21 +257,30 @@ class RoomListHandler(BaseHandler): defer.returnValue(results) @defer.inlineCallbacks - def _generate_room_entry(self, room_id, num_joined_users, chunk, limit, - search_filter): + def _append_room_entry_to_chunk(self, room_id, num_joined_users, chunk, limit, + search_filter): if limit and len(chunk) > limit + 1: # We've already got enough, so lets just drop it. return + result = yield self._generate_room_entry(room_id, num_joined_users) + + if result and _matches_room_entry(result, search_filter): + chunk.append(result) + + @cachedInlineCallbacks(num_args=1, cache_context=True) + def _generate_room_entry(self, room_id, num_joined_users, cache_context): result = { "room_id": room_id, "num_joined_members": num_joined_users, } - current_state_ids = yield self.state_handler.get_current_state_ids(room_id) + current_state_ids = yield self.store.get_current_state_ids( + room_id, on_invalidate=cache_context.invalidate, + ) event_map = yield self.store.get_events([ - event_id for key, event_id in current_state_ids.items() + event_id for key, event_id in current_state_ids.iteritems() if key[0] in ( EventTypes.JoinRules, EventTypes.Name, @@ -294,7 +304,9 @@ class RoomListHandler(BaseHandler): if join_rule and join_rule != JoinRules.PUBLIC: defer.returnValue(None) - aliases = yield self.store.get_aliases_for_room(room_id) + aliases = yield self.store.get_aliases_for_room( + room_id, on_invalidate=cache_context.invalidate + ) if aliases: result["aliases"] = aliases @@ -334,8 +346,7 @@ class RoomListHandler(BaseHandler): if avatar_url: result["avatar_url"] = avatar_url - if _matches_room_entry(result, search_filter): - chunk.append(result) + defer.returnValue(result) @defer.inlineCallbacks def get_remote_public_room_list(self, server_name, limit=None, since_token=None, diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 62d794f22..3a50c72e0 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -139,7 +139,7 @@ class Mailer(object): @defer.inlineCallbacks def _fetch_room_state(room_id): - room_state = yield self.state_handler.get_current_state_ids(room_id) + room_state = yield self.store.get_current_state_ids(room_id) state_by_room[room_id] = room_state # Run at most 3 of these at once: sync does 10 at a time but email diff --git a/synapse/storage/events.py b/synapse/storage/events.py index db01eb6d1..0039c281c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -442,14 +442,9 @@ class EventsStore(SQLBaseStore): else: return - existing_state_rows = yield self._simple_select_list( - table="current_state_events", - keyvalues={"room_id": room_id}, - retcols=["event_id", "type", "state_key"], - desc="_calculate_state_delta", - ) + existing_state = yield self.get_current_state_ids(room_id) - existing_events = set(row["event_id"] for row in existing_state_rows) + existing_events = set(existing_state.itervalues()) new_events = set(ev_id for ev_id in current_state.itervalues()) changed_events = existing_events ^ new_events @@ -457,9 +452,8 @@ class EventsStore(SQLBaseStore): return to_delete = { - (row["type"], row["state_key"]): row["event_id"] - for row in existing_state_rows - if row["event_id"] in changed_events + key: ev_id for key, ev_id in existing_state.iteritems() + if ev_id in changed_events } events_to_insert = (new_events - existing_events) to_insert = { @@ -585,6 +579,10 @@ class EventsStore(SQLBaseStore): txn, self.get_users_in_room, (room_id,) ) + self._invalidate_cache_and_stream( + txn, self.get_current_state_ids, (room_id,) + ) + for room_id, new_extrem in new_forward_extremeties.items(): self._simple_delete_txn( txn, diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 84482d828..27f1ec89e 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -14,7 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cached, cachedList +from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks from synapse.util.caches import intern_string from synapse.storage.engines import PostgresEngine @@ -69,6 +69,18 @@ class StateStore(SQLBaseStore): where_clause="type='m.room.member'", ) + @cachedInlineCallbacks(max_entries=100000, iterable=True) + def get_current_state_ids(self, room_id): + rows = yield self._simple_select_list( + table="current_state_events", + keyvalues={"room_id": room_id}, + retcols=["event_id", "type", "state_key"], + desc="_calculate_state_delta", + ) + defer.returnValue({ + (r["type"], r["state_key"]): r["event_id"] for r in rows + }) + @defer.inlineCallbacks def get_state_groups_ids(self, room_id, event_ids): if not event_ids: From 79926e016e98d4074aac4803d4d262dfd9c570c4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 13 Mar 2017 09:50:10 +0000 Subject: [PATCH 2/4] Assume rooms likely haven't changed --- synapse/handlers/room_list.py | 19 +++++++++++-------- synapse/storage/stream.py | 3 +++ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 6283caaf7..2f82c520c 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -120,16 +120,19 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def get_order_for_room(room_id): - latest_event_ids = yield self.store.get_forward_extremeties_for_room( - room_id, stream_token - ) + joined_users = yield self.store.get_users_in_room(room_id) + if self.store.has_room_changed_since(room_id, stream_token): + latest_event_ids = yield self.store.get_forward_extremeties_for_room( + room_id, stream_token + ) - if not latest_event_ids: - return + if not latest_event_ids: + return + + joined_users = yield self.state_handler.get_current_user_in_room( + room_id, latest_event_ids, + ) - joined_users = yield self.state_handler.get_current_user_in_room( - room_id, latest_event_ids, - ) num_joined_users = len(joined_users) rooms_to_num_joined[room_id] = num_joined_users diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 200d12463..dddd5fc0e 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -829,3 +829,6 @@ class StreamStore(SQLBaseStore): updatevalues={"stream_id": stream_id}, desc="update_federation_out_pos", ) + + def has_room_changed_since(self, room_id, stream_id): + return self._events_stream_cache.has_entity_changed(room_id, stream_id) From 0162994983f2af89b7eed0c2150353f8c5114f1b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 13 Mar 2017 11:53:26 +0000 Subject: [PATCH 3/4] Comments --- synapse/handlers/room_list.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 2f82c520c..516cd9a6a 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -120,6 +120,13 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def get_order_for_room(room_id): + # Most of the rooms won't have changed between the since token and + # now (especially if the since token is "now"). So, we can ask what + # the current users are in a room (that will hit a cache) and then + # check if the room has changed since the since token. (We have to + # do it in that order to avoid races). + # If things have changed then fall back to getting the current state + # at the since token. joined_users = yield self.store.get_users_in_room(room_id) if self.store.has_room_changed_since(room_id, stream_token): latest_event_ids = yield self.store.get_forward_extremeties_for_room( @@ -262,6 +269,9 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def _append_room_entry_to_chunk(self, room_id, num_joined_users, chunk, limit, search_filter): + """Generate the entry for a room in the public room list and append it + to the `chunk` if it matches the search filter + """ if limit and len(chunk) > limit + 1: # We've already got enough, so lets just drop it. return @@ -273,6 +283,8 @@ class RoomListHandler(BaseHandler): @cachedInlineCallbacks(num_args=1, cache_context=True) def _generate_room_entry(self, room_id, num_joined_users, cache_context): + """Returns the entry for a room + """ result = { "room_id": room_id, "num_joined_members": num_joined_users, From 45c7f12d2a7243b4d273d8af1639c03f3136d2a8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 13 Mar 2017 16:24:54 +0000 Subject: [PATCH 4/4] Add new storage function to slave store --- synapse/replication/slave/storage/events.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 622b2d854..518c9ea2e 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -109,6 +109,10 @@ class SlavedEventStore(BaseSlavedStore): get_recent_event_ids_for_room = ( StreamStore.__dict__["get_recent_event_ids_for_room"] ) + get_current_state_ids = ( + StateStore.__dict__["get_current_state_ids"] + ) + has_room_changed_since = DataStore.has_room_changed_since.__func__ get_unread_push_actions_for_user_in_range_for_http = ( DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__