From 5797f5542b687b73ebdd8613f64ce0f38e637b55 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 12 Jul 2018 01:32:39 +0100 Subject: [PATCH 01/22] WIP to announce deleted devices over federation Previously we queued up the poke correctly when the device was deleted, but then the actual EDU wouldn't get sent, as the device was no longer known. Instead, we now send EDUs for deleted devices too if there's a poke for them. --- synapse/notifier.py | 2 +- synapse/storage/devices.py | 40 +++++++++++++++++++++--------- synapse/storage/end_to_end_keys.py | 16 +++++++++++- 3 files changed, 44 insertions(+), 14 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 51cbd66f0..e650c3e49 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -274,7 +274,7 @@ class Notifier(object): logger.exception("Error notifying application services of event") def on_new_event(self, stream_key, new_token, users=[], rooms=[]): - """ Used to inform listeners that something has happend event wise. + """ Used to inform listeners that something has happened event wise. Will wake up all listeners for the given users and rooms. """ diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index ec68e39f1..0c797f9f3 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -239,6 +239,7 @@ class DeviceStore(SQLBaseStore): def update_remote_device_list_cache_entry(self, user_id, device_id, content, stream_id): """Updates a single user's device in the cache. + If the content is null, delete the device from the cache. """ return self.runInteraction( "update_remote_device_list_cache_entry", @@ -248,17 +249,32 @@ class DeviceStore(SQLBaseStore): def _update_remote_device_list_cache_entry_txn(self, txn, user_id, device_id, content, stream_id): - self._simple_upsert_txn( - txn, - table="device_lists_remote_cache", - keyvalues={ - "user_id": user_id, - "device_id": device_id, - }, - values={ - "content": json.dumps(content), - } - ) + if content is None: + self._simple_delete_txn( + txn, + table="device_lists_remote_cache", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + }, + ) + + # Do we need this? + txn.call_after( + self.device_id_exists_cache.invalidate, (user_id, device_id,) + ) + else: + self._simple_upsert_txn( + txn, + table="device_lists_remote_cache", + keyvalues={ + "user_id": user_id, + "device_id": device_id, + }, + values={ + "content": json.dumps(content), + } + ) txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,)) txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,)) @@ -366,7 +382,7 @@ class DeviceStore(SQLBaseStore): now_stream_id = max(stream_id for stream_id in itervalues(query_map)) devices = self._get_e2e_device_keys_txn( - txn, query_map.keys(), include_all_devices=True + txn, query_map.keys(), include_all_devices=True, include_deleted_devices=True ) prev_sent_id_sql = """ diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 7ae5c6548..f61553cec 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -64,12 +64,17 @@ class EndToEndKeyStore(SQLBaseStore): ) @defer.inlineCallbacks - def get_e2e_device_keys(self, query_list, include_all_devices=False): + def get_e2e_device_keys( + self, query_list, include_all_devices=False, + include_deleted_devices=False + ): """Fetch a list of device keys. Args: query_list(list): List of pairs of user_ids and device_ids. include_all_devices (bool): whether to include entries for devices that don't have device keys + include_deleted_devices (bool): whether to include null entries for + devices which no longer exist (but where in the query_list) Returns: Dict mapping from user-id to dict mapping from device_id to dict containing "key_json", "device_display_name". @@ -82,10 +87,19 @@ class EndToEndKeyStore(SQLBaseStore): query_list, include_all_devices, ) + if include_deleted_devices: + deleted_devices = set(query_list) + for user_id, device_keys in iteritems(results): for device_id, device_info in iteritems(device_keys): + if include_deleted_devices: + deleted_devices -= (user_id, device_id) device_info["keys"] = json.loads(device_info.pop("key_json")) + if include_deleted_devices: + for user_id, device_id in deleted_devices: + results.setdefault(user_id, {})[device_id] = None + defer.returnValue(results) def _get_e2e_device_keys_txn(self, txn, query_list, include_all_devices): From 12ec58301f946ced9702afbf6dfbfbc8c3dfd3dd Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 12 Jul 2018 11:39:43 +0100 Subject: [PATCH 02/22] shift to using an explicit deleted flag on m.device_list_update EDUs and generally make it work. --- synapse/storage/devices.py | 18 ++++++++++-------- synapse/storage/end_to_end_keys.py | 27 +++++++++++++++------------ 2 files changed, 25 insertions(+), 20 deletions(-) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 0c797f9f3..203f50f07 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -239,7 +239,6 @@ class DeviceStore(SQLBaseStore): def update_remote_device_list_cache_entry(self, user_id, device_id, content, stream_id): """Updates a single user's device in the cache. - If the content is null, delete the device from the cache. """ return self.runInteraction( "update_remote_device_list_cache_entry", @@ -249,7 +248,7 @@ class DeviceStore(SQLBaseStore): def _update_remote_device_list_cache_entry_txn(self, txn, user_id, device_id, content, stream_id): - if content is None: + if content.get("deleted"): self._simple_delete_txn( txn, table="device_lists_remote_cache", @@ -409,12 +408,15 @@ class DeviceStore(SQLBaseStore): prev_id = stream_id - key_json = device.get("key_json", None) - if key_json: - result["keys"] = json.loads(key_json) - device_display_name = device.get("device_display_name", None) - if device_display_name: - result["device_display_name"] = device_display_name + if device is not None: + key_json = device.get("key_json", None) + if key_json: + result["keys"] = json.loads(key_json) + device_display_name = device.get("device_display_name", None) + if device_display_name: + result["device_display_name"] = device_display_name + else: + result["deleted"] = True results.append(result) diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index f61553cec..6c2871942 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -74,7 +74,7 @@ class EndToEndKeyStore(SQLBaseStore): include_all_devices (bool): whether to include entries for devices that don't have device keys include_deleted_devices (bool): whether to include null entries for - devices which no longer exist (but where in the query_list) + devices which no longer exist (but were in the query_list) Returns: Dict mapping from user-id to dict mapping from device_id to dict containing "key_json", "device_display_name". @@ -84,28 +84,25 @@ class EndToEndKeyStore(SQLBaseStore): results = yield self.runInteraction( "get_e2e_device_keys", self._get_e2e_device_keys_txn, - query_list, include_all_devices, + query_list, include_all_devices, include_deleted_devices, ) - if include_deleted_devices: - deleted_devices = set(query_list) - for user_id, device_keys in iteritems(results): for device_id, device_info in iteritems(device_keys): - if include_deleted_devices: - deleted_devices -= (user_id, device_id) device_info["keys"] = json.loads(device_info.pop("key_json")) - if include_deleted_devices: - for user_id, device_id in deleted_devices: - results.setdefault(user_id, {})[device_id] = None - defer.returnValue(results) - def _get_e2e_device_keys_txn(self, txn, query_list, include_all_devices): + def _get_e2e_device_keys_txn( + self, txn, query_list, include_all_devices=False, + include_deleted_devices=False, + ): query_clauses = [] query_params = [] + if include_deleted_devices: + deleted_devices = set(query_list) + for (user_id, device_id) in query_list: query_clause = "user_id = ?" query_params.append(user_id) @@ -133,8 +130,14 @@ class EndToEndKeyStore(SQLBaseStore): result = {} for row in rows: + if include_deleted_devices: + deleted_devices.remove((row["user_id"], row["device_id"])) result.setdefault(row["user_id"], {})[row["device_id"]] = row + if include_deleted_devices: + for user_id, device_id in deleted_devices: + result.setdefault(user_id, {})[device_id] = None + return result @defer.inlineCallbacks From 37c4fba0ac95e0425193b1eaeaf96bc348e094d9 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 12 Jul 2018 11:45:33 +0100 Subject: [PATCH 03/22] changelog --- changelog.d/3520.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3520.bugfix diff --git a/changelog.d/3520.bugfix b/changelog.d/3520.bugfix new file mode 100644 index 000000000..9278cb370 --- /dev/null +++ b/changelog.d/3520.bugfix @@ -0,0 +1 @@ +Correctly announce deleted devices over federation From c0685f67c001ab156fb6922877c35f70536100dc Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 19 Jul 2018 10:59:02 +0100 Subject: [PATCH 04/22] spell out that include_deleted_devices requires include_all_devices --- synapse/storage/end_to_end_keys.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 6c2871942..ffe4d7235 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -74,7 +74,8 @@ class EndToEndKeyStore(SQLBaseStore): include_all_devices (bool): whether to include entries for devices that don't have device keys include_deleted_devices (bool): whether to include null entries for - devices which no longer exist (but were in the query_list) + devices which no longer exist (but were in the query_list). + This option only takes effect if include_all_devices is true. Returns: Dict mapping from user-id to dict mapping from device_id to dict containing "key_json", "device_display_name". @@ -100,6 +101,9 @@ class EndToEndKeyStore(SQLBaseStore): query_clauses = [] query_params = [] + if include_all_devices is False: + include_deleted_devices = False + if include_deleted_devices: deleted_devices = set(query_list) From 9e40834f742d95c324183f3e71ae73aafe3c6a99 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 19 Jul 2018 11:15:10 +0100 Subject: [PATCH 05/22] yes, we do need to invalidate the device_id_exists_cache when deleting a remote device --- synapse/storage/devices.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 203f50f07..cc3cdf2eb 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -258,7 +258,6 @@ class DeviceStore(SQLBaseStore): }, ) - # Do we need this? txn.call_after( self.device_id_exists_cache.invalidate, (user_id, device_id,) ) From 3d6df846580ce6ef8769945e6990af2f44251e40 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 20 Jul 2018 13:59:55 +0100 Subject: [PATCH 06/22] Test and fix support for cancellation in Linearizer --- synapse/util/async.py | 28 ++++++++++++++++++++++------ tests/util/test_linearizer.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 6 deletions(-) diff --git a/synapse/util/async.py b/synapse/util/async.py index 5a50d9700..a7094e2fb 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -184,13 +184,13 @@ class Linearizer(object): # key_to_defer is a map from the key to a 2 element list where # the first element is the number of things executing, and - # the second element is a deque of deferreds for the things blocked from - # executing. + # the second element is an OrderedDict, where the keys are deferreds for the + # things blocked from executing. self.key_to_defer = {} @defer.inlineCallbacks def queue(self, key): - entry = self.key_to_defer.setdefault(key, [0, collections.deque()]) + entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()]) # If the number of things executing is greater than the maximum # then add a deferred to the list of blocked items @@ -198,12 +198,28 @@ class Linearizer(object): # this item so that it can continue executing. if entry[0] >= self.max_count: new_defer = defer.Deferred() - entry[1].append(new_defer) + entry[1][new_defer] = 1 logger.info( "Waiting to acquire linearizer lock %r for key %r", self.name, key, ) - yield make_deferred_yieldable(new_defer) + try: + yield make_deferred_yieldable(new_defer) + except Exception as e: + if isinstance(e, CancelledError): + logger.info( + "Cancelling wait for linearizer lock %r for key %r", + self.name, key, + ) + else: + logger.warn( + "Unexpected exception waiting for linearizer lock %r for key %r", + self.name, key, + ) + + # we just have to take ourselves back out of the queue. + del entry[1][new_defer] + raise logger.info("Acquired linearizer lock %r for key %r", self.name, key) entry[0] += 1 @@ -238,7 +254,7 @@ class Linearizer(object): entry[0] -= 1 if entry[1]: - next_def = entry[1].popleft() + (next_def, _) = entry[1].popitem(last=False) # we need to run the next thing in the sentinel context. with PreserveLoggingContext(): diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index c9563124f..4729bd5a0 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -17,6 +17,7 @@ from six.moves import range from twisted.internet import defer, reactor +from twisted.internet.defer import CancelledError from synapse.util import Clock, logcontext from synapse.util.async import Linearizer @@ -112,3 +113,33 @@ class LinearizerTestCase(unittest.TestCase): d6 = limiter.queue(key) with (yield d6): pass + + @defer.inlineCallbacks + def test_cancellation(self): + linearizer = Linearizer() + + key = object() + + d1 = linearizer.queue(key) + cm1 = yield d1 + + d2 = linearizer.queue(key) + self.assertFalse(d2.called) + + d3 = linearizer.queue(key) + self.assertFalse(d3.called) + + d2.cancel() + + with cm1: + pass + + self.assertTrue(d2.called) + try: + yield d2 + self.fail("Expected d2 to raise CancelledError") + except CancelledError: + pass + + with (yield d3): + pass From 5c30cb709af1295b2285d802d1e40b91382e7af0 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 20 Jul 2018 14:01:36 +0100 Subject: [PATCH 07/22] Changelog --- changelog.d/3572.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3572.misc diff --git a/changelog.d/3572.misc b/changelog.d/3572.misc new file mode 100644 index 000000000..8908324e6 --- /dev/null +++ b/changelog.d/3572.misc @@ -0,0 +1 @@ +Merge Linearizer and Limiter From 3132b89f12f0386558045683ad198f090b0e2c90 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Sat, 21 Jul 2018 15:47:18 +1000 Subject: [PATCH 08/22] Make the rest of the .iterwhatever go away (#3562) --- changelog.d/3562.misc | 0 synapse/app/homeserver.py | 6 ++++-- synapse/app/synctl.py | 4 +++- synapse/events/snapshot.py | 4 +++- synapse/handlers/federation.py | 18 +++++++++--------- synapse/state.py | 6 +++--- synapse/visibility.py | 19 ++++++++++--------- tests/test_federation.py | 3 +-- 8 files changed, 33 insertions(+), 27 deletions(-) create mode 100644 changelog.d/3562.misc diff --git a/changelog.d/3562.misc b/changelog.d/3562.misc new file mode 100644 index 000000000..e69de29bb diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 14e6dca52..2ad1beb8d 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -18,6 +18,8 @@ import logging import os import sys +from six import iteritems + from twisted.application import service from twisted.internet import defer, reactor from twisted.web.resource import EncodingResourceWrapper, NoResource @@ -442,7 +444,7 @@ def run(hs): stats["total_nonbridged_users"] = total_nonbridged_users daily_user_type_results = yield hs.get_datastore().count_daily_user_type() - for name, count in daily_user_type_results.iteritems(): + for name, count in iteritems(daily_user_type_results): stats["daily_user_type_" + name] = count room_count = yield hs.get_datastore().get_room_count() @@ -453,7 +455,7 @@ def run(hs): stats["daily_messages"] = yield hs.get_datastore().count_daily_messages() r30_results = yield hs.get_datastore().count_r30_users() - for name, count in r30_results.iteritems(): + for name, count in iteritems(r30_results): stats["r30_users_" + name] = count daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages() diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index 68acc15a9..d658f967b 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -25,6 +25,8 @@ import subprocess import sys import time +from six import iteritems + import yaml SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"] @@ -173,7 +175,7 @@ def main(): os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor) cache_factors = config.get("synctl_cache_factors", {}) - for cache_name, factor in cache_factors.iteritems(): + for cache_name, factor in iteritems(cache_factors): os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor) worker_configfiles = [] diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index bcd9bb594..f83a1581a 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from six import iteritems + from frozendict import frozendict from twisted.internet import defer @@ -159,7 +161,7 @@ def _encode_state_dict(state_dict): return [ (etype, state_key, v) - for (etype, state_key), v in state_dict.iteritems() + for (etype, state_key), v in iteritems(state_dict) ] diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 65f6041b1..a6d391c4e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -21,8 +21,8 @@ import logging import sys import six -from six import iteritems -from six.moves import http_client +from six import iteritems, itervalues +from six.moves import http_client, zip from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json @@ -731,7 +731,7 @@ class FederationHandler(BaseHandler): """ joined_users = [ (state_key, int(event.depth)) - for (e_type, state_key), event in state.iteritems() + for (e_type, state_key), event in iteritems(state) if e_type == EventTypes.Member and event.membership == Membership.JOIN ] @@ -748,7 +748,7 @@ class FederationHandler(BaseHandler): except Exception: pass - return sorted(joined_domains.iteritems(), key=lambda d: d[1]) + return sorted(joined_domains.items(), key=lambda d: d[1]) curr_domains = get_domains_from_state(curr_state) @@ -811,7 +811,7 @@ class FederationHandler(BaseHandler): tried_domains = set(likely_domains) tried_domains.add(self.server_name) - event_ids = list(extremities.iterkeys()) + event_ids = list(extremities.keys()) logger.debug("calling resolve_state_groups in _maybe_backfill") resolve = logcontext.preserve_fn( @@ -827,15 +827,15 @@ class FederationHandler(BaseHandler): states = dict(zip(event_ids, [s.state for s in states])) state_map = yield self.store.get_events( - [e_id for ids in states.itervalues() for e_id in ids.itervalues()], + [e_id for ids in itervalues(states) for e_id in itervalues(ids)], get_prev_content=False ) states = { key: { k: state_map[e_id] - for k, e_id in state_dict.iteritems() + for k, e_id in iteritems(state_dict) if e_id in state_map - } for key, state_dict in states.iteritems() + } for key, state_dict in iteritems(states) } for e_id, _ in sorted_extremeties_tuple: @@ -1515,7 +1515,7 @@ class FederationHandler(BaseHandler): yield self.store.persist_events( [ (ev_info["event"], context) - for ev_info, context in itertools.izip(event_infos, contexts) + for ev_info, context in zip(event_infos, contexts) ], backfilled=backfilled, ) diff --git a/synapse/state.py b/synapse/state.py index 15a593d41..504caae2f 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -18,7 +18,7 @@ import hashlib import logging from collections import namedtuple -from six import iteritems, itervalues +from six import iteritems, iterkeys, itervalues from frozendict import frozendict @@ -647,7 +647,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory): for event_id in event_ids ) if event_map is not None: - needed_events -= set(event_map.iterkeys()) + needed_events -= set(iterkeys(event_map)) logger.info("Asking for %d conflicted events", len(needed_events)) @@ -668,7 +668,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory): new_needed_events = set(itervalues(auth_events)) new_needed_events -= needed_events if event_map is not None: - new_needed_events -= set(event_map.iterkeys()) + new_needed_events -= set(iterkeys(event_map)) logger.info("Asking for %d auth events", len(new_needed_events)) diff --git a/synapse/visibility.py b/synapse/visibility.py index 9b97ea2b8..ba0499a02 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -12,11 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import itertools + import logging import operator -import six +from six import iteritems, itervalues +from six.moves import map from twisted.internet import defer @@ -221,7 +222,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False, return event # check each event: gives an iterable[None|EventBase] - filtered_events = itertools.imap(allowed, events) + filtered_events = map(allowed, events) # remove the None entries filtered_events = filter(operator.truth, filtered_events) @@ -261,7 +262,7 @@ def filter_events_for_server(store, server_name, events): # membership states for the requesting server to determine # if the server is either in the room or has been invited # into the room. - for ev in state.itervalues(): + for ev in itervalues(state): if ev.type != EventTypes.Member: continue try: @@ -295,7 +296,7 @@ def filter_events_for_server(store, server_name, events): ) visibility_ids = set() - for sids in event_to_state_ids.itervalues(): + for sids in itervalues(event_to_state_ids): hist = sids.get((EventTypes.RoomHistoryVisibility, "")) if hist: visibility_ids.add(hist) @@ -308,7 +309,7 @@ def filter_events_for_server(store, server_name, events): event_map = yield store.get_events(visibility_ids) all_open = all( e.content.get("history_visibility") in (None, "shared", "world_readable") - for e in event_map.itervalues() + for e in itervalues(event_map) ) if all_open: @@ -346,7 +347,7 @@ def filter_events_for_server(store, server_name, events): # state_key_to_event_id_set = { e - for key_to_eid in six.itervalues(event_to_state_ids) + for key_to_eid in itervalues(event_to_state_ids) for e in key_to_eid.items() } @@ -369,10 +370,10 @@ def filter_events_for_server(store, server_name, events): event_to_state = { e_id: { key: event_map[inner_e_id] - for key, inner_e_id in key_to_eid.iteritems() + for key, inner_e_id in iteritems(key_to_eid) if inner_e_id in event_map } - for e_id, key_to_eid in event_to_state_ids.iteritems() + for e_id, key_to_eid in iteritems(event_to_state_ids) } defer.returnValue([ diff --git a/tests/test_federation.py b/tests/test_federation.py index 159a13697..f40ff29b5 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -137,7 +137,6 @@ class MessageAcceptTests(unittest.TestCase): ) self.assertEqual(self.successResultOf(extrem)[0], "$join:test.serv") - @unittest.DEBUG def test_cant_hide_past_history(self): """ If you send a message, you must be able to provide the direct @@ -178,7 +177,7 @@ class MessageAcceptTests(unittest.TestCase): for x, y in d.items() if x == ("m.room.member", "@us:test") ], - "auth_chain_ids": d.values(), + "auth_chain_ids": list(d.values()), } ) From c1bf2b587eaa718e28a33f76a7b5f6e288255fca Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 23 Jul 2018 09:56:23 +0100 Subject: [PATCH 09/22] add trailing comma --- synapse/storage/end_to_end_keys.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index ffe4d7235..523b4360c 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -66,7 +66,7 @@ class EndToEndKeyStore(SQLBaseStore): @defer.inlineCallbacks def get_e2e_device_keys( self, query_list, include_all_devices=False, - include_deleted_devices=False + include_deleted_devices=False, ): """Fetch a list of device keys. Args: From acbfdc3442f706150c6312e534ca4ecec8548582 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 12:17:16 +0100 Subject: [PATCH 10/22] Refcator EventContext to accept state during init --- synapse/events/snapshot.py | 48 ++++++++++--------- synapse/state.py | 97 +++++++++++++++++++++----------------- 2 files changed, 82 insertions(+), 63 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index f83a1581a..fbbe8dd49 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -60,22 +60,22 @@ class EventContext(object): "app_service", ] - def __init__(self): + def __init__(self, state_group, current_state_ids, prev_state_ids, + prev_group=None, delta_ids=None): # The current state including the current event - self.current_state_ids = None + self.current_state_ids = current_state_ids # The current state excluding the current event - self.prev_state_ids = None - self.state_group = None - - self.rejected = False + self.prev_state_ids = prev_state_ids + self.state_group = state_group # A previously persisted state group and a delta between that # and this state. - self.prev_group = None - self.delta_ids = None + self.prev_group = prev_group + self.delta_ids = delta_ids - self.prev_state_events = None + self.prev_state_events = [] + self.rejected = False self.app_service = None def serialize(self, event): @@ -123,27 +123,33 @@ class EventContext(object): Returns: EventContext """ - context = EventContext() - context.state_group = input["state_group"] - context.rejected = input["rejected"] - context.prev_group = input["prev_group"] - context.delta_ids = _decode_state_dict(input["delta_ids"]) - context.prev_state_events = input["prev_state_events"] - # We use the state_group and prev_state_id stuff to pull the # current_state_ids out of the DB and construct prev_state_ids. prev_state_id = input["prev_state_id"] event_type = input["event_type"] event_state_key = input["event_state_key"] - context.current_state_ids = yield store.get_state_ids_for_group( - context.state_group, + state_group = input["state_group"] + + current_state_ids = yield store.get_state_ids_for_group( + state_group, ) if prev_state_id and event_state_key: - context.prev_state_ids = dict(context.current_state_ids) - context.prev_state_ids[(event_type, event_state_key)] = prev_state_id + prev_state_ids = dict(current_state_ids) + prev_state_ids[(event_type, event_state_key)] = prev_state_id else: - context.prev_state_ids = context.current_state_ids + prev_state_ids = current_state_ids + + context = EventContext( + state_group=state_group, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + prev_group=input["prev_group"], + delta_ids = _decode_state_dict(input["delta_ids"]), + ) + + context.rejected = input["rejected"] + context.prev_state_events = input["prev_state_events"] app_service_id = input["app_service_id"] if app_service_id: diff --git a/synapse/state.py b/synapse/state.py index 504caae2f..a70869500 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -203,25 +203,27 @@ class StateHandler(object): # If this is an outlier, then we know it shouldn't have any current # state. Certainly store.get_current_state won't return any, and # persisting the event won't store the state group. - context = EventContext() if old_state: - context.prev_state_ids = { + prev_state_ids = { (s.type, s.state_key): s.event_id for s in old_state } if event.is_state(): - context.current_state_ids = dict(context.prev_state_ids) + current_state_ids = dict(prev_state_ids) key = (event.type, event.state_key) - context.current_state_ids[key] = event.event_id + current_state_ids[key] = event.event_id else: - context.current_state_ids = context.prev_state_ids + current_state_ids = prev_state_ids else: - context.current_state_ids = {} - context.prev_state_ids = {} - context.prev_state_events = [] + current_state_ids = {} + prev_state_ids = {} # We don't store state for outliers, so we don't generate a state - # froup for it. - context.state_group = None + # group for it. + context = EventContext( + state_group=None, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + ) defer.returnValue(context) @@ -230,31 +232,35 @@ class StateHandler(object): # Let's just correctly fill out the context and create a # new state group for it. - context = EventContext() - context.prev_state_ids = { + prev_state_ids = { (s.type, s.state_key): s.event_id for s in old_state } if event.is_state(): key = (event.type, event.state_key) - if key in context.prev_state_ids: - replaces = context.prev_state_ids[key] + if key in prev_state_ids: + replaces = prev_state_ids[key] if replaces != event.event_id: # Paranoia check event.unsigned["replaces_state"] = replaces - context.current_state_ids = dict(context.prev_state_ids) - context.current_state_ids[key] = event.event_id + current_state_ids = dict(prev_state_ids) + current_state_ids[key] = event.event_id else: - context.current_state_ids = context.prev_state_ids + current_state_ids = prev_state_ids - context.state_group = yield self.store.store_state_group( + state_group = yield self.store.store_state_group( event.event_id, event.room_id, prev_group=None, delta_ids=None, - current_state_ids=context.current_state_ids, + current_state_ids=current_state_ids, + ) + + context = EventContext( + state_group=state_group, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, ) - context.prev_state_events = [] defer.returnValue(context) logger.debug("calling resolve_state_groups from compute_event_context") @@ -262,47 +268,47 @@ class StateHandler(object): event.room_id, [e for e, _ in event.prev_events], ) - curr_state = entry.state + prev_state_ids = entry.state + prev_group = None + delta_ids = None - context = EventContext() - context.prev_state_ids = curr_state if event.is_state(): # If this is a state event then we need to create a new state # group for the state after this event. key = (event.type, event.state_key) - if key in context.prev_state_ids: - replaces = context.prev_state_ids[key] + if key in prev_state_ids: + replaces = prev_state_ids[key] event.unsigned["replaces_state"] = replaces - context.current_state_ids = dict(context.prev_state_ids) - context.current_state_ids[key] = event.event_id + current_state_ids = dict(prev_state_ids) + current_state_ids[key] = event.event_id if entry.state_group: # If the state at the event has a state group assigned then # we can use that as the prev group - context.prev_group = entry.state_group - context.delta_ids = { + prev_group = entry.state_group + delta_ids = { key: event.event_id } elif entry.prev_group: # If the state at the event only has a prev group, then we can # use that as a prev group too. - context.prev_group = entry.prev_group - context.delta_ids = dict(entry.delta_ids) - context.delta_ids[key] = event.event_id + prev_group = entry.prev_group + delta_ids = dict(entry.delta_ids) + delta_ids[key] = event.event_id - context.state_group = yield self.store.store_state_group( + state_group = yield self.store.store_state_group( event.event_id, event.room_id, - prev_group=context.prev_group, - delta_ids=context.delta_ids, - current_state_ids=context.current_state_ids, + prev_group=prev_group, + delta_ids=delta_ids, + current_state_ids=current_state_ids, ) else: - context.current_state_ids = context.prev_state_ids - context.prev_group = entry.prev_group - context.delta_ids = entry.delta_ids + current_state_ids = prev_state_ids + prev_group = entry.prev_group + delta_ids = entry.delta_ids if entry.state_group is None: entry.state_group = yield self.store.store_state_group( @@ -310,13 +316,20 @@ class StateHandler(object): event.room_id, prev_group=entry.prev_group, delta_ids=entry.delta_ids, - current_state_ids=context.current_state_ids, + current_state_ids=current_state_ids, ) entry.state_id = entry.state_group - context.state_group = entry.state_group + state_group = entry.state_group + + context = EventContext( + state_group=state_group, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + prev_group=prev_group, + delta_ids=delta_ids, + ) - context.prev_state_events = [] defer.returnValue(context) @defer.inlineCallbacks From 959f4b9074f6e893dc3c8e622e8c17fd229ad319 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 12:22:59 +0100 Subject: [PATCH 11/22] Newsfile --- changelog.d/3577.misc | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 changelog.d/3577.misc diff --git a/changelog.d/3577.misc b/changelog.d/3577.misc new file mode 100644 index 000000000..e69de29bb From 842cdece42e59a4181a496761447f0cb00053c05 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 12:31:35 +0100 Subject: [PATCH 12/22] pep8 --- synapse/events/snapshot.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index fbbe8dd49..5e02ef1a5 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -145,7 +145,7 @@ class EventContext(object): current_state_ids=current_state_ids, prev_state_ids=prev_state_ids, prev_group=input["prev_group"], - delta_ids = _decode_state_dict(input["delta_ids"]), + delta_ids=_decode_state_dict(input["delta_ids"]), ) context.rejected = input["rejected"] From 440b8845b531db05e7c4f646e48dee7635cf1f0a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 12:38:46 +0100 Subject: [PATCH 13/22] Make EventContext lazy load state --- synapse/events/snapshot.py | 153 +++++++++++++++++++++++++++---------- synapse/state.py | 6 +- 2 files changed, 115 insertions(+), 44 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 5e02ef1a5..f9568638a 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -19,18 +19,12 @@ from frozendict import frozendict from twisted.internet import defer +from synapse.util.logcontext import make_deferred_yieldable, run_in_background + class EventContext(object): """ Attributes: - current_state_ids (dict[(str, str), str]): - The current state map including the current event. - (type, state_key) -> event_id - - prev_state_ids (dict[(str, str), str]): - The current state map excluding the current event. - (type, state_key) -> event_id - state_group (int|None): state group id, if the state has been stored as a state group. This is usually only None if e.g. the event is an outlier. @@ -47,36 +41,71 @@ class EventContext(object): prev_state_events (?): XXX: is this ever set to anything other than the empty list? + + _current_state_ids (dict[(str, str), str]|None): + The current state map including the current event. None if outlier + or we haven't fetched the state from DB yet. + (type, state_key) -> event_id + + _prev_state_ids (dict[(str, str), str]|None): + The current state map excluding the current event. None if outlier + or we haven't fetched the state from DB yet. + (type, state_key) -> event_id + + _fetching_state_deferred (Deferred|None): Resolves when *_state_ids have + been calculated. None if we haven't started calculating yet + + _prev_state_id (str|None): If set then the event associated with the + context overrode the _prev_state_id + + _event_type (str): The type of the event the context is associated with + + _event_state_key (str|None): The state_key of the event the context is + associated with """ __slots__ = [ - "current_state_ids", - "prev_state_ids", "state_group", "rejected", "prev_group", "delta_ids", "prev_state_events", "app_service", + "_current_state_ids", + "_prev_state_ids", + "_prev_state_id", + "_event_type", + "_event_state_key", + "_fetching_state_deferred", ] - def __init__(self, state_group, current_state_ids, prev_state_ids, - prev_group=None, delta_ids=None): + @staticmethod + def with_state(state_group, current_state_ids, prev_state_ids, + prev_group=None, delta_ids=None): + context = EventContext() + # The current state including the current event - self.current_state_ids = current_state_ids + context._current_state_ids = current_state_ids # The current state excluding the current event - self.prev_state_ids = prev_state_ids - self.state_group = state_group + context._prev_state_ids = prev_state_ids + context.state_group = state_group + + context._prev_state_id = None + context._event_type = None + context._event_state_key = None + context._fetching_state_deferred = defer.succeed(None) # A previously persisted state group and a delta between that # and this state. - self.prev_group = prev_group - self.delta_ids = delta_ids + context.prev_group = prev_group + context.delta_ids = delta_ids - self.prev_state_events = [] + context.prev_state_events = [] - self.rejected = False - self.app_service = None + context.rejected = False + context.app_service = None + + return context def serialize(self, event): """Converts self to a type that can be serialized as JSON, and then @@ -123,30 +152,17 @@ class EventContext(object): Returns: EventContext """ + context = EventContext() + # We use the state_group and prev_state_id stuff to pull the # current_state_ids out of the DB and construct prev_state_ids. - prev_state_id = input["prev_state_id"] - event_type = input["event_type"] - event_state_key = input["event_state_key"] + context._prev_state_id = input["prev_state_id"] + context._event_type = input["event_type"] + context._event_state_key = input["event_state_key"] - state_group = input["state_group"] - - current_state_ids = yield store.get_state_ids_for_group( - state_group, - ) - if prev_state_id and event_state_key: - prev_state_ids = dict(current_state_ids) - prev_state_ids[(event_type, event_state_key)] = prev_state_id - else: - prev_state_ids = current_state_ids - - context = EventContext( - state_group=state_group, - current_state_ids=current_state_ids, - prev_state_ids=prev_state_ids, - prev_group=input["prev_group"], - delta_ids=_decode_state_dict(input["delta_ids"]), - ) + context.state_group = input["state_group"] + context.prev_group = input["prev_group"] + context.delta_ids = _decode_state_dict(input["delta_ids"]) context.rejected = input["rejected"] context.prev_state_events = input["prev_state_events"] @@ -157,6 +173,61 @@ class EventContext(object): defer.returnValue(context) + @defer.inlineCallbacks + def get_current_state_ids(self, store): + """Gets the current state IDs + + Returns: + Deferred[dict[(str, str), str]|None]: Returns None if state_group + is None, which happens when the associated event is an outlier. + """ + + if not self._fetching_state_deferred: + self._fetching_state_deferred = run_in_background( + self._fill_out_state, store, + ) + + yield make_deferred_yieldable(self._fetching_state_deferred) + + defer.returnValue(self._current_state_ids) + + @defer.inlineCallbacks + def get_prev_state_ids(self, store): + """Gets the prev state IDs + + Returns: + Deferred[dict[(str, str), str]|None]: Returns None if state_group + is None, which happens when the associated event is an outlier. + """ + + if not self._fetching_state_deferred: + self._fetching_state_deferred = run_in_background( + self._fill_out_state, store, + ) + + yield make_deferred_yieldable(self._fetching_state_deferred) + + defer.returnValue(self._prev_state_ids) + + @defer.inlineCallbacks + def _fill_out_state(self, store): + """Called to populate the _current_state_ids and _prev_state_ids + attributes by loading from the database. + """ + if self.state_group is None: + return + + self._current_state_ids = yield store.get_state_ids_for_group( + self.state_group, + ) + if self._prev_state_id and self._event_state_key is not None: + self._prev_state_ids = dict(self._current_state_ids) + + key = (self._event_type, self._event_state_key) + self._prev_state_ids[key] = self._prev_state_id + else: + self._prev_state_ids = self._current_state_ids + def _encode_state_dict(state_dict): """Since dicts of (type, state_key) -> event_id cannot be serialized in diff --git a/synapse/state.py b/synapse/state.py index a70869500..32125c95d 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -219,7 +219,7 @@ class StateHandler(object): # We don't store state for outliers, so we don't generate a state # group for it. - context = EventContext( + context = EventContext.with_state( state_group=None, current_state_ids=current_state_ids, prev_state_ids=prev_state_ids, @@ -255,7 +255,7 @@ class StateHandler(object): current_state_ids=current_state_ids, ) - context = EventContext( + context = EventContext.with_state( state_group=state_group, current_state_ids=current_state_ids, prev_state_ids=prev_state_ids, @@ -322,7 +322,7 @@ class StateHandler(object): state_group = entry.state_group - context = EventContext( + context = EventContext.with_state( state_group=state_group, current_state_ids=current_state_ids, prev_state_ids=prev_state_ids, From e42510ba635b3e4d83215e4f5634ca51411996e0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 13:00:22 +0100 Subject: [PATCH 14/22] Use new getters --- synapse/api/auth.py | 6 ++++-- synapse/handlers/_base.py | 3 ++- synapse/handlers/federation.py | 23 ++++++++++++++------- synapse/handlers/message.py | 26 +++++++++++++++--------- synapse/handlers/room_member.py | 9 +++++--- synapse/push/bulk_push_rule_evaluator.py | 7 ++++--- synapse/storage/events.py | 2 +- synapse/storage/push_rule.py | 7 +++++-- synapse/storage/roommember.py | 7 +++++-- 9 files changed, 59 insertions(+), 31 deletions(-) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index bc629832d..535bdb449 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -65,8 +65,9 @@ class Auth(object): @defer.inlineCallbacks def check_from_context(self, event, context, do_sig_check=True): + prev_state_ids = yield context.get_prev_state_ids(self.store) auth_events_ids = yield self.compute_auth_events( - event, context.prev_state_ids, for_verification=True, + event, prev_state_ids, for_verification=True, ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { @@ -544,7 +545,8 @@ class Auth(object): @defer.inlineCallbacks def add_auth_events(self, builder, context): - auth_ids = yield self.compute_auth_events(builder, context.prev_state_ids) + prev_state_ids = yield context.get_prev_state_ids(self.store) + auth_ids = yield self.compute_auth_events(builder, prev_state_ids) auth_events_entries = yield self.store.add_event_hashes( auth_ids diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index b6a8b3aa3..704181d2d 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -112,8 +112,9 @@ class BaseHandler(object): guest_access = event.content.get("guest_access", "forbidden") if guest_access != "can_join": if context: + current_state_ids = yield context.get_current_state_ids(self.store) current_state = yield self.store.get_events( - list(context.current_state_ids.values()) + list(current_state_ids.values()) ) else: current_state = yield self.state_handler.get_current_state( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a6d391c4e..98dd4a7fd 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -486,7 +486,10 @@ class FederationHandler(BaseHandler): # joined the room. Don't bother if the user is just # changing their profile info. newly_joined = True - prev_state_id = context.prev_state_ids.get( + + prev_state_ids = yield context.get_prev_state_ids(self.store) + + prev_state_id = prev_state_ids.get( (event.type, event.state_key) ) if prev_state_id: @@ -1106,10 +1109,12 @@ class FederationHandler(BaseHandler): user = UserID.from_string(event.state_key) yield user_joined_room(self.distributor, user, event.room_id) - state_ids = list(context.prev_state_ids.values()) + prev_state_ids = yield context.get_prev_state_ids(self.store) + + state_ids = list(prev_state_ids.values()) auth_chain = yield self.store.get_auth_chain(state_ids) - state = yield self.store.get_events(list(context.prev_state_ids.values())) + state = yield self.store.get_events(list(prev_state_ids.values())) defer.returnValue({ "state": list(state.values()), @@ -1635,8 +1640,9 @@ class FederationHandler(BaseHandler): ) if not auth_events: + prev_state_ids = yield context.get_prev_state_ids(self.store) auth_events_ids = yield self.auth.compute_auth_events( - event, context.prev_state_ids, for_verification=True, + event, prev_state_ids, for_verification=True, ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { @@ -1876,9 +1882,10 @@ class FederationHandler(BaseHandler): break if do_resolution: + prev_state_ids = yield context.get_prev_state_ids(self.store) # 1. Get what we think is the auth chain. auth_ids = yield self.auth.compute_auth_events( - event, context.prev_state_ids + event, prev_state_ids ) local_auth_chain = yield self.store.get_auth_chain( auth_ids, include_given=True @@ -2222,7 +2229,8 @@ class FederationHandler(BaseHandler): event.content["third_party_invite"]["signed"]["token"] ) original_invite = None - original_invite_id = context.prev_state_ids.get(key) + prev_state_ids = yield context.get_prev_state_ids(self.store) + original_invite_id = prev_state_ids.get(key) if original_invite_id: original_invite = yield self.store.get_event( original_invite_id, allow_none=True @@ -2264,7 +2272,8 @@ class FederationHandler(BaseHandler): signed = event.content["third_party_invite"]["signed"] token = signed["token"] - invite_event_id = context.prev_state_ids.get( + prev_state_ids = yield context.get_prev_state_ids(self.store) + invite_event_id = prev_state_ids.get( (EventTypes.ThirdPartyInvite, token,) ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index abc07ea87..c4bcd9018 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -630,7 +630,8 @@ class EventCreationHandler(object): If so, returns the version of the event in context. Otherwise, returns None. """ - prev_event_id = context.prev_state_ids.get((event.type, event.state_key)) + prev_state_ids = yield context.get_prev_state_ids(self.store) + prev_event_id = prev_state_ids.get((event.type, event.state_key)) prev_event = yield self.store.get_event(prev_event_id, allow_none=True) if not prev_event: return @@ -752,8 +753,8 @@ class EventCreationHandler(object): event = builder.build() logger.debug( - "Created event %s with state: %s", - event.event_id, context.prev_state_ids, + "Created event %s", + event.event_id, ) defer.returnValue( @@ -884,9 +885,11 @@ class EventCreationHandler(object): e.sender == event.sender ) + current_state_ids = yield context.get_current_state_ids(self.store) + state_to_include_ids = [ e_id - for k, e_id in iteritems(context.current_state_ids) + for k, e_id in iteritems(current_state_ids) if k[0] in self.hs.config.room_invite_state_types or k == (EventTypes.Member, event.sender) ] @@ -922,8 +925,9 @@ class EventCreationHandler(object): ) if event.type == EventTypes.Redaction: + prev_state_ids = yield context.get_prev_state_ids(self.store) auth_events_ids = yield self.auth.compute_auth_events( - event, context.prev_state_ids, for_verification=True, + event, prev_state_ids, for_verification=True, ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { @@ -943,11 +947,13 @@ class EventCreationHandler(object): "You don't have permission to redact events" ) - if event.type == EventTypes.Create and context.prev_state_ids: - raise AuthError( - 403, - "Changing the room create event is forbidden", - ) + if event.type == EventTypes.Create: + prev_state_ids = yield context.get_prev_state_ids(self.store) + if prev_state_ids: + raise AuthError( + 403, + "Changing the room create event is forbidden", + ) (event_stream_id, max_stream_id) = yield self.store.persist_event( event, context=context diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 00f2e279b..a832d9180 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -201,7 +201,9 @@ class RoomMemberHandler(object): ratelimit=ratelimit, ) - prev_member_event_id = context.prev_state_ids.get( + prev_state_ids = yield context.get_prev_state_ids(self.store) + + prev_member_event_id = prev_state_ids.get( (EventTypes.Member, target.to_string()), None ) @@ -496,9 +498,10 @@ class RoomMemberHandler(object): if prev_event is not None: return + prev_state_ids = yield context.get_prev_state_ids(self.store) if event.membership == Membership.JOIN: if requester.is_guest: - guest_can_join = yield self._can_guest_join(context.prev_state_ids) + guest_can_join = yield self._can_guest_join(prev_state_ids) if not guest_can_join: # This should be an auth check, but guests are a local concept, # so don't really fit into the general auth process. @@ -517,7 +520,7 @@ class RoomMemberHandler(object): ratelimit=ratelimit, ) - prev_member_event_id = context.prev_state_ids.get( + prev_member_event_id = prev_state_ids.get( (EventTypes.Member, event.state_key), None ) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index bb181d94e..1d14d3639 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -112,7 +112,8 @@ class BulkPushRuleEvaluator(object): @defer.inlineCallbacks def _get_power_levels_and_sender_level(self, event, context): - pl_event_id = context.prev_state_ids.get(POWER_KEY) + prev_state_ids = yield context.get_prev_state_ids(self.store) + pl_event_id = prev_state_ids.get(POWER_KEY) if pl_event_id: # fastpath: if there's a power level event, that's all we need, and # not having a power level event is an extreme edge case @@ -120,7 +121,7 @@ class BulkPushRuleEvaluator(object): auth_events = {POWER_KEY: pl_event} else: auth_events_ids = yield self.auth.compute_auth_events( - event, context.prev_state_ids, for_verification=False, + event, prev_state_ids, for_verification=False, ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = { @@ -304,7 +305,7 @@ class RulesForRoom(object): push_rules_delta_state_cache_metric.inc_hits() else: - current_state_ids = context.current_state_ids + current_state_ids = yield context.get_current_state_ids(self.store) push_rules_delta_state_cache_metric.inc_misses() push_rules_state_size_counter.inc(len(current_state_ids)) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 4ff0fdc4a..bf4f3ee92 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -549,7 +549,7 @@ class EventsStore(EventsWorkerStore): if ctx.state_group in state_groups_map: continue - state_groups_map[ctx.state_group] = ctx.current_state_ids + state_groups_map[ctx.state_group] = yield ctx.get_current_state_ids(self) # We need to map the event_ids to their state groups. First, let's # check if the event is one we're persisting, in which case we can diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index be655d287..af564b1b4 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -186,6 +186,7 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore, defer.returnValue(results) + @defer.inlineCallbacks def bulk_get_push_rules_for_room(self, event, context): state_group = context.state_group if not state_group: @@ -195,9 +196,11 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore, # To do this we set the state_group to a new object as object() != object() state_group = object() - return self._bulk_get_push_rules_for_room( - event.room_id, state_group, context.current_state_ids, event=event + current_state_ids = yield context.get_current_state_ids(self) + result = yield self._bulk_get_push_rules_for_room( + event.room_id, state_group, current_state_ids, event=event ) + defer.returnValue(result) @cachedInlineCallbacks(num_args=2, cache_context=True) def _bulk_get_push_rules_for_room(self, room_id, state_group, current_state_ids, diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 02a802bed..a27702a7a 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -232,6 +232,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): defer.returnValue(user_who_share_room) + @defer.inlineCallbacks def get_joined_users_from_context(self, event, context): state_group = context.state_group if not state_group: @@ -241,11 +242,13 @@ class RoomMemberWorkerStore(EventsWorkerStore): # To do this we set the state_group to a new object as object() != object() state_group = object() - return self._get_joined_users_from_context( - event.room_id, state_group, context.current_state_ids, + current_state_ids = yield context.get_current_state_ids(self) + result = yield self._get_joined_users_from_context( + event.room_id, state_group, current_state_ids, event=event, context=context, ) + defer.returnValue(result) def get_joined_users_from_state(self, room_id, state_entry): state_group = state_entry.state_group From 027bc01a1bc254fe08140c6e91a9fb945b08486f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 13:02:09 +0100 Subject: [PATCH 15/22] Add support for updating state --- synapse/events/snapshot.py | 19 +++++++++++++++++++ synapse/handlers/federation.py | 32 +++++++++++++++++++++++--------- 2 files changed, 42 insertions(+), 9 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index f9568638a..b090751bf 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -228,6 +228,25 @@ class EventContext(object): else: self._prev_state_ids = self._current_state_ids + @defer.inlineCallbacks + def update_state(self, state_group, prev_state_ids, current_state_ids, + delta_ids): + """Replace the state in the context + """ + + # We need to make sure we wait for any ongoing fetching of state + # to complete so that the updated state doesn't get clobbered + if self._fetching_state_deferred: + yield make_deferred_yieldable(self._fetching_state_deferred) + + self.state_group = state_group + self._prev_state_ids = prev_state_ids + self._current_state_ids = current_state_ids + self.delta_ids = delta_ids + + # We need to ensure that that we've marked as having fetched the state + self._fetching_state_deferred = defer.succeed(None) + def _encode_state_dict(state_dict): """Since dicts of (type, state_key) -> event_id cannot be serialized in diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 98dd4a7fd..14654d59f 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1975,21 +1975,35 @@ class FederationHandler(BaseHandler): k: a.event_id for k, a in iteritems(auth_events) if k != event_key } - context.current_state_ids = dict(context.current_state_ids) - context.current_state_ids.update(state_updates) + current_state_ids = yield context.get_current_state_ids(self.store) + current_state_ids = dict(current_state_ids) + + current_state_ids.update(state_updates) + if context.delta_ids is not None: - context.delta_ids = dict(context.delta_ids) - context.delta_ids.update(state_updates) - context.prev_state_ids = dict(context.prev_state_ids) - context.prev_state_ids.update({ + delta_ids = dict(context.delta_ids) + delta_ids.update(state_updates) + + prev_state_ids = yield context.get_prev_state_ids(self.store) + prev_state_ids = dict(prev_state_ids) + + prev_state_ids.update({ k: a.event_id for k, a in iteritems(auth_events) }) - context.state_group = yield self.store.store_state_group( + + state_group = yield self.store.store_state_group( event.event_id, event.room_id, prev_group=context.prev_group, - delta_ids=context.delta_ids, - current_state_ids=context.current_state_ids, + delta_ids=delta_ids, + current_state_ids=current_state_ids, + ) + + yield context.update_state( + state_group=state_group, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + delta_ids=delta_ids, ) @defer.inlineCallbacks From f3182bb1d0ba496bef710a529c4cd7a99da72061 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 13:19:24 +0100 Subject: [PATCH 16/22] Newsfile --- changelog.d/3579.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3579.misc diff --git a/changelog.d/3579.misc b/changelog.d/3579.misc new file mode 100644 index 000000000..2374dc0c4 --- /dev/null +++ b/changelog.d/3579.misc @@ -0,0 +1 @@ +Lazily load state on master process when using workers to reduce DB consumption From 8fbe418777a62ea6dd5fd811d63e30c42589650b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 13:33:49 +0100 Subject: [PATCH 17/22] Fix unit tests --- .../replication/slave/storage/test_events.py | 8 ++-- tests/test_state.py | 47 ++++++++++++++----- 2 files changed, 40 insertions(+), 15 deletions(-) diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index cea01d93e..f5b47f5ec 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -222,9 +222,11 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): state_ids = { key: e.event_id for key, e in state.items() } - context = EventContext() - context.current_state_ids = state_ids - context.prev_state_ids = state_ids + context = EventContext.with_state( + state_group=None, + current_state_ids=state_ids, + prev_state_ids=state_ids + ) else: state_handler = self.hs.get_state_handler() context = yield state_handler.compute_event_context(event) diff --git a/tests/test_state.py b/tests/test_state.py index c0f2d1152..429a18cbf 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -204,7 +204,8 @@ class StateTestCase(unittest.TestCase): self.store.register_event_context(event, context) context_store[event.event_id] = context - self.assertEqual(2, len(context_store["D"].prev_state_ids)) + prev_state_ids = yield context_store["D"].get_prev_state_ids(self.store) + self.assertEqual(2, len(prev_state_ids)) @defer.inlineCallbacks def test_branch_basic_conflict(self): @@ -255,9 +256,11 @@ class StateTestCase(unittest.TestCase): self.store.register_event_context(event, context) context_store[event.event_id] = context + prev_state_ids = yield context_store["D"].get_prev_state_ids(self.store) + self.assertSetEqual( {"START", "A", "C"}, - {e_id for e_id in context_store["D"].prev_state_ids.values()} + {e_id for e_id in prev_state_ids.values()} ) @defer.inlineCallbacks @@ -318,9 +321,11 @@ class StateTestCase(unittest.TestCase): self.store.register_event_context(event, context) context_store[event.event_id] = context + prev_state_ids = yield context_store["E"].get_prev_state_ids(self.store) + self.assertSetEqual( {"START", "A", "B", "C"}, - {e for e in context_store["E"].prev_state_ids.values()} + {e for e in prev_state_ids.values()} ) @defer.inlineCallbacks @@ -398,9 +403,11 @@ class StateTestCase(unittest.TestCase): self.store.register_event_context(event, context) context_store[event.event_id] = context + prev_state_ids = yield context_store["D"].get_prev_state_ids(self.store) + self.assertSetEqual( {"A1", "A2", "A3", "A5", "B"}, - {e for e in context_store["D"].prev_state_ids.values()} + {e for e in prev_state_ids.values()} ) def _add_depths(self, nodes, edges): @@ -429,8 +436,10 @@ class StateTestCase(unittest.TestCase): event, old_state=old_state ) + current_state_ids = yield context.get_current_state_ids(self.store) + self.assertEqual( - set(e.event_id for e in old_state), set(context.current_state_ids.values()) + set(e.event_id for e in old_state), set(current_state_ids.values()) ) self.assertIsNotNone(context.state_group) @@ -449,8 +458,10 @@ class StateTestCase(unittest.TestCase): event, old_state=old_state ) + prev_state_ids = yield context.get_prev_state_ids(self.store) + self.assertEqual( - set(e.event_id for e in old_state), set(context.prev_state_ids.values()) + set(e.event_id for e in old_state), set(prev_state_ids.values()) ) @defer.inlineCallbacks @@ -475,9 +486,11 @@ class StateTestCase(unittest.TestCase): context = yield self.state.compute_event_context(event) + current_state_ids = yield context.get_current_state_ids(self.store) + self.assertEqual( set([e.event_id for e in old_state]), - set(context.current_state_ids.values()) + set(current_state_ids.values()) ) self.assertEqual(group_name, context.state_group) @@ -504,9 +517,11 @@ class StateTestCase(unittest.TestCase): context = yield self.state.compute_event_context(event) + prev_state_ids = yield context.get_prev_state_ids(self.store) + self.assertEqual( set([e.event_id for e in old_state]), - set(context.prev_state_ids.values()) + set(prev_state_ids.values()) ) self.assertIsNotNone(context.state_group) @@ -545,7 +560,9 @@ class StateTestCase(unittest.TestCase): event, prev_event_id1, old_state_1, prev_event_id2, old_state_2, ) - self.assertEqual(len(context.current_state_ids), 6) + current_state_ids = yield context.get_current_state_ids(self.store) + + self.assertEqual(len(current_state_ids), 6) self.assertIsNotNone(context.state_group) @@ -585,7 +602,9 @@ class StateTestCase(unittest.TestCase): event, prev_event_id1, old_state_1, prev_event_id2, old_state_2, ) - self.assertEqual(len(context.current_state_ids), 6) + current_state_ids = yield context.get_current_state_ids(self.store) + + self.assertEqual(len(current_state_ids), 6) self.assertIsNotNone(context.state_group) @@ -642,8 +661,10 @@ class StateTestCase(unittest.TestCase): event, prev_event_id1, old_state_1, prev_event_id2, old_state_2, ) + current_state_ids = yield context.get_current_state_ids(self.store) + self.assertEqual( - old_state_2[3].event_id, context.current_state_ids[("test1", "1")] + old_state_2[3].event_id, current_state_ids[("test1", "1")] ) # Reverse the depth to make sure we are actually using the depths @@ -670,8 +691,10 @@ class StateTestCase(unittest.TestCase): event, prev_event_id1, old_state_1, prev_event_id2, old_state_2, ) + current_state_ids = yield context.get_current_state_ids(self.store) + self.assertEqual( - old_state_1[3].event_id, context.current_state_ids[("test1", "1")] + old_state_1[3].event_id, current_state_ids[("test1", "1")] ) def _get_context(self, event, prev_event_id_1, old_state_1, prev_event_id_2, From 4797ed000e612b68c418ce8c342bdd7ecc16b198 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 15:05:40 +0100 Subject: [PATCH 18/22] Update docstrings to make sense --- synapse/events/snapshot.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index b090751bf..a6d7bf570 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -55,13 +55,16 @@ class EventContext(object): _fetching_state_deferred (Deferred|None): Resolves when *_state_ids have been calculated. None if we haven't started calculating yet - _prev_state_id (str|None): If set then the event associated with the - context overrode the _prev_state_id - - _event_type (str): The type of the event the context is associated with + _event_type (str): The type of the event the context is associated with. + Only set when state has not been fetched yet. _event_state_key (str|None): The state_key of the event the context is - associated with + associated with. Only set when state has not been fetched yet. + + _prev_state_id (str|None): If the event associated with the context is + a state event, then `_prev_state_id` is the event_id of the state + that was replaced. + Only set when state has not been fetched yet. """ __slots__ = [ From 999bcf9d016fb7fd9ad5a9daf4f0ec6d25a10717 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 15:24:21 +0100 Subject: [PATCH 19/22] Fix EventContext when using workers We were: 1. Not correctly setting all attributes 2. Using defer.inlineCallbacks in a non-generator --- synapse/events/snapshot.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index a6d7bf570..e31eceb92 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -82,6 +82,11 @@ class EventContext(object): "_fetching_state_deferred", ] + def __init__(self): + self.prev_state_events = [] + self.rejected = False + self.app_service = None + @staticmethod def with_state(state_group, current_state_ids, prev_state_ids, prev_group=None, delta_ids=None): @@ -103,11 +108,6 @@ class EventContext(object): context.prev_group = prev_group context.delta_ids = delta_ids - context.prev_state_events = [] - - context.rejected = False - context.app_service = None - return context def serialize(self, event): @@ -143,7 +143,6 @@ class EventContext(object): } @staticmethod - @defer.inlineCallbacks def deserialize(store, input): """Converts a dict that was produced by `serialize` back into a EventContext. @@ -162,6 +161,7 @@ class EventContext(object): context._prev_state_id = input["prev_state_id"] context._event_type = input["event_type"] context._event_state_key = input["event_state_key"] + context._fetching_state_deferred = None context.state_group = input["state_group"] context.prev_group = input["prev_group"] @@ -174,7 +174,7 @@ class EventContext(object): if app_service_id: context.app_service = store.get_app_service_by_id(app_service_id) - defer.returnValue(context) + return context @defer.inlineCallbacks def get_current_state_ids(self, store): From a4d24781bf2290311af8f901862d83134bbd6229 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 15:28:51 +0100 Subject: [PATCH 20/22] Newsfile --- changelog.d/3581.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3581.misc diff --git a/changelog.d/3581.misc b/changelog.d/3581.misc new file mode 100644 index 000000000..2374dc0c4 --- /dev/null +++ b/changelog.d/3581.misc @@ -0,0 +1 @@ +Lazily load state on master process when using workers to reduce DB consumption From 0faa3223cdf996aa18376a7420a43061a6691638 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 16:28:00 +0100 Subject: [PATCH 21/22] Fix missing attributes on workers. This was missed during the transition from attribute to getter for getting state from context. --- synapse/events/snapshot.py | 10 ++++++---- synapse/handlers/message.py | 5 +++-- synapse/replication/http/send_event.py | 7 +++++-- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index e31eceb92..a59064b41 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -110,7 +110,8 @@ class EventContext(object): return context - def serialize(self, event): + @defer.inlineCallbacks + def serialize(self, event, store): """Converts self to a type that can be serialized as JSON, and then deserialized by `deserialize` @@ -126,11 +127,12 @@ class EventContext(object): # the prev_state_ids, so if we're a state event we include the event # id that we replaced in the state. if event.is_state(): - prev_state_id = self.prev_state_ids.get((event.type, event.state_key)) + prev_state_ids = yield self.get_prev_state_ids(store) + prev_state_id = prev_state_ids.get((event.type, event.state_key)) else: prev_state_id = None - return { + defer.returnValue({ "prev_state_id": prev_state_id, "event_type": event.type, "event_state_key": event.state_key if event.is_state() else None, @@ -140,7 +142,7 @@ class EventContext(object): "delta_ids": _encode_state_dict(self.delta_ids), "prev_state_events": self.prev_state_events, "app_service_id": self.app_service.id if self.app_service else None - } + }) @staticmethod def deserialize(store, input): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c4bcd9018..7571975c2 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -807,8 +807,9 @@ class EventCreationHandler(object): # If we're a worker we need to hit out to the master. if self.config.worker_app: yield send_event_to_master( - self.hs.get_clock(), - self.http_client, + clock=self.hs.get_clock(), + store=self.store, + client=self.http_client, host=self.config.worker_replication_host, port=self.config.worker_replication_http_port, requester=requester, diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 2eede5479..5227bc333 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -34,12 +34,13 @@ logger = logging.getLogger(__name__) @defer.inlineCallbacks -def send_event_to_master(clock, client, host, port, requester, event, context, +def send_event_to_master(clock, store, client, host, port, requester, event, context, ratelimit, extra_users): """Send event to be handled on the master Args: clock (synapse.util.Clock) + store (DataStore) client (SimpleHttpClient) host (str): host of master port (int): port on master listening for HTTP replication @@ -53,11 +54,13 @@ def send_event_to_master(clock, client, host, port, requester, event, context, host, port, event.event_id, ) + serialized_context = yield context.serialize(event, store) + payload = { "event": event.get_pdu_json(), "internal_metadata": event.internal_metadata.get_dict(), "rejected_reason": event.rejected_reason, - "context": context.serialize(event), + "context": serialized_context, "requester": requester.serialize(), "ratelimit": ratelimit, "extra_users": [u.to_string() for u in extra_users], From 9f41ad491dd950058562e25045bc6ae3539ddc79 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Jul 2018 16:31:46 +0100 Subject: [PATCH 22/22] Newsfile --- changelog.d/3582.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/3582.misc diff --git a/changelog.d/3582.misc b/changelog.d/3582.misc new file mode 100644 index 000000000..2374dc0c4 --- /dev/null +++ b/changelog.d/3582.misc @@ -0,0 +1 @@ +Lazily load state on master process when using workers to reduce DB consumption