From 00ab5cd6f2bd3952e1216f13794c9ff392736ad7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 26 Nov 2014 18:04:33 +0000 Subject: [PATCH 1/7] Attempt to fix bug where we 500d an event stream due to trying to cancel a timer twice --- synapse/handlers/events.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index d59221a4f..02202692d 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -53,8 +53,12 @@ class EventStreamHandler(BaseHandler): if auth_user not in self._streams_per_user: self._streams_per_user[auth_user] = 0 if auth_user in self._stop_timer_per_user: - self.clock.cancel_call_later( - self._stop_timer_per_user.pop(auth_user)) + try: + self.clock.cancel_call_later( + self._stop_timer_per_user.pop(auth_user) + ) + except: + logger.exception("Failed to cancel event timer") else: yield self.distributor.fire( "started_user_eventstream", auth_user @@ -95,10 +99,12 @@ class EventStreamHandler(BaseHandler): logger.debug( "_later stopped_user_eventstream %s", auth_user ) + + self._stop_timer_per_user.pop(auth_user, None) + yield self.distributor.fire( "stopped_user_eventstream", auth_user ) - del self._stop_timer_per_user[auth_user] logger.debug("Scheduling _later: for %s", auth_user) self._stop_timer_per_user[auth_user] = ( From b8849c8cbf7666688d26a0503ddd678fba56425c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 27 Nov 2014 13:53:31 +0000 Subject: [PATCH 2/7] Re-sign events when we return them via federation as a temporary hack to work around the problem where we reconstruct events differently than when they were signed --- synapse/handlers/federation.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index fcef60205..7903494e0 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -540,6 +540,17 @@ class FederationHandler(BaseHandler): ) if event: + # FIXME: This is a temporary work around where we occasionally + # return events slightly differently than when they were + # originally signed + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ) + if do_auth: in_room = yield self.auth.check_host_in_room( event.room_id, From 07699b587144d9a9e92294f041db51f2a6621d59 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 27 Nov 2014 14:31:43 +0000 Subject: [PATCH 3/7] Change the way we get missing auth and state events --- synapse/federation/replication.py | 73 ++++++++++++++++++----------- synapse/handlers/federation.py | 77 ++++++++++++++++++++++++------- 2 files changed, 106 insertions(+), 44 deletions(-) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 6bfb30b42..312d69fca 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -281,6 +281,22 @@ class ReplicationLayer(object): defer.returnValue(pdus) + @defer.inlineCallbacks + @log_function + def get_event_auth(self, destination, context, event_id): + res = yield self.transport_layer.get_event_auth( + destination, context, event_id, + ) + + auth_chain = [ + self.event_from_pdu_json(p, outlier=True) + for p in res["auth_chain"] + ] + + auth_chain.sort(key=lambda e: e.depth) + + defer.returnValue(auth_chain) + @defer.inlineCallbacks @log_function def on_backfill_request(self, origin, context, versions, limit): @@ -549,34 +565,34 @@ class ReplicationLayer(object): state = None # We need to make sure we have all the auth events. - for e_id, _ in pdu.auth_events: - exists = yield self._get_persisted_pdu( - origin, - e_id, - do_auth=False - ) - - if not exists: - try: - logger.debug( - "_handle_new_pdu fetch missing auth event %s from %s", - e_id, - origin, - ) - - yield self.get_pdu( - origin, - event_id=e_id, - outlier=True, - ) - - logger.debug("Processed pdu %s", e_id) - except: - logger.warn( - "Failed to get auth event %s from %s", - e_id, - origin - ) + # for e_id, _ in pdu.auth_events: + # exists = yield self._get_persisted_pdu( + # origin, + # e_id, + # do_auth=False + # ) + # + # if not exists: + # try: + # logger.debug( + # "_handle_new_pdu fetch missing auth event %s from %s", + # e_id, + # origin, + # ) + # + # yield self.get_pdu( + # origin, + # event_id=e_id, + # outlier=True, + # ) + # + # logger.debug("Processed pdu %s", e_id) + # except: + # logger.warn( + # "Failed to get auth event %s from %s", + # e_id, + # origin + # ) # Get missing pdus if necessary. if not pdu.outlier: @@ -626,6 +642,7 @@ class ReplicationLayer(object): if not backfilled: ret = yield self.handler.on_receive_pdu( + origin, pdu, backfilled=backfilled, state=state, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 7903494e0..0863fdb13 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -101,7 +101,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def on_receive_pdu(self, pdu, backfilled, state=None): + def on_receive_pdu(self, origin, pdu, backfilled, state=None): """ Called by the ReplicationLayer when we have a new pdu. We need to do auth checks and put it through the StateHandler. """ @@ -149,14 +149,47 @@ class FederationHandler(BaseHandler): # FIXME (erikj): Awful hack to make the case where we are not currently # in the room work current_state = None - if state: - is_in_room = yield self.auth.check_host_in_room( - event.room_id, - self.server_name + is_in_room = yield self.auth.check_host_in_room( + event.room_id, + self.server_name + ) + if not is_in_room: + logger.debug("Got event for room we're not in.") + + replication_layer = self.replication_layer + auth_chain = yield replication_layer.get_event_auth( + origin, + context=event.room_id, + event_id=event.event_id, ) - if not is_in_room: - logger.debug("Got event for room we're not in.") - current_state = state + + current_state = yield replication_layer.get_state_for_context( + origin, + context=event.room_id, + event_id=event.event_id, + ) + + for e in auth_chain: + e.outlier = True + try: + yield self._handle_new_event(e) + yield self.notifier.on_new_room_event(e) + except: + logger.exception( + "Failed to parse auth event %s", + e.event_id, + ) + + for e in current_state: + e.outlier = True + try: + yield self._handle_new_event(e) + yield self.notifier.on_new_room_event(e) + except: + logger.exception( + "Failed to parse state event %s", + e.event_id, + ) try: yield self._handle_new_event( @@ -328,18 +361,30 @@ class FederationHandler(BaseHandler): for e in auth_chain: e.outlier = True - yield self._handle_new_event(e) - yield self.notifier.on_new_room_event( - e, extra_users=[joinee] - ) + try: + yield self._handle_new_event(e) + yield self.notifier.on_new_room_event( + e, extra_users=[joinee] + ) + except: + logger.exception( + "Failed to parse auth event %s", + e.event_id, + ) for e in state: # FIXME: Auth these. e.outlier = True - yield self._handle_new_event(e) - yield self.notifier.on_new_room_event( - e, extra_users=[joinee] - ) + try: + yield self._handle_new_event(e) + yield self.notifier.on_new_room_event( + e, extra_users=[joinee] + ) + except: + logger.exception( + "Failed to parse state event %s", + e.event_id, + ) yield self._handle_new_event( event, From 0294fba0429a789c87f359f58218e2183ef69d96 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 27 Nov 2014 14:46:33 +0000 Subject: [PATCH 4/7] on_receive_pdu takes more args --- synapse/handlers/federation.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0863fdb13..27ecd35b4 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -112,7 +112,7 @@ class FederationHandler(BaseHandler): # If we are currently in the process of joining this room, then we # queue up events for later processing. if event.room_id in self.room_queues: - self.room_queues[event.room_id].append(pdu) + self.room_queues[event.room_id].append((pdu, origin)) return logger.debug("Processing event: %s", event.event_id) @@ -401,9 +401,9 @@ class FederationHandler(BaseHandler): room_queue = self.room_queues[room_id] del self.room_queues[room_id] - for p in room_queue: + for p, origin in room_queue: try: - self.on_receive_pdu(p, backfilled=False) + self.on_receive_pdu(origin, p, backfilled=False) except: logger.exception("Couldn't handle pdu") From 027542e2e5daa94c6517c0283be40834773fb475 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 27 Nov 2014 16:02:26 +0000 Subject: [PATCH 5/7] Fix bugs when joining a remote room that has dodgy event graphs. This should also fix the number of times a HS will trigger a GET /event/ --- synapse/api/auth.py | 10 +++- synapse/handlers/federation.py | 83 ++++++++++++++++++++----------- tests/handlers/test_federation.py | 6 ++- 3 files changed, 68 insertions(+), 31 deletions(-) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index fb911e51a..2b0475543 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -202,7 +202,10 @@ class Auth(object): # Invites are valid iff caller is in the room and target isn't. if not caller_in_room: # caller isn't joined - raise AuthError(403, "You are not in room %s." % event.room_id) + raise AuthError( + 403, + "%s not in room %s." % (event.user_id, event.room_id,) + ) elif target_in_room: # the target is already in the room. raise AuthError(403, "%s is already in the room." % target_user_id) @@ -225,7 +228,10 @@ class Auth(object): # TODO (erikj): Implement kicks. if not caller_in_room: # trying to leave a room you aren't joined - raise AuthError(403, "You are not in room %s." % event.room_id) + raise AuthError( + 403, + "%s not in room %s." % (target_user_id, event.room_id,) + ) elif target_user_id != event.user_id: if kick_level: kick_level = int(kick_level) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 27ecd35b4..925eb5376 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -153,7 +153,7 @@ class FederationHandler(BaseHandler): event.room_id, self.server_name ) - if not is_in_room: + if not is_in_room and not event.outlier: logger.debug("Got event for room we're not in.") replication_layer = self.replication_layer @@ -163,28 +163,30 @@ class FederationHandler(BaseHandler): event_id=event.event_id, ) - current_state = yield replication_layer.get_state_for_context( - origin, - context=event.room_id, - event_id=event.event_id, - ) - for e in auth_chain: e.outlier = True try: - yield self._handle_new_event(e) - yield self.notifier.on_new_room_event(e) + yield self._handle_new_event(e, fetch_missing=False) except: logger.exception( "Failed to parse auth event %s", e.event_id, ) - for e in current_state: + if not state: + state = yield replication_layer.get_state_for_context( + origin, + context=event.room_id, + event_id=event.event_id, + ) + + current_state = state + + if state: + for e in state: e.outlier = True try: yield self._handle_new_event(e) - yield self.notifier.on_new_room_event(e) except: logger.exception( "Failed to parse state event %s", @@ -284,6 +286,16 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def on_event_auth(self, event_id): auth = yield self.store.get_auth_chain(event_id) + + for event in auth: + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ) + defer.returnValue([e for e in auth]) @log_function @@ -343,6 +355,7 @@ class FederationHandler(BaseHandler): state = ret["state"] auth_chain = ret["auth_chain"] + auth_chain.sort(key=lambda e: e.depth) logger.debug("do_invite_join auth_chain: %s", auth_chain) logger.debug("do_invite_join state: %s", state) @@ -362,10 +375,7 @@ class FederationHandler(BaseHandler): for e in auth_chain: e.outlier = True try: - yield self._handle_new_event(e) - yield self.notifier.on_new_room_event( - e, extra_users=[joinee] - ) + yield self._handle_new_event(e, fetch_missing=False) except: logger.exception( "Failed to parse auth event %s", @@ -376,9 +386,9 @@ class FederationHandler(BaseHandler): # FIXME: Auth these. e.outlier = True try: - yield self._handle_new_event(e) - yield self.notifier.on_new_room_event( - e, extra_users=[joinee] + yield self._handle_new_event( + e, + fetch_missing=True ) except: logger.exception( @@ -389,7 +399,7 @@ class FederationHandler(BaseHandler): yield self._handle_new_event( event, state=state, - current_state=state + current_state=state, ) yield self.notifier.on_new_room_event( @@ -552,7 +562,17 @@ class FederationHandler(BaseHandler): else: del results[(event.type, event.state_key)] - defer.returnValue(results.values()) + res = results.values() + for event in res: + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ) + + defer.returnValue(res) else: defer.returnValue([]) @@ -623,11 +643,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def _handle_new_event(self, event, state=None, backfilled=False, - current_state=None): - if state: - for s in state: - yield self._handle_new_event(s) - + current_state=None, fetch_missing=True): is_new_state = yield self.state_handler.annotate_event_with_state( event, old_state=state @@ -667,11 +683,22 @@ class FederationHandler(BaseHandler): ) if not e: - raise AuthError( - 403, - "Can't find auth event %s." % (e_id, ) + e = yield self.replication_layer.get_pdu( + event.origin, e_id, outlier=True ) + if e and fetch_missing: + try: + yield self.on_receive_pdu(event.origin, e, False) + except: + logger.exception( + "Failed to parse auth event %s", + e_id, + ) + + if not e: + logger.warn("Can't find auth event %s.", e_id) + auth_events[(e.type, e.state_key)] = e if event.type == RoomMemberEvent.TYPE and not event.auth_events: diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 98cfbe50b..33016c16e 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -42,6 +42,7 @@ class FederationTestCase(unittest.TestCase): self.auth = NonCallableMock(spec_set=[ "check", + "check_host_in_room", ]) self.hostname = "test" @@ -89,13 +90,16 @@ class FederationTestCase(unittest.TestCase): self.datastore.persist_event.return_value = defer.succeed(None) self.datastore.get_room.return_value = defer.succeed(True) + self.auth.check_host_in_room.return_value = defer.succeed(True) def annotate(ev, old_state=None): ev.old_state_events = [] return defer.succeed(False) self.state_handler.annotate_event_with_state.side_effect = annotate - yield self.handlers.federation_handler.on_receive_pdu(pdu, False) + yield self.handlers.federation_handler.on_receive_pdu( + "fo", pdu, False + ) self.datastore.persist_event.assert_called_once_with( ANY, is_new_state=False, backfilled=False, current_state=None From 1505055334ecff6516c0b388efe4c5759e59fad0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 27 Nov 2014 16:38:50 +0000 Subject: [PATCH 6/7] Don't return outliers when we get recent events for rooms. --- synapse/storage/stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index b84735e61..3405cb365 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -283,7 +283,7 @@ class StreamStore(SQLBaseStore): sql = ( "SELECT *, (%(redacted)s) AS redacted FROM events " - "WHERE room_id = ? AND stream_ordering <= ? " + "WHERE room_id = ? AND stream_ordering <= ? AND outlier = 0 " "ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ? " ) % { "redacted": del_sql, From cce32f8dc54cfbda5670c91d36477faedd3b9cc9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 27 Nov 2014 17:15:32 +0000 Subject: [PATCH 7/7] Bump version and changelog --- CHANGES.rst | 8 ++++++++ VERSION | 2 +- synapse/__init__.py | 2 +- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 207f1e4d7..6779a36f7 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,11 @@ +Changes in synapse 0.5.3 (2014-11-27) +===================================== + + * Fix bug that caused joining a remote room to fail if a single event was not + signed correctly. + * Fix bug which caused servers to continuously try and fetch events from other + servers. + Changes in synapse 0.5.2 (2014-11-26) ===================================== diff --git a/VERSION b/VERSION index cb0c939a9..be14282b7 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.5.2 +0.5.3 diff --git a/synapse/__init__.py b/synapse/__init__.py index d5c2f2548..c99cd9653 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a synapse home server. """ -__version__ = "0.5.2" +__version__ = "0.5.3"