From 941f59101b51e9225dbdc38b22110a01de194242 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 2 Feb 2015 16:56:01 +0000 Subject: [PATCH 01/21] Don't fail an entire request if one of the returned events fails a signature check. If an event does fail a signature check, look in the local database and request it from the originator. --- synapse/federation/federation_client.py | 107 ++++++++++++++++++------ synapse/storage/__init__.py | 21 +++-- 2 files changed, 94 insertions(+), 34 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index e1539bd0e..b809e935a 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -224,17 +224,17 @@ class FederationClient(object): for p in result.get("auth_chain", []) ] - for i, pdu in enumerate(pdus): - pdus[i] = yield self._check_sigs_and_hash(pdu) + signed_pdus = yield self._check_sigs_and_hash_and_fetch( + pdus, outlier=True + ) - # FIXME: We should handle signature failures more gracefully. + signed_auth = yield self._check_sigs_and_hash_and_fetch( + auth_chain, outlier=True + ) - for i, pdu in enumerate(auth_chain): - auth_chain[i] = yield self._check_sigs_and_hash(pdu) + signed_auth.sort(key=lambda e: e.depth) - # FIXME: We should handle signature failures more gracefully. - - defer.returnValue((pdus, auth_chain)) + defer.returnValue((signed_pdus, signed_auth)) @defer.inlineCallbacks @log_function @@ -248,14 +248,13 @@ class FederationClient(object): for p in res["auth_chain"] ] - for i, pdu in enumerate(auth_chain): - auth_chain[i] = yield self._check_sigs_and_hash(pdu) + signed_auth = yield self._check_sigs_and_hash_and_fetch( + auth_chain, outlier=True + ) - # FIXME: We should handle signature failures more gracefully. + signed_auth.sort(key=lambda e: e.depth) - auth_chain.sort(key=lambda e: e.depth) - - defer.returnValue(auth_chain) + defer.returnValue(signed_auth) @defer.inlineCallbacks def make_join(self, destination, room_id, user_id): @@ -291,21 +290,19 @@ class FederationClient(object): for p in content.get("auth_chain", []) ] - for i, pdu in enumerate(state): - state[i] = yield self._check_sigs_and_hash(pdu) + signed_state = yield self._check_sigs_and_hash_and_fetch( + state, outlier=True + ) - # FIXME: We should handle signature failures more gracefully. - - for i, pdu in enumerate(auth_chain): - auth_chain[i] = yield self._check_sigs_and_hash(pdu) - - # FIXME: We should handle signature failures more gracefully. + signed_auth = yield self._check_sigs_and_hash_and_fetch( + auth_chain, outlier=True + ) auth_chain.sort(key=lambda e: e.depth) defer.returnValue({ - "state": state, - "auth_chain": auth_chain, + "state": signed_state, + "auth_chain": signed_auth, }) @defer.inlineCallbacks @@ -353,12 +350,18 @@ class FederationClient(object): ) auth_chain = [ - (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + self.event_from_pdu_json(e) for e in content["auth_chain"] ] + signed_auth = yield self._check_sigs_and_hash_and_fetch( + auth_chain, outlier=True + ) + + signed_auth.sort(key=lambda e: e.depth) + ret = { - "auth_chain": auth_chain, + "auth_chain": signed_auth, "rejects": content.get("rejects", []), "missing": content.get("missing", []), } @@ -374,6 +377,58 @@ class FederationClient(object): return event + @defer.inlineCallbacks + def _check_sigs_and_hash_and_fetch(self, pdus, outlier=False): + """Takes a list of PDUs and checks the signatures and hashs of each + one. If a PDU fails its signature check then we check if we have it in + the database and if not then request if from the originating server of + that PDU. + + If a PDU fails its content hash check then it is redacted. + + The given list of PDUs are not modified, instead the function returns + a new list. + + Args: + pdu (list) + outlier (bool) + + Returns: + Deferred : A list of PDUs that have valid signatures and hashes. + """ + signed_pdus = [] + for pdu in pdus: + try: + new_pdu = yield self._check_sigs_and_hash(pdu) + signed_pdus.append(new_pdu) + except SynapseError: + # FIXME: We should handle signature failures more gracefully. + + # Check local db. + new_pdu = yield self.store.get_event( + pdu.event_id, + allow_rejected=True + ) + if new_pdu: + signed_pdus.append(new_pdu) + continue + + # Check pdu.origin + new_pdu = yield self.get_pdu( + destinations=[pdu.origin], + event_id=pdu.event_id, + outlier=outlier, + ) + + if new_pdu: + signed_pdus.append(new_pdu) + continue + + logger.warn("Failed to find copy of %s with valid signature") + + defer.returnValue(signed_pdus) + + @defer.inlineCallbacks def _check_sigs_and_hash(self, pdu): """Throws a SynapseError if the PDU does not have the correct diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 7c54b1b9d..b4a7a3f06 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -128,16 +128,21 @@ class DataStore(RoomMemberStore, RoomStore, pass @defer.inlineCallbacks - def get_event(self, event_id, allow_none=False): - events = yield self._get_events([event_id]) + def get_event(self, event_id, check_redacted=True, + get_prev_content=False, allow_rejected=False, + allow_none=False): + event = yield self.runInteraction( + "get_event", self._get_event_txn, + event_id, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) - if not events: - if allow_none: - defer.returnValue(None) - else: - raise RuntimeError("Could not find event %s" % (event_id,)) + if not event and not allow_none: + raise RuntimeError("Could not find event %s" % (event_id,)) - defer.returnValue(events[0]) + defer.returnValue(event) @log_function def _persist_event_txn(self, txn, event, context, backfilled, From 40c6fe1b81e4d92cba797b0c966fd774e2a60a28 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 2 Feb 2015 17:06:37 +0000 Subject: [PATCH 02/21] Don't bother requesting PDUs with bad signatures from the same server --- synapse/federation/federation_client.py | 31 +++++++++++++------------ 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b809e935a..f87e84db7 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -225,11 +225,11 @@ class FederationClient(object): ] signed_pdus = yield self._check_sigs_and_hash_and_fetch( - pdus, outlier=True + destination, pdus, outlier=True ) signed_auth = yield self._check_sigs_and_hash_and_fetch( - auth_chain, outlier=True + destination, auth_chain, outlier=True ) signed_auth.sort(key=lambda e: e.depth) @@ -249,7 +249,7 @@ class FederationClient(object): ] signed_auth = yield self._check_sigs_and_hash_and_fetch( - auth_chain, outlier=True + destination, auth_chain, outlier=True ) signed_auth.sort(key=lambda e: e.depth) @@ -291,11 +291,11 @@ class FederationClient(object): ] signed_state = yield self._check_sigs_and_hash_and_fetch( - state, outlier=True + destination, state, outlier=True ) signed_auth = yield self._check_sigs_and_hash_and_fetch( - auth_chain, outlier=True + destination, auth_chain, outlier=True ) auth_chain.sort(key=lambda e: e.depth) @@ -355,7 +355,7 @@ class FederationClient(object): ] signed_auth = yield self._check_sigs_and_hash_and_fetch( - auth_chain, outlier=True + destination, auth_chain, outlier=True ) signed_auth.sort(key=lambda e: e.depth) @@ -378,7 +378,7 @@ class FederationClient(object): return event @defer.inlineCallbacks - def _check_sigs_and_hash_and_fetch(self, pdus, outlier=False): + def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False): """Takes a list of PDUs and checks the signatures and hashs of each one. If a PDU fails its signature check then we check if we have it in the database and if not then request if from the originating server of @@ -414,15 +414,16 @@ class FederationClient(object): continue # Check pdu.origin - new_pdu = yield self.get_pdu( - destinations=[pdu.origin], - event_id=pdu.event_id, - outlier=outlier, - ) + if pdu.origin != origin: + new_pdu = yield self.get_pdu( + destinations=[pdu.origin], + event_id=pdu.event_id, + outlier=outlier, + ) - if new_pdu: - signed_pdus.append(new_pdu) - continue + if new_pdu: + signed_pdus.append(new_pdu) + continue logger.warn("Failed to find copy of %s with valid signature") From e7ca813dd476c83497d4130ad8efa9424d86e921 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 10:38:14 +0000 Subject: [PATCH 03/21] Try to ensure we don't persist an event we have already persisted. In persist_event check if we already have the event, if so then update instead of replacing so that we don't cause a bump of the stream_ordering. --- synapse/handlers/federation.py | 42 ++++++++++++++++++++----------- synapse/storage/__init__.py | 40 ++++++++++++++++++++++++++--- tests/handlers/test_federation.py | 5 +++- 3 files changed, 68 insertions(+), 19 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8bf5a4cc1..c384789c2 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -112,6 +112,14 @@ class FederationHandler(BaseHandler): logger.debug("Event: %s", event) + event_ids = set() + if state: + event_ids += {e.event_id for e in state} + if auth_chain: + event_ids += {e.event_id for e in auth_chain} + + seen_ids = (yield self.store.have_events(event_ids)).keys() + # FIXME (erikj): Awful hack to make the case where we are not currently # in the room work current_state = None @@ -124,20 +132,26 @@ class FederationHandler(BaseHandler): current_state = state if state and auth_chain is not None: - for e in state: - e.internal_metadata.outlier = True - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - yield self._handle_new_event(origin, e, auth_events=auth) - except: - logger.exception( - "Failed to handle state event %s", - e.event_id, - ) + for list_of_pdus in [auth_chain, state]: + for e in list_of_pdus: + if e.event_id in seen_ids: + continue + + e.internal_metadata.outlier = True + try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + yield self._handle_new_event( + origin, e, auth_events=auth + ) + except: + logger.exception( + "Failed to handle state event %s", + e.event_id, + ) try: yield self._handle_new_event( diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index b4a7a3f06..93aefe0c4 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -161,6 +161,39 @@ class DataStore(RoomMemberStore, RoomStore, outlier = event.internal_metadata.is_outlier() + have_persisted = self._simple_select_one_onecol_txn( + txn, + table="event_json", + keyvalues={"event_id": event.event_id}, + retcol="event_id", + allow_none=True, + ) + + metadata_json = encode_canonical_json( + event.internal_metadata.get_dict() + ) + + if have_persisted: + if not outlier: + sql = ( + "UPDATE event_json SET internal_metadata = ?" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (metadata_json.decode("UTF-8"), event.event_id,) + ) + + sql = ( + "UPDATE events SET outlier = 0" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (event.event_id,) + ) + return + event_dict = { k: v for k, v in event.get_dict().items() @@ -170,10 +203,6 @@ class DataStore(RoomMemberStore, RoomStore, ] } - metadata_json = encode_canonical_json( - event.internal_metadata.get_dict() - ) - self._simple_insert_txn( txn, table="event_json", @@ -482,6 +511,9 @@ class DataStore(RoomMemberStore, RoomStore, the rejected reason string if we rejected the event, else maps to None. """ + if not event_ids: + return defer.succeed({}) + def f(txn): sql = ( "SELECT e.event_id, reason FROM events as e " diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 44dbce6be..427048113 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -91,7 +91,10 @@ class FederationTestCase(unittest.TestCase): self.datastore.persist_event.return_value = defer.succeed(None) self.datastore.get_room.return_value = defer.succeed(True) self.auth.check_host_in_room.return_value = defer.succeed(True) - self.datastore.have_events.return_value = defer.succeed({}) + + def have_events(event_ids): + return defer.succeed({}) + self.datastore.have_events.side_effect = have_events def annotate(ev, old_state=None): context = Mock() From 51969f9e5f4774e4d0ddcc2e8ebcf96803cbf295 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 10:40:14 +0000 Subject: [PATCH 04/21] Return rejected events if asked for it over federation. --- synapse/handlers/federation.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c384789c2..0161fbe49 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -632,6 +632,7 @@ class FederationHandler(BaseHandler): event = yield self.store.get_event( event_id, allow_none=True, + allow_rejected=True, ) if event: From 0f48e22ef66ff8a34d4af13c25e20a461c8a8390 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 10:43:29 +0000 Subject: [PATCH 05/21] PEP8 --- synapse/federation/federation_client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index f87e84db7..9ceb66e6f 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -429,7 +429,6 @@ class FederationClient(object): defer.returnValue(signed_pdus) - @defer.inlineCallbacks def _check_sigs_and_hash(self, pdu): """Throws a SynapseError if the PDU does not have the correct From 4ff2273b300c0a40fcf054a613aac24c51af22f1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 11:23:26 +0000 Subject: [PATCH 06/21] Add FIXME note. --- synapse/handlers/federation.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0161fbe49..322c1b407 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -716,6 +716,8 @@ class FederationHandler(BaseHandler): context.rejected = RejectedReason.AUTH_ERROR + # FIXME: Don't store as rejected with AUTH_ERROR if we haven't + # seen all the auth events. yield self.store.persist_event( event, context=context, From 06c34bfbaee553b851e0f02d8e17d5bff7360376 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 11:23:44 +0000 Subject: [PATCH 07/21] Give exception better message --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 322c1b407..9d7557f57 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1000,7 +1000,7 @@ class FederationHandler(BaseHandler): if reason is None: # FIXME: ERRR?! logger.warn("Could not find reason for %s", e.event_id) - raise RuntimeError("") + raise RuntimeError("Could not find reason for %s" % e.event_id) reason_map[e.event_id] = reason From fed29251d70b050ea5799708b6b13be9617d1f6d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 13:23:58 +0000 Subject: [PATCH 08/21] Spelling --- synapse/handlers/federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 9d7557f57..b9b2e25d1 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -767,7 +767,7 @@ class FederationHandler(BaseHandler): ) ) - logger.debug("on_query_auth reutrning: %s", ret) + logger.debug("on_query_auth returning: %s", ret) defer.returnValue(ret) From 77a076bd25fc355c49251bcf489c0511c0f0f0af Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 13:35:17 +0000 Subject: [PATCH 09/21] Set combinations is | and not + --- synapse/handlers/federation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b9b2e25d1..653ab0dbf 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -114,9 +114,9 @@ class FederationHandler(BaseHandler): event_ids = set() if state: - event_ids += {e.event_id for e in state} + event_ids |= {e.event_id for e in state} if auth_chain: - event_ids += {e.event_id for e in auth_chain} + event_ids |= {e.event_id for e in auth_chain} seen_ids = (yield self.store.have_events(event_ids)).keys() From 6efd4d1649a539ba4bf1c884ebb90a48c2d1c8df Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 13:57:54 +0000 Subject: [PATCH 10/21] Don't completely die if get auth_chain or querying auth_chain requests fail --- synapse/handlers/federation.py | 135 ++++++++++++++++++--------------- 1 file changed, 72 insertions(+), 63 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 653ab0dbf..6727155c3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -787,41 +787,45 @@ class FederationHandler(BaseHandler): if missing_auth: logger.debug("Missing auth: %s", missing_auth) # If we don't have all the auth events, we need to get them. - remote_auth_chain = yield self.replication_layer.get_event_auth( - origin, event.room_id, event.event_id - ) + try: + remote_auth_chain = yield self.replication_layer.get_event_auth( + origin, event.room_id, event.event_id + ) - seen_remotes = yield self.store.have_events( - [e.event_id for e in remote_auth_chain] - ) + seen_remotes = yield self.store.have_events( + [e.event_id for e in remote_auth_chain] + ) - for e in remote_auth_chain: - if e.event_id in seen_remotes.keys(): - continue + for e in remote_auth_chain: + if e.event_id in seen_remotes.keys(): + continue - if e.event_id == event.event_id: - continue + if e.event_id == event.event_id: + continue - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { - (e.type, e.state_key): e for e in remote_auth_chain - if e.event_id in auth_ids - } - e.internal_metadata.outlier = True + try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in remote_auth_chain + if e.event_id in auth_ids + } + e.internal_metadata.outlier = True - logger.debug( - "do_auth %s missing_auth: %s", - event.event_id, e.event_id - ) - yield self._handle_new_event( - origin, e, auth_events=auth - ) + logger.debug( + "do_auth %s missing_auth: %s", + event.event_id, e.event_id + ) + yield self._handle_new_event( + origin, e, auth_events=auth + ) - if e.event_id in event_auth_events: - auth_events[(e.type, e.state_key)] = e - except AuthError: - pass + if e.event_id in event_auth_events: + auth_events[(e.type, e.state_key)] = e + except AuthError: + pass + except: + # FIXME: + logger.exception("Failed to get auth chain") # FIXME: Assumes we have and stored all the state for all the # prev_events @@ -836,47 +840,52 @@ class FederationHandler(BaseHandler): auth_ids = self.auth.compute_auth_events(event, context) local_auth_chain = yield self.store.get_auth_chain(auth_ids) - # 2. Get remote difference. - result = yield self.replication_layer.query_auth( - origin, - event.room_id, - event.event_id, - local_auth_chain, - ) + try: + # 2. Get remote difference. + result = yield self.replication_layer.query_auth( + origin, + event.room_id, + event.event_id, + local_auth_chain, + ) - seen_remotes = yield self.store.have_events( - [e.event_id for e in result["auth_chain"]] - ) + seen_remotes = yield self.store.have_events( + [e.event_id for e in result["auth_chain"]] + ) - # 3. Process any remote auth chain events we haven't seen. - for ev in result["auth_chain"]: - if ev.event_id in seen_remotes.keys(): - continue + # 3. Process any remote auth chain events we haven't seen. + for ev in result["auth_chain"]: + if ev.event_id in seen_remotes.keys(): + continue - if ev.event_id == event.event_id: - continue + if ev.event_id == event.event_id: + continue - try: - auth_ids = [e_id for e_id, _ in ev.auth_events] - auth = { - (e.type, e.state_key): e for e in result["auth_chain"] - if e.event_id in auth_ids - } - ev.internal_metadata.outlier = True + try: + auth_ids = [e_id for e_id, _ in ev.auth_events] + auth = { + (e.type, e.state_key): e for e in result["auth_chain"] + if e.event_id in auth_ids + } + ev.internal_metadata.outlier = True - logger.debug( - "do_auth %s different_auth: %s", - event.event_id, e.event_id - ) + logger.debug( + "do_auth %s different_auth: %s", + event.event_id, e.event_id + ) - yield self._handle_new_event( - origin, ev, auth_events=auth - ) + yield self._handle_new_event( + origin, ev, auth_events=auth + ) - if ev.event_id in event_auth_events: - auth_events[(ev.type, ev.state_key)] = ev - except AuthError: - pass + if ev.event_id in event_auth_events: + auth_events[(ev.type, ev.state_key)] = ev + except AuthError: + pass + + except: + # FIXME: + logger.exception("Failed to query auth chain") # 4. Look at rejects and their proofs. # TODO. From 0dd3aea319c13e66eb1d75b5b8a196032ee332b7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 14:58:30 +0000 Subject: [PATCH 11/21] Keep around the old (buggy) version of the prune_event function so that we can use it to check signatures for events on old servers --- synapse/api/auth.py | 2 - synapse/events/utils.py | 79 ++++++++++++++++++++ synapse/federation/federation_client.py | 96 +------------------------ synapse/federation/federation_server.py | 52 +++----------- 4 files changed, 92 insertions(+), 137 deletions(-) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 37e31d2b6..3471afd7e 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -102,8 +102,6 @@ class Auth(object): def check_host_in_room(self, room_id, host): curr_state = yield self.state.get_current_state(room_id) - logger.debug("Got curr_state %s", curr_state) - for event in curr_state: if event.type == EventTypes.Member: try: diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 1aa952150..65a9f7098 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -94,6 +94,85 @@ def prune_event(event): ) +def old_prune_event(event): + """This is an old and buggy version of the prune event function. The + difference between this and the new version is that when including dicts + in the content they were included as frozen_dicts rather than dicts. This + caused the JSON encoder to encode as a list of the keys rather than the + dict. + """ + event_type = event.type + + allowed_keys = [ + "event_id", + "sender", + "room_id", + "hashes", + "signatures", + "content", + "type", + "state_key", + "depth", + "prev_events", + "prev_state", + "auth_events", + "origin", + "origin_server_ts", + "membership", + ] + + event_dict = event.get_dict() + + new_content = {} + + def add_fields(*fields): + for field in fields: + if field in event.content: + # This is the line that is buggy: event.content may return + # a frozen_dict which the json encoders encode as lists rather + # than dicts. + new_content[field] = event.content[field] + + if event_type == EventTypes.Member: + add_fields("membership") + elif event_type == EventTypes.Create: + add_fields("creator") + elif event_type == EventTypes.JoinRules: + add_fields("join_rule") + elif event_type == EventTypes.PowerLevels: + add_fields( + "users", + "users_default", + "events", + "events_default", + "events_default", + "state_default", + "ban", + "kick", + "redact", + ) + elif event_type == EventTypes.Aliases: + add_fields("aliases") + + allowed_fields = { + k: v + for k, v in event_dict.items() + if k in allowed_keys + } + + allowed_fields["content"] = new_content + + allowed_fields["unsigned"] = {} + + if "age_ts" in event.unsigned: + allowed_fields["unsigned"]["age_ts"] = event.unsigned["age_ts"] + + return type(event)( + allowed_fields, + internal_metadata_dict=event.internal_metadata.get_dict() + ) + + def format_event_raw(d): return d diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 9ceb66e6f..5fac62970 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -16,17 +16,11 @@ from twisted.internet import defer +from .federation_base import FederationBase from .units import Edu from synapse.util.logutils import log_function from synapse.events import FrozenEvent -from synapse.events.utils import prune_event - -from syutil.jsonutil import encode_canonical_json - -from synapse.crypto.event_signing import check_event_content_hash - -from synapse.api.errors import SynapseError import logging @@ -34,7 +28,7 @@ import logging logger = logging.getLogger(__name__) -class FederationClient(object): +class FederationClient(FederationBase): @log_function def send_pdu(self, pdu, destinations): """Informs the replication layer about a new PDU generated within the @@ -376,89 +370,3 @@ class FederationClient(object): event.internal_metadata.outlier = outlier return event - - @defer.inlineCallbacks - def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False): - """Takes a list of PDUs and checks the signatures and hashs of each - one. If a PDU fails its signature check then we check if we have it in - the database and if not then request if from the originating server of - that PDU. - - If a PDU fails its content hash check then it is redacted. - - The given list of PDUs are not modified, instead the function returns - a new list. - - Args: - pdu (list) - outlier (bool) - - Returns: - Deferred : A list of PDUs that have valid signatures and hashes. - """ - signed_pdus = [] - for pdu in pdus: - try: - new_pdu = yield self._check_sigs_and_hash(pdu) - signed_pdus.append(new_pdu) - except SynapseError: - # FIXME: We should handle signature failures more gracefully. - - # Check local db. - new_pdu = yield self.store.get_event( - pdu.event_id, - allow_rejected=True - ) - if new_pdu: - signed_pdus.append(new_pdu) - continue - - # Check pdu.origin - if pdu.origin != origin: - new_pdu = yield self.get_pdu( - destinations=[pdu.origin], - event_id=pdu.event_id, - outlier=outlier, - ) - - if new_pdu: - signed_pdus.append(new_pdu) - continue - - logger.warn("Failed to find copy of %s with valid signature") - - defer.returnValue(signed_pdus) - - @defer.inlineCallbacks - def _check_sigs_and_hash(self, pdu): - """Throws a SynapseError if the PDU does not have the correct - signatures. - - Returns: - FrozenEvent: Either the given event or it redacted if it failed the - content hash check. - """ - # Check signatures are correct. - redacted_event = prune_event(pdu) - redacted_pdu_json = redacted_event.get_pdu_json() - - try: - yield self.keyring.verify_json_for_server( - pdu.origin, redacted_pdu_json - ) - except SynapseError: - logger.warn( - "Signature check failed for %s redacted to %s", - encode_canonical_json(pdu.get_pdu_json()), - encode_canonical_json(redacted_pdu_json), - ) - raise - - if not check_event_content_hash(pdu): - logger.warn( - "Event content has been tampered, redacting %s, %s", - pdu.event_id, encode_canonical_json(pdu.get_dict()) - ) - defer.returnValue(redacted_event) - - defer.returnValue(pdu) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 5fbd8b19d..97dca3f84 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -16,6 +16,7 @@ from twisted.internet import defer +from .federation_base import FederationBase from .units import Transaction, Edu from synapse.util.logutils import log_function @@ -35,7 +36,7 @@ import logging logger = logging.getLogger(__name__) -class FederationServer(object): +class FederationServer(FederationBase): def set_handler(self, handler): """Sets the handler that the replication layer will use to communicate receipt of new PDUs from other home servers. The required methods are @@ -251,17 +252,20 @@ class FederationServer(object): Deferred: Results in `dict` with the same format as `content` """ auth_chain = [ - (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + self.event_from_pdu_json(e) for e in content["auth_chain"] ] - missing = [ - (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) - for e in content.get("missing", []) - ] + signed_auth = yield self._check_sigs_and_hash_and_fetch( + origin, auth_chain, outlier=True + ) ret = yield self.handler.on_query_auth( - origin, event_id, auth_chain, content.get("rejects", []), missing + origin, + event_id, + signed_auth, + content.get("rejects", []), + content.get("missing", []), ) time_now = self._clock.time_msec() @@ -426,37 +430,3 @@ class FederationServer(object): event.internal_metadata.outlier = outlier return event - - @defer.inlineCallbacks - def _check_sigs_and_hash(self, pdu): - """Throws a SynapseError if the PDU does not have the correct - signatures. - - Returns: - FrozenEvent: Either the given event or it redacted if it failed the - content hash check. - """ - # Check signatures are correct. - redacted_event = prune_event(pdu) - redacted_pdu_json = redacted_event.get_pdu_json() - - try: - yield self.keyring.verify_json_for_server( - pdu.origin, redacted_pdu_json - ) - except SynapseError: - logger.warn( - "Signature check failed for %s redacted to %s", - encode_canonical_json(pdu.get_pdu_json()), - encode_canonical_json(redacted_pdu_json), - ) - raise - - if not check_event_content_hash(pdu): - logger.warn( - "Event content has been tampered, redacting %s, %s", - pdu.event_id, encode_canonical_json(pdu.get_dict()) - ) - defer.returnValue(redacted_event) - - defer.returnValue(pdu) From 7b810e136ef2040fe55a778d4a4a790cdbd0f84c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 15:00:42 +0000 Subject: [PATCH 12/21] Add new FederationBase --- synapse/federation/federation_base.py | 126 ++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 synapse/federation/federation_base.py diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py new file mode 100644 index 000000000..d26a2396a --- /dev/null +++ b/synapse/federation/federation_base.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from twisted.internet import defer + +from synapse.events.utils import prune_event, old_prune_event + +from syutil.jsonutil import encode_canonical_json + +from synapse.crypto.event_signing import check_event_content_hash + +from synapse.api.errors import SynapseError + +import logging + + +logger = logging.getLogger(__name__) + + +class FederationBase(object): + @defer.inlineCallbacks + def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False): + """Takes a list of PDUs and checks the signatures and hashs of each + one. If a PDU fails its signature check then we check if we have it in + the database and if not then request if from the originating server of + that PDU. + + If a PDU fails its content hash check then it is redacted. + + The given list of PDUs are not modified, instead the function returns + a new list. + + Args: + pdu (list) + outlier (bool) + + Returns: + Deferred : A list of PDUs that have valid signatures and hashes. + """ + signed_pdus = [] + for pdu in pdus: + try: + new_pdu = yield self._check_sigs_and_hash(pdu) + signed_pdus.append(new_pdu) + except SynapseError: + # FIXME: We should handle signature failures more gracefully. + + # Check local db. + new_pdu = yield self.store.get_event( + pdu.event_id, + allow_rejected=True + ) + if new_pdu: + signed_pdus.append(new_pdu) + continue + + # Check pdu.origin + if pdu.origin != origin: + new_pdu = yield self.get_pdu( + destinations=[pdu.origin], + event_id=pdu.event_id, + outlier=outlier, + ) + + if new_pdu: + signed_pdus.append(new_pdu) + continue + + logger.warn("Failed to find copy of %s with valid signature") + + defer.returnValue(signed_pdus) + + @defer.inlineCallbacks + def _check_sigs_and_hash(self, pdu): + """Throws a SynapseError if the PDU does not have the correct + signatures. + + Returns: + FrozenEvent: Either the given event or it redacted if it failed the + content hash check. + """ + # Check signatures are correct. + redacted_event = prune_event(pdu) + redacted_pdu_json = redacted_event.get_pdu_json() + + old_redacted = old_prune_event(pdu) + old_redacted_pdu_json = old_redacted.get_pdu_json() + + try: + try: + yield self.keyring.verify_json_for_server( + pdu.origin, old_redacted_pdu_json + ) + except SynapseError: + yield self.keyring.verify_json_for_server( + pdu.origin, redacted_pdu_json + ) + except SynapseError: + logger.warn( + "Signature check failed for %s redacted to %s", + encode_canonical_json(pdu.get_pdu_json()), + encode_canonical_json(redacted_pdu_json), + ) + raise + + if not check_event_content_hash(pdu): + logger.warn( + "Event content has been tampered, redacting %s, %s", + pdu.event_id, encode_canonical_json(pdu.get_dict()) + ) + defer.returnValue(redacted_event) + + defer.returnValue(pdu) \ No newline at end of file From 8dae5c81085458a3a0b3dc92278e8c317d5de204 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 15:01:12 +0000 Subject: [PATCH 13/21] Remove unused imports --- synapse/federation/federation_server.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 97dca3f84..4742ca939 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -22,11 +22,6 @@ from .units import Transaction, Edu from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext from synapse.events import FrozenEvent -from synapse.events.utils import prune_event - -from syutil.jsonutil import encode_canonical_json - -from synapse.crypto.event_signing import check_event_content_hash from synapse.api.errors import FederationError, SynapseError From 9bace3a36751deed141225ccabd5bebecebc25f3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 15:32:17 +0000 Subject: [PATCH 14/21] Actually, the old prune_event function was non-deterministic, so no point keeping it around :( --- synapse/events/utils.py | 79 --------------------------- synapse/federation/federation_base.py | 16 ++---- 2 files changed, 4 insertions(+), 91 deletions(-) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 65a9f7098..1aa952150 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -94,85 +94,6 @@ def prune_event(event): ) -def old_prune_event(event): - """This is an old and buggy version of the prune event function. The - difference between this and the new version is that when including dicts - in the content they were included as frozen_dicts rather than dicts. This - caused the JSON encoder to encode as a list of the keys rather than the - dict. - """ - event_type = event.type - - allowed_keys = [ - "event_id", - "sender", - "room_id", - "hashes", - "signatures", - "content", - "type", - "state_key", - "depth", - "prev_events", - "prev_state", - "auth_events", - "origin", - "origin_server_ts", - "membership", - ] - - event_dict = event.get_dict() - - new_content = {} - - def add_fields(*fields): - for field in fields: - if field in event.content: - # This is the line that is buggy: event.content may return - # a frozen_dict which the json encoders encode as lists rather - # than dicts. - new_content[field] = event.content[field] - - if event_type == EventTypes.Member: - add_fields("membership") - elif event_type == EventTypes.Create: - add_fields("creator") - elif event_type == EventTypes.JoinRules: - add_fields("join_rule") - elif event_type == EventTypes.PowerLevels: - add_fields( - "users", - "users_default", - "events", - "events_default", - "events_default", - "state_default", - "ban", - "kick", - "redact", - ) - elif event_type == EventTypes.Aliases: - add_fields("aliases") - - allowed_fields = { - k: v - for k, v in event_dict.items() - if k in allowed_keys - } - - allowed_fields["content"] = new_content - - allowed_fields["unsigned"] = {} - - if "age_ts" in event.unsigned: - allowed_fields["unsigned"]["age_ts"] = event.unsigned["age_ts"] - - return type(event)( - allowed_fields, - internal_metadata_dict=event.internal_metadata.get_dict() - ) - - def format_event_raw(d): return d diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index d26a2396a..27c5918e0 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -16,7 +16,7 @@ from twisted.internet import defer -from synapse.events.utils import prune_event, old_prune_event +from synapse.events.utils import prune_event from syutil.jsonutil import encode_canonical_json @@ -96,18 +96,10 @@ class FederationBase(object): redacted_event = prune_event(pdu) redacted_pdu_json = redacted_event.get_pdu_json() - old_redacted = old_prune_event(pdu) - old_redacted_pdu_json = old_redacted.get_pdu_json() - try: - try: - yield self.keyring.verify_json_for_server( - pdu.origin, old_redacted_pdu_json - ) - except SynapseError: - yield self.keyring.verify_json_for_server( - pdu.origin, redacted_pdu_json - ) + yield self.keyring.verify_json_for_server( + pdu.origin, redacted_pdu_json + ) except SynapseError: logger.warn( "Signature check failed for %s redacted to %s", From 7dd1c5c542d58464085b11ababe746c6a60515ec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 16:12:04 +0000 Subject: [PATCH 15/21] Neaten the handling of state and auth_chain up a bit --- synapse/handlers/federation.py | 59 ++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 28 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 6727155c3..86953bf8c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -30,6 +30,7 @@ from synapse.types import UserID from twisted.internet import defer +import itertools import logging @@ -112,14 +113,6 @@ class FederationHandler(BaseHandler): logger.debug("Event: %s", event) - event_ids = set() - if state: - event_ids |= {e.event_id for e in state} - if auth_chain: - event_ids |= {e.event_id for e in auth_chain} - - seen_ids = (yield self.store.have_events(event_ids)).keys() - # FIXME (erikj): Awful hack to make the case where we are not currently # in the room work current_state = None @@ -131,27 +124,37 @@ class FederationHandler(BaseHandler): logger.debug("Got event for room we're not in.") current_state = state - if state and auth_chain is not None: - for list_of_pdus in [auth_chain, state]: - for e in list_of_pdus: - if e.event_id in seen_ids: - continue + event_ids = set() + if state: + event_ids |= {e.event_id for e in state} + if auth_chain: + event_ids |= {e.event_id for e in auth_chain} - e.internal_metadata.outlier = True - try: - auth_ids = [e_id for e_id, _ in e.auth_events] - auth = { - (e.type, e.state_key): e for e in auth_chain - if e.event_id in auth_ids - } - yield self._handle_new_event( - origin, e, auth_events=auth - ) - except: - logger.exception( - "Failed to handle state event %s", - e.event_id, - ) + seen_ids = (yield self.store.have_events(event_ids)).keys() + + if state and auth_chain is not None: + # If we have any state or auth_chain given to us by the replication + # layer, then we should handle them (if we haven't before.) + for e in itertools.chain(auth_chain, state): + if e.event_id in seen_ids: + continue + + e.internal_metadata.outlier = True + try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + yield self._handle_new_event( + origin, e, auth_events=auth + ) + seen_ids.add(e.event_id) + except: + logger.exception( + "Failed to handle state event %s", + e.event_id, + ) try: yield self._handle_new_event( From 3c39f42a0526186f85e69988504fff6adbf41d91 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 16:14:19 +0000 Subject: [PATCH 16/21] New line --- synapse/federation/federation_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 27c5918e0..a990aec4f 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -115,4 +115,4 @@ class FederationBase(object): ) defer.returnValue(redacted_event) - defer.returnValue(pdu) \ No newline at end of file + defer.returnValue(pdu) From 02be8da5e11d9abcfc962f962bbc4e9940b69199 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 17:34:07 +0000 Subject: [PATCH 17/21] Add doc to get_event --- synapse/storage/__init__.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 93aefe0c4..93ab26fcd 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -131,6 +131,21 @@ class DataStore(RoomMemberStore, RoomStore, def get_event(self, event_id, check_redacted=True, get_prev_content=False, allow_rejected=False, allow_none=False): + """Get an event from the database by event_id. + + Args: + event_id (str): The event_id of the event to fetch + check_redacted (bool): If True, check if event has been redacted + and redact it. + get_prev_content (bool): If True and event is a state event, + include the previous states content in the unsigned field. + allow_rejected (bool): If True return rejected events. + allow_none (bool): If True, return None if no event found, if + False throw an exception. + + Returns: + Deferred : A FrozenEvent. + """ event = yield self.runInteraction( "get_event", self._get_event_txn, event_id, From c0462dbf1533f285f632dcb0a74c0ef0c3e2475b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Feb 2015 10:16:51 +0000 Subject: [PATCH 18/21] Rearrange persist_event so that do all the queries that need to be done before returning early if we have already persisted that event. --- synapse/events/__init__.py | 2 +- synapse/handlers/federation.py | 2 + synapse/storage/__init__.py | 145 +++++++++++++++++---------------- 3 files changed, 77 insertions(+), 72 deletions(-) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index bf0795102..8f0c6e959 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -77,7 +77,7 @@ class EventBase(object): return self.content["membership"] def is_state(self): - return hasattr(self, "state_key") + return hasattr(self, "state_key") and self.state_key is not None def get_dict(self): d = dict(self._event_dict) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 86953bf8c..0876589e3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -515,6 +515,8 @@ class FederationHandler(BaseHandler): "Failed to get destination from event %s", s.event_id ) + destinations.remove(origin) + logger.debug( "on_send_join_request: Sending event: %s, signatures: %s", event.event_id, diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 93ab26fcd..30ce37890 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -163,19 +163,70 @@ class DataStore(RoomMemberStore, RoomStore, def _persist_event_txn(self, txn, event, context, backfilled, stream_ordering=None, is_new_state=True, current_state=None): - if event.type == EventTypes.Member: - self._store_room_member_txn(txn, event) - elif event.type == EventTypes.Feedback: - self._store_feedback_txn(txn, event) - elif event.type == EventTypes.Name: - self._store_room_name_txn(txn, event) - elif event.type == EventTypes.Topic: - self._store_room_topic_txn(txn, event) - elif event.type == EventTypes.Redaction: - self._store_redaction(txn, event) + + # We purposefully do this first since if we include a `current_state` + # key, we *want* to update the `current_state_events` table + if current_state: + txn.execute( + "DELETE FROM current_state_events WHERE room_id = ?", + (event.room_id,) + ) + + for s in current_state: + self._simple_insert_txn( + txn, + "current_state_events", + { + "event_id": s.event_id, + "room_id": s.room_id, + "type": s.type, + "state_key": s.state_key, + }, + or_replace=True, + ) + + if event.is_state() and is_new_state: + if not backfilled and not context.rejected: + self._simple_insert_txn( + txn, + table="state_forward_extremities", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + }, + or_replace=True, + ) + + for prev_state_id, _ in event.prev_state: + self._simple_delete_txn( + txn, + table="state_forward_extremities", + keyvalues={ + "event_id": prev_state_id, + } + ) outlier = event.internal_metadata.is_outlier() + if not outlier: + self._store_state_groups_txn(txn, event, context) + + self._update_min_depth_for_room_txn( + txn, + event.room_id, + event.depth + ) + + self._handle_prev_events( + txn, + outlier=outlier, + event_id=event.event_id, + prev_events=event.prev_events, + room_id=event.room_id, + ) + have_persisted = self._simple_select_one_onecol_txn( txn, table="event_json", @@ -209,6 +260,17 @@ class DataStore(RoomMemberStore, RoomStore, ) return + if event.type == EventTypes.Member: + self._store_room_member_txn(txn, event) + elif event.type == EventTypes.Feedback: + self._store_feedback_txn(txn, event) + elif event.type == EventTypes.Name: + self._store_room_name_txn(txn, event) + elif event.type == EventTypes.Topic: + self._store_room_topic_txn(txn, event) + elif event.type == EventTypes.Redaction: + self._store_redaction(txn, event) + event_dict = { k: v for k, v in event.get_dict().items() @@ -273,41 +335,10 @@ class DataStore(RoomMemberStore, RoomStore, ) raise _RollbackButIsFineException("_persist_event") - self._handle_prev_events( - txn, - outlier=outlier, - event_id=event.event_id, - prev_events=event.prev_events, - room_id=event.room_id, - ) - - if not outlier: - self._store_state_groups_txn(txn, event, context) - if context.rejected: self._store_rejections_txn(txn, event.event_id, context.rejected) - if current_state: - txn.execute( - "DELETE FROM current_state_events WHERE room_id = ?", - (event.room_id,) - ) - - for s in current_state: - self._simple_insert_txn( - txn, - "current_state_events", - { - "event_id": s.event_id, - "room_id": s.room_id, - "type": s.type, - "state_key": s.state_key, - }, - or_replace=True, - ) - - is_state = hasattr(event, "state_key") and event.state_key is not None - if is_state: + if event.is_state(): vals = { "event_id": event.event_id, "room_id": event.room_id, @@ -315,6 +346,7 @@ class DataStore(RoomMemberStore, RoomStore, "state_key": event.state_key, } + # TODO: How does this work with backfilling? if hasattr(event, "replaces_state"): vals["prev_state"] = event.replaces_state @@ -351,28 +383,6 @@ class DataStore(RoomMemberStore, RoomStore, or_ignore=True, ) - if not backfilled and not context.rejected: - self._simple_insert_txn( - txn, - table="state_forward_extremities", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - }, - or_replace=True, - ) - - for prev_state_id, _ in event.prev_state: - self._simple_delete_txn( - txn, - table="state_forward_extremities", - keyvalues={ - "event_id": prev_state_id, - } - ) - for hash_alg, hash_base64 in event.hashes.items(): hash_bytes = decode_base64(hash_base64) self._store_event_content_hash_txn( @@ -403,13 +413,6 @@ class DataStore(RoomMemberStore, RoomStore, txn, event.event_id, ref_alg, ref_hash_bytes ) - if not outlier: - self._update_min_depth_for_room_txn( - txn, - event.room_id, - event.depth - ) - def _store_redaction(self, txn, event): txn.execute( "INSERT OR IGNORE INTO redactions " From f275ba49bbcb86e111ed0d66dd99da617220ae79 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Feb 2015 10:36:28 +0000 Subject: [PATCH 19/21] Fix state resolution to remember join_rules is a type of auth event. --- synapse/state.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/synapse/state.py b/synapse/state.py index 8a056ee95..6a6fb8aea 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -37,7 +37,10 @@ def _get_state_key_from_event(event): KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key")) -AuthEventTypes = (EventTypes.Create, EventTypes.Member, EventTypes.PowerLevels,) +AuthEventTypes = ( + EventTypes.Create, EventTypes.Member, EventTypes.PowerLevels, + EventTypes.JoinRules, +) class StateHandler(object): @@ -258,6 +261,15 @@ class StateHandler(object): auth_events.update(resolved_state) + for key, events in conflicted_state.items(): + if key[0] == EventTypes.JoinRules: + resolved_state[key] = self._resolve_auth_events( + events, + auth_events + ) + + auth_events.update(resolved_state) + for key, events in conflicted_state.items(): if key[0] == EventTypes.Member: resolved_state[key] = self._resolve_auth_events( From 03d415a6a23300e36b5e6c35080ac4dd8ab06815 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Feb 2015 10:40:59 +0000 Subject: [PATCH 20/21] Brief comment on why we do some things on every call to persist_event and not others --- synapse/storage/__init__.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 30ce37890..a63c59a8a 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -239,6 +239,12 @@ class DataStore(RoomMemberStore, RoomStore, event.internal_metadata.get_dict() ) + # If we have already persisted this event, we don't need to do any + # more processing. + # The processing above must be done on every call to persist event, + # since they might not have happened on previous calls. For example, + # if we are persisting an event that we had persisted as an outlier, + # but is no longer one. if have_persisted: if not outlier: sql = ( From 650e32d45580ddd13826364291bda6760c014df9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Feb 2015 14:06:42 +0000 Subject: [PATCH 21/21] Change context.auth_events to what the auth_events would be bases on context.current_state, rather than based on the auth_events from the event. --- synapse/api/auth.py | 12 ++++++------ synapse/handlers/federation.py | 4 +++- synapse/state.py | 8 ++++++-- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 3471afd7e..7105ee21d 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -358,7 +358,7 @@ class Auth(object): def add_auth_events(self, builder, context): yield run_on_reactor() - auth_ids = self.compute_auth_events(builder, context) + auth_ids = self.compute_auth_events(builder, context.current_state) auth_events_entries = yield self.store.add_event_hashes( auth_ids @@ -372,26 +372,26 @@ class Auth(object): if v.event_id in auth_ids } - def compute_auth_events(self, event, context): + def compute_auth_events(self, event, current_state): if event.type == EventTypes.Create: return [] auth_ids = [] key = (EventTypes.PowerLevels, "", ) - power_level_event = context.current_state.get(key) + power_level_event = current_state.get(key) if power_level_event: auth_ids.append(power_level_event.event_id) key = (EventTypes.JoinRules, "", ) - join_rule_event = context.current_state.get(key) + join_rule_event = current_state.get(key) key = (EventTypes.Member, event.user_id, ) - member_event = context.current_state.get(key) + member_event = current_state.get(key) key = (EventTypes.Create, "", ) - create_event = context.current_state.get(key) + create_event = current_state.get(key) if create_event: auth_ids.append(create_event.event_id) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0876589e3..2e2c23ef6 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -842,7 +842,9 @@ class FederationHandler(BaseHandler): logger.debug("Different auth: %s", different_auth) # 1. Get what we think is the auth chain. - auth_ids = self.auth.compute_auth_events(event, context) + auth_ids = self.auth.compute_auth_events( + event, context.current_state + ) local_auth_chain = yield self.store.get_auth_chain(auth_ids) try: diff --git a/synapse/state.py b/synapse/state.py index 6a6fb8aea..695a5e7ac 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -103,7 +103,9 @@ class StateHandler(object): context.state_group = None if hasattr(event, "auth_events") and event.auth_events: - auth_ids = zip(*event.auth_events)[0] + auth_ids = self.hs.get_auth().compute_auth_events( + event, context.current_state + ) context.auth_events = { k: v for k, v in context.current_state.items() @@ -149,7 +151,9 @@ class StateHandler(object): event.unsigned["replaces_state"] = replaces.event_id if hasattr(event, "auth_events") and event.auth_events: - auth_ids = zip(*event.auth_events)[0] + auth_ids = self.hs.get_auth().compute_auth_events( + event, context.current_state + ) context.auth_events = { k: v for k, v in context.current_state.items()