From 3406333a58d9887cdb79ef73cae218c705ac11b0 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 9 Mar 2017 12:20:46 +0000 Subject: [PATCH 01/25] Factor _get_missing_events_for_pdu out of _handle_new_pdu This should be functionally identical: it just seeks to improve readability by reducing indentation. --- synapse/federation/federation_server.py | 144 ++++++++++++++---------- 1 file changed, 82 insertions(+), 62 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index e922b7ff4..3ef700f7f 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -574,68 +574,9 @@ class FederationServer(FederationBase): pdu.room_id, len(prevs - seen), ) - # We recalculate seen, since it may have changed. - have_seen = yield self.store.have_events(prevs) - seen = set(have_seen.keys()) - - if prevs - seen: - latest = yield self.store.get_latest_event_ids_in_room( - pdu.room_id - ) - - # We add the prev events that we have seen to the latest - # list to ensure the remote server doesn't give them to us - latest = set(latest) - latest |= seen - - logger.info( - "Missing %d events for room %r: %r...", - len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] - ) - - # XXX: we set timeout to 10s to help workaround - # https://github.com/matrix-org/synapse/issues/1733. - # The reason is to avoid holding the linearizer lock - # whilst processing inbound /send transactions, causing - # FDs to stack up and block other inbound transactions - # which empirically can currently take up to 30 minutes. - # - # N.B. this explicitly disables retry attempts. - # - # N.B. this also increases our chances of falling back to - # fetching fresh state for the room if the missing event - # can't be found, which slightly reduces our security. - # it may also increase our DAG extremity count for the room, - # causing additional state resolution? See #1760. - # However, fetching state doesn't hold the linearizer lock - # apparently. - # - # see https://github.com/matrix-org/synapse/pull/1744 - - missing_events = yield self.get_missing_events( - origin, - pdu.room_id, - earliest_events_ids=list(latest), - latest_events=[pdu], - limit=10, - min_depth=min_depth, - timeout=10000, - ) - - # We want to sort these by depth so we process them and - # tell clients about them in order. - missing_events.sort(key=lambda x: x.depth) - - for e in missing_events: - yield self._handle_new_pdu( - origin, - e, - get_missing=False - ) - - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] - ) + yield self._get_missing_events_for_pdu( + origin, pdu, prevs, min_depth + ) prevs = {e_id for e_id, _ in pdu.prev_events} seen = set(have_seen.keys()) @@ -667,6 +608,85 @@ class FederationServer(FederationBase): auth_chain=auth_chain, ) + @defer.inlineCallbacks + def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth): + """ + Args: + origin (str): Origin of the pdu. Will be called to get the missing events + pdu: received pdu + prevs (str[]): List of event ids which we are missing + min_depth (int): Minimum depth of events to return. + + Returns: + Deferred: updated have_seen dictionary + """ + # We recalculate seen, since it may have changed. + have_seen = yield self.store.have_events(prevs) + seen = set(have_seen.keys()) + + if not prevs - seen: + # nothing left to do + defer.returnValue(have_seen) + + latest = yield self.store.get_latest_event_ids_in_room( + pdu.room_id + ) + + # We add the prev events that we have seen to the latest + # list to ensure the remote server doesn't give them to us + latest = set(latest) + latest |= seen + + logger.info( + "Missing %d events for room %r: %r...", + len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] + ) + + # XXX: we set timeout to 10s to help workaround + # https://github.com/matrix-org/synapse/issues/1733. + # The reason is to avoid holding the linearizer lock + # whilst processing inbound /send transactions, causing + # FDs to stack up and block other inbound transactions + # which empirically can currently take up to 30 minutes. + # + # N.B. this explicitly disables retry attempts. + # + # N.B. this also increases our chances of falling back to + # fetching fresh state for the room if the missing event + # can't be found, which slightly reduces our security. + # it may also increase our DAG extremity count for the room, + # causing additional state resolution? See #1760. + # However, fetching state doesn't hold the linearizer lock + # apparently. + # + # see https://github.com/matrix-org/synapse/pull/1744 + + missing_events = yield self.get_missing_events( + origin, + pdu.room_id, + earliest_events_ids=list(latest), + latest_events=[pdu], + limit=10, + min_depth=min_depth, + timeout=10000, + ) + + # We want to sort these by depth so we process them and + # tell clients about them in order. + missing_events.sort(key=lambda x: x.depth) + + for e in missing_events: + yield self._handle_new_pdu( + origin, + e, + get_missing=False + ) + + have_seen = yield self.store.have_events( + [ev for ev, _ in pdu.prev_events] + ) + defer.returnValue(have_seen) + def __str__(self): return "" % self.server_name From e8b1721290b196f9190f72517e8136d6af8cad58 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 9 Mar 2017 13:15:27 +0000 Subject: [PATCH 02/25] Move sig check out of _handle_new_pdu When we receive PDUs via `get_missing_events`, we have already checked their sigs, so there is no need to do it again. --- synapse/federation/federation_server.py | 48 ++++++++++++++++++------- 1 file changed, 36 insertions(+), 12 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 3ef700f7f..d8df6555d 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -165,7 +165,7 @@ class FederationServer(FederationBase): ) try: - yield self._handle_new_pdu(transaction.origin, pdu) + yield self._handle_received_pdu(transaction.origin, pdu) results.append({}) except FederationError as e: self.send_failure(e, transaction.origin) @@ -496,9 +496,44 @@ class FederationServer(FederationBase): destination=None, ) + @defer.inlineCallbacks + def _handle_received_pdu(self, origin, pdu): + """ Process a PDU received in a federation /send/ transaction. + + Args: + origin (str): server which sent the pdu + pdu (FrozenEvent): received pdu + + Returns (Deferred): completes with None + Raises: FederationError if the signatures / hash do not match + """ + # Check signature. + try: + pdu = yield self._check_sigs_and_hash(pdu) + except SynapseError as e: + raise FederationError( + "ERROR", + e.code, + e.msg, + affected=pdu.event_id, + ) + + yield self._handle_new_pdu(origin, pdu, get_missing=True) + @defer.inlineCallbacks @log_function def _handle_new_pdu(self, origin, pdu, get_missing=True): + """ Process a PDU received via a federation /send/ transaction, or + via backfill of missing prev_events + + Args: + origin (str): server which initiated the /send/ transaction. Will + be used to fetch missing events or state. + pdu (FrozenEvent): received PDU + get_missing (bool): True if we should fetch missing prev_events + + Returns (Deferred): completes with None + """ # We reprocess pdus when we have seen them only as outliers existing = yield self._get_persisted_pdu( @@ -518,17 +553,6 @@ class FederationServer(FederationBase): logger.debug("Already seen pdu %s", pdu.event_id) return - # Check signature. - try: - pdu = yield self._check_sigs_and_hash(pdu) - except SynapseError as e: - raise FederationError( - "ERROR", - e.code, - e.msg, - affected=pdu.event_id, - ) - state = None auth_chain = [] From 29235901b81b344fc28ff9f59c36257afecf0265 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 9 Mar 2017 16:20:13 +0000 Subject: [PATCH 03/25] Move FederationServer._handle_new_pdu to FederationHandler Unfortunately this significantly increases the size of the already-rather-big FederationHandler, but the code fits more naturally here, and it paves the way for the tighter integration that I need between handling incoming PDUs and doing the join dance. Other than renaming the existing `FederationHandler.on_receive_pdu` to `_process_received_pdu` to make way for it, this just consists of the move, and replacing `self.handler` with `self` and `self` with `self.replication_layer`. --- synapse/federation/federation_server.py | 194 +---------------------- synapse/handlers/federation.py | 202 +++++++++++++++++++++++- 2 files changed, 198 insertions(+), 198 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index d8df6555d..510a17682 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -52,7 +52,6 @@ class FederationServer(FederationBase): self.auth = hs.get_auth() - self._room_pdu_linearizer = Linearizer("fed_room_pdu") self._server_linearizer = Linearizer("fed_server") # We cache responses to state queries, as they take a while and often @@ -518,198 +517,7 @@ class FederationServer(FederationBase): affected=pdu.event_id, ) - yield self._handle_new_pdu(origin, pdu, get_missing=True) - - @defer.inlineCallbacks - @log_function - def _handle_new_pdu(self, origin, pdu, get_missing=True): - """ Process a PDU received via a federation /send/ transaction, or - via backfill of missing prev_events - - Args: - origin (str): server which initiated the /send/ transaction. Will - be used to fetch missing events or state. - pdu (FrozenEvent): received PDU - get_missing (bool): True if we should fetch missing prev_events - - Returns (Deferred): completes with None - """ - - # We reprocess pdus when we have seen them only as outliers - existing = yield self._get_persisted_pdu( - origin, pdu.event_id, do_auth=False - ) - - # FIXME: Currently we fetch an event again when we already have it - # if it has been marked as an outlier. - - already_seen = ( - existing and ( - not existing.internal_metadata.is_outlier() - or pdu.internal_metadata.is_outlier() - ) - ) - if already_seen: - logger.debug("Already seen pdu %s", pdu.event_id) - return - - state = None - - auth_chain = [] - - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] - ) - - fetch_state = False - - # Get missing pdus if necessary. - if not pdu.internal_metadata.is_outlier(): - # We only backfill backwards to the min depth. - min_depth = yield self.handler.get_min_depth_for_context( - pdu.room_id - ) - - logger.debug( - "_handle_new_pdu min_depth for %s: %d", - pdu.room_id, min_depth - ) - - prevs = {e_id for e_id, _ in pdu.prev_events} - seen = set(have_seen.keys()) - - if min_depth and pdu.depth < min_depth: - # This is so that we don't notify the user about this - # message, to work around the fact that some events will - # reference really really old events we really don't want to - # send to the clients. - pdu.internal_metadata.outlier = True - elif min_depth and pdu.depth > min_depth: - if get_missing and prevs - seen: - # If we're missing stuff, ensure we only fetch stuff one - # at a time. - logger.info( - "Acquiring lock for room %r to fetch %d missing events: %r...", - pdu.room_id, len(prevs - seen), list(prevs - seen)[:5], - ) - with (yield self._room_pdu_linearizer.queue(pdu.room_id)): - logger.info( - "Acquired lock for room %r to fetch %d missing events", - pdu.room_id, len(prevs - seen), - ) - - yield self._get_missing_events_for_pdu( - origin, pdu, prevs, min_depth - ) - - prevs = {e_id for e_id, _ in pdu.prev_events} - seen = set(have_seen.keys()) - if prevs - seen: - logger.info( - "Still missing %d events for room %r: %r...", - len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] - ) - fetch_state = True - - if fetch_state: - # We need to get the state at this event, since we haven't - # processed all the prev events. - logger.debug( - "_handle_new_pdu getting state for %s", - pdu.room_id - ) - try: - state, auth_chain = yield self.get_state_for_room( - origin, pdu.room_id, pdu.event_id, - ) - except: - logger.exception("Failed to get state for event: %s", pdu.event_id) - - yield self.handler.on_receive_pdu( - origin, - pdu, - state=state, - auth_chain=auth_chain, - ) - - @defer.inlineCallbacks - def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth): - """ - Args: - origin (str): Origin of the pdu. Will be called to get the missing events - pdu: received pdu - prevs (str[]): List of event ids which we are missing - min_depth (int): Minimum depth of events to return. - - Returns: - Deferred: updated have_seen dictionary - """ - # We recalculate seen, since it may have changed. - have_seen = yield self.store.have_events(prevs) - seen = set(have_seen.keys()) - - if not prevs - seen: - # nothing left to do - defer.returnValue(have_seen) - - latest = yield self.store.get_latest_event_ids_in_room( - pdu.room_id - ) - - # We add the prev events that we have seen to the latest - # list to ensure the remote server doesn't give them to us - latest = set(latest) - latest |= seen - - logger.info( - "Missing %d events for room %r: %r...", - len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] - ) - - # XXX: we set timeout to 10s to help workaround - # https://github.com/matrix-org/synapse/issues/1733. - # The reason is to avoid holding the linearizer lock - # whilst processing inbound /send transactions, causing - # FDs to stack up and block other inbound transactions - # which empirically can currently take up to 30 minutes. - # - # N.B. this explicitly disables retry attempts. - # - # N.B. this also increases our chances of falling back to - # fetching fresh state for the room if the missing event - # can't be found, which slightly reduces our security. - # it may also increase our DAG extremity count for the room, - # causing additional state resolution? See #1760. - # However, fetching state doesn't hold the linearizer lock - # apparently. - # - # see https://github.com/matrix-org/synapse/pull/1744 - - missing_events = yield self.get_missing_events( - origin, - pdu.room_id, - earliest_events_ids=list(latest), - latest_events=[pdu], - limit=10, - min_depth=min_depth, - timeout=10000, - ) - - # We want to sort these by depth so we process them and - # tell clients about them in order. - missing_events.sort(key=lambda x: x.depth) - - for e in missing_events: - yield self._handle_new_pdu( - origin, - e, - get_missing=False - ) - - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] - ) - defer.returnValue(have_seen) + yield self.handler.on_receive_pdu(origin, pdu, get_missing=True) def __str__(self): return "" % self.server_name diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ed0fa51e7..d0c2b4d6e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -31,7 +31,7 @@ from synapse.util.logcontext import ( ) from synapse.util.metrics import measure_func from synapse.util.logutils import log_function -from synapse.util.async import run_on_reactor +from synapse.util.async import run_on_reactor, Linearizer from synapse.util.frozenutils import unfreeze from synapse.crypto.event_signing import ( compute_event_signature, add_hashes_and_signatures, @@ -79,12 +79,204 @@ class FederationHandler(BaseHandler): # When joining a room we need to queue any events for that room up self.room_queues = {} + self._room_pdu_linearizer = Linearizer("fed_room_pdu") + + @defer.inlineCallbacks + @log_function + def on_receive_pdu(self, origin, pdu, get_missing=True): + """ Process a PDU received via a federation /send/ transaction, or + via backfill of missing prev_events + + Args: + origin (str): server which initiated the /send/ transaction. Will + be used to fetch missing events or state. + pdu (FrozenEvent): received PDU + get_missing (bool): True if we should fetch missing prev_events + + Returns (Deferred): completes with None + """ + + # We reprocess pdus when we have seen them only as outliers + existing = yield self.get_persisted_pdu( + origin, pdu.event_id, do_auth=False + ) + + # FIXME: Currently we fetch an event again when we already have it + # if it has been marked as an outlier. + + already_seen = ( + existing and ( + not existing.internal_metadata.is_outlier() + or pdu.internal_metadata.is_outlier() + ) + ) + if already_seen: + logger.debug("Already seen pdu %s", pdu.event_id) + return + + state = None + + auth_chain = [] + + have_seen = yield self.store.have_events( + [ev for ev, _ in pdu.prev_events] + ) + + fetch_state = False + + # Get missing pdus if necessary. + if not pdu.internal_metadata.is_outlier(): + # We only backfill backwards to the min depth. + min_depth = yield self.get_min_depth_for_context( + pdu.room_id + ) + + logger.debug( + "_handle_new_pdu min_depth for %s: %d", + pdu.room_id, min_depth + ) + + prevs = {e_id for e_id, _ in pdu.prev_events} + seen = set(have_seen.keys()) + + if min_depth and pdu.depth < min_depth: + # This is so that we don't notify the user about this + # message, to work around the fact that some events will + # reference really really old events we really don't want to + # send to the clients. + pdu.internal_metadata.outlier = True + elif min_depth and pdu.depth > min_depth: + if get_missing and prevs - seen: + # If we're missing stuff, ensure we only fetch stuff one + # at a time. + logger.info( + "Acquiring lock for room %r to fetch %d missing events: %r...", + pdu.room_id, len(prevs - seen), list(prevs - seen)[:5], + ) + with (yield self._room_pdu_linearizer.queue(pdu.room_id)): + logger.info( + "Acquired lock for room %r to fetch %d missing events", + pdu.room_id, len(prevs - seen), + ) + + yield self._get_missing_events_for_pdu( + origin, pdu, prevs, min_depth + ) + + prevs = {e_id for e_id, _ in pdu.prev_events} + seen = set(have_seen.keys()) + if prevs - seen: + logger.info( + "Still missing %d events for room %r: %r...", + len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] + ) + fetch_state = True + + if fetch_state: + # We need to get the state at this event, since we haven't + # processed all the prev events. + logger.debug( + "_handle_new_pdu getting state for %s", + pdu.room_id + ) + try: + state, auth_chain = yield self.replication_layer.get_state_for_room( + origin, pdu.room_id, pdu.event_id, + ) + except: + logger.exception("Failed to get state for event: %s", pdu.event_id) + + yield self._process_received_pdu( + origin, + pdu, + state=state, + auth_chain=auth_chain, + ) + + @defer.inlineCallbacks + def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth): + """ + Args: + origin (str): Origin of the pdu. Will be called to get the missing events + pdu: received pdu + prevs (str[]): List of event ids which we are missing + min_depth (int): Minimum depth of events to return. + + Returns: + Deferred: updated have_seen dictionary + """ + # We recalculate seen, since it may have changed. + have_seen = yield self.store.have_events(prevs) + seen = set(have_seen.keys()) + + if not prevs - seen: + # nothing left to do + defer.returnValue(have_seen) + + latest = yield self.store.get_latest_event_ids_in_room( + pdu.room_id + ) + + # We add the prev events that we have seen to the latest + # list to ensure the remote server doesn't give them to us + latest = set(latest) + latest |= seen + + logger.info( + "Missing %d events for room %r: %r...", + len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] + ) + + # XXX: we set timeout to 10s to help workaround + # https://github.com/matrix-org/synapse/issues/1733. + # The reason is to avoid holding the linearizer lock + # whilst processing inbound /send transactions, causing + # FDs to stack up and block other inbound transactions + # which empirically can currently take up to 30 minutes. + # + # N.B. this explicitly disables retry attempts. + # + # N.B. this also increases our chances of falling back to + # fetching fresh state for the room if the missing event + # can't be found, which slightly reduces our security. + # it may also increase our DAG extremity count for the room, + # causing additional state resolution? See #1760. + # However, fetching state doesn't hold the linearizer lock + # apparently. + # + # see https://github.com/matrix-org/synapse/pull/1744 + + missing_events = yield self.replication_layer.get_missing_events( + origin, + pdu.room_id, + earliest_events_ids=list(latest), + latest_events=[pdu], + limit=10, + min_depth=min_depth, + timeout=10000, + ) + + # We want to sort these by depth so we process them and + # tell clients about them in order. + missing_events.sort(key=lambda x: x.depth) + + for e in missing_events: + yield self.on_receive_pdu( + origin, + e, + get_missing=False + ) + + have_seen = yield self.store.have_events( + [ev for ev, _ in pdu.prev_events] + ) + defer.returnValue(have_seen) @log_function @defer.inlineCallbacks - def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None): - """ Called by the ReplicationLayer when we have a new pdu. We need to - do auth checks and put it through the StateHandler. + def _process_received_pdu(self, origin, pdu, state=None, auth_chain=None): + """ Called when we have a new pdu. We need to do auth checks and put it + through the StateHandler. auth_chain and state are None if we already have the necessary state and prev_events in the db @@ -738,7 +930,7 @@ class FederationHandler(BaseHandler): continue try: - self.on_receive_pdu(origin, p) + self._process_received_pdu(origin, p) except: logger.exception("Couldn't handle pdu") From 9072a8c6279812e50ad1d3129efc564ce531acbb Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 9 Mar 2017 22:54:52 +0000 Subject: [PATCH 04/25] Reread log config on SIGHUP When we are using a log_config file, reread it on SIGHUP. --- synapse/config/logger.py | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 77ded0ad2..e1f060d40 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -153,14 +153,6 @@ def setup_logging(log_config=None, log_file=None, verbosity=None): logger.info("Closing log file due to SIGHUP") handler.doRollover() logger.info("Opened new log file due to SIGHUP") - - # TODO(paul): obviously this is a terrible mechanism for - # stealing SIGHUP, because it means no other part of synapse - # can use it instead. If we want to catch SIGHUP anywhere - # else as well, I'd suggest we find a nicer way to broadcast - # it around. - if getattr(signal, "SIGHUP"): - signal.signal(signal.SIGHUP, sighup) else: handler = logging.StreamHandler() handler.setFormatter(formatter) @@ -169,8 +161,25 @@ def setup_logging(log_config=None, log_file=None, verbosity=None): logger.addHandler(handler) else: - with open(log_config, 'r') as f: - logging.config.dictConfig(yaml.load(f)) + def load_log_config(): + with open(log_config, 'r') as f: + logging.config.dictConfig(yaml.load(f)) + + def sighup(signum, stack): + # it might be better to use a file watcher or something for this. + logging.info("Reloading log config from %s due to SIGHUP", + log_config) + load_log_config() + + load_log_config() + + # TODO(paul): obviously this is a terrible mechanism for + # stealing SIGHUP, because it means no other part of synapse + # can use it instead. If we want to catch SIGHUP anywhere + # else as well, I'd suggest we find a nicer way to broadcast + # it around. + if getattr(signal, "SIGHUP"): + signal.signal(signal.SIGHUP, sighup) # It's critical to point twisted's internal logging somewhere, otherwise it # stacks up and leaks kup to 64K object; From d84bd51e95149f67e0740657d60b568533a6ea72 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 10 Mar 2017 15:16:50 +0000 Subject: [PATCH 05/25] Refactor logger config for workers - to make it easier to add more config options. --- synapse/app/appservice.py | 2 +- synapse/app/client_reader.py | 2 +- synapse/app/federation_reader.py | 2 +- synapse/app/federation_sender.py | 2 +- synapse/app/homeserver.py | 4 +++- synapse/app/media_repository.py | 2 +- synapse/app/pusher.py | 2 +- synapse/app/synchrotron.py | 2 +- synapse/config/logger.py | 21 ++++++++++++++++----- 9 files changed, 26 insertions(+), 13 deletions(-) diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 190093005..83ee3e3ce 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -157,7 +157,7 @@ def start(config_options): assert config.worker_app == "synapse.app.appservice" - setup_logging(config.worker_log_config, config.worker_log_file) + setup_logging(config, use_worker_options=True) events.USE_FROZEN_DICTS = config.use_frozen_dicts diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 4d081eccd..7ed0de411 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -171,7 +171,7 @@ def start(config_options): assert config.worker_app == "synapse.app.client_reader" - setup_logging(config.worker_log_config, config.worker_log_file) + setup_logging(config, use_worker_options=True) events.USE_FROZEN_DICTS = config.use_frozen_dicts diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 90a481675..ca742de6b 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -162,7 +162,7 @@ def start(config_options): assert config.worker_app == "synapse.app.federation_reader" - setup_logging(config.worker_log_config, config.worker_log_file) + setup_logging(config, use_worker_options=True) events.USE_FROZEN_DICTS = config.use_frozen_dicts diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 411e47d98..0cf5b196e 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -160,7 +160,7 @@ def start(config_options): assert config.worker_app == "synapse.app.federation_sender" - setup_logging(config.worker_log_config, config.worker_log_file) + setup_logging(config, use_worker_options=True) events.USE_FROZEN_DICTS = config.use_frozen_dicts diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index e0b87468f..0b9d78c13 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -20,6 +20,8 @@ import gc import logging import os import sys + +import synapse.config.logger from synapse.config._base import ConfigError from synapse.python_dependencies import ( @@ -286,7 +288,7 @@ def setup(config_options): # generating config files and shouldn't try to continue. sys.exit(0) - config.setup_logging() + synapse.config.logger.setup_logging(config, use_worker_options=False) # check any extra requirements we have now we have a config check_requirements(config) diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index ef17b158a..c5579d9e3 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -168,7 +168,7 @@ def start(config_options): assert config.worker_app == "synapse.app.media_repository" - setup_logging(config.worker_log_config, config.worker_log_file) + setup_logging(config, use_worker_options=True) events.USE_FROZEN_DICTS = config.use_frozen_dicts diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 073f2c248..b025db54d 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -245,7 +245,7 @@ def start(config_options): assert config.worker_app == "synapse.app.pusher" - setup_logging(config.worker_log_config, config.worker_log_file) + setup_logging(config, use_worker_options=True) events.USE_FROZEN_DICTS = config.use_frozen_dicts diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 3f2959525..29f075aa5 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -478,7 +478,7 @@ def start(config_options): assert config.worker_app == "synapse.app.synchrotron" - setup_logging(config.worker_log_config, config.worker_log_file) + setup_logging(config, use_worker_options=True) synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 77ded0ad2..c76fddf11 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -118,11 +118,22 @@ class LoggingConfig(Config): DEFAULT_LOG_CONFIG.substitute(log_file=config["log_file"]) ) - def setup_logging(self): - setup_logging(self.log_config, self.log_file, self.verbosity) +def setup_logging(config, use_worker_options=False): + """ Set up python logging + + Args: + config (LoggingConfig | synapse.config.workers.WorkerConfig): + configuration data + + use_worker_options (bool): True to use 'worker_log_config' and + 'worker_log_file' options instead of 'log_config' and 'log_file'. + """ + log_config = (config.worker_log_config if use_worker_options + else config.log_config) + log_file = (config.worker_log_file if use_worker_options + else config.log_file) -def setup_logging(log_config=None, log_file=None, verbosity=None): log_format = ( "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s" " - %(message)s" @@ -131,9 +142,9 @@ def setup_logging(log_config=None, log_file=None, verbosity=None): level = logging.INFO level_for_storage = logging.INFO - if verbosity: + if config.verbosity: level = logging.DEBUG - if verbosity > 1: + if config.verbosity > 1: level_for_storage = logging.DEBUG # FIXME: we need a logging.WARN for a -q quiet option From bcfa5cd00c4d5827a9a525078db32a0c93bc087c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 10 Mar 2017 15:38:29 +0000 Subject: [PATCH 06/25] Add an option to disable stdio redirect This makes it tractable to run synapse under pdb. --- synapse/config/logger.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/synapse/config/logger.py b/synapse/config/logger.py index c76fddf11..a48f7695e 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -68,6 +68,7 @@ class LoggingConfig(Config): def read_config(self, config): self.verbosity = config.get("verbose", 0) + self.no_redirect_stdio = config.get("no_redirect_stdio", False) self.log_config = self.abspath(config.get("log_config")) self.log_file = self.abspath(config.get("log_file")) @@ -90,6 +91,8 @@ class LoggingConfig(Config): def read_arguments(self, args): if args.verbose is not None: self.verbosity = args.verbose + if args.no_redirect_stdio is not None: + self.no_redirect_stdio = args.no_redirect_stdio if args.log_config is not None: self.log_config = args.log_config if args.log_file is not None: @@ -109,6 +112,11 @@ class LoggingConfig(Config): '--log-config', dest="log_config", default=None, help="Python logging config file" ) + logging_group.add_argument( + '-n', '--no-redirect-stdio', + action='store_true', default=None, + help="Do not redirect stdout/stderr to the log" + ) def generate_files(self, config): log_config = config.get("log_config") @@ -194,4 +202,7 @@ def setup_logging(config, use_worker_options=False): # # However this may not be too much of a problem if we are just writing to a file. observer = STDLibLogObserver() - globalLogBeginner.beginLoggingTo([observer]) + globalLogBeginner.beginLoggingTo( + [observer], + redirectStandardIO=not config.no_redirect_stdio, + ) From 8ffbe43ba11c925322af06f4d12b076754aeac56 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 10 Mar 2017 17:39:35 +0000 Subject: [PATCH 07/25] Get current state by using current_state_events table --- synapse/handlers/device.py | 2 +- synapse/handlers/room_list.py | 47 +++++++++++++++++++++-------------- synapse/push/mailer.py | 2 +- synapse/storage/events.py | 18 ++++++-------- synapse/storage/state.py | 14 ++++++++++- 5 files changed, 52 insertions(+), 31 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index e859b3165..9374c085d 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -262,7 +262,7 @@ class DeviceHandler(BaseHandler): # ordering: treat it the same as a new room event_ids = [] - current_state_ids = yield self.state.get_current_state_ids(room_id) + current_state_ids = yield self.store.get_current_state_ids(room_id) # special-case for an empty prev state: include all members # in the changed list diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 19eebbd43..6283caaf7 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -21,6 +21,7 @@ from synapse.api.constants import ( EventTypes, JoinRules, ) from synapse.util.async import concurrently_execute +from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.response_cache import ResponseCache from synapse.types import ThirdPartyInstanceID @@ -62,6 +63,10 @@ class RoomListHandler(BaseHandler): appservice and network id to use an appservice specific one. Setting to None returns all public rooms across all lists. """ + logger.info( + "Getting public room list: limit=%r, since=%r, search=%r, network=%r", + limit, since_token, bool(search_filter), network_tuple, + ) if search_filter: # We explicitly don't bother caching searches or requests for # appservice specific lists. @@ -91,7 +96,6 @@ class RoomListHandler(BaseHandler): rooms_to_order_value = {} rooms_to_num_joined = {} - rooms_to_latest_event_ids = {} newly_visible = [] newly_unpublished = [] @@ -116,12 +120,9 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def get_order_for_room(room_id): - latest_event_ids = rooms_to_latest_event_ids.get(room_id, None) - if not latest_event_ids: - latest_event_ids = yield self.store.get_forward_extremeties_for_room( - room_id, stream_token - ) - rooms_to_latest_event_ids[room_id] = latest_event_ids + latest_event_ids = yield self.store.get_forward_extremeties_for_room( + room_id, stream_token + ) if not latest_event_ids: return @@ -165,19 +166,19 @@ class RoomListHandler(BaseHandler): rooms_to_scan = rooms_to_scan[:since_token.current_limit] rooms_to_scan.reverse() - # Actually generate the entries. _generate_room_entry will append to + # Actually generate the entries. _append_room_entry_to_chunk will append to # chunk but will stop if len(chunk) > limit chunk = [] if limit and not search_filter: step = limit + 1 for i in xrange(0, len(rooms_to_scan), step): # We iterate here because the vast majority of cases we'll stop - # at first iteration, but occaisonally _generate_room_entry + # at first iteration, but occaisonally _append_room_entry_to_chunk # won't append to the chunk and so we need to loop again. # We don't want to scan over the entire range either as that # would potentially waste a lot of work. yield concurrently_execute( - lambda r: self._generate_room_entry( + lambda r: self._append_room_entry_to_chunk( r, rooms_to_num_joined[r], chunk, limit, search_filter ), @@ -187,7 +188,7 @@ class RoomListHandler(BaseHandler): break else: yield concurrently_execute( - lambda r: self._generate_room_entry( + lambda r: self._append_room_entry_to_chunk( r, rooms_to_num_joined[r], chunk, limit, search_filter ), @@ -256,21 +257,30 @@ class RoomListHandler(BaseHandler): defer.returnValue(results) @defer.inlineCallbacks - def _generate_room_entry(self, room_id, num_joined_users, chunk, limit, - search_filter): + def _append_room_entry_to_chunk(self, room_id, num_joined_users, chunk, limit, + search_filter): if limit and len(chunk) > limit + 1: # We've already got enough, so lets just drop it. return + result = yield self._generate_room_entry(room_id, num_joined_users) + + if result and _matches_room_entry(result, search_filter): + chunk.append(result) + + @cachedInlineCallbacks(num_args=1, cache_context=True) + def _generate_room_entry(self, room_id, num_joined_users, cache_context): result = { "room_id": room_id, "num_joined_members": num_joined_users, } - current_state_ids = yield self.state_handler.get_current_state_ids(room_id) + current_state_ids = yield self.store.get_current_state_ids( + room_id, on_invalidate=cache_context.invalidate, + ) event_map = yield self.store.get_events([ - event_id for key, event_id in current_state_ids.items() + event_id for key, event_id in current_state_ids.iteritems() if key[0] in ( EventTypes.JoinRules, EventTypes.Name, @@ -294,7 +304,9 @@ class RoomListHandler(BaseHandler): if join_rule and join_rule != JoinRules.PUBLIC: defer.returnValue(None) - aliases = yield self.store.get_aliases_for_room(room_id) + aliases = yield self.store.get_aliases_for_room( + room_id, on_invalidate=cache_context.invalidate + ) if aliases: result["aliases"] = aliases @@ -334,8 +346,7 @@ class RoomListHandler(BaseHandler): if avatar_url: result["avatar_url"] = avatar_url - if _matches_room_entry(result, search_filter): - chunk.append(result) + defer.returnValue(result) @defer.inlineCallbacks def get_remote_public_room_list(self, server_name, limit=None, since_token=None, diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 62d794f22..3a50c72e0 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -139,7 +139,7 @@ class Mailer(object): @defer.inlineCallbacks def _fetch_room_state(room_id): - room_state = yield self.state_handler.get_current_state_ids(room_id) + room_state = yield self.store.get_current_state_ids(room_id) state_by_room[room_id] = room_state # Run at most 3 of these at once: sync does 10 at a time but email diff --git a/synapse/storage/events.py b/synapse/storage/events.py index db01eb6d1..0039c281c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -442,14 +442,9 @@ class EventsStore(SQLBaseStore): else: return - existing_state_rows = yield self._simple_select_list( - table="current_state_events", - keyvalues={"room_id": room_id}, - retcols=["event_id", "type", "state_key"], - desc="_calculate_state_delta", - ) + existing_state = yield self.get_current_state_ids(room_id) - existing_events = set(row["event_id"] for row in existing_state_rows) + existing_events = set(existing_state.itervalues()) new_events = set(ev_id for ev_id in current_state.itervalues()) changed_events = existing_events ^ new_events @@ -457,9 +452,8 @@ class EventsStore(SQLBaseStore): return to_delete = { - (row["type"], row["state_key"]): row["event_id"] - for row in existing_state_rows - if row["event_id"] in changed_events + key: ev_id for key, ev_id in existing_state.iteritems() + if ev_id in changed_events } events_to_insert = (new_events - existing_events) to_insert = { @@ -585,6 +579,10 @@ class EventsStore(SQLBaseStore): txn, self.get_users_in_room, (room_id,) ) + self._invalidate_cache_and_stream( + txn, self.get_current_state_ids, (room_id,) + ) + for room_id, new_extrem in new_forward_extremeties.items(): self._simple_delete_txn( txn, diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 84482d828..27f1ec89e 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -14,7 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cached, cachedList +from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks from synapse.util.caches import intern_string from synapse.storage.engines import PostgresEngine @@ -69,6 +69,18 @@ class StateStore(SQLBaseStore): where_clause="type='m.room.member'", ) + @cachedInlineCallbacks(max_entries=100000, iterable=True) + def get_current_state_ids(self, room_id): + rows = yield self._simple_select_list( + table="current_state_events", + keyvalues={"room_id": room_id}, + retcols=["event_id", "type", "state_key"], + desc="_calculate_state_delta", + ) + defer.returnValue({ + (r["type"], r["state_key"]): r["event_id"] for r in rows + }) + @defer.inlineCallbacks def get_state_groups_ids(self, room_id, event_ids): if not event_ids: From 79926e016e98d4074aac4803d4d262dfd9c570c4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 13 Mar 2017 09:50:10 +0000 Subject: [PATCH 08/25] Assume rooms likely haven't changed --- synapse/handlers/room_list.py | 19 +++++++++++-------- synapse/storage/stream.py | 3 +++ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 6283caaf7..2f82c520c 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -120,16 +120,19 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def get_order_for_room(room_id): - latest_event_ids = yield self.store.get_forward_extremeties_for_room( - room_id, stream_token - ) + joined_users = yield self.store.get_users_in_room(room_id) + if self.store.has_room_changed_since(room_id, stream_token): + latest_event_ids = yield self.store.get_forward_extremeties_for_room( + room_id, stream_token + ) - if not latest_event_ids: - return + if not latest_event_ids: + return + + joined_users = yield self.state_handler.get_current_user_in_room( + room_id, latest_event_ids, + ) - joined_users = yield self.state_handler.get_current_user_in_room( - room_id, latest_event_ids, - ) num_joined_users = len(joined_users) rooms_to_num_joined[room_id] = num_joined_users diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 200d12463..dddd5fc0e 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -829,3 +829,6 @@ class StreamStore(SQLBaseStore): updatevalues={"stream_id": stream_id}, desc="update_federation_out_pos", ) + + def has_room_changed_since(self, room_id, stream_id): + return self._events_stream_cache.has_entity_changed(room_id, stream_id) From 0162994983f2af89b7eed0c2150353f8c5114f1b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 13 Mar 2017 11:53:26 +0000 Subject: [PATCH 09/25] Comments --- synapse/handlers/room_list.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 2f82c520c..516cd9a6a 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -120,6 +120,13 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def get_order_for_room(room_id): + # Most of the rooms won't have changed between the since token and + # now (especially if the since token is "now"). So, we can ask what + # the current users are in a room (that will hit a cache) and then + # check if the room has changed since the since token. (We have to + # do it in that order to avoid races). + # If things have changed then fall back to getting the current state + # at the since token. joined_users = yield self.store.get_users_in_room(room_id) if self.store.has_room_changed_since(room_id, stream_token): latest_event_ids = yield self.store.get_forward_extremeties_for_room( @@ -262,6 +269,9 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def _append_room_entry_to_chunk(self, room_id, num_joined_users, chunk, limit, search_filter): + """Generate the entry for a room in the public room list and append it + to the `chunk` if it matches the search filter + """ if limit and len(chunk) > limit + 1: # We've already got enough, so lets just drop it. return @@ -273,6 +283,8 @@ class RoomListHandler(BaseHandler): @cachedInlineCallbacks(num_args=1, cache_context=True) def _generate_room_entry(self, room_id, num_joined_users, cache_context): + """Returns the entry for a room + """ result = { "room_id": room_id, "num_joined_members": num_joined_users, From 6037a9804ca730b0dcdf21c7fbfd0f75ecad7e6a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 10 Mar 2017 15:23:20 +0000 Subject: [PATCH 10/25] Add helpful texts to logger config options --- synapse/config/logger.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/synapse/config/logger.py b/synapse/config/logger.py index e5d945e5b..2dbeafa9d 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -45,7 +45,6 @@ handlers: maxBytes: 104857600 backupCount: 10 filters: [context] - level: INFO console: class: logging.StreamHandler formatter: precise @@ -56,6 +55,8 @@ loggers: level: INFO synapse.storage.SQL: + # beware: increasing this to DEBUG will make synapse log sensitive + # information such as access tokens. level: INFO root: @@ -78,10 +79,10 @@ class LoggingConfig(Config): os.path.join(config_dir_path, server_name + ".log.config") ) return """ - # Logging verbosity level. + # Logging verbosity level. Ignored if log_config is specified. verbose: 0 - # File to write logging to + # File to write logging to. Ignored if log_config is specified. log_file: "%(log_file)s" # A yaml python logging config file @@ -102,11 +103,12 @@ class LoggingConfig(Config): logging_group = parser.add_argument_group("logging") logging_group.add_argument( '-v', '--verbose', dest="verbose", action='count', - help="The verbosity level." + help="The verbosity level. Specify multiple times to increase " + "verbosity. (Ignored if --log-config is specified.)" ) logging_group.add_argument( '-f', '--log-file', dest="log_file", - help="File to log to." + help="File to log to. (Ignored if --log-config is specified.)" ) logging_group.add_argument( '--log-config', dest="log_config", default=None, From 8d86d11fdf54b1aca902d128ca89a8614e8c9a27 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 13 Mar 2017 12:39:09 +0000 Subject: [PATCH 11/25] Bring example log config into line with default --- contrib/example_log_config.yaml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/contrib/example_log_config.yaml b/contrib/example_log_config.yaml index 7f7c8ba58..c7aa68abf 100644 --- a/contrib/example_log_config.yaml +++ b/contrib/example_log_config.yaml @@ -39,9 +39,11 @@ loggers: synapse: level: INFO - synapse.storage: + synapse.storage.SQL: + # beware: increasing this to DEBUG will make synapse log sensitive + # information such as access tokens. level: INFO - + # example of enabling debugging for a component: # # synapse.federation.transport.server: From 45c7f12d2a7243b4d273d8af1639c03f3136d2a8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 13 Mar 2017 16:24:54 +0000 Subject: [PATCH 12/25] Add new storage function to slave store --- synapse/replication/slave/storage/events.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 622b2d854..518c9ea2e 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -109,6 +109,10 @@ class SlavedEventStore(BaseSlavedStore): get_recent_event_ids_for_room = ( StreamStore.__dict__["get_recent_event_ids_for_room"] ) + get_current_state_ids = ( + StateStore.__dict__["get_current_state_ids"] + ) + has_room_changed_since = DataStore.has_room_changed_since.__func__ get_unread_push_actions_for_user_in_range_for_http = ( DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__ From 170ccc9de5c09a543a60a7d9eada2e02ba9c9980 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 13 Mar 2017 13:50:16 +0000 Subject: [PATCH 13/25] Fix routing loop when fetching remote media When we proxy a media request to a remote server, add a query-param, which will tell the remote server to 404 if it doesn't recognise the server_name. This should fix a routing loop where the server keeps forwarding back to itself. Also improves the error handling on remote media fetches, so that we don't always return a rather obscure 502. --- synapse/api/errors.py | 59 +++++++++++++++++++--- synapse/http/matrixfederationclient.py | 15 ++++-- synapse/rest/media/v1/download_resource.py | 12 +++++ synapse/rest/media/v1/media_repository.py | 30 +++++++++-- 4 files changed, 102 insertions(+), 14 deletions(-) diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 921c45773..014bd60b9 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -15,6 +15,7 @@ """Contains exceptions and error codes.""" +import json import logging logger = logging.getLogger(__name__) @@ -50,12 +51,15 @@ class Codes(object): class CodeMessageException(RuntimeError): - """An exception with integer code and message string attributes.""" + """An exception with integer code and message string attributes. - def __init__(self, code, msg): - super(CodeMessageException, self).__init__("%d: %s" % (code, msg)) + Attributes: + code (int): HTTP error code + response_code_message (str): HTTP reason phrase. None for the default. + """ + def __init__(self, code): + super(CodeMessageException, self).__init__("%d" % code) self.code = code - self.msg = msg self.response_code_message = None def error_dict(self): @@ -70,17 +74,44 @@ class SynapseError(CodeMessageException): Args: code (int): The integer error code (an HTTP response code) msg (str): The human-readable error message. - err (str): The error code e.g 'M_FORBIDDEN' + errcode (str): The synapse error code e.g 'M_FORBIDDEN' """ - super(SynapseError, self).__init__(code, msg) + super(SynapseError, self).__init__(code) + self.msg = msg self.errcode = errcode + def __str__(self): + return "%d: %s %s" % (self.code, self.errcode, self.msg) + def error_dict(self): return cs_error( self.msg, self.errcode, ) + @classmethod + def from_http_response_exception(cls, err): + """Make a SynapseError based on an HTTPResponseException + + Args: + err (HttpResponseException): + + Returns: + SynapseError: + """ + # try to parse the body as json, to get better errcode/msg, but + # default to M_UNKNOWN with the HTTP status as the error text + try: + j = json.loads(err.response) + except ValueError: + j = {} + errcode = j.get('errcode', Codes.UNKNOWN) + errmsg = j.get('error', err.response_code_message) + + res = SynapseError(err.code, errmsg, errcode) + res.response_code_message = err.response_code_message + return res + class RegistrationError(SynapseError): """An error raised when a registration event fails.""" @@ -243,6 +274,20 @@ class FederationError(RuntimeError): class HttpResponseException(CodeMessageException): + """ + Represents an HTTP-level failure of an outbound request + + Attributes: + response (str): body of response + """ def __init__(self, code, msg, response): + """ + + Args: + code (int): HTTP status code + msg (str): reason phrase from HTTP response status line + response (str): body of response + """ + super(HttpResponseException, self).__init__(code) + self.response_code_message = msg self.response = response - super(HttpResponseException, self).__init__(code, msg) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 78b92cef3..82586e3de 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -108,6 +108,12 @@ class MatrixFederationHttpClient(object): query_bytes=b"", retry_on_dns_fail=True, timeout=None, long_retries=False): """ Creates and sends a request to the given url + + Returns: + Deferred: resolves with the http response object on success. + + Fails with ``HTTPRequestException``: if we get an HTTP response + code >= 300. """ headers_dict[b"User-Agent"] = [self.version_string] headers_dict[b"Host"] = [destination] @@ -408,8 +414,11 @@ class MatrixFederationHttpClient(object): output_stream (file): File to write the response body to. args (dict): Optional dictionary used to create the query string. Returns: - A (int,dict) tuple of the file length and a dict of the response - headers. + Deferred: resolves with an (int,dict) tuple of the file length and + a dict of the response headers. + + Fails with ``HTTPRequestException`` if we get an HTTP response code + >= 300 """ encoded_args = {} @@ -419,7 +428,7 @@ class MatrixFederationHttpClient(object): encoded_args[k] = [v.encode("UTF-8") for v in vs] query_bytes = urllib.urlencode(encoded_args, True) - logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail) + logger.debug("Query bytes: %s Retry DNS: %s", query_bytes, retry_on_dns_fail) def body_callback(method, url_bytes, headers_dict): self.sign_request(destination, method, url_bytes, headers_dict) diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py index dfb87ffd1..6788375e8 100644 --- a/synapse/rest/media/v1/download_resource.py +++ b/synapse/rest/media/v1/download_resource.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import synapse.http.servlet from ._base import parse_media_id, respond_with_file, respond_404 from twisted.web.resource import Resource @@ -81,6 +82,17 @@ class DownloadResource(Resource): @defer.inlineCallbacks def _respond_remote_file(self, request, server_name, media_id, name): + # don't forward requests for remote media if allow_remote is false + allow_remote = synapse.http.servlet.parse_boolean( + request, "allow_remote", default=True) + if not allow_remote: + logger.info( + "Rejecting request for remote media %s/%s due to allow_remote", + server_name, media_id, + ) + respond_404(request) + return + media_info = yield self.media_repo.get_remote_media(server_name, media_id) media_type = media_info["media_type"] diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 481ffee20..66464cfe6 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -12,6 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import twisted.internet.error from .upload_resource import UploadResource from .download_resource import DownloadResource @@ -26,7 +27,8 @@ from .thumbnailer import Thumbnailer from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.util.stringutils import random_string -from synapse.api.errors import SynapseError +from synapse.api.errors import SynapseError, HttpResponseException, \ + NotFoundError from twisted.internet import defer, threads @@ -157,11 +159,31 @@ class MediaRepository(object): try: length, headers = yield self.client.get_file( server_name, request_path, output_stream=f, - max_size=self.max_upload_size, + max_size=self.max_upload_size, args={ + # tell the remote server to 404 if it doesn't + # recognise the server_name, to make sure we don't + # end up with a routing loop. + "allow_remote": "false", + } ) + except twisted.internet.error.DNSLookupError as e: + logger.warn("HTTP error fetching remote media %s/%s: %r", + server_name, media_id, e) + raise NotFoundError() + + except HttpResponseException as e: + logger.warn("HTTP error fetching remote media %s/%s: %s", + server_name, media_id, e.response) + raise SynapseError.from_http_response_exception(e) + except Exception as e: - logger.warn("Failed to fetch remoted media %r", e) - raise SynapseError(502, "Failed to fetch remoted media") + logger.warn("Failed to fetch remote media %s/%s", + server_name, media_id, + exc_info=True) + if isinstance(e, SynapseError): + raise e + else: + raise SynapseError(502, "Failed to fetch remote media") media_type = headers["Content-Type"][0] time_now_ms = self.clock.time_msec() From c81f33f73d37fdf5027a356b50cc6ab0f93da3d9 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Mon, 13 Mar 2017 16:33:51 +0000 Subject: [PATCH 14/25] Implement delete_devices API This implements the proposal here https://docs.google.com/document/d/1C-25Gqz3TXy2jIAoeOKxpNtmme0jI4g3yFGqv5GlAAk for deleting multiple devices at once in a single request. --- synapse/rest/client/v2_alpha/devices.py | 47 +++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/synapse/rest/client/v2_alpha/devices.py b/synapse/rest/client/v2_alpha/devices.py index a1feaf3d5..2560da141 100644 --- a/synapse/rest/client/v2_alpha/devices.py +++ b/synapse/rest/client/v2_alpha/devices.py @@ -45,6 +45,52 @@ class DevicesRestServlet(servlet.RestServlet): ) defer.returnValue((200, {"devices": devices})) +class DeleteDevicesRestServlet(servlet.RestServlet): + PATTERNS = client_v2_patterns("/delete_devices", releases=[], v2_alpha=False) + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + super(DeleteDevicesRestServlet, self).__init__() + self.hs = hs + self.auth = hs.get_auth() + self.device_handler = hs.get_device_handler() + self.auth_handler = hs.get_auth_handler() + + @defer.inlineCallbacks + def on_POST(self, request): + try: + body = servlet.parse_json_object_from_request(request) + + except errors.SynapseError as e: + if e.errcode == errors.Codes.NOT_JSON: + # deal with older clients which didn't pass a J*DELETESON dict + # the same as those that pass an empty dict + body = {} + else: + raise + + if 'devices' not in body: + raise errors.SynapseError( + 400, "No devices supplied", errcode=errors.Codes.MISSING_PARAM + ) + + authed, result, params, _ = yield self.auth_handler.check_auth([ + [constants.LoginType.PASSWORD], + ], body, self.hs.get_ip_from_request(request)) + + if not authed: + defer.returnValue((401, result)) + + requester = yield self.auth.get_user_by_req(request) + for d_id in body['devices']: + yield self.device_handler.delete_device( + requester.user.to_string(), + d_id, + ) + defer.returnValue((200, {})) class DeviceRestServlet(servlet.RestServlet): PATTERNS = client_v2_patterns("/devices/(?P[^/]*)$", @@ -111,5 +157,6 @@ class DeviceRestServlet(servlet.RestServlet): def register_servlets(hs, http_server): + DeleteDevicesRestServlet(hs).register(http_server) DevicesRestServlet(hs).register(http_server) DeviceRestServlet(hs).register(http_server) From c077c3277b30968933a394bf8c2675cb4f9bf671 Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Mon, 13 Mar 2017 16:45:38 +0000 Subject: [PATCH 15/25] Flake --- synapse/rest/client/v2_alpha/devices.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/rest/client/v2_alpha/devices.py b/synapse/rest/client/v2_alpha/devices.py index 2560da141..fd9516a60 100644 --- a/synapse/rest/client/v2_alpha/devices.py +++ b/synapse/rest/client/v2_alpha/devices.py @@ -45,6 +45,7 @@ class DevicesRestServlet(servlet.RestServlet): ) defer.returnValue((200, {"devices": devices})) + class DeleteDevicesRestServlet(servlet.RestServlet): PATTERNS = client_v2_patterns("/delete_devices", releases=[], v2_alpha=False) @@ -92,6 +93,7 @@ class DeleteDevicesRestServlet(servlet.RestServlet): ) defer.returnValue((200, {})) + class DeviceRestServlet(servlet.RestServlet): PATTERNS = client_v2_patterns("/devices/(?P[^/]*)$", releases=[], v2_alpha=False) From 73a5f06652c6966eead46eded1d68f6f3522b54a Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 13 Mar 2017 17:27:51 +0000 Subject: [PATCH 16/25] Support registration / login with phone number Changes from https://github.com/matrix-org/synapse/pull/1971 --- synapse/api/constants.py | 2 + synapse/handlers/auth.py | 32 ++++-- synapse/handlers/identity.py | 37 ++++++- synapse/http/servlet.py | 10 ++ synapse/python_dependencies.py | 2 + synapse/rest/client/v1/login.py | 88 +++++++++++++++-- synapse/rest/client/v2_alpha/account.py | 114 +++++++++++++++++---- synapse/rest/client/v2_alpha/register.py | 120 ++++++++++++++++++++--- synapse/util/msisdn.py | 40 ++++++++ 9 files changed, 395 insertions(+), 50 deletions(-) create mode 100644 synapse/util/msisdn.py diff --git a/synapse/api/constants.py b/synapse/api/constants.py index ca23c9c46..489efb7f8 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2017 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -44,6 +45,7 @@ class JoinRules(object): class LoginType(object): PASSWORD = u"m.login.password" EMAIL_IDENTITY = u"m.login.email.identity" + MSISDN = u"m.login.msisdn" RECAPTCHA = u"m.login.recaptcha" DUMMY = u"m.login.dummy" diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index fffba3438..e7a1bb724 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014 - 2016 OpenMarket Ltd +# Copyright 2017 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -47,6 +48,7 @@ class AuthHandler(BaseHandler): LoginType.PASSWORD: self._check_password_auth, LoginType.RECAPTCHA: self._check_recaptcha, LoginType.EMAIL_IDENTITY: self._check_email_identity, + LoginType.MSISDN: self._check_msisdn, LoginType.DUMMY: self._check_dummy_auth, } self.bcrypt_rounds = hs.config.bcrypt_rounds @@ -307,31 +309,47 @@ class AuthHandler(BaseHandler): defer.returnValue(True) raise LoginError(401, "", errcode=Codes.UNAUTHORIZED) - @defer.inlineCallbacks def _check_email_identity(self, authdict, _): + return self._check_threepid('email', authdict) + + def _check_msisdn(self, authdict, _): + return self._check_threepid('msisdn', authdict) + + @defer.inlineCallbacks + def _check_dummy_auth(self, authdict, _): + yield run_on_reactor() + defer.returnValue(True) + + @defer.inlineCallbacks + def _check_threepid(self, medium, authdict): yield run_on_reactor() if 'threepid_creds' not in authdict: raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM) threepid_creds = authdict['threepid_creds'] + identity_handler = self.hs.get_handlers().identity_handler - logger.info("Getting validated threepid. threepidcreds: %r" % (threepid_creds,)) + logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,)) threepid = yield identity_handler.threepid_from_creds(threepid_creds) if not threepid: raise LoginError(401, "", errcode=Codes.UNAUTHORIZED) + if threepid['medium'] != medium: + raise LoginError( + 401, + "Expecting threepid of type '%s', got '%s'" % ( + medium, threepid['medium'], + ), + errcode=Codes.UNAUTHORIZED + ) + threepid['threepid_creds'] = authdict['threepid_creds'] defer.returnValue(threepid) - @defer.inlineCallbacks - def _check_dummy_auth(self, authdict, _): - yield run_on_reactor() - defer.returnValue(True) - def _get_params_recaptcha(self): return {"public_key": self.hs.config.recaptcha_public_key} diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 559e5d5a7..6a53c5eb4 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2017 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -150,7 +151,7 @@ class IdentityHandler(BaseHandler): params.update(kwargs) try: - data = yield self.http_client.post_urlencoded_get_json( + data = yield self.http_client.post_json_get_json( "https://%s%s" % ( id_server, "/_matrix/identity/api/v1/validate/email/requestToken" @@ -161,3 +162,37 @@ class IdentityHandler(BaseHandler): except CodeMessageException as e: logger.info("Proxied requestToken failed: %r", e) raise e + + @defer.inlineCallbacks + def requestMsisdnToken( + self, id_server, country, phone_number, + client_secret, send_attempt, **kwargs + ): + yield run_on_reactor() + + if not self._should_trust_id_server(id_server): + raise SynapseError( + 400, "Untrusted ID server '%s'" % id_server, + Codes.SERVER_NOT_TRUSTED + ) + + params = { + 'country': country, + 'phone_number': phone_number, + 'client_secret': client_secret, + 'send_attempt': send_attempt, + } + params.update(kwargs) + + try: + data = yield self.http_client.post_json_get_json( + "https://%s%s" % ( + id_server, + "/_matrix/identity/api/v1/validate/msisdn/requestToken" + ), + params + ) + defer.returnValue(data) + except CodeMessageException as e: + logger.info("Proxied requestToken failed: %r", e) + raise e diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index 8c22d6f00..9a4c36ad5 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -192,6 +192,16 @@ def parse_json_object_from_request(request): return content +def assert_params_in_request(body, required): + absent = [] + for k in required: + if k not in body: + absent.append(k) + + if len(absent) > 0: + raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM) + + class RestServlet(object): """ A Synapse REST Servlet. diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 7817b0cd9..c4777b2a2 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -1,4 +1,5 @@ # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2017 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -37,6 +38,7 @@ REQUIREMENTS = { "pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"], "pymacaroons-pynacl": ["pymacaroons"], "msgpack-python>=0.3.0": ["msgpack"], + "phonenumbers>=8.2.0": ["phonenumbers"], } CONDITIONAL_REQUIREMENTS = { "web_client": { diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 72057f1b0..c4bbb7027 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -19,6 +19,7 @@ from synapse.api.errors import SynapseError, LoginError, Codes from synapse.types import UserID from synapse.http.server import finish_request from synapse.http.servlet import parse_json_object_from_request +from synapse.util.msisdn import phone_number_to_msisdn from .base import ClientV1RestServlet, client_path_patterns @@ -37,6 +38,49 @@ import xml.etree.ElementTree as ET logger = logging.getLogger(__name__) +def login_submission_legacy_convert(submission): + """ + If the input login submission is an old style object + (ie. with top-level user / medium / address) convert it + to a typed object. + """ + if "user" in submission: + submission["identifier"] = { + "type": "m.id.user", + "user": submission["user"], + } + del submission["user"] + + if "medium" in submission and "address" in submission: + submission["identifier"] = { + "type": "m.id.thirdparty", + "medium": submission["medium"], + "address": submission["address"], + } + del submission["medium"] + del submission["address"] + + +def login_id_thirdparty_from_phone(identifier): + """ + Convert a phone login identifier type to a generic threepid identifier + Args: + identifier(dict): Login identifier dict of type 'm.id.phone' + + Returns: Login identifier dict of type 'm.id.threepid' + """ + if "country" not in identifier or "number" not in identifier: + raise SynapseError(400, "Invalid phone-type identifier") + + msisdn = phone_number_to_msisdn(identifier["country"], identifier["number"]) + + return { + "type": "m.id.thirdparty", + "medium": "msisdn", + "address": msisdn, + } + + class LoginRestServlet(ClientV1RestServlet): PATTERNS = client_path_patterns("/login$") PASS_TYPE = "m.login.password" @@ -117,20 +161,52 @@ class LoginRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def do_password_login(self, login_submission): - if 'medium' in login_submission and 'address' in login_submission: - address = login_submission['address'] - if login_submission['medium'] == 'email': + if "password" not in login_submission: + raise SynapseError(400, "Missing parameter: password") + + login_submission_legacy_convert(login_submission) + + if "identifier" not in login_submission: + raise SynapseError(400, "Missing param: identifier") + + identifier = login_submission["identifier"] + if "type" not in identifier: + raise SynapseError(400, "Login identifier has no type") + + # convert phone type identifiers to generic threepids + if identifier["type"] == "m.id.phone": + identifier = login_id_thirdparty_from_phone(identifier) + + # convert threepid identifiers to user IDs + if identifier["type"] == "m.id.thirdparty": + if 'medium' not in identifier or 'address' not in identifier: + raise SynapseError(400, "Invalid thirdparty identifier") + + address = identifier['address'] + if identifier['medium'] == 'email': # For emails, transform the address to lowercase. # We store all email addreses as lowercase in the DB. # (See add_threepid in synapse/handlers/auth.py) address = address.lower() user_id = yield self.hs.get_datastore().get_user_id_by_threepid( - login_submission['medium'], address + identifier['medium'], address ) if not user_id: raise LoginError(403, "", errcode=Codes.FORBIDDEN) - else: - user_id = login_submission['user'] + + identifier = { + "type": "m.id.user", + "user": user_id, + } + + # by this point, the identifier should be an m.id.user: if it's anything + # else, we haven't understood it. + if identifier["type"] != "m.id.user": + raise SynapseError(400, "Unknown login identifier type") + if "user" not in identifier: + raise SynapseError(400, "User identifier is missing 'user' key") + + user_id = identifier["user"] if not user_id.startswith('@'): user_id = UserID.create( diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 398e7f5eb..aac76edf1 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2017 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,8 +18,11 @@ from twisted.internet import defer from synapse.api.constants import LoginType from synapse.api.errors import LoginError, SynapseError, Codes -from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.http.servlet import ( + RestServlet, parse_json_object_from_request, assert_params_in_request +) from synapse.util.async import run_on_reactor +from synapse.util.msisdn import phone_number_to_msisdn from ._base import client_v2_patterns @@ -28,11 +32,11 @@ import logging logger = logging.getLogger(__name__) -class PasswordRequestTokenRestServlet(RestServlet): +class EmailPasswordRequestTokenRestServlet(RestServlet): PATTERNS = client_v2_patterns("/account/password/email/requestToken$") def __init__(self, hs): - super(PasswordRequestTokenRestServlet, self).__init__() + super(EmailPasswordRequestTokenRestServlet, self).__init__() self.hs = hs self.identity_handler = hs.get_handlers().identity_handler @@ -40,14 +44,9 @@ class PasswordRequestTokenRestServlet(RestServlet): def on_POST(self, request): body = parse_json_object_from_request(request) - required = ['id_server', 'client_secret', 'email', 'send_attempt'] - absent = [] - for k in required: - if k not in body: - absent.append(k) - - if absent: - raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM) + assert_params_in_request(body, [ + 'id_server', 'client_secret', 'email', 'send_attempt' + ]) existingUid = yield self.hs.get_datastore().get_user_id_by_threepid( 'email', body['email'] @@ -60,6 +59,37 @@ class PasswordRequestTokenRestServlet(RestServlet): defer.returnValue((200, ret)) +class MsisdnPasswordRequestTokenRestServlet(RestServlet): + PATTERNS = client_v2_patterns("/account/password/msisdn/requestToken$") + + def __init__(self, hs): + super(MsisdnPasswordRequestTokenRestServlet, self).__init__() + self.hs = hs + self.datastore = self.hs.get_datastore() + self.identity_handler = hs.get_handlers().identity_handler + + @defer.inlineCallbacks + def on_POST(self, request): + body = parse_json_object_from_request(request) + + assert_params_in_request(body, [ + 'id_server', 'client_secret', + 'country', 'phone_number', 'send_attempt', + ]) + + msisdn = phone_number_to_msisdn(body['country'], body['phone_number']) + + existingUid = yield self.datastore.get_user_id_by_threepid( + 'msisdn', msisdn + ) + + if existingUid is None: + raise SynapseError(400, "MSISDN not found", Codes.THREEPID_NOT_FOUND) + + ret = yield self.identity_handler.requestMsisdnToken(**body) + defer.returnValue((200, ret)) + + class PasswordRestServlet(RestServlet): PATTERNS = client_v2_patterns("/account/password$") @@ -68,6 +98,7 @@ class PasswordRestServlet(RestServlet): self.hs = hs self.auth = hs.get_auth() self.auth_handler = hs.get_auth_handler() + self.datastore = self.hs.get_datastore() @defer.inlineCallbacks def on_POST(self, request): @@ -77,7 +108,8 @@ class PasswordRestServlet(RestServlet): authed, result, params, _ = yield self.auth_handler.check_auth([ [LoginType.PASSWORD], - [LoginType.EMAIL_IDENTITY] + [LoginType.EMAIL_IDENTITY], + [LoginType.MSISDN], ], body, self.hs.get_ip_from_request(request)) if not authed: @@ -102,7 +134,7 @@ class PasswordRestServlet(RestServlet): # (See add_threepid in synapse/handlers/auth.py) threepid['address'] = threepid['address'].lower() # if using email, we must know about the email they're authing with! - threepid_user_id = yield self.hs.get_datastore().get_user_id_by_threepid( + threepid_user_id = yield self.datastore.get_user_id_by_threepid( threepid['medium'], threepid['address'] ) if not threepid_user_id: @@ -169,13 +201,14 @@ class DeactivateAccountRestServlet(RestServlet): defer.returnValue((200, {})) -class ThreepidRequestTokenRestServlet(RestServlet): +class EmailThreepidRequestTokenRestServlet(RestServlet): PATTERNS = client_v2_patterns("/account/3pid/email/requestToken$") def __init__(self, hs): self.hs = hs - super(ThreepidRequestTokenRestServlet, self).__init__() + super(EmailThreepidRequestTokenRestServlet, self).__init__() self.identity_handler = hs.get_handlers().identity_handler + self.datastore = self.hs.get_datastore() @defer.inlineCallbacks def on_POST(self, request): @@ -190,7 +223,7 @@ class ThreepidRequestTokenRestServlet(RestServlet): if absent: raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM) - existingUid = yield self.hs.get_datastore().get_user_id_by_threepid( + existingUid = yield self.datastore.get_user_id_by_threepid( 'email', body['email'] ) @@ -201,6 +234,44 @@ class ThreepidRequestTokenRestServlet(RestServlet): defer.returnValue((200, ret)) +class MsisdnThreepidRequestTokenRestServlet(RestServlet): + PATTERNS = client_v2_patterns("/account/3pid/msisdn/requestToken$") + + def __init__(self, hs): + self.hs = hs + super(MsisdnThreepidRequestTokenRestServlet, self).__init__() + self.identity_handler = hs.get_handlers().identity_handler + self.datastore = self.hs.get_datastore() + + @defer.inlineCallbacks + def on_POST(self, request): + body = parse_json_object_from_request(request) + + required = [ + 'id_server', 'client_secret', + 'country', 'phone_number', 'send_attempt', + ] + absent = [] + for k in required: + if k not in body: + absent.append(k) + + if absent: + raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM) + + msisdn = phone_number_to_msisdn(body['country'], body['phone_number']) + + existingUid = yield self.datastore.get_user_id_by_threepid( + 'msisdn', msisdn + ) + + if existingUid is not None: + raise SynapseError(400, "MSISDN is already in use", Codes.THREEPID_IN_USE) + + ret = yield self.identity_handler.requestEmailToken(**body) + defer.returnValue((200, ret)) + + class ThreepidRestServlet(RestServlet): PATTERNS = client_v2_patterns("/account/3pid$") @@ -210,6 +281,7 @@ class ThreepidRestServlet(RestServlet): self.identity_handler = hs.get_handlers().identity_handler self.auth = hs.get_auth() self.auth_handler = hs.get_auth_handler() + self.datastore = self.hs.get_datastore() @defer.inlineCallbacks def on_GET(self, request): @@ -217,7 +289,7 @@ class ThreepidRestServlet(RestServlet): requester = yield self.auth.get_user_by_req(request) - threepids = yield self.hs.get_datastore().user_get_threepids( + threepids = yield self.datastore.user_get_threepids( requester.user.to_string() ) @@ -258,7 +330,7 @@ class ThreepidRestServlet(RestServlet): if 'bind' in body and body['bind']: logger.debug( - "Binding emails %s to %s", + "Binding threepid %s to %s", threepid, user_id ) yield self.identity_handler.bind_threepid( @@ -302,9 +374,11 @@ class ThreepidDeleteRestServlet(RestServlet): def register_servlets(hs, http_server): - PasswordRequestTokenRestServlet(hs).register(http_server) + EmailPasswordRequestTokenRestServlet(hs).register(http_server) + MsisdnPasswordRequestTokenRestServlet(hs).register(http_server) PasswordRestServlet(hs).register(http_server) DeactivateAccountRestServlet(hs).register(http_server) - ThreepidRequestTokenRestServlet(hs).register(http_server) + EmailThreepidRequestTokenRestServlet(hs).register(http_server) + MsisdnThreepidRequestTokenRestServlet(hs).register(http_server) ThreepidRestServlet(hs).register(http_server) ThreepidDeleteRestServlet(hs).register(http_server) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index ccca5a12d..7448c1346 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015 - 2016 OpenMarket Ltd +# Copyright 2017 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,7 +20,10 @@ import synapse from synapse.api.auth import get_access_token_from_request, has_access_token from synapse.api.constants import LoginType from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError -from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.http.servlet import ( + RestServlet, parse_json_object_from_request, assert_params_in_request +) +from synapse.util.msisdn import phone_number_to_msisdn from ._base import client_v2_patterns @@ -43,7 +47,7 @@ else: logger = logging.getLogger(__name__) -class RegisterRequestTokenRestServlet(RestServlet): +class EmailRegisterRequestTokenRestServlet(RestServlet): PATTERNS = client_v2_patterns("/register/email/requestToken$") def __init__(self, hs): @@ -51,7 +55,7 @@ class RegisterRequestTokenRestServlet(RestServlet): Args: hs (synapse.server.HomeServer): server """ - super(RegisterRequestTokenRestServlet, self).__init__() + super(EmailRegisterRequestTokenRestServlet, self).__init__() self.hs = hs self.identity_handler = hs.get_handlers().identity_handler @@ -59,14 +63,9 @@ class RegisterRequestTokenRestServlet(RestServlet): def on_POST(self, request): body = parse_json_object_from_request(request) - required = ['id_server', 'client_secret', 'email', 'send_attempt'] - absent = [] - for k in required: - if k not in body: - absent.append(k) - - if len(absent) > 0: - raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM) + assert_params_in_request(body, [ + 'id_server', 'client_secret', 'email', 'send_attempt' + ]) existingUid = yield self.hs.get_datastore().get_user_id_by_threepid( 'email', body['email'] @@ -79,6 +78,43 @@ class RegisterRequestTokenRestServlet(RestServlet): defer.returnValue((200, ret)) +class MsisdnRegisterRequestTokenRestServlet(RestServlet): + PATTERNS = client_v2_patterns("/register/msisdn/requestToken$") + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + super(MsisdnRegisterRequestTokenRestServlet, self).__init__() + self.hs = hs + self.identity_handler = hs.get_handlers().identity_handler + + @defer.inlineCallbacks + def on_POST(self, request): + body = parse_json_object_from_request(request) + + assert_params_in_request(body, [ + 'id_server', 'client_secret', + 'country', 'phone_number', + 'send_attempt', + ]) + + msisdn = phone_number_to_msisdn(body['country'], body['phone_number']) + + existingUid = yield self.hs.get_datastore().get_user_id_by_threepid( + 'msisdn', msisdn + ) + + if existingUid is not None: + raise SynapseError( + 400, "Phone number is already in use", Codes.THREEPID_IN_USE + ) + + ret = yield self.identity_handler.requestMsisdnToken(**body) + defer.returnValue((200, ret)) + + class RegisterRestServlet(RestServlet): PATTERNS = client_v2_patterns("/register$") @@ -203,12 +239,16 @@ class RegisterRestServlet(RestServlet): if self.hs.config.enable_registration_captcha: flows = [ [LoginType.RECAPTCHA], - [LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA] + [LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA], + [LoginType.MSISDN, LoginType.RECAPTCHA], + [LoginType.EMAIL_IDENTITY, LoginType.MSISDN, LoginType.RECAPTCHA], ] else: flows = [ [LoginType.DUMMY], - [LoginType.EMAIL_IDENTITY] + [LoginType.EMAIL_IDENTITY], + [LoginType.MSISDN], + [LoginType.EMAIL_IDENTITY, LoginType.MSISDN], ] authed, auth_result, params, session_id = yield self.auth_handler.check_auth( @@ -224,8 +264,9 @@ class RegisterRestServlet(RestServlet): "Already registered user ID %r for this session", registered_user_id ) - # don't re-register the email address + # don't re-register the threepids add_email = False + add_msisdn = False else: # NB: This may be from the auth handler and NOT from the POST if 'password' not in params: @@ -250,6 +291,7 @@ class RegisterRestServlet(RestServlet): ) add_email = True + add_msisdn = True return_dict = yield self._create_registration_details( registered_user_id, params @@ -262,6 +304,13 @@ class RegisterRestServlet(RestServlet): params.get("bind_email") ) + if add_msisdn and auth_result and LoginType.MSISDN in auth_result: + threepid = auth_result[LoginType.MSISDN] + yield self._register_msisdn_threepid( + registered_user_id, threepid, return_dict["access_token"], + params.get("bind_msisdn") + ) + defer.returnValue((200, return_dict)) def on_OPTIONS(self, _): @@ -323,8 +372,9 @@ class RegisterRestServlet(RestServlet): """ reqd = ('medium', 'address', 'validated_at') if any(x not in threepid for x in reqd): + # This will only happen if the ID server returns a malformed response logger.info("Can't add incomplete 3pid") - defer.returnValue() + return yield self.auth_handler.add_threepid( user_id, @@ -371,6 +421,43 @@ class RegisterRestServlet(RestServlet): else: logger.info("bind_email not specified: not binding email") + @defer.inlineCallbacks + def _register_msisdn_threepid(self, user_id, threepid, token, bind_msisdn): + """Add a phone number as a 3pid identifier + + Also optionally binds msisdn to the given user_id on the identity server + + Args: + user_id (str): id of user + threepid (object): m.login.msisdn auth response + token (str): access_token for the user + bind_email (bool): true if the client requested the email to be + bound at the identity server + Returns: + defer.Deferred: + """ + reqd = ('medium', 'address', 'validated_at') + if any(x not in threepid for x in reqd): + # This will only happen if the ID server returns a malformed response + logger.info("Can't add incomplete 3pid") + defer.returnValue() + + yield self.auth_handler.add_threepid( + user_id, + threepid['medium'], + threepid['address'], + threepid['validated_at'], + ) + + if bind_msisdn: + logger.info("bind_msisdn specified: binding") + logger.debug("Binding msisdn %s to %s", threepid, user_id) + yield self.identity_handler.bind_threepid( + threepid['threepid_creds'], user_id + ) + else: + logger.info("bind_msisdn not specified: not binding msisdn") + @defer.inlineCallbacks def _create_registration_details(self, user_id, params): """Complete registration of newly-registered user @@ -449,5 +536,6 @@ class RegisterRestServlet(RestServlet): def register_servlets(hs, http_server): - RegisterRequestTokenRestServlet(hs).register(http_server) + EmailRegisterRequestTokenRestServlet(hs).register(http_server) + MsisdnRegisterRequestTokenRestServlet(hs).register(http_server) RegisterRestServlet(hs).register(http_server) diff --git a/synapse/util/msisdn.py b/synapse/util/msisdn.py new file mode 100644 index 000000000..607161e7f --- /dev/null +++ b/synapse/util/msisdn.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import phonenumbers +from synapse.api.errors import SynapseError + + +def phone_number_to_msisdn(country, number): + """ + Takes an ISO-3166-1 2 letter country code and phone number and + returns an msisdn representing the canonical version of that + phone number. + Args: + country (str): ISO-3166-1 2 letter country code + number (str): Phone number in a national or international format + + Returns: + (str) The canonical form of the phone number, as an msisdn + Raises: + SynapseError if the number could not be parsed. + """ + try: + phoneNumber = phonenumbers.parse(number, country) + except phonenumbers.NumberParseException: + raise SynapseError(400, "Unable to parse phone number") + return phonenumbers.format_number( + phoneNumber, phonenumbers.PhoneNumberFormat.E164 + )[1:] From 0a9945220e3ac8720239df8b7269c183605a732c Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 13 Mar 2017 17:29:38 +0000 Subject: [PATCH 17/25] Fix registration for broken clients Only offer msisdn flows if the x_show_msisdn option is given. --- synapse/rest/client/v2_alpha/register.py | 26 ++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 7448c1346..d3a9b58f2 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -236,20 +236,38 @@ class RegisterRestServlet(RestServlet): assigned_user_id=registered_user_id, ) + # Only give msisdn flows if the x_show_msisdn flag is given: + # this is a hack to work around the fact that clients were shipped + # that use fallback registration if they see any flows that they don't + # recognise, which means we break registration for these clients if we + # advertise msisdn flows. Once usage of Riot iOS <=0.3.9 and Riot + # Android <=0.6.9 have fallen below an acceptable threshold, this + # parameter should go away and we should always advertise msisdn flows. + show_msisdn = False + print body + if 'x_show_msisdn' in body and body['x_show_msisdn']: + show_msisdn = True + if self.hs.config.enable_registration_captcha: flows = [ [LoginType.RECAPTCHA], [LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA], - [LoginType.MSISDN, LoginType.RECAPTCHA], - [LoginType.EMAIL_IDENTITY, LoginType.MSISDN, LoginType.RECAPTCHA], ] + if show_msisdn: + flows += [ + [LoginType.MSISDN, LoginType.RECAPTCHA], + [LoginType.MSISDN, LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA], + ] else: flows = [ [LoginType.DUMMY], [LoginType.EMAIL_IDENTITY], - [LoginType.MSISDN], - [LoginType.EMAIL_IDENTITY, LoginType.MSISDN], ] + if show_msisdn: + flows += [ + [LoginType.MSISDN], + [LoginType.MSISDN, LoginType.EMAIL_IDENTITY], + ] authed, auth_result, params, session_id = yield self.auth_handler.check_auth( flows, body, self.hs.get_ip_from_request(request) From bbeeb97f753e158e9aadd53aff78b076d756917c Mon Sep 17 00:00:00 2001 From: Luke Barnard Date: Mon, 13 Mar 2017 17:53:23 +0000 Subject: [PATCH 18/25] Implement _simple_delete_many_txn, use it to delete devices (But this doesn't implement the same for deleting access tokens or e2e keys. Also respond to code review. --- synapse/handlers/device.py | 34 ++++++++++++++++++++ synapse/rest/client/v2_alpha/devices.py | 20 ++++++------ synapse/storage/_base.py | 41 +++++++++++++++++++++++++ synapse/storage/devices.py | 17 ++++++++++ 4 files changed, 101 insertions(+), 11 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index e859b3165..efaa0c8d6 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -169,6 +169,40 @@ class DeviceHandler(BaseHandler): yield self.notify_device_update(user_id, [device_id]) + @defer.inlineCallbacks + def delete_devices(self, user_id, device_ids): + """ Delete several devices + + Args: + user_id (str): + device_ids (str): The list of device IDs to delete + + Returns: + defer.Deferred: + """ + + try: + yield self.store.delete_devices(user_id, device_ids) + except errors.StoreError, e: + if e.code == 404: + # no match + pass + else: + raise + + # Delete access tokens and e2e keys for each device. Not optimised as it is not + # considered as part of a critical path. + for device_id in device_ids: + yield self.store.user_delete_access_tokens( + user_id, device_id=device_id, + delete_refresh_tokens=True, + ) + yield self.store.delete_e2e_keys_by_device( + user_id=user_id, device_id=device_id + ) + + yield self.notify_device_update(user_id, device_ids) + @defer.inlineCallbacks def update_device(self, user_id, device_id, content): """ Update the given device diff --git a/synapse/rest/client/v2_alpha/devices.py b/synapse/rest/client/v2_alpha/devices.py index fd9516a60..b57ba95d2 100644 --- a/synapse/rest/client/v2_alpha/devices.py +++ b/synapse/rest/client/v2_alpha/devices.py @@ -47,13 +47,13 @@ class DevicesRestServlet(servlet.RestServlet): class DeleteDevicesRestServlet(servlet.RestServlet): + """ + API for bulk deletion of devices. Accepts a JSON object with a devices + key which lists the device_ids to delete. Requires user interactive auth. + """ PATTERNS = client_v2_patterns("/delete_devices", releases=[], v2_alpha=False) def __init__(self, hs): - """ - Args: - hs (synapse.server.HomeServer): server - """ super(DeleteDevicesRestServlet, self).__init__() self.hs = hs self.auth = hs.get_auth() @@ -64,14 +64,13 @@ class DeleteDevicesRestServlet(servlet.RestServlet): def on_POST(self, request): try: body = servlet.parse_json_object_from_request(request) - except errors.SynapseError as e: if e.errcode == errors.Codes.NOT_JSON: # deal with older clients which didn't pass a J*DELETESON dict # the same as those that pass an empty dict body = {} else: - raise + raise e if 'devices' not in body: raise errors.SynapseError( @@ -86,11 +85,10 @@ class DeleteDevicesRestServlet(servlet.RestServlet): defer.returnValue((401, result)) requester = yield self.auth.get_user_by_req(request) - for d_id in body['devices']: - yield self.device_handler.delete_device( - requester.user.to_string(), - d_id, - ) + yield self.device_handler.delete_devices( + requester.user.to_string(), + body['devices'], + ) defer.returnValue((200, {})) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index a7a8ec9b7..13b106bba 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -840,6 +840,47 @@ class SQLBaseStore(object): return txn.execute(sql, keyvalues.values()) + def _simple_delete_many(self, table, column, iterable, keyvalues, desc): + return self.runInteraction( + desc, self._simple_delete_many_txn, table, column, iterable, keyvalues + ) + + @staticmethod + def _simple_delete_many_txn(txn, table, column, iterable, keyvalues): + """Executes a DELETE query on the named table. + + Filters rows by if value of `column` is in `iterable`. + + Args: + txn : Transaction object + table : string giving the table name + column : column name to test for inclusion against `iterable` + iterable : list + keyvalues : dict of column names and values to select the rows with + """ + if not iterable: + return + + sql = "DELETE FROM %s" % table + + clauses = [] + values = [] + clauses.append( + "%s IN (%s)" % (column, ",".join("?" for _ in iterable)) + ) + values.extend(iterable) + + for key, value in keyvalues.items(): + clauses.append("%s = ?" % (key,)) + values.append(value) + + if clauses: + sql = "%s WHERE %s" % ( + sql, + " AND ".join(clauses), + ) + return txn.execute(sql, values) + def _get_cache_dict(self, db_conn, table, entity_column, stream_column, max_value, limit=100000): # Fetch a mapping of room_id -> max stream position for "recent" rooms. diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index bd56ba251..563071b7a 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -108,6 +108,23 @@ class DeviceStore(SQLBaseStore): desc="delete_device", ) + def delete_devices(self, user_id, device_ids): + """Deletes several devices. + + Args: + user_id (str): The ID of the user which owns the devices + device_ids (list): The IDs of the devices to delete + Returns: + defer.Deferred + """ + return self._simple_delete_many( + table="devices", + column="device_id", + iterable=device_ids, + keyvalues={"user_id": user_id}, + desc="delete_devices", + ) + def update_device(self, user_id, device_id, new_display_name=None): """Update a device. From d79a687d85ec15470e2313ee613ca8454a1bf8ec Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 14 Mar 2017 10:40:20 +0000 Subject: [PATCH 19/25] Oops, remove print --- synapse/rest/client/v2_alpha/register.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index d3a9b58f2..526b29c24 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -244,7 +244,6 @@ class RegisterRestServlet(RestServlet): # Android <=0.6.9 have fallen below an acceptable threshold, this # parameter should go away and we should always advertise msisdn flows. show_msisdn = False - print body if 'x_show_msisdn' in body and body['x_show_msisdn']: show_msisdn = True From 7b6ed9871e1f43a9fd809febfd191f3e03ef7212 Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 14 Mar 2017 10:49:55 +0000 Subject: [PATCH 20/25] Use extend instead of += --- synapse/rest/client/v2_alpha/register.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 526b29c24..dcd13b876 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -253,20 +253,20 @@ class RegisterRestServlet(RestServlet): [LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA], ] if show_msisdn: - flows += [ + flows.extend([ [LoginType.MSISDN, LoginType.RECAPTCHA], [LoginType.MSISDN, LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA], - ] + ]) else: flows = [ [LoginType.DUMMY], [LoginType.EMAIL_IDENTITY], ] if show_msisdn: - flows += [ + flows.extend([ [LoginType.MSISDN], [LoginType.MSISDN, LoginType.EMAIL_IDENTITY], - ] + ]) authed, auth_result, params, session_id = yield self.auth_handler.check_auth( flows, body, self.hs.get_ip_from_request(request) From cc7a294e2ec271126207922ea293e16d2a858943 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 Mar 2017 10:54:28 +0000 Subject: [PATCH 21/25] Fix current_state_events table to not lie If we try and persist two state events that have the same ancestor we calculate the wrong current state when persisting those events. --- synapse/storage/events.py | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index db01eb6d1..03881ea3d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -433,11 +433,36 @@ class EventsStore(SQLBaseStore): if not new_latest_event_ids: current_state = {} elif was_updated: + # We work out the current state by passing the state sets to the + # state resolution algorithm. It may ask for some events, including + # the events we have yet to persist, so we need a slightly more + # complicated event lookup function than simply looking the events + # up in the db. + events_map = {ev.event_id: ev for ev, _ in events_context} + + @defer.inlineCallbacks + def get_events(ev_ids): + # We get the events by first looking at the list of events we + # are trying to persist, and then fetching the rest from the DB. + db = [] + to_return = {} + for ev_id in ev_ids: + ev = events_map.get(ev_id, None) + if ev: + to_return[ev_id] = ev + else: + db.append(ev_id) + + if db: + evs = yield self.get_events( + ev_ids, get_prev_content=False, check_redacted=False, + ) + to_return.update(evs) + defer.returnValue(to_return) + current_state = yield resolve_events( state_sets, - state_map_factory=lambda ev_ids: self.get_events( - ev_ids, get_prev_content=False, check_redacted=False, - ), + state_map_factory=get_events, ) else: return From 7f237800e91c4640b79fceb645e07feb837ee4ef Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 14 Mar 2017 12:36:50 +0000 Subject: [PATCH 22/25] re-refactor exception heirarchy Give CodeMessageException back its `msg` attribute, and use that to hold the HTTP status message for HttpResponseException. --- synapse/api/errors.py | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 014bd60b9..d5391a80c 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -55,34 +55,35 @@ class CodeMessageException(RuntimeError): Attributes: code (int): HTTP error code - response_code_message (str): HTTP reason phrase. None for the default. + msg (str): string describing the error """ - def __init__(self, code): - super(CodeMessageException, self).__init__("%d" % code) + def __init__(self, code, msg): + super(CodeMessageException, self).__init__("%d: %s" % (code, msg)) self.code = code - self.response_code_message = None + self.msg = msg def error_dict(self): return cs_error(self.msg) class SynapseError(CodeMessageException): - """A base error which can be caught for all synapse events.""" + """A base exception type for matrix errors which have an errcode and error + message (as well as an HTTP status code). + + Attributes: + errcode (str): Matrix error code e.g 'M_FORBIDDEN' + """ def __init__(self, code, msg, errcode=Codes.UNKNOWN): """Constructs a synapse error. Args: code (int): The integer error code (an HTTP response code) msg (str): The human-readable error message. - errcode (str): The synapse error code e.g 'M_FORBIDDEN' + errcode (str): The matrix error code e.g 'M_FORBIDDEN' """ - super(SynapseError, self).__init__(code) - self.msg = msg + super(SynapseError, self).__init__(code, msg) self.errcode = errcode - def __str__(self): - return "%d: %s %s" % (self.code, self.errcode, self.msg) - def error_dict(self): return cs_error( self.msg, @@ -106,10 +107,9 @@ class SynapseError(CodeMessageException): except ValueError: j = {} errcode = j.get('errcode', Codes.UNKNOWN) - errmsg = j.get('error', err.response_code_message) + errmsg = j.get('error', err.msg) res = SynapseError(err.code, errmsg, errcode) - res.response_code_message = err.response_code_message return res @@ -204,7 +204,6 @@ class LimitExceededError(SynapseError): errcode=Codes.LIMIT_EXCEEDED): super(LimitExceededError, self).__init__(code, msg, errcode) self.retry_after_ms = retry_after_ms - self.response_code_message = "Too Many Requests" def error_dict(self): return cs_error( @@ -288,6 +287,5 @@ class HttpResponseException(CodeMessageException): msg (str): reason phrase from HTTP response status line response (str): body of response """ - super(HttpResponseException, self).__init__(code) - self.response_code_message = msg + super(HttpResponseException, self).__init__(code, msg) self.response = response From 1d09586599a495e01bfb6b887b1a59419673600a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 14 Mar 2017 13:36:06 +0000 Subject: [PATCH 23/25] Address review comments - don't blindly proxy all HTTPRequestExceptions - log unexpected exceptions at error - avoid `isinstance` - improve docs on `from_http_response_exception` --- synapse/api/errors.py | 19 ++++++++++---- synapse/rest/media/v1/media_repository.py | 30 ++++++++++++----------- 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/synapse/api/errors.py b/synapse/api/errors.py index d5391a80c..6fbd5d687 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -94,6 +94,17 @@ class SynapseError(CodeMessageException): def from_http_response_exception(cls, err): """Make a SynapseError based on an HTTPResponseException + This is useful when a proxied request has failed, and we need to + decide how to map the failure onto a matrix error to send back to the + client. + + An attempt is made to parse the body of the http response as a matrix + error. If that succeeds, the errcode and error message from the body + are used as the errcode and error message in the new synapse error. + + Otherwise, the errcode is set to M_UNKNOWN, and the error message is + set to the reason code from the HTTP response. + Args: err (HttpResponseException): @@ -137,13 +148,11 @@ class UnrecognizedRequestError(SynapseError): class NotFoundError(SynapseError): """An error indicating we can't find the thing you asked for""" - def __init__(self, *args, **kwargs): - if "errcode" not in kwargs: - kwargs["errcode"] = Codes.NOT_FOUND + def __init__(self, msg="Not found", errcode=Codes.NOT_FOUND): super(NotFoundError, self).__init__( 404, - "Not found", - **kwargs + msg, + errcode=errcode ) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 66464cfe6..c43b185e0 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -12,7 +12,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from twisted.internet import defer, threads import twisted.internet.error +import twisted.web.http +from twisted.web.resource import Resource from .upload_resource import UploadResource from .download_resource import DownloadResource @@ -20,9 +24,6 @@ from .thumbnail_resource import ThumbnailResource from .identicon_resource import IdenticonResource from .preview_url_resource import PreviewUrlResource from .filepath import MediaFilePaths - -from twisted.web.resource import Resource - from .thumbnailer import Thumbnailer from synapse.http.matrixfederationclient import MatrixFederationHttpClient @@ -30,8 +31,6 @@ from synapse.util.stringutils import random_string from synapse.api.errors import SynapseError, HttpResponseException, \ NotFoundError -from twisted.internet import defer, threads - from synapse.util.async import Linearizer from synapse.util.stringutils import is_ascii from synapse.util.logcontext import preserve_context_over_fn @@ -174,16 +173,19 @@ class MediaRepository(object): except HttpResponseException as e: logger.warn("HTTP error fetching remote media %s/%s: %s", server_name, media_id, e.response) - raise SynapseError.from_http_response_exception(e) + if e.code == twisted.web.http.NOT_FOUND: + raise SynapseError.from_http_response_exception(e) + raise SynapseError(502, "Failed to fetch remote media") - except Exception as e: - logger.warn("Failed to fetch remote media %s/%s", - server_name, media_id, - exc_info=True) - if isinstance(e, SynapseError): - raise e - else: - raise SynapseError(502, "Failed to fetch remote media") + except SynapseError: + logger.exception("Failed to fetch remote media %s/%s", + server_name, media_id) + raise + + except Exception: + logger.exception("Failed to fetch remote media %s/%s", + server_name, media_id) + raise SynapseError(502, "Failed to fetch remote media") media_type = headers["Content-Type"][0] time_now_ms = self.clock.time_msec() From fd2eef49c8a63e12f3a1f724a4543ae96aa55611 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 Mar 2017 15:03:46 +0000 Subject: [PATCH 24/25] Reduce spurious calls to generate sync --- synapse/notifier.py | 44 +++++++++++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 13 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 8051a7a84..6abb33bb3 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -73,6 +73,13 @@ class _NotifierUserStream(object): self.user_id = user_id self.rooms = set(rooms) self.current_token = current_token + + # The last token for which we should wake up any streams that have a + # token that comes before it. This gets updated everytime we get poked. + # We start it at the current token since if we get any streams + # that have a token from before we have no idea whether they should be + # woken up or not, so lets just wake them up. + self.last_notified_token = current_token self.last_notified_ms = time_now_ms with PreserveLoggingContext(): @@ -89,6 +96,7 @@ class _NotifierUserStream(object): self.current_token = self.current_token.copy_and_advance( stream_key, stream_id ) + self.last_notified_token = self.current_token self.last_notified_ms = time_now_ms noify_deferred = self.notify_deferred @@ -113,8 +121,14 @@ class _NotifierUserStream(object): def new_listener(self, token): """Returns a deferred that is resolved when there is a new token greater than the given token. + + Args: + token: The token from which we are streaming from, i.e. we shouldn't + notify for things that happened before this. """ - if self.current_token.is_after(token): + # Immediately wake up stream if something has already since happened + # since their last token. + if self.last_notified_token.is_after(token): return _NotificationListener(defer.succeed(self.current_token)) else: return _NotificationListener(self.notify_deferred.observe()) @@ -294,40 +308,44 @@ class Notifier(object): self._register_with_keys(user_stream) result = None + prev_token = from_token if timeout: end_time = self.clock.time_msec() + timeout - prev_token = from_token while not result: try: - current_token = user_stream.current_token - - result = yield callback(prev_token, current_token) - if result: - break - now = self.clock.time_msec() if end_time <= now: break # Now we wait for the _NotifierUserStream to be told there # is a new token. - # We need to supply the token we supplied to callback so - # that we don't miss any current_token updates. - prev_token = current_token listener = user_stream.new_listener(prev_token) with PreserveLoggingContext(): yield self.clock.time_bound_deferred( listener.deferred, time_out=(end_time - now) / 1000. ) + + current_token = user_stream.current_token + + result = yield callback(prev_token, current_token) + if result: + break + + # Update the prev_token to the current_token since nothing + # has happened between the old prev_token and the current_token + prev_token = current_token except DeferredTimedOutError: break except defer.CancelledError: break - else: + + if result is None: + # This happened if there was no timeout or if the timeout had + # already expired. current_token = user_stream.current_token - result = yield callback(from_token, current_token) + result = yield callback(prev_token, current_token) defer.returnValue(result) From ef328b2fc14bd85907f59542887d12cf1c5b3c8f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 14 Mar 2017 20:44:50 +0000 Subject: [PATCH 25/25] kick jenkins