diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index f0430b2cb..299493af9 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -18,8 +18,6 @@ from twisted.internet import defer from synapse.events.utils import prune_event -from syutil.jsonutil import encode_canonical_json - from synapse.crypto.event_signing import check_event_content_hash from synapse.api.errors import SynapseError @@ -120,16 +118,15 @@ class FederationBase(object): ) except SynapseError: logger.warn( - "Signature check failed for %s redacted to %s", - encode_canonical_json(pdu.get_pdu_json()), - encode_canonical_json(redacted_pdu_json), + "Signature check failed for %s", + pdu.event_id, ) raise if not check_event_content_hash(pdu): logger.warn( - "Event content has been tampered, redacting %s, %s", - pdu.event_id, encode_canonical_json(pdu.get_dict()) + "Event content has been tampered, redacting.", + pdu.event_id, ) defer.returnValue(redacted_event) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 5503d9ae8..b5d882fd6 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -247,9 +247,15 @@ class FederationHandler(BaseHandler): if set(e_id for e_id, _ in ev.prev_events) - event_ids ] + logger.info( + "backfill: Got %d events with %d edges", + len(events), len(edges), + ) + # For each edge get the current state. auth_events = {} + state_events = {} events_to_state = {} for e_id in edges: state, auth = yield self.replication_layer.get_state_for_room( @@ -258,12 +264,46 @@ class FederationHandler(BaseHandler): event_id=e_id ) auth_events.update({a.event_id: a for a in auth}) + auth_events.update({s.event_id: s for s in state}) + state_events.update({s.event_id: s for s in state}) events_to_state[e_id] = state + seen_events = yield self.store.have_events( + set(auth_events.keys()) | set(state_events.keys()) + ) + + all_events = events + state_events.values() + auth_events.values() + required_auth = set( + a_id for event in all_events for a_id, _ in event.auth_events + ) + + missing_auth = required_auth - set(auth_events) + results = yield defer.gatherResults( + [ + self.replication_layer.get_pdu( + [dest], + event_id, + outlier=True, + timeout=10000, + ) + for event_id in missing_auth + ], + consumeErrors=True + ).addErrback(unwrapFirstError) + auth_events.update({a.event_id: a for a in results}) + yield defer.gatherResults( [ - self._handle_new_event(dest, a) + self._handle_new_event( + dest, a, + auth_events={ + (auth_events[a_id].type, auth_events[a_id].state_key): + auth_events[a_id] + for a_id, _ in a.auth_events + }, + ) for a in auth_events.values() + if a.event_id not in seen_events ], consumeErrors=True, ).addErrback(unwrapFirstError) @@ -274,6 +314,11 @@ class FederationHandler(BaseHandler): dest, event_map[e_id], state=events_to_state[e_id], backfilled=True, + auth_events={ + (auth_events[a_id].type, auth_events[a_id].state_key): + auth_events[a_id] + for a_id, _ in event_map[e_id].auth_events + }, ) for e_id in events_to_state ],