From 9e25443db84f16bca36d1ba605e5b5ea09d1f8c7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 1 Sep 2016 14:31:26 +0100 Subject: [PATCH 01/22] Move to storing state_groups_state as deltas --- synapse/events/snapshot.py | 16 +++ synapse/state.py | 34 ++++- synapse/storage/prepare_database.py | 2 +- synapse/storage/schema/delta/35/state.sql | 21 +++ synapse/storage/state.py | 161 ++++++++++++++-------- 5 files changed, 172 insertions(+), 62 deletions(-) create mode 100644 synapse/storage/schema/delta/35/state.sql diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index e895b1c45..ec32008d5 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -15,9 +15,25 @@ class EventContext(object): + __slots__ = [ + "current_state_ids", + "prev_state_ids", + "state_group", + "rejected", + "push_actions", + "prev_group", + "delta_ids", + "prev_state_events", + ] + def __init__(self): self.current_state_ids = None self.prev_state_ids = None self.state_group = None self.rejected = False self.push_actions = [] + + self.prev_group = None + self.delta_ids = None + + self.prev_state_events = None diff --git a/synapse/state.py b/synapse/state.py index b31bbcdbd..cd428e83c 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -54,12 +54,15 @@ def _gen_state_id(): class _StateCacheEntry(object): - __slots__ = ["state", "state_group", "state_id"] + __slots__ = ["state", "state_group", "state_id", "prev_group", "delta_ids"] - def __init__(self, state, state_group): + def __init__(self, state, state_group, prev_group=None, delta_ids=None): self.state = state self.state_group = state_group + self.prev_group = prev_group + self.delta_ids = delta_ids + # The `state_id` is a unique ID we generate that can be used as ID for # this collection of state. Usually this would be the same as the # state group, but on worker instances we can't generate a new state @@ -243,11 +246,20 @@ class StateHandler(object): if key in context.prev_state_ids: replaces = context.prev_state_ids[key] event.unsigned["replaces_state"] = replaces + context.current_state_ids = dict(context.prev_state_ids) context.current_state_ids[key] = event.event_id + + context.prev_group = entry.prev_group + context.delta_ids = entry.delta_ids + if context.delta_ids is not None: + context.delta_ids[key] = event.event_id else: context.current_state_ids = context.prev_state_ids + context.prev_group = entry.prev_group + context.delta_ids = entry.delta_ids + context.prev_state_events = [] defer.returnValue(context) @@ -281,6 +293,8 @@ class StateHandler(object): defer.returnValue(_StateCacheEntry( state=state_list, state_group=name, + prev_group=name, + delta_ids={}, )) if self._state_cache is not None: @@ -330,6 +344,7 @@ class StateHandler(object): if new_state_event_ids == frozenset(e_id for e_id in events): state_group = sg break + if state_group is None: # Worker instances don't have access to this method, but we want # to set the state_group on the main instance to increase cache @@ -337,9 +352,24 @@ class StateHandler(object): if hasattr(self.store, "get_next_state_group"): state_group = self.store.get_next_state_group() + prev_group = None + delta_ids = None + for old_group, old_ids in state_groups_ids.items(): + if not set(new_state.iterkeys()) - set(old_ids.iterkeys()): + n_delta_ids = { + k: v + for k, v in new_state.items() + if old_ids.get(k) != v + } + if not delta_ids or len(n_delta_ids) < len(delta_ids): + prev_group = old_group + delta_ids = n_delta_ids + cache = _StateCacheEntry( state=new_state, state_group=state_group, + prev_group=prev_group, + delta_ids=delta_ids, ) if self._state_cache is not None: diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index b94ce7bea..b1fbc4ffa 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 34 +SCHEMA_VERSION = 35 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/35/state.sql b/synapse/storage/schema/delta/35/state.sql new file mode 100644 index 000000000..c4c244c16 --- /dev/null +++ b/synapse/storage/schema/delta/35/state.sql @@ -0,0 +1,21 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE state_group_edges( + state_group BIGINT NOT NULL, + prev_state_group BIGINT NOT NULL +); + +CREATE INDEX state_group_edges_idx ON state_group_edges(state_group); diff --git a/synapse/storage/state.py b/synapse/storage/state.py index ec551b0b4..73cebc738 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -16,6 +16,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches import intern_string +from synapse.storage.engines import PostgresEngine from twisted.internet import defer @@ -118,20 +119,45 @@ class StateStore(SQLBaseStore): }, ) - self._simple_insert_many_txn( - txn, - table="state_groups_state", - values=[ - { + if context.prev_group: + self._simple_insert_txn( + txn, + table="state_group_edges", + values={ "state_group": context.state_group, - "room_id": event.room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } - for key, state_id in state_event_ids.items() - ], - ) + "prev_state_group": context.prev_group, + }, + ) + + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": context.state_group, + "room_id": event.room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in context.delta_ids.items() + ], + ) + else: + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": context.state_group, + "room_id": event.room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in state_event_ids.items() + ], + ) self._simple_insert_many_txn( txn, @@ -214,26 +240,70 @@ class StateStore(SQLBaseStore): else: where_clause = "" - sql = ( - "SELECT state_group, event_id, type, state_key" - " FROM state_groups_state WHERE" - " state_group IN (%s) %s" % ( - ",".join("?" for _ in groups), - where_clause, - ) - ) - - args = list(groups) - if types is not None: - args.extend([i for typ in types for i in typ]) - - txn.execute(sql, args) - rows = self.cursor_to_dict(txn) - results = {group: {} for group in groups} - for row in rows: - key = (row["type"], row["state_key"]) - results[row["state_group"]][key] = row["event_id"] + if isinstance(self.database_engine, PostgresEngine): + sql = (""" + WITH RECURSIVE state(state_group) AS ( + VALUES(?::bigint) + UNION ALL + SELECT prev_state_group FROM state_group_edges e, state s + WHERE s.state_group = e.state_group + ) + SELECT type, state_key, event_id FROM state_groups_state + WHERE ROW(type, state_key, state_group) IN ( + SELECT type, state_key, max(state_group) FROM state + INNER JOIN state_groups_state USING (state_group) + GROUP BY type, state_key + ) + %s; + """) % (where_clause,) + + for group in groups: + args = [group] + if types is not None: + args.extend([i for typ in types for i in typ]) + + txn.execute(sql, args) + rows = self.cursor_to_dict(txn) + for row in rows: + key = (row["type"], row["state_key"]) + results[group][key] = row["event_id"] + else: + for group in groups: + group_tree = [group] + next_group = group + + while next_group: + next_group = self._simple_select_one_onecol_txn( + txn, + table="state_group_edges", + keyvalues={"state_group": next_group}, + retcol="prev_state_group", + allow_none=True, + ) + if next_group: + group_tree.append(next_group) + + sql = (""" + SELECT type, state_key, event_id FROM state_groups_state + INNER JOIN ( + SELECT type, state_key, max(state_group) as state_group + FROM state_groups_state + WHERE state_group IN (%s) %s + GROUP BY type, state_key + ) USING (type, state_key, state_group); + """) % (",".join("?" for _ in group_tree), where_clause,) + + args = list(group_tree) + if types is not None: + args.extend([i for typ in types for i in typ]) + + txn.execute(sql, args) + rows = self.cursor_to_dict(txn) + for row in rows: + key = (row["type"], row["state_key"]) + results[group][key] = row["event_id"] + return results results = {} @@ -504,32 +574,5 @@ class StateStore(SQLBaseStore): defer.returnValue(results) - def get_all_new_state_groups(self, last_id, current_id, limit): - def get_all_new_state_groups_txn(txn): - sql = ( - "SELECT id, room_id, event_id FROM state_groups" - " WHERE ? < id AND id <= ? ORDER BY id LIMIT ?" - ) - txn.execute(sql, (last_id, current_id, limit)) - groups = txn.fetchall() - - if not groups: - return ([], []) - - lower_bound = groups[0][0] - upper_bound = groups[-1][0] - sql = ( - "SELECT state_group, type, state_key, event_id" - " FROM state_groups_state" - " WHERE ? <= state_group AND state_group <= ?" - ) - - txn.execute(sql, (lower_bound, upper_bound)) - state_group_state = txn.fetchall() - return (groups, state_group_state) - return self.runInteraction( - "get_all_new_state_groups", get_all_new_state_groups_txn - ) - def get_next_state_group(self): return self._state_groups_id_gen.get_next() From 598317927cb8f741528d639f3ce875299fde478e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 2 Sep 2016 10:41:38 +0100 Subject: [PATCH 02/22] Limit the length of state chains --- synapse/storage/events.py | 49 +++++++++++-------- synapse/storage/state.py | 100 +++++++++++++++++++++++++++++--------- 2 files changed, 106 insertions(+), 43 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 1a7d4c519..7e9b35151 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -497,7 +497,11 @@ class EventsStore(SQLBaseStore): # insert into the state_group, state_groups_state and # event_to_state_groups tables. - self._store_mult_state_groups_txn(txn, ((event, context),)) + try: + self._store_mult_state_groups_txn(txn, ((event, context),)) + except Exception: + logger.exception("") + raise metadata_json = encode_json( event.internal_metadata.get_dict() @@ -1543,6 +1547,9 @@ class EventsStore(SQLBaseStore): ) event_rows = txn.fetchall() + for event_id, state_key in event_rows: + txn.call_after(self._get_state_group_for_event.invalidate, (event_id,)) + # We calculate the new entries for the backward extremeties by finding # all events that point to events that are to be purged txn.execute( @@ -1571,26 +1578,26 @@ class EventsStore(SQLBaseStore): # Get all state groups that are only referenced by events that are # to be deleted. - txn.execute( - "SELECT state_group FROM event_to_state_groups" - " INNER JOIN events USING (event_id)" - " WHERE state_group IN (" - " SELECT DISTINCT state_group FROM events" - " INNER JOIN event_to_state_groups USING (event_id)" - " WHERE room_id = ? AND topological_ordering < ?" - " )" - " GROUP BY state_group HAVING MAX(topological_ordering) < ?", - (room_id, topological_ordering, topological_ordering) - ) - state_rows = txn.fetchall() - txn.executemany( - "DELETE FROM state_groups_state WHERE state_group = ?", - state_rows - ) - txn.executemany( - "DELETE FROM state_groups WHERE id = ?", - state_rows - ) + # txn.execute( + # "SELECT state_group FROM event_to_state_groups" + # " INNER JOIN events USING (event_id)" + # " WHERE state_group IN (" + # " SELECT DISTINCT state_group FROM events" + # " INNER JOIN event_to_state_groups USING (event_id)" + # " WHERE room_id = ? AND topological_ordering < ?" + # " )" + # " GROUP BY state_group HAVING MAX(topological_ordering) < ?", + # (room_id, topological_ordering, topological_ordering) + # ) + # state_rows = txn.fetchall() + # txn.executemany( + # "DELETE FROM state_groups_state WHERE state_group = ?", + # state_rows + # ) + # txn.executemany( + # "DELETE FROM state_groups WHERE id = ?", + # state_rows + # ) # Delete all non-state txn.executemany( "DELETE FROM event_to_state_groups WHERE event_id = ?", diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 73cebc738..7f45c0cd9 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -25,6 +25,9 @@ import logging logger = logging.getLogger(__name__) +MAX_STATE_DELTA_HOPS = 100 + + class StateStore(SQLBaseStore): """ Keeps track of the state at a given event. @@ -104,7 +107,6 @@ class StateStore(SQLBaseStore): state_groups[event.event_id] = context.state_group if self._have_persisted_state_group_txn(txn, context.state_group): - logger.info("Already persisted state_group: %r", context.state_group) continue state_event_ids = dict(context.current_state_ids) @@ -120,29 +122,48 @@ class StateStore(SQLBaseStore): ) if context.prev_group: - self._simple_insert_txn( - txn, - table="state_group_edges", - values={ - "state_group": context.state_group, - "prev_state_group": context.prev_group, - }, + potential_hops = self._count_state_group_hops_txn( + txn, context.prev_group ) - - self._simple_insert_many_txn( - txn, - table="state_groups_state", - values=[ - { + if potential_hops < MAX_STATE_DELTA_HOPS: + self._simple_insert_txn( + txn, + table="state_group_edges", + values={ "state_group": context.state_group, - "room_id": event.room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } - for key, state_id in context.delta_ids.items() - ], - ) + "prev_state_group": context.prev_group, + }, + ) + + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": context.state_group, + "room_id": event.room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in context.delta_ids.items() + ], + ) + else: + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": context.state_group, + "room_id": event.room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in context.current_state_ids.items() + ], + ) else: self._simple_insert_many_txn( txn, @@ -171,6 +192,41 @@ class StateStore(SQLBaseStore): ], ) + def _count_state_group_hops_txn(self, txn, state_group): + if isinstance(self.database_engine, PostgresEngine): + sql = (""" + WITH RECURSIVE state(state_group) AS ( + VALUES(?::bigint) + UNION ALL + SELECT prev_state_group FROM state_group_edges e, state s + WHERE s.state_group = e.state_group + ) + SELECT count(*) FROM state; + """) + + txn.execute(sql, (state_group,)) + row = txn.fetchone() + if row and row[0]: + return row[0] + else: + return 0 + else: + next_group = state_group + count = 0 + + while next_group: + next_group = self._simple_select_one_onecol_txn( + txn, + table="state_group_edges", + keyvalues={"state_group": next_group}, + retcol="prev_state_group", + allow_none=True, + ) + if next_group: + count += 1 + + return count + @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key=""): if event_type and state_key is not None: From a99e9335502df3389ff6f16ef52c43ce391b6955 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 09:34:24 +0100 Subject: [PATCH 03/22] Add upgrade script that will slowly prune state_groups_state entries --- synapse/replication/slave/storage/events.py | 3 + .../storage/schema/delta/35/state_dedupe.sql | 17 ++ synapse/storage/state.py | 278 +++++++++++++----- 3 files changed, 223 insertions(+), 75 deletions(-) create mode 100644 synapse/storage/schema/delta/35/state_dedupe.sql diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index cbebd5b2f..15c52774a 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -86,6 +86,9 @@ class SlavedEventStore(BaseSlavedStore): _get_state_groups_from_groups = ( StateStore.__dict__["_get_state_groups_from_groups"] ) + _get_state_groups_from_groups_txn = ( + DataStore._get_state_groups_from_groups_txn.__func__ + ) _get_state_group_from_group = ( StateStore.__dict__["_get_state_group_from_group"] ) diff --git a/synapse/storage/schema/delta/35/state_dedupe.sql b/synapse/storage/schema/delta/35/state_dedupe.sql new file mode 100644 index 000000000..97e5067ef --- /dev/null +++ b/synapse/storage/schema/delta/35/state_dedupe.sql @@ -0,0 +1,17 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT into background_updates (update_name, progress_json) + VALUES ('state_group_state_deduplication', '{}'); diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 7f45c0cd9..968b68f46 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -47,6 +47,15 @@ class StateStore(SQLBaseStore): * `state_groups_state`: Maps state group to state events. """ + STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication" + + def __init__(self, hs): + super(StateStore, self).__init__(hs) + self.register_background_update_handler( + self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, + self._background_deduplicate_state, + ) + @defer.inlineCallbacks def get_state_groups_ids(self, room_id, event_ids): if not event_ids: @@ -288,92 +297,92 @@ class StateStore(SQLBaseStore): def _get_state_groups_from_groups(self, groups, types): """Returns dictionary state_group -> (dict of (type, state_key) -> event id) """ - def f(txn, groups): - if types is not None: - where_clause = "AND (%s)" % ( - " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), - ) - else: - where_clause = "" - - results = {group: {} for group in groups} - if isinstance(self.database_engine, PostgresEngine): - sql = (""" - WITH RECURSIVE state(state_group) AS ( - VALUES(?::bigint) - UNION ALL - SELECT prev_state_group FROM state_group_edges e, state s - WHERE s.state_group = e.state_group - ) - SELECT type, state_key, event_id FROM state_groups_state - WHERE ROW(type, state_key, state_group) IN ( - SELECT type, state_key, max(state_group) FROM state - INNER JOIN state_groups_state USING (state_group) - GROUP BY type, state_key - ) - %s; - """) % (where_clause,) - - for group in groups: - args = [group] - if types is not None: - args.extend([i for typ in types for i in typ]) - - txn.execute(sql, args) - rows = self.cursor_to_dict(txn) - for row in rows: - key = (row["type"], row["state_key"]) - results[group][key] = row["event_id"] - else: - for group in groups: - group_tree = [group] - next_group = group - - while next_group: - next_group = self._simple_select_one_onecol_txn( - txn, - table="state_group_edges", - keyvalues={"state_group": next_group}, - retcol="prev_state_group", - allow_none=True, - ) - if next_group: - group_tree.append(next_group) - - sql = (""" - SELECT type, state_key, event_id FROM state_groups_state - INNER JOIN ( - SELECT type, state_key, max(state_group) as state_group - FROM state_groups_state - WHERE state_group IN (%s) %s - GROUP BY type, state_key - ) USING (type, state_key, state_group); - """) % (",".join("?" for _ in group_tree), where_clause,) - - args = list(group_tree) - if types is not None: - args.extend([i for typ in types for i in typ]) - - txn.execute(sql, args) - rows = self.cursor_to_dict(txn) - for row in rows: - key = (row["type"], row["state_key"]) - results[group][key] = row["event_id"] - - return results - results = {} chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)] for chunk in chunks: res = yield self.runInteraction( "_get_state_groups_from_groups", - f, chunk + self._get_state_groups_from_groups_txn, chunk, types, ) results.update(res) defer.returnValue(results) + def _get_state_groups_from_groups_txn(self, txn, groups, types=None): + if types is not None: + where_clause = "AND (%s)" % ( + " OR ".join(["(type = ? AND state_key = ?)"] * len(types)), + ) + else: + where_clause = "" + + results = {group: {} for group in groups} + if isinstance(self.database_engine, PostgresEngine): + sql = (""" + WITH RECURSIVE state(state_group) AS ( + VALUES(?::bigint) + UNION ALL + SELECT prev_state_group FROM state_group_edges e, state s + WHERE s.state_group = e.state_group + ) + SELECT type, state_key, event_id FROM state_groups_state + WHERE ROW(type, state_key, state_group) IN ( + SELECT type, state_key, max(state_group) FROM state + INNER JOIN state_groups_state USING (state_group) + GROUP BY type, state_key + ) + %s; + """) % (where_clause,) + + for group in groups: + args = [group] + if types is not None: + args.extend([i for typ in types for i in typ]) + + txn.execute(sql, args) + rows = self.cursor_to_dict(txn) + for row in rows: + key = (row["type"], row["state_key"]) + results[group][key] = row["event_id"] + else: + for group in groups: + group_tree = [group] + next_group = group + + while next_group: + next_group = self._simple_select_one_onecol_txn( + txn, + table="state_group_edges", + keyvalues={"state_group": next_group}, + retcol="prev_state_group", + allow_none=True, + ) + if next_group: + group_tree.append(next_group) + + sql = (""" + SELECT type, state_key, event_id FROM state_groups_state + INNER JOIN ( + SELECT type, state_key, max(state_group) as state_group + FROM state_groups_state + WHERE state_group IN (%s) %s + GROUP BY type, state_key + ) USING (type, state_key, state_group); + """) % (",".join("?" for _ in group_tree), where_clause,) + + args = list(group_tree) + if types is not None: + args.extend([i for typ in types for i in typ]) + + txn.execute(sql, args) + rows = self.cursor_to_dict(txn) + for row in rows: + key = (row["type"], row["state_key"]) + results[group][key] = row["event_id"] + + return results + @defer.inlineCallbacks def get_state_for_events(self, event_ids, types): """Given a list of event_ids and type tuples, return a list of state @@ -632,3 +641,122 @@ class StateStore(SQLBaseStore): def get_next_state_group(self): return self._state_groups_id_gen.get_next() + + @defer.inlineCallbacks + def _background_deduplicate_state(self, progress, batch_size): + last_state_group = progress.get("last_state_group", 0) + rows_inserted = progress.get("rows_inserted", 0) + max_group = progress.get("max_group", None) + + if max_group is None: + rows = yield self._execute( + "_background_deduplicate_state", None, + "SELECT coalesce(max(id), 0) FROM state_groups", + ) + max_group = rows[0][0] + + def reindex_txn(txn): + new_last_state_group = last_state_group + for count in xrange(batch_size): + txn.execute( + "SELECT id, room_id FROM state_groups" + " WHERE ? < id AND id <= ?" + " ORDER BY id ASC" + " LIMIT 1", + (new_last_state_group, max_group,) + ) + row = txn.fetchone() + if row: + state_group, room_id = row + + if not row or not state_group: + return True, count + + txn.execute( + "SELECT coalesce(max(id), 0) FROM state_groups" + " WHERE id < ? AND room_id = ?", + (state_group, room_id,) + ) + prev_group, = txn.fetchone() + new_last_state_group = state_group + + if prev_group: + potential_hops = self._count_state_group_hops_txn( + txn, prev_group + ) + if potential_hops >= MAX_STATE_DELTA_HOPS: + # We want to ensure chains are at most this long,# + # otherwise read performance degrades. + continue + + prev_state = self._get_state_groups_from_groups_txn( + txn, [prev_group], types=None + ) + prev_state = prev_state.values()[0] + + curr_state = self._get_state_groups_from_groups_txn( + txn, [state_group], types=None + ) + curr_state = curr_state.values()[0] + + if not set(prev_state.keys()) - set(curr_state.keys()): + # We can only do a delta if the current has a strict super set + # of keys + + delta_state = { + key: value for key, value in curr_state.items() + if prev_state.get(key, None) != value + } + + self._simple_insert_txn( + txn, + table="state_group_edges", + values={ + "state_group": state_group, + "prev_state_group": prev_group, + } + ) + + self._simple_delete_txn( + txn, + table="state_groups_state", + keyvalues={ + "state_group": state_group, + } + ) + + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": state_group, + "room_id": room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in delta_state.items() + ], + ) + + progress = { + "last_state_group": state_group, + "rows_inserted": rows_inserted + batch_size, + "max_group": max_group, + } + + self._background_update_progress_txn( + txn, self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, progress + ) + + return False, batch_size + + finished, result = yield self.runInteraction( + self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, reindex_txn + ) + + if finished: + yield self._end_background_update(self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME) + + defer.returnValue(result) From 628e65721bdf1fb39e78a833d757a38e614b652d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 10:41:27 +0100 Subject: [PATCH 04/22] Add comments --- synapse/events/snapshot.py | 5 +++ synapse/storage/state.py | 79 ++++++++++++++++++-------------------- 2 files changed, 43 insertions(+), 41 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index ec32008d5..11605b34a 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -27,12 +27,17 @@ class EventContext(object): ] def __init__(self): + # The current state including the current event self.current_state_ids = None + # The current state excluding the current event self.prev_state_ids = None self.state_group = None + self.rejected = False self.push_actions = [] + # A previously persisted state group and a delta between that + # and this state. self.prev_group = None self.delta_ids = None diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 968b68f46..ee8b76300 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -118,8 +118,6 @@ class StateStore(SQLBaseStore): if self._have_persisted_state_group_txn(txn, context.state_group): continue - state_event_ids = dict(context.current_state_ids) - self._simple_insert_txn( txn, table="state_groups", @@ -130,49 +128,36 @@ class StateStore(SQLBaseStore): }, ) + # We persist as a delta if we can, while also ensuring the chain + # of deltas isn't tooo long, as otherwise read performance degrades. if context.prev_group: potential_hops = self._count_state_group_hops_txn( txn, context.prev_group ) - if potential_hops < MAX_STATE_DELTA_HOPS: - self._simple_insert_txn( - txn, - table="state_group_edges", - values={ - "state_group": context.state_group, - "prev_state_group": context.prev_group, - }, - ) + if context.prev_group and potential_hops < MAX_STATE_DELTA_HOPS: + self._simple_insert_txn( + txn, + table="state_group_edges", + values={ + "state_group": context.state_group, + "prev_state_group": context.prev_group, + }, + ) - self._simple_insert_many_txn( - txn, - table="state_groups_state", - values=[ - { - "state_group": context.state_group, - "room_id": event.room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } - for key, state_id in context.delta_ids.items() - ], - ) - else: - self._simple_insert_many_txn( - txn, - table="state_groups_state", - values=[ - { - "state_group": context.state_group, - "room_id": event.room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } - for key, state_id in context.current_state_ids.items() - ], - ) + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": context.state_group, + "room_id": event.room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in context.delta_ids.items() + ], + ) else: self._simple_insert_many_txn( txn, @@ -185,7 +170,7 @@ class StateStore(SQLBaseStore): "state_key": key[1], "event_id": state_id, } - for key, state_id in state_event_ids.items() + for key, state_id in context.current_state_ids.items() ], ) @@ -202,6 +187,10 @@ class StateStore(SQLBaseStore): ) def _count_state_group_hops_txn(self, txn, state_group): + """Given a state group, count how many hops there are in the tree. + + This is used to ensure the delta chains don't get too long. + """ if isinstance(self.database_engine, PostgresEngine): sql = (""" WITH RECURSIVE state(state_group) AS ( @@ -319,6 +308,11 @@ class StateStore(SQLBaseStore): results = {group: {} for group in groups} if isinstance(self.database_engine, PostgresEngine): + # The below query walks the state_group tree so that the "state" + # table includes all state_groups in the tree. It then joins + # against `state_groups_state` to fetch the latest state. + # It assumes that previous state groups are always numerically + # lesser. sql = (""" WITH RECURSIVE state(state_group) AS ( VALUES(?::bigint) @@ -644,6 +638,9 @@ class StateStore(SQLBaseStore): @defer.inlineCallbacks def _background_deduplicate_state(self, progress, batch_size): + """This background update will slowly deduplicate state by reencoding + them as deltas. + """ last_state_group = progress.get("last_state_group", 0) rows_inserted = progress.get("rows_inserted", 0) max_group = progress.get("max_group", None) From 69a2d4e38c9de61969f637182a5e5cb60094fe55 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 13:43:31 +0100 Subject: [PATCH 05/22] Use get_joined_users_from_context instead of manually looking up hosts --- synapse/handlers/federation.py | 19 ++++++++------- synapse/handlers/message.py | 44 +++++----------------------------- 2 files changed, 17 insertions(+), 46 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index dc90a5dde..8a1038c44 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -832,11 +832,13 @@ class FederationHandler(BaseHandler): new_pdu = event - message_handler = self.hs.get_handlers().message_handler - destinations = yield message_handler.get_joined_hosts_for_room_from_state( - context + users_in_room = yield self.store.get_joined_users_from_context(event, context) + + destinations = set( + get_domain_from_id(user_id) for user_id in users_in_room + if not self.hs.is_mine_id(user_id) ) - destinations = set(destinations) + destinations.discard(origin) logger.debug( @@ -1055,11 +1057,12 @@ class FederationHandler(BaseHandler): new_pdu = event - message_handler = self.hs.get_handlers().message_handler - destinations = yield message_handler.get_joined_hosts_for_room_from_state( - context + users_in_room = yield self.store.get_joined_users_from_context(event, context) + + destinations = set( + get_domain_from_id(user_id) for user_id in users_in_room + if not self.hs.is_mine_id(user_id) ) - destinations = set(destinations) destinations.discard(origin) logger.debug( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3577db059..178209a20 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -30,7 +30,6 @@ from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLo from synapse.util.caches.snapshot_cache import SnapshotCache from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred from synapse.util.metrics import measure_func -from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -945,7 +944,12 @@ class MessageHandler(BaseHandler): event_stream_id, max_stream_id ) - destinations = yield self.get_joined_hosts_for_room_from_state(context) + users_in_room = yield self.store.get_joined_users_from_context(event, context) + + destinations = [ + get_domain_from_id(user_id) for user_id in users_in_room + if not self.hs.is_mine_id(user_id) + ] @defer.inlineCallbacks def _notify(): @@ -963,39 +967,3 @@ class MessageHandler(BaseHandler): preserve_fn(federation_handler.handle_new_event)( event, destinations=destinations, ) - - def get_joined_hosts_for_room_from_state(self, context): - state_group = context.state_group - if not state_group: - # If state_group is None it means it has yet to be assigned a - # state group, i.e. we need to make sure that calls with a state_group - # of None don't hit previous cached calls with a None state_group. - # To do this we set the state_group to a new object as object() != object() - state_group = object() - - return self._get_joined_hosts_for_room_from_state( - state_group, context.current_state_ids - ) - - @cachedInlineCallbacks(num_args=1, cache_context=True) - def _get_joined_hosts_for_room_from_state(self, state_group, current_state_ids, - cache_context): - - # Don't bother getting state for people on the same HS - current_state = yield self.store.get_events([ - e_id for key, e_id in current_state_ids.items() - if key[0] == EventTypes.Member and not self.hs.is_mine_id(key[1]) - ]) - - destinations = set() - for e in current_state.itervalues(): - try: - if e.type == EventTypes.Member: - if e.content["membership"] == Membership.JOIN: - destinations.add(get_domain_from_id(e.state_key)) - except SynapseError: - logger.warn( - "Failed to get destination from event %s", e.event_id - ) - - defer.returnValue(destinations) From 69054e3d4c46d99f877b3242707bbeaa43485f17 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 14:12:11 +0100 Subject: [PATCH 06/22] Record why we have chosen to notify --- synapse/handlers/presence.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index cf82a2336..7ae05603f 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -52,6 +52,8 @@ bump_active_time_counter = metrics.register_counter("bump_active_time") get_updates_counter = metrics.register_counter("get_updates", labels=["type"]) +notify_reason_counter = metrics.register_counter("notify_reason", labels=["reason"]) + # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them # "currently_active" @@ -940,26 +942,32 @@ def should_notify(old_state, new_state): """Decides if a presence state change should be sent to interested parties. """ if old_state.status_msg != new_state.status_msg: + notify_reason_counter.inc("status_msg_change") return True if old_state.state == PresenceState.ONLINE: if new_state.state != PresenceState.ONLINE: # Always notify for online -> anything + notify_reason_counter.inc("online_to_not") return True if new_state.currently_active != old_state.currently_active: + notify_reason_counter.inc("current_active_change") return True if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: # Only notify about last active bumps if we're not currently acive if not (old_state.currently_active and new_state.currently_active): + notify_reason_counter.inc("last_active_change") return True elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: # Always notify for a transition where last active gets bumped. + notify_reason_counter.inc("last_active_change") return True if old_state.state != new_state.state: + notify_reason_counter.inc("state_change") return True return False From 485d999c8a95f8fdc6425a00e906e86efc77a917 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 14:49:08 +0100 Subject: [PATCH 07/22] Correctly delete old state groups in purge history API --- synapse/storage/events.py | 99 ++++++++++++++++++----- synapse/storage/schema/delta/35/state.sql | 1 + 2 files changed, 80 insertions(+), 20 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 7e9b35151..bec35ea68 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1578,26 +1578,85 @@ class EventsStore(SQLBaseStore): # Get all state groups that are only referenced by events that are # to be deleted. - # txn.execute( - # "SELECT state_group FROM event_to_state_groups" - # " INNER JOIN events USING (event_id)" - # " WHERE state_group IN (" - # " SELECT DISTINCT state_group FROM events" - # " INNER JOIN event_to_state_groups USING (event_id)" - # " WHERE room_id = ? AND topological_ordering < ?" - # " )" - # " GROUP BY state_group HAVING MAX(topological_ordering) < ?", - # (room_id, topological_ordering, topological_ordering) - # ) - # state_rows = txn.fetchall() - # txn.executemany( - # "DELETE FROM state_groups_state WHERE state_group = ?", - # state_rows - # ) - # txn.executemany( - # "DELETE FROM state_groups WHERE id = ?", - # state_rows - # ) + txn.execute( + "SELECT state_group FROM event_to_state_groups" + " INNER JOIN events USING (event_id)" + " WHERE state_group IN (" + " SELECT DISTINCT state_group FROM events" + " INNER JOIN event_to_state_groups USING (event_id)" + " WHERE room_id = ? AND topological_ordering < ?" + " )" + " GROUP BY state_group HAVING MAX(topological_ordering) < ?", + (room_id, topological_ordering, topological_ordering) + ) + + state_rows = txn.fetchall() + state_groups_to_delete = [sg for sg, in state_rows] + + # Now we get all the state groups that rely on these state groups + new_state_edges = [] + chunks = [ + state_groups_to_delete[i:i + 100] + for i in xrange(0, len(state_groups_to_delete), 100) + ] + for chunk in chunks: + rows = self._simple_select_many_txn( + txn, + table="state_group_edges", + column="prev_state_group", + iterable=chunk, + retcols=["state_group"], + keyvalues={}, + ) + new_state_edges.extend(row["state_group"] for row in rows) + + # Now we turn the state groups that reference to-be-deleted state groups + # to non delta versions. + for new_state_edge in new_state_edges: + curr_state = self._get_state_groups_from_groups_txn( + txn, [new_state_edge], types=None + ) + curr_state = curr_state.values()[0] + + self._simple_delete_txn( + txn, + table="state_groups_state", + keyvalues={ + "state_group": new_state_edge, + } + ) + + self._simple_delete_txn( + txn, + table="state_group_edges", + keyvalues={ + "state_group": new_state_edge, + } + ) + + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": new_state_edge, + "room_id": room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in curr_state.items() + ], + ) + + txn.executemany( + "DELETE FROM state_groups_state WHERE state_group = ?", + state_rows + ) + txn.executemany( + "DELETE FROM state_groups WHERE id = ?", + state_rows + ) # Delete all non-state txn.executemany( "DELETE FROM event_to_state_groups WHERE event_id = ?", diff --git a/synapse/storage/schema/delta/35/state.sql b/synapse/storage/schema/delta/35/state.sql index c4c244c16..0f1fa68a8 100644 --- a/synapse/storage/schema/delta/35/state.sql +++ b/synapse/storage/schema/delta/35/state.sql @@ -19,3 +19,4 @@ CREATE TABLE state_group_edges( ); CREATE INDEX state_group_edges_idx ON state_group_edges(state_group); +CREATE INDEX state_group_edges_prev_idx ON state_group_edges(prev_state_group); From 373654c6354c04b08a6f4dcb0ff7fa9ccae02f55 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 14:50:36 +0100 Subject: [PATCH 08/22] Comment about sqlite and WITH RECURSIVE --- synapse/storage/state.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index ee8b76300..e79079337 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -209,6 +209,8 @@ class StateStore(SQLBaseStore): else: return 0 else: + # We don't use WITH RECURSIVE on sqlite3 as there are distributions + # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) next_group = state_group count = 0 @@ -340,6 +342,8 @@ class StateStore(SQLBaseStore): key = (row["type"], row["state_key"]) results[group][key] = row["event_id"] else: + # We don't use WITH RECURSIVE on sqlite3 as there are distributions + # that ship with an sqlite3 version that doesn't support it (e.g. wheezy) for group in groups: group_tree = [group] next_group = group From 70332a12dd0a2ea01e1f8f835dcb5ca15526a5f3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 14:57:14 +0100 Subject: [PATCH 09/22] Take value in a better way --- synapse/storage/events.py | 2 +- synapse/storage/state.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index bec35ea68..ed182c8d1 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1616,7 +1616,7 @@ class EventsStore(SQLBaseStore): curr_state = self._get_state_groups_from_groups_txn( txn, [new_state_edge], types=None ) - curr_state = curr_state.values()[0] + curr_state = curr_state[new_state_edge] self._simple_delete_txn( txn, diff --git a/synapse/storage/state.py b/synapse/storage/state.py index e79079337..589a4fec6 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -693,12 +693,12 @@ class StateStore(SQLBaseStore): prev_state = self._get_state_groups_from_groups_txn( txn, [prev_group], types=None ) - prev_state = prev_state.values()[0] + prev_state = prev_state[prev_group] curr_state = self._get_state_groups_from_groups_txn( txn, [state_group], types=None ) - curr_state = curr_state.values()[0] + curr_state = curr_state[state_group] if not set(prev_state.keys()) - set(curr_state.keys()): # We can only do a delta if the current has a strict super set From a7032abb2e64f79be5823b770230cb223cc22ff1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 15:07:23 +0100 Subject: [PATCH 10/22] Correctly handle reindexing state groups that already have an edge --- synapse/storage/state.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 589a4fec6..af3ddd962 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -673,6 +673,17 @@ class StateStore(SQLBaseStore): if not row or not state_group: return True, count + txn.execute( + "SELECT state_group FROM state_group_edges" + " WHERE state_group = ?", + (state_group,) + ) + + # If we reach a point where we've already started inserting + # edges we should stop. + if txn.fetchall(): + return True, count + txn.execute( "SELECT coalesce(max(id), 0) FROM state_groups" " WHERE id < ? AND room_id = ?", @@ -709,6 +720,14 @@ class StateStore(SQLBaseStore): if prev_state.get(key, None) != value } + self._simple_delete_txn( + txn, + table="state_group_edges", + keyvalues={ + "state_group": state_group, + } + ) + self._simple_insert_txn( txn, table="state_group_edges", From 0595413c0fe51d4f400f597bf57cd13d5e3450e3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 15:49:57 +0100 Subject: [PATCH 11/22] Scale the batch size so that we're not bitten by the minimum --- synapse/storage/state.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index af3ddd962..0730399b8 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -649,6 +649,10 @@ class StateStore(SQLBaseStore): rows_inserted = progress.get("rows_inserted", 0) max_group = progress.get("max_group", None) + BATCH_SIZE_SCALE_FACTOR = 100 + + batch_size = max(1, int(batch_size / BATCH_SIZE_SCALE_FACTOR)) + if max_group is None: rows = yield self._execute( "_background_deduplicate_state", None, @@ -779,4 +783,4 @@ class StateStore(SQLBaseStore): if finished: yield self._end_background_update(self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME) - defer.returnValue(result) + defer.returnValue(result * BATCH_SIZE_SCALE_FACTOR) From 662b031a30b85811469921e1d61fcf35775917eb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 16:43:30 +0100 Subject: [PATCH 12/22] Allow PDF to be rendered from media repo --- synapse/rest/media/v1/download_resource.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py index 9f0625a82..e3843c1be 100644 --- a/synapse/rest/media/v1/download_resource.py +++ b/synapse/rest/media/v1/download_resource.py @@ -45,7 +45,13 @@ class DownloadResource(Resource): @request_handler() @defer.inlineCallbacks def _async_render_GET(self, request): - request.setHeader("Content-Security-Policy", "sandbox") + request.setHeader( + "Content-Security-Policy", + "default-src none;" + " plugin-types application/pdf;" + " style-src 'unsafe-inline';" + " object-src 'self';" + ) server_name, media_id, name = parse_media_id(request) if server_name == self.server_name: yield self._respond_local_file(request, media_id, name) From d51b8a1674974ec0d694f86e6a911f45efba643a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 17:35:01 +0100 Subject: [PATCH 13/22] Add quotes and be explicity about script-src --- synapse/rest/media/v1/download_resource.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py index e3843c1be..a45ee9483 100644 --- a/synapse/rest/media/v1/download_resource.py +++ b/synapse/rest/media/v1/download_resource.py @@ -47,7 +47,8 @@ class DownloadResource(Resource): def _async_render_GET(self, request): request.setHeader( "Content-Security-Policy", - "default-src none;" + "default-src 'none';" + " script-src 'none';" " plugin-types application/pdf;" " style-src 'unsafe-inline';" " object-src 'self';" From 74a3b4a650022a669f528c332f1913161562c7d0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Sep 2016 10:23:38 +0100 Subject: [PATCH 14/22] Fiddle should_notify to better report stats --- synapse/handlers/presence.py | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7ae05603f..af389b590 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -945,29 +945,24 @@ def should_notify(old_state, new_state): notify_reason_counter.inc("status_msg_change") return True - if old_state.state == PresenceState.ONLINE: - if new_state.state != PresenceState.ONLINE: - # Always notify for online -> anything - notify_reason_counter.inc("online_to_not") - return True + if old_state.state != new_state.state: + notify_reason_counter.inc("state_change") + return True + if old_state.state == PresenceState.ONLINE: if new_state.currently_active != old_state.currently_active: notify_reason_counter.inc("current_active_change") return True if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: # Only notify about last active bumps if we're not currently acive - if not (old_state.currently_active and new_state.currently_active): - notify_reason_counter.inc("last_active_change") + if not new_state.currently_active: + notify_reason_counter.inc("last_active_change_online") return True elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY: # Always notify for a transition where last active gets bumped. - notify_reason_counter.inc("last_active_change") - return True - - if old_state.state != new_state.state: - notify_reason_counter.inc("state_change") + notify_reason_counter.inc("last_active_change_not_online") return True return False From 438ef4763704fb90c3aa0b7aa0c688607e60b010 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Sep 2016 10:28:35 +0100 Subject: [PATCH 15/22] Short circuit if presence is the same --- synapse/handlers/presence.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index af389b590..a9f523311 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -941,6 +941,9 @@ class PresenceHandler(object): def should_notify(old_state, new_state): """Decides if a presence state change should be sent to interested parties. """ + if old_state == new_state: + return False + if old_state.status_msg != new_state.status_msg: notify_reason_counter.inc("status_msg_change") return True From 3c4208a0570fb7410a8d12e11999a78ee35700a6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Sep 2016 11:31:01 +0100 Subject: [PATCH 16/22] Record counts of state changes --- synapse/handlers/presence.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index a9f523311..da9f0da69 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -53,6 +53,9 @@ bump_active_time_counter = metrics.register_counter("bump_active_time") get_updates_counter = metrics.register_counter("get_updates", labels=["type"]) notify_reason_counter = metrics.register_counter("notify_reason", labels=["reason"]) +state_transition_counter = metrics.register_counter( + "state_transition", labels=["from", "to"] +) # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them @@ -950,6 +953,7 @@ def should_notify(old_state, new_state): if old_state.state != new_state.state: notify_reason_counter.inc("state_change") + state_transition_counter.inc(old_state.state, new_state.state) return True if old_state.state == PresenceState.ONLINE: From d25c20ccbe0f10fe5d6c0cef2156db7e8d76049c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Sep 2016 14:22:22 +0100 Subject: [PATCH 17/22] Use windowing function to make use of index --- synapse/storage/state.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 0730399b8..26ecad590 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -322,11 +322,11 @@ class StateStore(SQLBaseStore): SELECT prev_state_group FROM state_group_edges e, state s WHERE s.state_group = e.state_group ) - SELECT type, state_key, event_id FROM state_groups_state - WHERE ROW(type, state_key, state_group) IN ( - SELECT type, state_key, max(state_group) FROM state - INNER JOIN state_groups_state USING (state_group) - GROUP BY type, state_key + SELECT type, state_key, last_value(event_id) OVER ( + PARTITION BY type, state_key ORDER BY state_group ASC + ) AS event_id FROM state_groups_state + WHERE state_group IN ( + SELECT state_group FROM state ) %s; """) % (where_clause,) From fadb01551a897fdf1a2cbe43ff463c9616bd11ad Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Sep 2016 14:39:01 +0100 Subject: [PATCH 18/22] Add appopriate framing clause --- synapse/storage/state.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 26ecad590..382f308a6 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -324,6 +324,7 @@ class StateStore(SQLBaseStore): ) SELECT type, state_key, last_value(event_id) OVER ( PARTITION BY type, state_key ORDER BY state_group ASC + ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING ) AS event_id FROM state_groups_state WHERE state_group IN ( SELECT state_group FROM state From 513188aa56bc680a54dbdf6d40657da72c5c6877 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 7 Sep 2016 14:53:19 +0100 Subject: [PATCH 19/22] Comment --- synapse/storage/state.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 382f308a6..d6643473d 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -315,6 +315,10 @@ class StateStore(SQLBaseStore): # against `state_groups_state` to fetch the latest state. # It assumes that previous state groups are always numerically # lesser. + # The PARTITION is used to get the event_id in the greatest state + # group for the given type, state_key. + # This may return multiple rows per (type, state_key), but last_value + # should be the same. sql = (""" WITH RECURSIVE state(state_group) AS ( VALUES(?::bigint) From 85b51fdd6bc16b0b673130da760eb930e414af5c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 7 Sep 2016 17:19:18 +0100 Subject: [PATCH 20/22] Log the types and values when failing to store devices --- synapse/storage/devices.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index afd6530ca..17920d448 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -54,8 +54,12 @@ class DeviceStore(SQLBaseStore): or_ignore=ignore_if_known, ) except Exception as e: - logger.error("store_device with device_id=%s failed: %s", - device_id, e) + logger.error("store_device with device_id=%s(%r) user_id=%s(%r)" + " display_name=%s(%r) failed: %s", + type(device_id).__name__, device_id, + type(user_id).__name__, user_id, + type(initial_device_display_name).__name__, + initial_device_display_name, e) raise StoreError(500, "Problem storing device.") def get_device(self, user_id, device_id): From b568ca309c5724d28b6ebd9c0a3cd8179fa6d6d3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Sep 2016 09:38:54 +0100 Subject: [PATCH 21/22] Temporarily disable sequential scans for state fetching --- synapse/storage/state.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index d6643473d..fef87834c 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -310,6 +310,10 @@ class StateStore(SQLBaseStore): results = {group: {} for group in groups} if isinstance(self.database_engine, PostgresEngine): + # Temporarily disable sequential scans in this transaction. This is + # a temporary hack until we can add the right indices in + txn.execute("SET LOCAL enable_seqscan=off") + # The below query walks the state_group tree so that the "state" # table includes all state_groups in the tree. It then joins # against `state_groups_state` to fetch the latest state. From 61cd9af09bc66f29d6a740f445047624d48fda8c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Sep 2016 13:40:46 +0100 Subject: [PATCH 22/22] Log delta files we're applying --- synapse/storage/prepare_database.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index b1fbc4ffa..7efbe51cd 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -242,7 +242,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, module = imp.load_source( module_name, absolute_path, python_file ) - logger.debug("Running script %s", relative_path) + logger.info("Running script %s", relative_path) module.run_create(cur, database_engine) if not is_empty: module.run_upgrade(cur, database_engine, config=config) @@ -253,7 +253,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files, pass elif ext == ".sql": # A plain old .sql file, just read and execute it - logger.debug("Applying schema %s", relative_path) + logger.info("Applying schema %s", relative_path) executescript(cur, absolute_path) else: # Not a valid delta file.