From 081e5d55e68b1a55d4f52ef062084d9126ce2231 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Jul 2016 11:14:54 +0100 Subject: [PATCH 01/51] Send the correct host header when fetching keys --- synapse/crypto/keyclient.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py index 54b83da9d..4fca215c9 100644 --- a/synapse/crypto/keyclient.py +++ b/synapse/crypto/keyclient.py @@ -79,8 +79,7 @@ class SynapseKeyClientProtocol(HTTPClient): self.host = None def connectionMade(self): - self.host = self.transport.getHost() - logger.debug("Connected to %s", self.host) + logger.debug("Connected to %s", self.transport.getPeer()) self.sendCommand(b"GET", self.path) if self.host: self.sendHeader(b"Host", self.host) @@ -124,7 +123,10 @@ class SynapseKeyClientProtocol(HTTPClient): self.timer.cancel() def on_timeout(self): - logger.debug("Timeout waiting for response from %s", self.host) + logger.debug( + "Timeout waiting for response from %s: %s", + self.host, self.transport.getPeer(), + ) self.errback(IOError("Timeout waiting for response")) self.transport.abortConnection() @@ -133,4 +135,5 @@ class SynapseKeyClientFactory(Factory): def protocol(self): protocol = SynapseKeyClientProtocol() protocol.path = self.path + protocol.path = self.host return protocol From cf94a78872397fd97465b4704465a2d03d27d41e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Jul 2016 11:45:53 +0100 Subject: [PATCH 02/51] Set host not path --- synapse/crypto/keyclient.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py index 4fca215c9..1d8599036 100644 --- a/synapse/crypto/keyclient.py +++ b/synapse/crypto/keyclient.py @@ -135,5 +135,5 @@ class SynapseKeyClientFactory(Factory): def protocol(self): protocol = SynapseKeyClientProtocol() protocol.path = self.path - protocol.path = self.host + protocol.host = self.host return protocol From 55abbe1850efff95efe9935873b666e5fc4bf0e9 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 21 Jul 2016 15:55:13 +0100 Subject: [PATCH 03/51] make /devices return a list Turns out I specced this to return a list of devices rather than a dict of them --- synapse/handlers/device.py | 10 +++++----- tests/handlers/test_device.py | 11 +++++++---- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 110f5fbb5..1f9e15c33 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -79,17 +79,17 @@ class DeviceHandler(BaseHandler): Args: user_id (str): Returns: - defer.Deferred: dict[str, dict[str, X]]: map from device_id to - info on the device + defer.Deferred: list[dict[str, X]]: info on each device """ - devices = yield self.store.get_devices_by_user(user_id) + device_map = yield self.store.get_devices_by_user(user_id) ips = yield self.store.get_last_client_ip_by_device( - devices=((user_id, device_id) for device_id in devices.keys()) + devices=((user_id, device_id) for device_id in device_map.keys()) ) - for device in devices.values(): + devices = device_map.values() + for device in devices: _update_device_from_client_ips(device, ips) defer.returnValue(devices) diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index 87c3c75ae..331aa13fe 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -84,28 +84,31 @@ class DeviceTestCase(unittest.TestCase): yield self._record_users() res = yield self.handler.get_devices_by_user(user1) - self.assertEqual(3, len(res.keys())) + self.assertEqual(3, len(res)) + device_map = { + d["device_id"]: d for d in res + } self.assertDictContainsSubset({ "user_id": user1, "device_id": "xyz", "display_name": "display 0", "last_seen_ip": None, "last_seen_ts": None, - }, res["xyz"]) + }, device_map["xyz"]) self.assertDictContainsSubset({ "user_id": user1, "device_id": "fco", "display_name": "display 1", "last_seen_ip": "ip1", "last_seen_ts": 1000000, - }, res["fco"]) + }, device_map["fco"]) self.assertDictContainsSubset({ "user_id": user1, "device_id": "abc", "display_name": "display 2", "last_seen_ip": "ip3", "last_seen_ts": 3000000, - }, res["abc"]) + }, device_map["abc"]) @defer.inlineCallbacks def test_get_device(self): From d26b660aa6580b1947f04f7efd598d34a259b970 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Jul 2016 17:38:51 +0100 Subject: [PATCH 04/51] Cache getPeer --- synapse/crypto/keyclient.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py index 1d8599036..c2bd64d6c 100644 --- a/synapse/crypto/keyclient.py +++ b/synapse/crypto/keyclient.py @@ -77,9 +77,12 @@ class SynapseKeyClientProtocol(HTTPClient): def __init__(self): self.remote_key = defer.Deferred() self.host = None + self._peer = None def connectionMade(self): - logger.debug("Connected to %s", self.transport.getPeer()) + self._peer = self.transport.getPeer() + logger.debug("Connected to %s", self._peer) + self.sendCommand(b"GET", self.path) if self.host: self.sendHeader(b"Host", self.host) @@ -125,7 +128,7 @@ class SynapseKeyClientProtocol(HTTPClient): def on_timeout(self): logger.debug( "Timeout waiting for response from %s: %s", - self.host, self.transport.getPeer(), + self.host, self._peer, ) self.errback(IOError("Timeout waiting for response")) self.transport.abortConnection() From ec5717caf59eb72caf6f82f1643f492f328a4be5 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 22 Jul 2016 13:14:03 +0100 Subject: [PATCH 05/51] Create index on user_ips in the background user_ips is kinda big, so really we want to add the index in the background once we're running. Replace the schema delta with one which will do that. I've done this in a way that's reasonably easy to reuse as there a few other indexes I need, and I don't suppose they will be the last. --- synapse/storage/background_updates.py | 73 +++++++++++++++++-- synapse/storage/client_ips.py | 16 +++- .../schema/delta/33/user_ips_index.sql | 3 +- 3 files changed, 80 insertions(+), 12 deletions(-) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 66a995157..75951d017 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -14,6 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore +from . import engines from twisted.internet import defer @@ -106,13 +107,13 @@ class BackgroundUpdateStore(SQLBaseStore): ) except: logger.exception("Error doing update") - - if result is None: - logger.info( - "No more background updates to do." - " Unscheduling background update task." - ) - return + else: + if result is None: + logger.info( + "No more background updates to do." + " Unscheduling background update task." + ) + return @defer.inlineCallbacks def do_background_update(self, desired_duration_ms): @@ -202,6 +203,64 @@ class BackgroundUpdateStore(SQLBaseStore): """ self._background_update_handlers[update_name] = update_handler + def register_background_index_update(self, update_name, index_name, + table, columns): + """Helper for store classes to do a background index addition + + To use: + + 1. use a schema delta file to add a background update. Example: + INSERT INTO background_updates (update_name, progress_json) VALUES + ('my_new_index', '{}'); + + 2. In the Store constructor, call this method + + Args: + update_name (str): update_name to register for + index_name (str): name of index to add + table (str): table to add index to + columns (list[str]): columns/expressions to include in index + """ + + # if this is postgres, we add the indexes concurrently. Otherwise + # we fall back to doing it inline + if isinstance(self.database_engine, engines.PostgresEngine): + conc = True + else: + conc = False + + sql = "CREATE INDEX %(conc)s %(name)s ON %(table)s (%(columns)s)" \ + % { + "conc": "CONCURRENTLY" if conc else "", + "name": index_name, + "table": table, + "columns": ", ".join(columns), + } + + def create_index_concurrently(conn): + conn.rollback() + # postgres insists on autocommit for the index + conn.set_session(autocommit=True) + c = conn.cursor() + c.execute(sql) + conn.set_session(autocommit=False) + + def create_index(conn): + c = conn.cursor() + c.execute(sql) + + @defer.inlineCallbacks + def updater(progress, batch_size): + logger.info("Adding index %s to %s", index_name, table) + if conc: + yield self.runWithConnection(create_index_concurrently) + else: + yield self.runWithConnection(create_index) + yield self._end_background_update(update_name) + defer.returnValue(1) + + self.register_background_update_handler(update_name, updater) + def start_background_update(self, update_name, progress): """Starts a background update running. diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index e31fa53c3..20eb9ac15 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -15,10 +15,11 @@ import logging -from ._base import SQLBaseStore, Cache - from twisted.internet import defer +from ._base import Cache +from . import background_updates + logger = logging.getLogger(__name__) # Number of msec of granularity to store the user IP 'last seen' time. Smaller @@ -27,8 +28,7 @@ logger = logging.getLogger(__name__) LAST_SEEN_GRANULARITY = 120 * 1000 -class ClientIpStore(SQLBaseStore): - +class ClientIpStore(background_updates.BackgroundUpdateStore): def __init__(self, hs): self.client_ip_last_seen = Cache( name="client_ip_last_seen", @@ -37,6 +37,14 @@ class ClientIpStore(SQLBaseStore): super(ClientIpStore, self).__init__(hs) + self.register_background_index_update( + "user_ips_device_index", + index_name="user_ips_device_id", + table="user_ips", + columns=["user_id", "device_id", "last_seen"], + ) + + @defer.inlineCallbacks def insert_client_ip(self, user, access_token, ip, user_agent, device_id): now = int(self._clock.time_msec()) diff --git a/synapse/storage/schema/delta/33/user_ips_index.sql b/synapse/storage/schema/delta/33/user_ips_index.sql index 8a05677d4..473f75a78 100644 --- a/synapse/storage/schema/delta/33/user_ips_index.sql +++ b/synapse/storage/schema/delta/33/user_ips_index.sql @@ -13,4 +13,5 @@ * limitations under the License. */ -CREATE INDEX user_ips_device_id ON user_ips(user_id, device_id, last_seen); +INSERT INTO background_updates (update_name, progress_json) VALUES + ('user_ips_device_index', '{}'); From 363786845b728bcd7146b3d949a86021a96eb2d2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 22 Jul 2016 13:21:07 +0100 Subject: [PATCH 06/51] PEP8 --- synapse/storage/client_ips.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 20eb9ac15..71e5ea112 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -44,7 +44,6 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): columns=["user_id", "device_id", "last_seen"], ) - @defer.inlineCallbacks def insert_client_ip(self, user, access_token, ip, user_agent, device_id): now = int(self._clock.time_msec()) From dad2da7e54a4f0e92185e4f8553fb51b037c0bd3 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 22 Jul 2016 17:00:56 +0100 Subject: [PATCH 07/51] Log the hostname the reCAPTCHA was completed on This could be useful information to have in the logs. Also comment about how & why we don't verify the hostname. --- synapse/handlers/auth.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 8f83923dd..6fff7e7d0 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -279,8 +279,17 @@ class AuthHandler(BaseHandler): data = pde.response resp_body = simplejson.loads(data) - if 'success' in resp_body and resp_body['success']: - defer.returnValue(True) + if 'success' in resp_body: + # Note that we do NOT check the hostname here: we explicitly + # intend the CAPTCHA to be presented by whatever client the + # user is using, we just care that they have completed a CAPTCHA. + logger.info( + "%s reCAPTCHA from hostname %s", + "Successful" if resp_body['success'] else "Failed", + resp_body['hostname'] + ) + if resp_body['success']: + defer.returnValue(True) raise LoginError(401, "", errcode=Codes.UNAUTHORIZED) @defer.inlineCallbacks From 7ed58bb3476c4a18a9af97b8ee3358dac00098eb Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 22 Jul 2016 17:18:50 +0100 Subject: [PATCH 08/51] Use get to avoid KeyErrors --- synapse/handlers/auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 6fff7e7d0..d5d207243 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -286,7 +286,7 @@ class AuthHandler(BaseHandler): logger.info( "%s reCAPTCHA from hostname %s", "Successful" if resp_body['success'] else "Failed", - resp_body['hostname'] + resp_body.get('hostname') ) if resp_body['success']: defer.returnValue(True) From 465117d7ca40ba9906697aa023897798f7833830 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 25 Jul 2016 12:10:42 +0100 Subject: [PATCH 09/51] Fix background_update tests A bit of a cleanup for background_updates, and make sure that the real background updates have run before we start the unit tests, so that they don't interfere with the tests. --- synapse/storage/background_updates.py | 27 ++++++++++++++++++------- tests/storage/test_background_update.py | 22 ++++++++++++++------ 2 files changed, 36 insertions(+), 13 deletions(-) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 75951d017..2771f7c3c 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -88,10 +88,12 @@ class BackgroundUpdateStore(SQLBaseStore): @defer.inlineCallbacks def start_doing_background_updates(self): - while True: - if self._background_update_timer is not None: - return + assert(self._background_update_timer is not None, + "background updates already running") + logger.info("Starting background schema updates") + + while True: sleep = defer.Deferred() self._background_update_timer = self._clock.call_later( self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None @@ -102,7 +104,7 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_timer = None try: - result = yield self.do_background_update( + result = yield self.do_next_background_update( self.BACKGROUND_UPDATE_DURATION_MS ) except: @@ -113,11 +115,12 @@ class BackgroundUpdateStore(SQLBaseStore): "No more background updates to do." " Unscheduling background update task." ) - return + defer.returnValue() @defer.inlineCallbacks - def do_background_update(self, desired_duration_ms): - """Does some amount of work on a background update + def do_next_background_update(self, desired_duration_ms): + """Does some amount of work on the next queued background update + Args: desired_duration_ms(float): How long we want to spend updating. @@ -136,11 +139,21 @@ class BackgroundUpdateStore(SQLBaseStore): self._background_update_queue.append(update['update_name']) if not self._background_update_queue: + # no work left to do defer.returnValue(None) + # pop from the front, and add back to the back update_name = self._background_update_queue.pop(0) self._background_update_queue.append(update_name) + res = yield self._do_background_update(update_name, desired_duration_ms) + defer.returnValue(res) + + @defer.inlineCallbacks + def _do_background_update(self, update_name, desired_duration_ms): + logger.info("Starting update batch on background update '%s'", + update_name) + update_handler = self._background_update_handlers[update_name] performance = self._background_update_performance.get(update_name) diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py index 6e4d9b137..4944cb0d2 100644 --- a/tests/storage/test_background_update.py +++ b/tests/storage/test_background_update.py @@ -10,7 +10,7 @@ class BackgroundUpdateTestCase(unittest.TestCase): @defer.inlineCallbacks def setUp(self): - hs = yield setup_test_homeserver() + hs = yield setup_test_homeserver() # type: synapse.server.HomeServer self.store = hs.get_datastore() self.clock = hs.get_clock() @@ -20,11 +20,20 @@ class BackgroundUpdateTestCase(unittest.TestCase): "test_update", self.update_handler ) + # run the real background updates, to get them out the way + # (perhaps we should run them as part of the test HS setup, since we + # run all of the other schema setup stuff there?) + while True: + res = yield self.store.do_next_background_update(1000) + if res is None: + break + @defer.inlineCallbacks def test_do_background_update(self): desired_count = 1000 duration_ms = 42 + # first step: make a bit of progress @defer.inlineCallbacks def update(progress, count): self.clock.advance_time_msec(count * duration_ms) @@ -42,7 +51,7 @@ class BackgroundUpdateTestCase(unittest.TestCase): yield self.store.start_background_update("test_update", {"my_key": 1}) self.update_handler.reset_mock() - result = yield self.store.do_background_update( + result = yield self.store.do_next_background_update( duration_ms * desired_count ) self.assertIsNotNone(result) @@ -50,24 +59,25 @@ class BackgroundUpdateTestCase(unittest.TestCase): {"my_key": 1}, self.store.DEFAULT_BACKGROUND_BATCH_SIZE ) + # second step: complete the update @defer.inlineCallbacks def update(progress, count): yield self.store._end_background_update("test_update") defer.returnValue(count) self.update_handler.side_effect = update - self.update_handler.reset_mock() - result = yield self.store.do_background_update( - duration_ms * desired_count + result = yield self.store.do_next_background_update( + duration_ms * desired_count ) self.assertIsNotNone(result) self.update_handler.assert_called_once_with( {"my_key": 2}, desired_count ) + # third step: we don't expect to be called any more self.update_handler.reset_mock() - result = yield self.store.do_background_update( + result = yield self.store.do_next_background_update( duration_ms * desired_count ) self.assertIsNone(result) From f16f0e169d30e6920b892ee772693199b16713fd Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 25 Jul 2016 12:12:47 +0100 Subject: [PATCH 10/51] Slightly saner logging for unittests 1. Give the handler used for logging in unit tests a formatter, so that the output is slightly more meaningful 2. Log some synapse.storage stuff, because it's useful. --- tests/unittest.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/unittest.py b/tests/unittest.py index 5b22abfe7..38715972d 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -17,13 +17,18 @@ from twisted.trial import unittest import logging - # logging doesn't have a "don't log anything at all EVARRRR setting, # but since the highest value is 50, 1000000 should do ;) NEVER = 1000000 -logging.getLogger().addHandler(logging.StreamHandler()) +handler = logging.StreamHandler() +handler.setFormatter(logging.Formatter( + "%(levelname)s:%(name)s:%(message)s [%(pathname)s:%(lineno)d]" +)) +logging.getLogger().addHandler(handler) logging.getLogger().setLevel(NEVER) +logging.getLogger("synapse.storage.SQL").setLevel(NEVER) +logging.getLogger("synapse.storage.txn").setLevel(NEVER) def around(target): @@ -70,8 +75,6 @@ class TestCase(unittest.TestCase): return ret logging.getLogger().setLevel(level) - # Don't set SQL logging - logging.getLogger("synapse.storage").setLevel(old_level) return orig() def assertObjectHasAttributes(self, attrs, obj): From 42f4feb2b709671bb2dbbabfe1aad7e951479652 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 25 Jul 2016 12:25:06 +0100 Subject: [PATCH 11/51] PEP8 --- tests/storage/test_background_update.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py index 4944cb0d2..1286b4ce2 100644 --- a/tests/storage/test_background_update.py +++ b/tests/storage/test_background_update.py @@ -68,7 +68,7 @@ class BackgroundUpdateTestCase(unittest.TestCase): self.update_handler.side_effect = update self.update_handler.reset_mock() result = yield self.store.do_next_background_update( - duration_ms * desired_count + duration_ms * desired_count ) self.assertIsNotNone(result) self.update_handler.assert_called_once_with( From 9dbd903f4108c81499205ff80d9d420911fd0f54 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 25 Jul 2016 14:05:23 +0100 Subject: [PATCH 12/51] background updates: Fix assertion to do something --- synapse/storage/background_updates.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 2771f7c3c..321c889b2 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -88,8 +88,8 @@ class BackgroundUpdateStore(SQLBaseStore): @defer.inlineCallbacks def start_doing_background_updates(self): - assert(self._background_update_timer is not None, - "background updates already running") + assert self._background_update_timer is not None, \ + "background updates already running" logger.info("Starting background schema updates") From 2ee4c9ee023a50ae7c0800c34c609886fb27298f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 25 Jul 2016 16:01:46 +0100 Subject: [PATCH 13/51] background updates: fix assert again --- synapse/storage/background_updates.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 321c889b2..af9bfbbe4 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -88,7 +88,7 @@ class BackgroundUpdateStore(SQLBaseStore): @defer.inlineCallbacks def start_doing_background_updates(self): - assert self._background_update_timer is not None, \ + assert self._background_update_timer is None, \ "background updates already running" logger.info("Starting background schema updates") From 955ef1f06caee7385cb5ef21477b4d0490889c3c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 25 Jul 2016 16:04:45 +0100 Subject: [PATCH 14/51] fix: defer.returnValue takes one argument --- synapse/storage/background_updates.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index af9bfbbe4..30d0e4c5d 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -115,7 +115,7 @@ class BackgroundUpdateStore(SQLBaseStore): "No more background updates to do." " Unscheduling background update task." ) - defer.returnValue() + defer.returnValue(None) @defer.inlineCallbacks def do_next_background_update(self, desired_duration_ms): From 2623cec8746392067e781164e8ed4f2236b15bec Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 25 Jul 2016 16:12:16 +0100 Subject: [PATCH 15/51] Don't add rejections to the state_group, persist all rejections --- synapse/storage/events.py | 9 +++++---- synapse/storage/state.py | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 661054928..41c9b17d1 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -591,10 +591,11 @@ class EventsStore(SQLBaseStore): ], ) - if context.rejected: - self._store_rejections_txn( - txn, event.event_id, context.rejected - ) + for event, context in events_and_contexts: + if context.rejected: + self._store_rejections_txn( + txn, event.event_id, context.rejected + ) self._simple_insert_many_txn( txn, diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 5b743db67..cc1c7ec6a 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -79,7 +79,7 @@ class StateStore(SQLBaseStore): state_events = dict(context.current_state) - if event.is_state(): + if event.is_state() and not context.rejected: state_events[(event.type, event.state_key)] = event state_group = context.new_state_group_id From 8f7f4cb92baa1eb8c772644e2567fe56d563b4b9 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 25 Jul 2016 17:13:37 +0100 Subject: [PATCH 16/51] Don't add the events to forward extremities if the event is rejected --- synapse/storage/events.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 41c9b17d1..201a4455f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -498,8 +498,8 @@ class EventsStore(SQLBaseStore): sql, (False, event.event_id,) ) - - self._update_extremeties(txn, [event]) + if not context.rejected: + self._update_extremeties(txn, [event]) events_and_contexts = [ ec for ec in events_and_contexts if ec[0] not in to_remove @@ -512,7 +512,10 @@ class EventsStore(SQLBaseStore): self._handle_mult_prev_events( txn, - events=[event for event, _ in events_and_contexts], + events=[ + event for event, context in events_and_contexts + if not context.rejected + ], ) for event, _ in events_and_contexts: From 33d08e843368a9caf01835ec4d56160fdc0f9469 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 21 Jul 2016 15:56:57 +0100 Subject: [PATCH 17/51] Log when adding listeners --- synapse/http/server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/http/server.py b/synapse/http/server.py index f705abab9..2b3c05a74 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -205,6 +205,7 @@ class JsonResource(HttpServer, resource.Resource): def register_paths(self, method, path_patterns, callback): for path_pattern in path_patterns: + logger.debug("Registering for %s %s", method, path_pattern.pattern) self.path_regexs.setdefault(method, []).append( self._PathEntry(path_pattern, callback) ) From 1b3c3e6d68bf503bf09e046ecf57bb652669e637 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Mon, 25 Jul 2016 18:44:30 +0100 Subject: [PATCH 18/51] Only update the events and event_json tables for rejected events --- synapse/storage/events.py | 113 +++++++++++++++++++++----------------- synapse/storage/state.py | 2 +- 2 files changed, 63 insertions(+), 52 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 201a4455f..c38a63108 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -407,21 +407,11 @@ class EventsStore(SQLBaseStore): event.room_id, event.internal_metadata.stream_ordering, ) - if not event.internal_metadata.is_outlier(): + if not event.internal_metadata.is_outlier() and not context.rejected: depth_updates[event.room_id] = max( event.depth, depth_updates.get(event.room_id, event.depth) ) - if context.push_actions: - self._set_push_actions_for_event_and_users_txn( - txn, event, context.push_actions - ) - - if event.type == EventTypes.Redaction and event.redacts is not None: - self._remove_push_actions_for_event_id_txn( - txn, event.room_id, event.redacts - ) - for room_id, depth in depth_updates.items(): self._update_min_depth_for_room_txn(txn, room_id, depth) @@ -431,6 +421,7 @@ class EventsStore(SQLBaseStore): ), [event.event_id for event, _ in events_and_contexts] ) + have_persisted = { event_id: outlier for event_id, outlier in txn.fetchall() @@ -442,6 +433,9 @@ class EventsStore(SQLBaseStore): # Handle the case of the list including the same event multiple # times. The tricky thing here is when they differ by whether # they are an outlier. + if context.rejected: + continue + if event.event_id in event_map: other = event_map[event.event_id] @@ -498,8 +492,8 @@ class EventsStore(SQLBaseStore): sql, (False, event.event_id,) ) - if not context.rejected: - self._update_extremeties(txn, [event]) + + self._update_extremeties(txn, [event]) events_and_contexts = [ ec for ec in events_and_contexts if ec[0] not in to_remove @@ -508,39 +502,8 @@ class EventsStore(SQLBaseStore): if not events_and_contexts: return - self._store_mult_state_groups_txn(txn, events_and_contexts) - - self._handle_mult_prev_events( - txn, - events=[ - event for event, context in events_and_contexts - if not context.rejected - ], - ) - - for event, _ in events_and_contexts: - if event.type == EventTypes.Name: - self._store_room_name_txn(txn, event) - elif event.type == EventTypes.Topic: - self._store_room_topic_txn(txn, event) - elif event.type == EventTypes.Message: - self._store_room_message_txn(txn, event) - elif event.type == EventTypes.Redaction: - self._store_redaction(txn, event) - elif event.type == EventTypes.RoomHistoryVisibility: - self._store_history_visibility_txn(txn, event) - elif event.type == EventTypes.GuestAccess: - self._store_guest_access_txn(txn, event) - - self._store_room_members_txn( - txn, - [ - event - for event, _ in events_and_contexts - if event.type == EventTypes.Member - ], - backfilled=backfilled, - ) + # From this point onwards the events are only events that we haven't + # seen before. def event_dict(event): return { @@ -594,11 +557,28 @@ class EventsStore(SQLBaseStore): ], ) + to_remove = set() for event, context in events_and_contexts: if context.rejected: self._store_rejections_txn( txn, event.event_id, context.rejected ) + to_remove.add(event.event_id) + + events_and_contexts = [ + ec for ec in events_and_contexts if ec[0].event_id not in to_remove + ] + + if not events_and_contexts: + return + + # From this point onwards the events are only ones that weren't rejected. + + for event, context in events_and_contexts: + if context.push_actions: + self._set_push_actions_for_event_and_users_txn( + txn, event, context.push_actions + ) self._simple_insert_many_txn( txn, @@ -614,6 +594,42 @@ class EventsStore(SQLBaseStore): ], ) + if event.type == EventTypes.Redaction and event.redacts is not None: + self._remove_push_actions_for_event_id_txn( + txn, event.room_id, event.redacts + ) + + self._store_mult_state_groups_txn(txn, events_and_contexts) + + self._handle_mult_prev_events( + txn, + events=[event for event, _ in events_and_contexts], + ) + + for event, _ in events_and_contexts: + if event.type == EventTypes.Name: + self._store_room_name_txn(txn, event) + elif event.type == EventTypes.Topic: + self._store_room_topic_txn(txn, event) + elif event.type == EventTypes.Message: + self._store_room_message_txn(txn, event) + elif event.type == EventTypes.Redaction: + self._store_redaction(txn, event) + elif event.type == EventTypes.RoomHistoryVisibility: + self._store_history_visibility_txn(txn, event) + elif event.type == EventTypes.GuestAccess: + self._store_guest_access_txn(txn, event) + + self._store_room_members_txn( + txn, + [ + event + for event, _ in events_and_contexts + if event.type == EventTypes.Member + ], + backfilled=backfilled, + ) + self._store_event_reference_hashes_txn( txn, [event for event, _ in events_and_contexts] ) @@ -670,11 +686,6 @@ class EventsStore(SQLBaseStore): # Outlier events shouldn't clobber the current state. continue - if context.rejected: - # If the event failed it's auth checks then it shouldn't - # clobbler the current state. - continue - txn.call_after( self._get_current_state_for_key.invalidate, (event.room_id, event.type, event.state_key,) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index cc1c7ec6a..5b743db67 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -79,7 +79,7 @@ class StateStore(SQLBaseStore): state_events = dict(context.current_state) - if event.is_state() and not context.rejected: + if event.is_state(): state_events[(event.type, event.state_key)] = event state_group = context.new_state_group_id From 436bffd15fb8382a0d2dddd3c6f7a077ba751da2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 22 Jul 2016 14:52:53 +0100 Subject: [PATCH 19/51] Implement deleting devices --- synapse/handlers/auth.py | 22 +++++++++++++-- synapse/handlers/device.py | 27 ++++++++++++++++++- synapse/rest/client/v1/login.py | 13 ++++++--- synapse/rest/client/v2_alpha/devices.py | 14 ++++++++++ synapse/rest/client/v2_alpha/register.py | 10 +++---- synapse/storage/devices.py | 15 +++++++++++ synapse/storage/registration.py | 26 +++++++++++++++--- .../delta/33/access_tokens_device_index.sql | 17 ++++++++++++ .../delta/33/refreshtoken_device_index.sql | 17 ++++++++++++ tests/handlers/test_device.py | 22 +++++++++++++-- tests/rest/client/v2_alpha/test_register.py | 14 +++++++--- 11 files changed, 176 insertions(+), 21 deletions(-) create mode 100644 synapse/storage/schema/delta/33/access_tokens_device_index.sql create mode 100644 synapse/storage/schema/delta/33/refreshtoken_device_index.sql diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index d5d207243..2e138f328 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -77,6 +77,7 @@ class AuthHandler(BaseHandler): self.ldap_bind_password = hs.config.ldap_bind_password self.hs = hs # FIXME better possibility to access registrationHandler later? + self.device_handler = hs.get_device_handler() @defer.inlineCallbacks def check_auth(self, flows, clientdict, clientip): @@ -374,7 +375,8 @@ class AuthHandler(BaseHandler): return self._check_password(user_id, password) @defer.inlineCallbacks - def get_login_tuple_for_user_id(self, user_id, device_id=None): + def get_login_tuple_for_user_id(self, user_id, device_id=None, + initial_display_name=None): """ Gets login tuple for the user with the given user ID. @@ -383,9 +385,15 @@ class AuthHandler(BaseHandler): The user is assumed to have been authenticated by some other machanism (e.g. CAS), and the user_id converted to the canonical case. + The device will be recorded in the table if it is not there already. + Args: user_id (str): canonical User ID - device_id (str): the device ID to associate with the access token + device_id (str|None): the device ID to associate with the tokens. + None to leave the tokens unassociated with a device (deprecated: + we should always have a device ID) + initial_display_name (str): display name to associate with the + device if it needs re-registering Returns: A tuple of: The access token for the user's session. @@ -397,6 +405,16 @@ class AuthHandler(BaseHandler): logger.info("Logging in user %s on device %s", user_id, device_id) access_token = yield self.issue_access_token(user_id, device_id) refresh_token = yield self.issue_refresh_token(user_id, device_id) + + # the device *should* have been registered before we got here; however, + # it's possible we raced against a DELETE operation. The thing we + # really don't want is active access_tokens without a record of the + # device, so we double-check it here. + if device_id is not None: + yield self.device_handler.check_device_registered( + user_id, device_id, initial_display_name + ) + defer.returnValue((access_token, refresh_token)) @defer.inlineCallbacks diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 1f9e15c33..a7a192e1c 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -100,7 +100,7 @@ class DeviceHandler(BaseHandler): Args: user_id (str): - device_id (str) + device_id (str): Returns: defer.Deferred: dict[str, X]: info on the device @@ -117,6 +117,31 @@ class DeviceHandler(BaseHandler): _update_device_from_client_ips(device, ips) defer.returnValue(device) + @defer.inlineCallbacks + def delete_device(self, user_id, device_id): + """ Delete the given device + + Args: + user_id (str): + device_id (str): + + Returns: + defer.Deferred: + """ + + try: + yield self.store.delete_device(user_id, device_id) + except errors.StoreError, e: + if e.code == 404: + # no match + pass + else: + raise + + yield self.store.user_delete_access_tokens(user_id, + device_id=device_id) + + def _update_device_from_client_ips(device, client_ips): ip = client_ips.get((device["user_id"], device["device_id"]), {}) diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index e8b791519..92fcae674 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -152,7 +152,10 @@ class LoginRestServlet(ClientV1RestServlet): ) device_id = yield self._register_device(user_id, login_submission) access_token, refresh_token = ( - yield auth_handler.get_login_tuple_for_user_id(user_id, device_id) + yield auth_handler.get_login_tuple_for_user_id( + user_id, device_id, + login_submission.get("initial_device_display_name") + ) ) result = { "user_id": user_id, # may have changed @@ -173,7 +176,10 @@ class LoginRestServlet(ClientV1RestServlet): ) device_id = yield self._register_device(user_id, login_submission) access_token, refresh_token = ( - yield auth_handler.get_login_tuple_for_user_id(user_id, device_id) + yield auth_handler.get_login_tuple_for_user_id( + user_id, device_id, + login_submission.get("initial_device_display_name") + ) ) result = { "user_id": user_id, # may have changed @@ -262,7 +268,8 @@ class LoginRestServlet(ClientV1RestServlet): ) access_token, refresh_token = ( yield auth_handler.get_login_tuple_for_user_id( - registered_user_id, device_id + registered_user_id, device_id, + login_submission.get("initial_device_display_name") ) ) result = { diff --git a/synapse/rest/client/v2_alpha/devices.py b/synapse/rest/client/v2_alpha/devices.py index 8b9ab4f67..30ef8b3da 100644 --- a/synapse/rest/client/v2_alpha/devices.py +++ b/synapse/rest/client/v2_alpha/devices.py @@ -70,6 +70,20 @@ class DeviceRestServlet(RestServlet): ) defer.returnValue((200, device)) + @defer.inlineCallbacks + def on_DELETE(self, request, device_id): + # XXX: it's not completely obvious we want to expose this endpoint. + # It allows the client to delete access tokens, which feels like a + # thing which merits extra auth. But if we want to do the interactive- + # auth dance, we should really make it possible to delete more than one + # device at a time. + requester = yield self.auth.get_user_by_req(request) + yield self.device_handler.delete_device( + requester.user.to_string(), + device_id, + ) + defer.returnValue((200, {})) + def register_servlets(hs, http_server): DevicesRestServlet(hs).register(http_server) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index c8c9395fc..9f599ea8b 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -374,13 +374,13 @@ class RegisterRestServlet(RestServlet): """ device_id = yield self._register_device(user_id, params) - access_token = yield self.auth_handler.issue_access_token( - user_id, device_id=device_id + access_token, refresh_token = ( + yield self.auth_handler.get_login_tuple_for_user_id( + user_id, device_id=device_id, + initial_display_name=params.get("initial_device_display_name") + ) ) - refresh_token = yield self.auth_handler.issue_refresh_token( - user_id, device_id=device_id - ) defer.returnValue({ "user_id": user_id, "access_token": access_token, diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 1cc6e07f2..4689980f8 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -76,6 +76,21 @@ class DeviceStore(SQLBaseStore): desc="get_device", ) + def delete_device(self, user_id, device_id): + """Delete a device. + + Args: + user_id (str): The ID of the user which owns the device + device_id (str): The ID of the device to retrieve + Returns: + defer.Deferred + """ + return self._simple_delete_one( + table="devices", + keyvalues={"user_id": user_id, "device_id": device_id}, + desc="delete_device", + ) + @defer.inlineCallbacks def get_devices_by_user(self, user_id): """Retrieve all of a user's registered devices. diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 9a92b3536..935e82bf7 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -18,18 +18,31 @@ import re from twisted.internet import defer from synapse.api.errors import StoreError, Codes - -from ._base import SQLBaseStore +from synapse.storage import background_updates from synapse.util.caches.descriptors import cached, cachedInlineCallbacks -class RegistrationStore(SQLBaseStore): +class RegistrationStore(background_updates.BackgroundUpdateStore): def __init__(self, hs): super(RegistrationStore, self).__init__(hs) self.clock = hs.get_clock() + self.register_background_index_update( + "access_tokens_device_index", + index_name="access_tokens_device_id", + table="access_tokens", + columns=["user_id", "device_id"], + ) + + self.register_background_index_update( + "refresh_tokens_device_index", + index_name="refresh_tokens_device_id", + table="refresh_tokens", + columns=["user_id", "device_id"], + ) + @defer.inlineCallbacks def add_access_token_to_user(self, user_id, token, device_id=None): """Adds an access token for the given user. @@ -238,11 +251,16 @@ class RegistrationStore(SQLBaseStore): self.get_user_by_id.invalidate((user_id,)) @defer.inlineCallbacks - def user_delete_access_tokens(self, user_id, except_token_ids=[]): + def user_delete_access_tokens(self, user_id, except_token_ids=[], + device_id=None): def f(txn): sql = "SELECT token FROM access_tokens WHERE user_id = ?" clauses = [user_id] + if device_id is not None: + sql += " AND device_id = ?" + clauses.append(device_id) + if except_token_ids: sql += " AND id NOT IN (%s)" % ( ",".join(["?" for _ in except_token_ids]), diff --git a/synapse/storage/schema/delta/33/access_tokens_device_index.sql b/synapse/storage/schema/delta/33/access_tokens_device_index.sql new file mode 100644 index 000000000..61ad3fe3e --- /dev/null +++ b/synapse/storage/schema/delta/33/access_tokens_device_index.sql @@ -0,0 +1,17 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('access_tokens_device_index', '{}'); diff --git a/synapse/storage/schema/delta/33/refreshtoken_device_index.sql b/synapse/storage/schema/delta/33/refreshtoken_device_index.sql new file mode 100644 index 000000000..bb225dafb --- /dev/null +++ b/synapse/storage/schema/delta/33/refreshtoken_device_index.sql @@ -0,0 +1,17 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('refresh_tokens_device_index', '{}'); diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index 331aa13fe..214e722eb 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -12,11 +12,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from synapse import types + from twisted.internet import defer +import synapse.api.errors import synapse.handlers.device + import synapse.storage +from synapse import types from tests import unittest, utils user1 = "@boris:aaa" @@ -27,7 +30,7 @@ class DeviceTestCase(unittest.TestCase): def __init__(self, *args, **kwargs): super(DeviceTestCase, self).__init__(*args, **kwargs) self.store = None # type: synapse.storage.DataStore - self.handler = None # type: device.DeviceHandler + self.handler = None # type: synapse.handlers.device.DeviceHandler self.clock = None # type: utils.MockClock @defer.inlineCallbacks @@ -123,6 +126,21 @@ class DeviceTestCase(unittest.TestCase): "last_seen_ts": 3000000, }, res) + @defer.inlineCallbacks + def test_delete_device(self): + yield self._record_users() + + # delete the device + yield self.handler.delete_device(user1, "abc") + + # check the device was deleted + with self.assertRaises(synapse.api.errors.NotFoundError): + yield self.handler.get_device(user1, "abc") + + # we'd like to check the access token was invalidated, but that's a + # bit of a PITA. + + @defer.inlineCallbacks def _record_users(self): # check this works for both devices which have a recorded client_ip, diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py index 3bd7065e3..8ac56a1fb 100644 --- a/tests/rest/client/v2_alpha/test_register.py +++ b/tests/rest/client/v2_alpha/test_register.py @@ -65,13 +65,16 @@ class RegisterRestServletTestCase(unittest.TestCase): self.registration_handler.appservice_register = Mock( return_value=user_id ) - self.auth_handler.issue_access_token = Mock(return_value=token) + self.auth_handler.get_login_tuple_for_user_id = Mock( + return_value=(token, "kermits_refresh_token") + ) (code, result) = yield self.servlet.on_POST(self.request) self.assertEquals(code, 200) det_data = { "user_id": user_id, "access_token": token, + "refresh_token": "kermits_refresh_token", "home_server": self.hs.hostname } self.assertDictContainsSubset(det_data, result) @@ -121,7 +124,9 @@ class RegisterRestServletTestCase(unittest.TestCase): "password": "monkey" }, None) self.registration_handler.register = Mock(return_value=(user_id, None)) - self.auth_handler.issue_access_token = Mock(return_value=token) + self.auth_handler.get_login_tuple_for_user_id = Mock( + return_value=(token, "kermits_refresh_token") + ) self.device_handler.check_device_registered = \ Mock(return_value=device_id) @@ -130,13 +135,14 @@ class RegisterRestServletTestCase(unittest.TestCase): det_data = { "user_id": user_id, "access_token": token, + "refresh_token": "kermits_refresh_token", "home_server": self.hs.hostname, "device_id": device_id, } self.assertDictContainsSubset(det_data, result) self.assertIn("refresh_token", result) - self.auth_handler.issue_access_token.assert_called_once_with( - user_id, device_id=device_id) + self.auth_handler.get_login_tuple_for_user_id( + user_id, device_id=device_id, initial_device_display_name=None) def test_POST_disabled_registration(self): self.hs.config.enable_registration = False From 012b4c19132d57fdbc1b6b0e304eb60eaf19200f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 25 Jul 2016 17:51:24 +0100 Subject: [PATCH 20/51] Implement updating devices You can update the displayname of devices now. --- synapse/handlers/device.py | 24 +++++++++++++++++ synapse/rest/client/v2_alpha/devices.py | 26 ++++++++++++------ synapse/storage/devices.py | 27 ++++++++++++++++++- tests/handlers/test_device.py | 16 +++++++++++ tests/storage/test_devices.py | 36 +++++++++++++++++++++++++ 5 files changed, 120 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index a7a192e1c..9e65d85e6 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -141,6 +141,30 @@ class DeviceHandler(BaseHandler): yield self.store.user_delete_access_tokens(user_id, device_id=device_id) + @defer.inlineCallbacks + def update_device(self, user_id, device_id, content): + """ Update the given device + + Args: + user_id (str): + device_id (str): + content (dict): body of update request + + Returns: + defer.Deferred: + """ + + try: + yield self.store.update_device( + user_id, + device_id, + new_display_name=content.get("display_name") + ) + except errors.StoreError, e: + if e.code == 404: + raise errors.NotFoundError() + else: + raise def _update_device_from_client_ips(device, client_ips): diff --git a/synapse/rest/client/v2_alpha/devices.py b/synapse/rest/client/v2_alpha/devices.py index 30ef8b3da..8fbd3d3df 100644 --- a/synapse/rest/client/v2_alpha/devices.py +++ b/synapse/rest/client/v2_alpha/devices.py @@ -13,19 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer - -from synapse.http.servlet import RestServlet - -from ._base import client_v2_patterns - import logging +from twisted.internet import defer + +from synapse.http import servlet +from ._base import client_v2_patterns logger = logging.getLogger(__name__) -class DevicesRestServlet(RestServlet): +class DevicesRestServlet(servlet.RestServlet): PATTERNS = client_v2_patterns("/devices$", releases=[], v2_alpha=False) def __init__(self, hs): @@ -47,7 +45,7 @@ class DevicesRestServlet(RestServlet): defer.returnValue((200, {"devices": devices})) -class DeviceRestServlet(RestServlet): +class DeviceRestServlet(servlet.RestServlet): PATTERNS = client_v2_patterns("/devices/(?P[^/]*)$", releases=[], v2_alpha=False) @@ -84,6 +82,18 @@ class DeviceRestServlet(RestServlet): ) defer.returnValue((200, {})) + @defer.inlineCallbacks + def on_PUT(self, request, device_id): + requester = yield self.auth.get_user_by_req(request) + + body = servlet.parse_json_object_from_request(request) + yield self.device_handler.update_device( + requester.user.to_string(), + device_id, + body + ) + defer.returnValue((200, {})) + def register_servlets(hs, http_server): DevicesRestServlet(hs).register(http_server) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 4689980f8..afd6530ca 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -81,7 +81,7 @@ class DeviceStore(SQLBaseStore): Args: user_id (str): The ID of the user which owns the device - device_id (str): The ID of the device to retrieve + device_id (str): The ID of the device to delete Returns: defer.Deferred """ @@ -91,6 +91,31 @@ class DeviceStore(SQLBaseStore): desc="delete_device", ) + def update_device(self, user_id, device_id, new_display_name=None): + """Update a device. + + Args: + user_id (str): The ID of the user which owns the device + device_id (str): The ID of the device to update + new_display_name (str|None): new displayname for device; None + to leave unchanged + Raises: + StoreError: if the device is not found + Returns: + defer.Deferred + """ + updates = {} + if new_display_name is not None: + updates["display_name"] = new_display_name + if not updates: + return defer.succeed(None) + return self._simple_update_one( + table="devices", + keyvalues={"user_id": user_id, "device_id": device_id}, + updatevalues=updates, + desc="update_device", + ) + @defer.inlineCallbacks def get_devices_by_user(self, user_id): """Retrieve all of a user's registered devices. diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index 214e722eb..85a970a6c 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -140,6 +140,22 @@ class DeviceTestCase(unittest.TestCase): # we'd like to check the access token was invalidated, but that's a # bit of a PITA. + @defer.inlineCallbacks + def test_update_device(self): + yield self._record_users() + + update = {"display_name": "new display"} + yield self.handler.update_device(user1, "abc", update) + + res = yield self.handler.get_device(user1, "abc") + self.assertEqual(res["display_name"], "new display") + + @defer.inlineCallbacks + def test_update_unknown_device(self): + update = {"display_name": "new_display"} + with self.assertRaises(synapse.api.errors.NotFoundError): + yield self.handler.update_device("user_id", "unknown_device_id", + update) @defer.inlineCallbacks def _record_users(self): diff --git a/tests/storage/test_devices.py b/tests/storage/test_devices.py index a6ce99337..f8725acea 100644 --- a/tests/storage/test_devices.py +++ b/tests/storage/test_devices.py @@ -15,6 +15,7 @@ from twisted.internet import defer +import synapse.api.errors import tests.unittest import tests.utils @@ -67,3 +68,38 @@ class DeviceStoreTestCase(tests.unittest.TestCase): "device_id": "device2", "display_name": "display_name 2", }, res["device2"]) + + @defer.inlineCallbacks + def test_update_device(self): + yield self.store.store_device( + "user_id", "device_id", "display_name 1" + ) + + res = yield self.store.get_device("user_id", "device_id") + self.assertEqual("display_name 1", res["display_name"]) + + # do a no-op first + yield self.store.update_device( + "user_id", "device_id", + ) + res = yield self.store.get_device("user_id", "device_id") + self.assertEqual("display_name 1", res["display_name"]) + + # do the update + yield self.store.update_device( + "user_id", "device_id", + new_display_name="display_name 2", + ) + + # check it worked + res = yield self.store.get_device("user_id", "device_id") + self.assertEqual("display_name 2", res["display_name"]) + + @defer.inlineCallbacks + def test_update_unknown_device(self): + with self.assertRaises(synapse.api.errors.StoreError) as cm: + yield self.store.update_device( + "user_id", "unknown_device_id", + new_display_name="display_name 2", + ) + self.assertEqual(404, cm.exception.code) From 242c52d607da68f48b3a4bce980663e0e5f103c6 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 26 Jul 2016 10:09:25 +0200 Subject: [PATCH 21/51] typo --- synapse/util/metrics.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index e1f374807..0b944d3e6 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -84,7 +84,7 @@ class Measure(object): if context != self.start_context: logger.warn( - "Context have unexpectedly changed from '%s' to '%s'. (%r)", + "Context has unexpectedly changed from '%s' to '%s'. (%r)", context, self.start_context, self.name ) return From 1a54513cf124f5796654c990c469a1a1b893909d Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Tue, 26 Jul 2016 10:09:37 +0200 Subject: [PATCH 22/51] federation doesn't work over ipv6 yet thanks to twisted --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index ebcb15a97..89458badc 100644 --- a/README.rst +++ b/README.rst @@ -445,7 +445,7 @@ You have two choices here, which will influence the form of your Matrix user IDs: 1) Use the machine's own hostname as available on public DNS in the form of - its A or AAAA records. This is easier to set up initially, perhaps for + its A records. This is easier to set up initially, perhaps for testing, but lacks the flexibility of SRV. 2) Set up a SRV record for your domain name. This requires you create a SRV From efeb6176c169835465eeb6184ead940a89b93b4e Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Jul 2016 10:49:52 +0100 Subject: [PATCH 23/51] Don't add rejected events if we've seen them befrore. Add some comments to explain what the code is doing mechanically --- synapse/storage/events.py | 53 +++++++++++++++++++++++++++++++++++---- 1 file changed, 48 insertions(+), 5 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c38a63108..25a2be279 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -397,6 +397,12 @@ class EventsStore(SQLBaseStore): @log_function def _persist_events_txn(self, txn, events_and_contexts, backfilled): + """Insert some number of room events into the necessary database tables. + + Rejected events are only inserted into the events table, the events_json table, + and the rejections table. Things reading from those table will need to check + whether the event was rejected. + """ depth_updates = {} for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids @@ -427,15 +433,21 @@ class EventsStore(SQLBaseStore): for event_id, outlier in txn.fetchall() } + # Remove the events that we've seen before. event_map = {} to_remove = set() for event, context in events_and_contexts: + if context.rejected: + # If the event is rejected then we don't care if the event + # was an outlier or not. + if event.event_id in have_persisted: + # If we have already seen the event then ignore it. + to_remove.add(event) + continue + # Handle the case of the list including the same event multiple # times. The tricky thing here is when they differ by whether # they are an outlier. - if context.rejected: - continue - if event.event_id in event_map: other = event_map[event.event_id] @@ -457,6 +469,12 @@ class EventsStore(SQLBaseStore): outlier_persisted = have_persisted[event.event_id] if not event.internal_metadata.is_outlier() and outlier_persisted: + # We received a copy of an event that we had already stored as + # an outlier in the database. We now have some state at that + # so we need to update the state_groups table with that state. + + # insert into the state_group, state_groups_state and + # event_to_state_groups tables. self._store_mult_state_groups_txn(txn, ((event, context),)) metadata_json = encode_json( @@ -472,6 +490,8 @@ class EventsStore(SQLBaseStore): (metadata_json, event.event_id,) ) + # Add an entry to the ex_outlier_stream table to replicate the + # change in outlier status to our workers. stream_order = event.internal_metadata.stream_ordering state_group_id = context.state_group or context.new_state_group_id self._simple_insert_txn( @@ -493,6 +513,8 @@ class EventsStore(SQLBaseStore): (False, event.event_id,) ) + # Update the event_backward_extremities table now that this + # event isn't an outlier any more. self._update_extremeties(txn, [event]) events_and_contexts = [ @@ -557,24 +579,30 @@ class EventsStore(SQLBaseStore): ], ) + # Remove the rejected events from the list now that we've added them + # to the events table and the events_json table. to_remove = set() for event, context in events_and_contexts: if context.rejected: + # Insert the event_id into the rejections table self._store_rejections_txn( txn, event.event_id, context.rejected ) - to_remove.add(event.event_id) + to_remove.add(event) events_and_contexts = [ - ec for ec in events_and_contexts if ec[0].event_id not in to_remove + ec for ec in events_and_contexts if ec[0] not in to_remove ] if not events_and_contexts: + # Make sure we don't pass an empty list to functions that expect to + # be storing at least one element. return # From this point onwards the events are only ones that weren't rejected. for event, context in events_and_contexts: + # Insert all the push actions into the event_push_actions table. if context.push_actions: self._set_push_actions_for_event_and_users_txn( txn, event, context.push_actions @@ -595,12 +623,18 @@ class EventsStore(SQLBaseStore): ) if event.type == EventTypes.Redaction and event.redacts is not None: + # Remove the entries in the event_push_actions table for the + # redacted event. self._remove_push_actions_for_event_id_txn( txn, event.room_id, event.redacts ) + # Insert into the state_groups, state_groups_state, and + # event_to_state_groups tables. self._store_mult_state_groups_txn(txn, events_and_contexts) + # Update the event_forward_extremities, event_backward_extremities and + # event_edges tables. self._handle_mult_prev_events( txn, events=[event for event, _ in events_and_contexts], @@ -608,18 +642,25 @@ class EventsStore(SQLBaseStore): for event, _ in events_and_contexts: if event.type == EventTypes.Name: + # Insert into the room_names and event_search tables. self._store_room_name_txn(txn, event) elif event.type == EventTypes.Topic: + # Insert into the topics table and event_search table. self._store_room_topic_txn(txn, event) elif event.type == EventTypes.Message: + # Insert into the event_search table. self._store_room_message_txn(txn, event) elif event.type == EventTypes.Redaction: + # Insert into the redactions table. self._store_redaction(txn, event) elif event.type == EventTypes.RoomHistoryVisibility: + # Insert into the event_search table. self._store_history_visibility_txn(txn, event) elif event.type == EventTypes.GuestAccess: + # Insert into the event_search table. self._store_guest_access_txn(txn, event) + # Insert into the room_memberships table. self._store_room_members_txn( txn, [ @@ -630,6 +671,7 @@ class EventsStore(SQLBaseStore): backfilled=backfilled, ) + # Insert event_reference_hashes table. self._store_event_reference_hashes_txn( txn, [event for event, _ in events_and_contexts] ) @@ -674,6 +716,7 @@ class EventsStore(SQLBaseStore): ], ) + # Prefil the event cache self._add_to_cache(txn, events_and_contexts) if backfilled: From a6f06ce3e280cfa18f51748a7d4327001658db40 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Jul 2016 11:05:39 +0100 Subject: [PATCH 24/51] Fix how push_actions are redacted. --- synapse/storage/events.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 25a2be279..c63ca36df 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -522,6 +522,8 @@ class EventsStore(SQLBaseStore): ] if not events_and_contexts: + # Make sure we don't pass an empty list to functions that expect to + # be storing at least one element. return # From this point onwards the events are only events that we haven't @@ -608,6 +610,13 @@ class EventsStore(SQLBaseStore): txn, event, context.push_actions ) + if event.type == EventTypes.Redaction and event.redacts is not None: + # Remove the entries in the event_push_actions table for the + # redacted event. + self._remove_push_actions_for_event_id_txn( + txn, event.room_id, event.redacts + ) + self._simple_insert_many_txn( txn, table="event_auth", @@ -622,13 +631,6 @@ class EventsStore(SQLBaseStore): ], ) - if event.type == EventTypes.Redaction and event.redacts is not None: - # Remove the entries in the event_push_actions table for the - # redacted event. - self._remove_push_actions_for_event_id_txn( - txn, event.room_id, event.redacts - ) - # Insert into the state_groups, state_groups_state, and # event_to_state_groups tables. self._store_mult_state_groups_txn(txn, events_and_contexts) @@ -716,7 +718,7 @@ class EventsStore(SQLBaseStore): ], ) - # Prefil the event cache + # Prefill the event cache self._add_to_cache(txn, events_and_contexts) if backfilled: From 8e0249416643f20f0c4cd8f2e19cf45ea63289d3 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 26 Jul 2016 11:09:47 +0100 Subject: [PATCH 25/51] Delete refresh tokens when deleting devices --- synapse/handlers/device.py | 6 ++-- synapse/storage/registration.py | 58 +++++++++++++++++++++++------- tests/storage/test_registration.py | 34 ++++++++++++++++++ 3 files changed, 83 insertions(+), 15 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 9e65d85e6..eaead5080 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -138,8 +138,10 @@ class DeviceHandler(BaseHandler): else: raise - yield self.store.user_delete_access_tokens(user_id, - device_id=device_id) + yield self.store.user_delete_access_tokens( + user_id, device_id=device_id, + delete_refresh_tokens=True, + ) @defer.inlineCallbacks def update_device(self, user_id, device_id, content): diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 935e82bf7..d9555e073 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -252,20 +252,36 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): @defer.inlineCallbacks def user_delete_access_tokens(self, user_id, except_token_ids=[], - device_id=None): - def f(txn): - sql = "SELECT token FROM access_tokens WHERE user_id = ?" + device_id=None, + delete_refresh_tokens=False): + """ + Invalidate access/refresh tokens belonging to a user + + Args: + user_id (str): ID of user the tokens belong to + except_token_ids (list[str]): list of access_tokens which should + *not* be deleted + device_id (str|None): ID of device the tokens are associated with. + If None, tokens associated with any device (or no device) will + be deleted + delete_refresh_tokens (bool): True to delete refresh tokens as + well as access tokens. + Returns: + defer.Deferred: + """ + def f(txn, table, except_tokens, call_after_delete): + sql = "SELECT token FROM %s WHERE user_id = ?" % table clauses = [user_id] if device_id is not None: sql += " AND device_id = ?" clauses.append(device_id) - if except_token_ids: + if except_tokens: sql += " AND id NOT IN (%s)" % ( - ",".join(["?" for _ in except_token_ids]), + ",".join(["?" for _ in except_tokens]), ) - clauses += except_token_ids + clauses += except_tokens txn.execute(sql, clauses) @@ -274,16 +290,33 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): n = 100 chunks = [rows[i:i + n] for i in xrange(0, len(rows), n)] for chunk in chunks: - for row in chunk: - txn.call_after(self.get_user_by_access_token.invalidate, (row[0],)) + if call_after_delete: + for row in chunk: + txn.call_after(call_after_delete, (row[0],)) txn.execute( - "DELETE FROM access_tokens WHERE token in (%s)" % ( + "DELETE FROM %s WHERE token in (%s)" % ( + table, ",".join(["?" for _ in chunk]), ), [r[0] for r in chunk] ) - yield self.runInteraction("user_delete_access_tokens", f) + # delete refresh tokens first, to stop new access tokens being + # allocated while our backs are turned + if delete_refresh_tokens: + yield self.runInteraction( + "user_delete_access_tokens", f, + table="refresh_tokens", + except_tokens=[], + call_after_delete=None, + ) + + yield self.runInteraction( + "user_delete_access_tokens", f, + table="access_tokens", + except_tokens=except_token_ids, + call_after_delete=self.get_user_by_access_token.invalidate, + ) def delete_access_token(self, access_token): def f(txn): @@ -306,9 +339,8 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): Args: token (str): The access token of a user. Returns: - dict: Including the name (user_id) and the ID of their access token. - Raises: - StoreError if no user was found. + defer.Deferred: None, if the token did not match, ootherwise dict + including the keys `name`, `is_guest`, `device_id`, `token_id`. """ return self.runInteraction( "get_user_by_access_token", diff --git a/tests/storage/test_registration.py b/tests/storage/test_registration.py index b03ca303a..f7d74dea8 100644 --- a/tests/storage/test_registration.py +++ b/tests/storage/test_registration.py @@ -128,6 +128,40 @@ class RegistrationStoreTestCase(unittest.TestCase): with self.assertRaises(StoreError): yield self.store.exchange_refresh_token(last_token, generator.generate) + @defer.inlineCallbacks + def test_user_delete_access_tokens(self): + # add some tokens + generator = TokenGenerator() + refresh_token = generator.generate(self.user_id) + yield self.store.register(self.user_id, self.tokens[0], self.pwhash) + yield self.store.add_access_token_to_user(self.user_id, self.tokens[1], + self.device_id) + yield self.store.add_refresh_token_to_user(self.user_id, refresh_token, + self.device_id) + + # now delete some + yield self.store.user_delete_access_tokens( + self.user_id, device_id=self.device_id, delete_refresh_tokens=True) + + # check they were deleted + user = yield self.store.get_user_by_access_token(self.tokens[1]) + self.assertIsNone(user, "access token was not deleted by device_id") + with self.assertRaises(StoreError): + yield self.store.exchange_refresh_token(refresh_token, + generator.generate) + + # check the one not associated with the device was not deleted + user = yield self.store.get_user_by_access_token(self.tokens[0]) + self.assertEqual(self.user_id, user["name"]) + + # now delete the rest + yield self.store.user_delete_access_tokens( + self.user_id, delete_refresh_tokens=True) + + user = yield self.store.get_user_by_access_token(self.tokens[0]) + self.assertIsNone(user, + "access token was not deleted without device_id") + class TokenGenerator: def __init__(self): From db4f823d34c6ebe30ce8f4f957c20f6e0a627ecc Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 26 Jul 2016 11:49:40 +0100 Subject: [PATCH 26/51] Fix flake8 configuration Apparently flake8 v3 doesn't like trailing comments on config settings. Also remove the pep8 config, which didn't work (because it was missing W503) and duplicated the flake8 config. We don't use pep8 on its own, so the config was duplicative. --- setup.cfg | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/setup.cfg b/setup.cfg index 5ebce1c56..da8eafbb3 100644 --- a/setup.cfg +++ b/setup.cfg @@ -16,7 +16,5 @@ ignore = [flake8] max-line-length = 90 -ignore = W503 ; W503 requires that binary operators be at the end, not start, of lines. Erik doesn't like it. - -[pep8] -max-line-length = 90 +# W503 requires that binary operators be at the end, not start, of lines. Erik doesn't like it. +ignore = W503 From 05e7e5e972446b639997f0ea461c2eea39617342 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 26 Jul 2016 11:59:08 +0100 Subject: [PATCH 27/51] Fix flake8 violation Apparently flake8 v3 puts the error on a different line to v2. Easiest way to make sure that happens is by putting the whole statement on one line :) --- synapse/app/__init__.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py index 1bc427980..9c2b62759 100644 --- a/synapse/app/__init__.py +++ b/synapse/app/__init__.py @@ -16,13 +16,11 @@ import sys sys.dont_write_bytecode = True -from synapse.python_dependencies import ( - check_requirements, MissingRequirementError -) # NOQA +from synapse import python_dependencies # noqa: E402 try: - check_requirements() -except MissingRequirementError as e: + python_dependencies.check_requirements() +except python_dependencies.MissingRequirementError as e: message = "\n".join([ "Missing Requirement: %s" % (e.message,), "To install run:", From 33d777647325501d2a1d18d95efc5f9f64eeb46e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 26 Jul 2016 13:32:15 +0100 Subject: [PATCH 28/51] Fix typo --- synapse/storage/registration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index d9555e073..7e7d32eb6 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -339,7 +339,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): Args: token (str): The access token of a user. Returns: - defer.Deferred: None, if the token did not match, ootherwise dict + defer.Deferred: None, if the token did not match, otherwise dict including the keys `name`, `is_guest`, `device_id`, `token_id`. """ return self.runInteraction( From c824b29e77cd1745f8ac14f2a73c3b8590acaac9 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Jul 2016 16:39:14 +0100 Subject: [PATCH 29/51] Check if the user is banned when handling 3pid invites --- synapse/api/auth.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index eca851390..f399aa8c7 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -376,6 +376,10 @@ class Auth(object): if Membership.INVITE == membership and "third_party_invite" in event.content: if not self._verify_third_party_invite(event, auth_events): raise AuthError(403, "You are not invited to this room.") + if target_banned: + raise AuthError( + 403, "%s is banned from the room" % (target_user_id,) + ) return True if Membership.JOIN != membership: From eb359eced44407b1ee9648f10fdf3df63c8d40ad Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 26 Jul 2016 16:46:53 +0100 Subject: [PATCH 30/51] Add `create_requester` function Wrap the `Requester` constructor with a function which provides sensible defaults, and use it throughout --- synapse/api/auth.py | 30 ++++++++++++------------- synapse/handlers/_base.py | 13 ++++++----- synapse/handlers/profile.py | 12 +++++----- synapse/handlers/register.py | 16 ++++++++------ synapse/handlers/room_member.py | 20 ++++++++--------- synapse/rest/client/v2_alpha/keys.py | 10 ++++----- synapse/types.py | 33 +++++++++++++++++++++++++++- tests/handlers/test_profile.py | 10 +++++---- tests/replication/test_resource.py | 24 ++++++++++---------- tests/rest/client/v1/test_profile.py | 13 +++++------ tests/utils.py | 5 ----- 11 files changed, 106 insertions(+), 80 deletions(-) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index eca851390..eecf3b0b2 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -13,22 +13,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + +import pymacaroons from canonicaljson import encode_canonical_json from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json, SignatureVerifyException - from twisted.internet import defer - -from synapse.api.constants import EventTypes, Membership, JoinRules -from synapse.api.errors import AuthError, Codes, SynapseError, EventSizeError -from synapse.types import Requester, UserID, get_domain_from_id -from synapse.util.logutils import log_function -from synapse.util.logcontext import preserve_context_over_fn -from synapse.util.metrics import Measure from unpaddedbase64 import decode_base64 -import logging -import pymacaroons +import synapse.types +from synapse.api.constants import EventTypes, Membership, JoinRules +from synapse.api.errors import AuthError, Codes, SynapseError, EventSizeError +from synapse.types import UserID, get_domain_from_id +from synapse.util.logcontext import preserve_context_over_fn +from synapse.util.logutils import log_function +from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -566,8 +566,7 @@ class Auth(object): Args: request - An HTTP request with an access_token query parameter. Returns: - defer.Deferred: resolves to a namedtuple including "user" (UserID) - "access_token_id" (int), "is_guest" (bool) + defer.Deferred: resolves to a ``synapse.types.Requester`` object Raises: AuthError if no user by that token exists or the token is invalid. """ @@ -576,9 +575,7 @@ class Auth(object): user_id = yield self._get_appservice_user_id(request.args) if user_id: request.authenticated_entity = user_id - defer.returnValue( - Requester(UserID.from_string(user_id), "", False) - ) + defer.returnValue(synapse.types.create_requester(user_id)) access_token = request.args["access_token"][0] user_info = yield self.get_user_by_access_token(access_token, rights) @@ -612,7 +609,8 @@ class Auth(object): request.authenticated_entity = user.to_string() - defer.returnValue(Requester(user, token_id, is_guest)) + defer.returnValue(synapse.types.create_requester( + user, token_id, is_guest, device_id)) except KeyError: raise AuthError( self.TOKEN_NOT_FOUND_HTTP_STATUS, "Missing access token.", diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 6264aa0d9..11081a0cd 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -13,14 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from twisted.internet import defer -from synapse.api.errors import LimitExceededError +import synapse.types from synapse.api.constants import Membership, EventTypes -from synapse.types import UserID, Requester - - -import logging +from synapse.api.errors import LimitExceededError +from synapse.types import UserID logger = logging.getLogger(__name__) @@ -124,7 +124,8 @@ class BaseHandler(object): # and having homeservers have their own users leave keeps more # of that decision-making and control local to the guest-having # homeserver. - requester = Requester(target_user, "", True) + requester = synapse.types.create_requester( + target_user, is_guest=True) handler = self.hs.get_handlers().room_member_handler yield handler.update_membership( requester, diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 711a6a567..d9ac09078 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -13,15 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from twisted.internet import defer +import synapse.types from synapse.api.errors import SynapseError, AuthError, CodeMessageException -from synapse.types import UserID, Requester - +from synapse.types import UserID from ._base import BaseHandler -import logging - logger = logging.getLogger(__name__) @@ -165,7 +165,9 @@ class ProfileHandler(BaseHandler): try: # Assume the user isn't a guest because we don't let guests set # profile or avatar data. - requester = Requester(user, "", False) + # XXX why are we recreating `requester` here for each room? + # what was wrong with the `requester` we were passed? + requester = synapse.types.create_requester(user) yield handler.update_membership( requester, user, diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 94b19d0cb..b9b5880d6 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -14,18 +14,19 @@ # limitations under the License. """Contains functions for registering clients.""" +import logging +import urllib + from twisted.internet import defer -from synapse.types import UserID, Requester +import synapse.types from synapse.api.errors import ( AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError ) -from ._base import BaseHandler -from synapse.util.async import run_on_reactor from synapse.http.client import CaptchaServerHttpClient - -import logging -import urllib +from synapse.types import UserID +from synapse.util.async import run_on_reactor +from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -410,8 +411,9 @@ class RegistrationHandler(BaseHandler): if displayname is not None: logger.info("setting user display name: %s -> %s", user_id, displayname) profile_handler = self.hs.get_handlers().profile_handler + requester = synapse.types.create_requester(user) yield profile_handler.set_displayname( - user, Requester(user, token, False), displayname + user, requester, displayname ) defer.returnValue((user_id, token)) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 7e616f44f..8cec8fc4e 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -14,24 +14,22 @@ # limitations under the License. +import logging + +from signedjson.key import decode_verify_key_bytes +from signedjson.sign import verify_signed_json from twisted.internet import defer +from unpaddedbase64 import decode_base64 -from ._base import BaseHandler - -from synapse.types import UserID, RoomID, Requester +import synapse.types from synapse.api.constants import ( EventTypes, Membership, ) from synapse.api.errors import AuthError, SynapseError, Codes +from synapse.types import UserID, RoomID from synapse.util.async import Linearizer from synapse.util.distributor import user_left_room, user_joined_room - -from signedjson.sign import verify_signed_json -from signedjson.key import decode_verify_key_bytes - -from unpaddedbase64 import decode_base64 - -import logging +from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -315,7 +313,7 @@ class RoomMemberHandler(BaseHandler): ) assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,) else: - requester = Requester(target_user, None, False) + requester = synapse.types.create_requester(target_user) message_handler = self.hs.get_handlers().message_handler prev_event = message_handler.deduplicate_state_event(event, context) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 89ab39491..56364af33 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -13,18 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + +import simplejson as json +from canonicaljson import encode_canonical_json from twisted.internet import defer from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.types import UserID - -from canonicaljson import encode_canonical_json - from ._base import client_v2_patterns -import logging -import simplejson as json - logger = logging.getLogger(__name__) diff --git a/synapse/types.py b/synapse/types.py index f639651a7..5349b0c45 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -18,7 +18,38 @@ from synapse.api.errors import SynapseError from collections import namedtuple -Requester = namedtuple("Requester", ["user", "access_token_id", "is_guest"]) +Requester = namedtuple("Requester", + ["user", "access_token_id", "is_guest", "device_id"]) +""" +Represents the user making a request + +Attributes: + user (UserID): id of the user making the request + access_token_id (int|None): *ID* of the access token used for this + request, or None if it came via the appservice API or similar + is_guest (bool): True if the user making this request is a guest user + device_id (str|None): device_id which was set at authentication time +""" + + +def create_requester(user_id, access_token_id=None, is_guest=False, + device_id=None): + """ + Create a new ``Requester`` object + + Args: + user_id (str|UserID): id of the user making the request + access_token_id (int|None): *ID* of the access token used for this + request, or None if it came via the appservice API or similar + is_guest (bool): True if the user making this request is a guest user + device_id (str|None): device_id which was set at authentication time + + Returns: + Requester + """ + if not isinstance(user_id, UserID): + user_id = UserID.from_string(user_id) + return Requester(user_id, access_token_id, is_guest, device_id) def get_domain_from_id(string): diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index 4f2c14e4f..f1f664275 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -19,11 +19,12 @@ from twisted.internet import defer from mock import Mock, NonCallableMock +import synapse.types from synapse.api.errors import AuthError from synapse.handlers.profile import ProfileHandler from synapse.types import UserID -from tests.utils import setup_test_homeserver, requester_for_user +from tests.utils import setup_test_homeserver class ProfileHandlers(object): @@ -86,7 +87,7 @@ class ProfileTestCase(unittest.TestCase): def test_set_my_name(self): yield self.handler.set_displayname( self.frank, - requester_for_user(self.frank), + synapse.types.create_requester(self.frank), "Frank Jr." ) @@ -99,7 +100,7 @@ class ProfileTestCase(unittest.TestCase): def test_set_my_name_noauth(self): d = self.handler.set_displayname( self.frank, - requester_for_user(self.bob), + synapse.types.create_requester(self.bob), "Frank Jr." ) @@ -144,7 +145,8 @@ class ProfileTestCase(unittest.TestCase): @defer.inlineCallbacks def test_set_my_avatar(self): yield self.handler.set_avatar_url( - self.frank, requester_for_user(self.frank), "http://my.server/pic.gif" + self.frank, synapse.types.create_requester(self.frank), + "http://my.server/pic.gif" ) self.assertEquals( diff --git a/tests/replication/test_resource.py b/tests/replication/test_resource.py index 842e3d29d..e70ac6f14 100644 --- a/tests/replication/test_resource.py +++ b/tests/replication/test_resource.py @@ -13,15 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.replication.resource import ReplicationResource -from synapse.types import Requester, UserID - -from twisted.internet import defer -from tests import unittest -from tests.utils import setup_test_homeserver, requester_for_user -from mock import Mock, NonCallableMock -import json import contextlib +import json + +from mock import Mock, NonCallableMock +from twisted.internet import defer + +import synapse.types +from synapse.replication.resource import ReplicationResource +from synapse.types import UserID +from tests import unittest +from tests.utils import setup_test_homeserver class ReplicationResourceCase(unittest.TestCase): @@ -61,7 +63,7 @@ class ReplicationResourceCase(unittest.TestCase): def test_events_and_state(self): get = self.get(events="-1", state="-1", timeout="0") yield self.hs.get_handlers().room_creation_handler.create_room( - Requester(self.user, "", False), {} + synapse.types.create_requester(self.user), {} ) code, body = yield get self.assertEquals(code, 200) @@ -144,7 +146,7 @@ class ReplicationResourceCase(unittest.TestCase): def send_text_message(self, room_id, message): handler = self.hs.get_handlers().message_handler event = yield handler.create_and_send_nonmember_event( - requester_for_user(self.user), + synapse.types.create_requester(self.user), { "type": "m.room.message", "content": {"body": "message", "msgtype": "m.text"}, @@ -157,7 +159,7 @@ class ReplicationResourceCase(unittest.TestCase): @defer.inlineCallbacks def create_room(self): result = yield self.hs.get_handlers().room_creation_handler.create_room( - Requester(self.user, "", False), {} + synapse.types.create_requester(self.user), {} ) defer.returnValue(result["room_id"]) diff --git a/tests/rest/client/v1/test_profile.py b/tests/rest/client/v1/test_profile.py index af02fce8f..1e95e9753 100644 --- a/tests/rest/client/v1/test_profile.py +++ b/tests/rest/client/v1/test_profile.py @@ -14,17 +14,14 @@ # limitations under the License. """Tests REST events for /profile paths.""" -from tests import unittest +from mock import Mock from twisted.internet import defer -from mock import Mock - -from ....utils import MockHttpResource, setup_test_homeserver - +import synapse.types from synapse.api.errors import SynapseError, AuthError -from synapse.types import Requester, UserID - from synapse.rest.client.v1 import profile +from tests import unittest +from ....utils import MockHttpResource, setup_test_homeserver myid = "@1234ABCD:test" PATH_PREFIX = "/_matrix/client/api/v1" @@ -52,7 +49,7 @@ class ProfileTestCase(unittest.TestCase): ) def _get_user_by_req(request=None, allow_guest=False): - return Requester(UserID.from_string(myid), "", False) + return synapse.types.create_requester(myid) hs.get_v1auth().get_user_by_req = _get_user_by_req diff --git a/tests/utils.py b/tests/utils.py index ed547bc39..915b934e9 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -20,7 +20,6 @@ from synapse.storage.prepare_database import prepare_database from synapse.storage.engines import create_engine from synapse.server import HomeServer from synapse.federation.transport import server -from synapse.types import Requester from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.logcontext import LoggingContext @@ -512,7 +511,3 @@ class DeferredMockCallable(object): "call(%s)" % _format_call(c[0], c[1]) for c in calls ]) ) - - -def requester_for_user(user): - return Requester(user, None, False) From 87ffd21b291a503fd47ba938b32658c9f475aed5 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Jul 2016 19:19:08 +0100 Subject: [PATCH 31/51] Fix a couple of bugs in the transaction and keyring code --- synapse/crypto/keyring.py | 17 +++++++++-------- synapse/storage/transactions.py | 3 ++- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index d08ee0aa9..826845f69 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -275,14 +275,15 @@ class Keyring(object): for server_name, groups in missing_groups.items() } - for group in missing_groups.values(): - group_id_to_deferred[group.group_id].errback(SynapseError( - 401, - "No key for %s with id %s" % ( - group.server_name, group.key_ids, - ), - Codes.UNAUTHORIZED, - )) + for groups in missing_groups.values(): + for group in groups: + group_id_to_deferred[group.group_id].errback(SynapseError( + 401, + "No key for %s with id %s" % ( + group.server_name, group.key_ids, + ), + Codes.UNAUTHORIZED, + )) def on_err(err): for deferred in group_id_to_deferred.values(): diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 6c7481a72..6258ff172 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -24,6 +24,7 @@ from collections import namedtuple import itertools import logging +import ujson as json logger = logging.getLogger(__name__) @@ -101,7 +102,7 @@ class TransactionStore(SQLBaseStore): ) if result and result["response_code"]: - return result["response_code"], result["response_json"] + return result["response_code"], json.loads(str(result["response_json"])) else: return None From a4b06b619c81f4a212323cc02565c7c893d5c2e5 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 26 Jul 2016 19:50:11 +0100 Subject: [PATCH 32/51] Add a couple more checks to the keyring --- synapse/crypto/keyring.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index d08ee0aa9..627bd0d22 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -447,7 +447,7 @@ class Keyring(object): ) processed_response = yield self.process_v2_response( - perspective_name, response + perspective_name, response, only_from_server=False ) for server_name, response_keys in processed_response.items(): @@ -527,7 +527,7 @@ class Keyring(object): @defer.inlineCallbacks def process_v2_response(self, from_server, response_json, - requested_ids=[]): + requested_ids=[], only_from_server=True): time_now_ms = self.clock.time_msec() response_keys = {} verify_keys = {} @@ -551,6 +551,13 @@ class Keyring(object): results = {} server_name = response_json["server_name"] + if only_from_server: + if server_name != from_server: + raise ValueError( + "Expected a response for server %r not %r" % ( + from_server, server_name + ) + ) for key_id in response_json["signatures"].get(server_name, {}): if key_id not in response_json["verify_keys"]: raise ValueError( From 2e3d90d67c8255300b226d6d2fdc2acef80e58ba Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 26 Jul 2016 23:38:12 +0100 Subject: [PATCH 33/51] Make the device id on e2e key upload optional We should now be able to get our device_id from the access_token, so the device_id on the upload request is optional. Where it is supplied, we should check that it matches. For active access_tokens without an associated device_id, we ought to register the device in the devices table. Also update the table on upgrade so that all of the existing e2e keys are associated with real devices. --- synapse/rest/client/v2_alpha/keys.py | 47 ++++++++++++++----- .../schema/delta/33/devices_for_e2e_keys.sql | 19 ++++++++ 2 files changed, 54 insertions(+), 12 deletions(-) create mode 100644 synapse/storage/schema/delta/33/devices_for_e2e_keys.sql diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 56364af33..0bf32a089 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -19,6 +19,9 @@ import simplejson as json from canonicaljson import encode_canonical_json from twisted.internet import defer +import synapse.api.errors +import synapse.server +import synapse.types from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.types import UserID from ._base import client_v2_patterns @@ -28,7 +31,7 @@ logger = logging.getLogger(__name__) class KeyUploadServlet(RestServlet): """ - POST /keys/upload/ HTTP/1.1 + POST /keys/upload HTTP/1.1 Content-Type: application/json { @@ -51,23 +54,51 @@ class KeyUploadServlet(RestServlet): }, } """ - PATTERNS = client_v2_patterns("/keys/upload/(?P[^/]*)", releases=()) + PATTERNS = client_v2_patterns("/keys/upload(/(?P[^/]+))?$", + releases=(), v2_alpha=False) def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ super(KeyUploadServlet, self).__init__() self.store = hs.get_datastore() self.clock = hs.get_clock() self.auth = hs.get_auth() + self.device_handler = hs.get_device_handler() @defer.inlineCallbacks def on_POST(self, request, device_id): requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() - # TODO: Check that the device_id matches that in the authentication - # or derive the device_id from the authentication instead. body = parse_json_object_from_request(request) + if device_id is not None: + # passing the device_id here is deprecated; however, we allow it + # for now for compatibility with older clients. But if a device_id + # was given here and in the auth, they must match. + + if (requester.device_id is not None and + device_id != requester.device_id): + raise synapse.api.errors.SynapseError( + 400, "Can only upload keys for current device" + ) + + self.device_handler.check_device_registered( + user_id, device_id, "unknown device" + ) + else: + device_id = requester.device_id + + if device_id is None: + raise synapse.api.errors.SynapseError( + 400, + "To upload keys, you must pass device_id when authenticating" + ) + time_now = self.clock.time_msec() # TODO: Validate the JSON to make sure it has the right keys. @@ -103,14 +134,6 @@ class KeyUploadServlet(RestServlet): result = yield self.store.count_e2e_one_time_keys(user_id, device_id) defer.returnValue((200, {"one_time_key_counts": result})) - @defer.inlineCallbacks - def on_GET(self, request, device_id): - requester = yield self.auth.get_user_by_req(request) - user_id = requester.user.to_string() - - result = yield self.store.count_e2e_one_time_keys(user_id, device_id) - defer.returnValue((200, {"one_time_key_counts": result})) - class KeyQueryServlet(RestServlet): """ diff --git a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql new file mode 100644 index 000000000..2908c4d23 --- /dev/null +++ b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql @@ -0,0 +1,19 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- make sure that we have a device record for each set of E2E keys, so that the +-- user can delete them if they like. +INSERT INTO devices + SELECT user_id, device_id, "unknown device" FROM e2e_device_keys_json; From d47115ff8bf3ab5952f053db578a519e8e3f930c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 27 Jul 2016 12:18:03 +0100 Subject: [PATCH 34/51] Delete e2e keys on device delete --- synapse/handlers/device.py | 4 ++++ synapse/rest/client/v2_alpha/keys.py | 13 +++++++++---- synapse/storage/end_to_end_keys.py | 15 +++++++++++++++ 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index eaead5080..f4bf159bb 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -143,6 +143,10 @@ class DeviceHandler(BaseHandler): delete_refresh_tokens=True, ) + yield self.store.delete_e2e_keys_by_device( + user_id=user_id, device_id=device_id + ) + @defer.inlineCallbacks def update_device(self, user_id, device_id, content): """ Update the given device diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 0bf32a089..4629f4bfd 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -86,10 +86,6 @@ class KeyUploadServlet(RestServlet): raise synapse.api.errors.SynapseError( 400, "Can only upload keys for current device" ) - - self.device_handler.check_device_registered( - user_id, device_id, "unknown device" - ) else: device_id = requester.device_id @@ -131,6 +127,15 @@ class KeyUploadServlet(RestServlet): user_id, device_id, time_now, key_list ) + # the device should have been registered already, but it may have been + # deleted due to a race with a DELETE request. Or we may be using an + # old access_token without an associated device_id. Either way, we + # need to double-check the device is registered to avoid ending up with + # keys without a corresponding device. + self.device_handler.check_device_registered( + user_id, device_id, "unknown device" + ) + result = yield self.store.count_e2e_one_time_keys(user_id, device_id) defer.returnValue((200, {"one_time_key_counts": result})) diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py index 2e8906651..62b7790e9 100644 --- a/synapse/storage/end_to_end_keys.py +++ b/synapse/storage/end_to_end_keys.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import twisted.internet.defer + from ._base import SQLBaseStore @@ -123,3 +125,16 @@ class EndToEndKeyStore(SQLBaseStore): return self.runInteraction( "claim_e2e_one_time_keys", _claim_e2e_one_time_keys ) + + @twisted.internet.defer.inlineCallbacks + def delete_e2e_keys_by_device(self, user_id, device_id): + yield self._simple_delete( + table="e2e_device_keys_json", + keyvalues={"user_id": user_id, "device_id": device_id}, + desc="delete_e2e_device_keys_by_device" + ) + yield self._simple_delete( + table="e2e_one_time_keys_json", + keyvalues={"user_id": user_id, "device_id": device_id}, + desc="delete_e2e_one_time_keys_by_device" + ) From 26cb0efa88c2fa84089c74e3de02fa2ce832f47a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 27 Jul 2016 12:30:22 +0100 Subject: [PATCH 35/51] SQL syntax fix --- synapse/storage/schema/delta/33/devices_for_e2e_keys.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql index 2908c4d23..140f2b63e 100644 --- a/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql +++ b/synapse/storage/schema/delta/33/devices_for_e2e_keys.sql @@ -16,4 +16,4 @@ -- make sure that we have a device record for each set of E2E keys, so that the -- user can delete them if they like. INSERT INTO devices - SELECT user_id, device_id, "unknown device" FROM e2e_device_keys_json; + SELECT user_id, device_id, 'unknown device' FROM e2e_device_keys_json; From fe1b36994643ed57b511d9caf834e3e131cd404c Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 27 Jul 2016 14:10:43 +0100 Subject: [PATCH 36/51] Clean up verify_json_objects_for_server --- synapse/crypto/keyring.py | 141 ++++++++++++++++++++------------------ 1 file changed, 74 insertions(+), 67 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index d08ee0aa9..f3924e23d 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -44,7 +44,21 @@ import logging logger = logging.getLogger(__name__) -KeyGroup = namedtuple("KeyGroup", ("server_name", "group_id", "key_ids")) +VerifyKeyRequest = namedtuple("VerifyRequest", ( + "server_name", "key_ids", "json_object", "deferred" +)) +""" +A request for a verify key to verify a JSON object. + +Attributes: + server_name(str): The name of the server to verify against. + key_ids(set(str)): The set of key_ids to that could be used to verify the + JSON object + json_object(dict): The JSON object to verify. + deferred(twisted.internet.defer.Deferred): + A deferred (server_name, key_id, verify_key) tuple that resolves when + a verify key has been fetched +""" class Keyring(object): @@ -74,39 +88,32 @@ class Keyring(object): list of deferreds indicating success or failure to verify each json object's signature for the given server_name. """ - group_id_to_json = {} - group_id_to_group = {} - group_ids = [] - - next_group_id = 0 - deferreds = {} + verify_requests = [] for server_name, json_object in server_and_json: logger.debug("Verifying for %s", server_name) - group_id = next_group_id - next_group_id += 1 - group_ids.append(group_id) key_ids = signature_ids(json_object, server_name) if not key_ids: - deferreds[group_id] = defer.fail(SynapseError( + deferred = defer.fail(SynapseError( 400, "Not signed with a supported algorithm", Codes.UNAUTHORIZED, )) else: - deferreds[group_id] = defer.Deferred() + deferred = defer.Deferred() - group = KeyGroup(server_name, group_id, key_ids) + verify_request = VerifyKeyRequest( + server_name, key_ids, json_object, deferred + ) - group_id_to_group[group_id] = group - group_id_to_json[group_id] = json_object + verify_requests.append(verify_request) @defer.inlineCallbacks - def handle_key_deferred(group, deferred): - server_name = group.server_name + def handle_key_deferred(verify_request): + server_name = verify_request.server_name try: - _, _, key_id, verify_key = yield deferred + _, key_id, verify_key = yield verify_request.deferred except IOError as e: logger.warn( "Got IOError when downloading keys for %s: %s %s", @@ -128,7 +135,7 @@ class Keyring(object): Codes.UNAUTHORIZED, ) - json_object = group_id_to_json[group.group_id] + json_object = verify_request.json_object try: verify_signed_json(json_object, server_name, verify_key) @@ -157,36 +164,34 @@ class Keyring(object): # Actually start fetching keys. wait_on_deferred.addBoth( - lambda _: self.get_server_verify_keys(group_id_to_group, deferreds) + lambda _: self.get_server_verify_keys(verify_requests) ) # When we've finished fetching all the keys for a given server_name, # resolve the deferred passed to `wait_for_previous_lookups` so that # any lookups waiting will proceed. - server_to_gids = {} + server_to_request_ids = {} - def remove_deferreds(res, server_name, group_id): - server_to_gids[server_name].discard(group_id) - if not server_to_gids[server_name]: + def remove_deferreds(res, server_name, verify_request): + request_id = id(verify_request) + server_to_request_ids[server_name].discard(request_id) + if not server_to_request_ids[server_name]: d = server_to_deferred.pop(server_name, None) if d: d.callback(None) return res - for g_id, deferred in deferreds.items(): - server_name = group_id_to_group[g_id].server_name - server_to_gids.setdefault(server_name, set()).add(g_id) - deferred.addBoth(remove_deferreds, server_name, g_id) + for verify_request in verify_requests: + server_name = verify_request.server_name + request_id = id(verify_request) + server_to_request_ids.setdefault(server_name, set()).add(request_id) + deferred.addBoth(remove_deferreds, server_name, verify_request) # Pass those keys to handle_key_deferred so that the json object # signatures can be verified return [ - preserve_context_over_fn( - handle_key_deferred, - group_id_to_group[g_id], - deferreds[g_id], - ) - for g_id in group_ids + preserve_context_over_fn(handle_key_deferred, verify_request) + for verify_request in verify_requests ] @defer.inlineCallbacks @@ -220,7 +225,7 @@ class Keyring(object): d.addBoth(rm, server_name) - def get_server_verify_keys(self, group_id_to_group, group_id_to_deferred): + def get_server_verify_keys(self, verify_requests): """Takes a dict of KeyGroups and tries to find at least one key for each group. """ @@ -237,62 +242,64 @@ class Keyring(object): merged_results = {} missing_keys = {} - for group in group_id_to_group.values(): - missing_keys.setdefault(group.server_name, set()).update( - group.key_ids + for verify_request in verify_requests: + missing_keys.setdefault(verify_request.server_name, set()).update( + verify_request.key_ids ) for fn in key_fetch_fns: results = yield fn(missing_keys.items()) merged_results.update(results) - # We now need to figure out which groups we have keys for - # and which we don't - missing_groups = {} - for group in group_id_to_group.values(): - for key_id in group.key_ids: - if key_id in merged_results[group.server_name]: + # We now need to figure out which verify requests we have keys + # for and which we don't + missing_keys = {} + requests_missing_keys = [] + for verify_request in verify_requests: + server_name = verify_request.server_name + result_keys = merged_results[server_name] + + if verify_request.deferred.called: + # We've already called this deferred, which probably + # means that we've already found a key for it. + continue + + for key_id in verify_request.key_ids: + if key_id in result_keys: with PreserveLoggingContext(): - group_id_to_deferred[group.group_id].callback(( - group.group_id, - group.server_name, + verify_request.deferred.callback(( + server_name, key_id, - merged_results[group.server_name][key_id], + result_keys[key_id], )) break else: - missing_groups.setdefault( - group.server_name, [] - ).append(group) + # The else block is only reached if the loop above + # doesn't break. + missing_keys.setdefault(server_name, set()).update( + verify_request.key_ids + ) + requests_missing_keys.append(verify_request) - if not missing_groups: + if not missing_keys: break - missing_keys = { - server_name: set( - key_id for group in groups for key_id in group.key_ids - ) - for server_name, groups in missing_groups.items() - } - - for group in missing_groups.values(): - group_id_to_deferred[group.group_id].errback(SynapseError( + for verify_request in requests_missing_keys.values(): + verify_request.deferred.errback(SynapseError( 401, "No key for %s with id %s" % ( - group.server_name, group.key_ids, + verify_request.server_name, verify_request.key_ids, ), Codes.UNAUTHORIZED, )) def on_err(err): - for deferred in group_id_to_deferred.values(): - if not deferred.called: - deferred.errback(err) + for verify_request in verify_requests: + if not verify_request.deferred.called: + verify_request.deferred.errback(err) do_iterations().addErrback(on_err) - return group_id_to_deferred - @defer.inlineCallbacks def get_keys_from_store(self, server_name_and_key_ids): res = yield defer.gatherResults( From ccec25e2c6270c1cae916b8ca8a775a166ea7e7f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 27 Jul 2016 16:41:06 +0100 Subject: [PATCH 37/51] key upload tweaks 1. Add v2_alpha URL back in, since things seem to be using it. 2. Don't reject the request if the device_id in the upload request fails to match that in the access_token. --- synapse/rest/client/v2_alpha/keys.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 4629f4bfd..dc1d4d8fc 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -55,7 +55,7 @@ class KeyUploadServlet(RestServlet): } """ PATTERNS = client_v2_patterns("/keys/upload(/(?P[^/]+))?$", - releases=(), v2_alpha=False) + releases=()) def __init__(self, hs): """ @@ -78,14 +78,12 @@ class KeyUploadServlet(RestServlet): if device_id is not None: # passing the device_id here is deprecated; however, we allow it - # for now for compatibility with older clients. But if a device_id - # was given here and in the auth, they must match. - + # for now for compatibility with older clients. if (requester.device_id is not None and device_id != requester.device_id): - raise synapse.api.errors.SynapseError( - 400, "Can only upload keys for current device" - ) + logger.warning("Client uploading keys for a different device " + "(logged in as %s, uploading for %s)", + requester.device_id, device_id) else: device_id = requester.device_id From 5238960850b4aa4b318f7c794fdadaf12dfe3841 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Jul 2016 17:33:09 +0100 Subject: [PATCH 38/51] Bump CHANGES and version --- CHANGES.rst | 56 +++++++++++++++++++++++++++++++++++++++++++++ synapse/__init__.py | 2 +- 2 files changed, 57 insertions(+), 1 deletion(-) diff --git a/CHANGES.rst b/CHANGES.rst index e1d5e876d..799c14575 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,59 @@ +Changes in synapse v0.17.0-r1 (2016-07-27) +========================================== + +This release changes the LDAP configuration format in a backwards incompatible +way, see PR #843 for details. + + +Features: + +* Add purge_media_cache admin API (PR #902) +* Add deactivate account admin API (PR #903) +* Add optional pepper to password hashing (PR #907, #910) +* Add an admin option to shared secret registration (breaks backwards compat) + (PR #909) +* Add purge local room history API (PR #911, #923, #924) +* Add requestToken endpoints (PR #915) +* Add an /account/deactivate endpoint (PR #921) +* Add filter param to /messages. Add 'contains_url' to filter. (PR #922) +* Add device_id support to /login (PR #929) +* Add device_id support to /v2/register flow. (PR #937, #942) +* Add GET /devices endpoint (PR #939, #944) +* Add GET /device/{deviceId} (PR #943) +* Add update and delete APIs for devices (PR #949) + + +Changes: + +* Rewrite LDAP Authentication against ldap3 (PR #843) +* Linearize some federation endpoints based on (origin, room_id) (PR #879) +* Remove the legacy v0 content upload API. (PR #888) +* Use similar naming we use in email notifs for push (PR #894) +* Optionally include password hash in createUser endpoint (PR #905) +* Use a query that postgresql optimises better for get_events_around (PR #906) +* Fall back to 'username' if 'user' is not given for appservice registration. + (PR #927) +* Add metrics for psutil derived memory usage (PR #936) +* Record device_id in client_ips (PR #938) +* Log the hostname the reCAPTCHA was completed on (PR #946) + + +Bug fixes: + +* Fix substitution failure in mail template (PR #887) +* Put most recent 20 messages in email notif (PR #892) +* Ensure that the guest user is in the database when upgrading accounts + (PR #914) +* Fix various edge cases in auth handling (PR #919) +* Fix 500 ISE when sending alias event without a state_key (PR #925) +* Fix bug where we stored rejections in the state_group, persist all + rejections (PR #948) +* Fix lack of check of if the user is banned when handling 3pid invites + (PR #952) +* Fix a couple of bugs in the transaction and keyring code (PR #954, #955) + + + Changes in synapse v0.16.1-r1 (2016-07-08) ========================================== diff --git a/synapse/__init__.py b/synapse/__init__.py index 2750ad3f7..b0bd7254c 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.16.1-r1" +__version__ = "0.16.17" From fda078f995265adb0ecee5734c516eb55adc9355 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 28 Jul 2016 09:14:21 +0100 Subject: [PATCH 39/51] Add r0.2.0 to the "supported versions" list --- synapse/rest/client/versions.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py index ca5468c40..1fe31abb4 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py @@ -26,7 +26,10 @@ class VersionsRestServlet(RestServlet): def on_GET(self, request): return (200, { - "versions": ["r0.0.1"] + "versions": [ + "r0.0.1", + "r0.2.0", + ] }) From ecd5e6bfa4b84b6beb47b27d476f0bdba66f7a23 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 28 Jul 2016 10:04:37 +0100 Subject: [PATCH 40/51] Typo --- synapse/push/push_tools.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index 6f2d1ad57..d555a33e9 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -54,7 +54,7 @@ def get_context_for_event(state_handler, ev, user_id): room_state = yield state_handler.get_current_state(ev.room_id) # we no longer bother setting room_alias, and make room_name the - # human-readable name instead, be that m.room.namer, an alias or + # human-readable name instead, be that m.room.name, an alias or # a list of people in the room name = calculate_room_name( room_state, user_id, fallback_to_single_member=False From f6f8f81a4800cae83684cd1d75eb9a132c5bde6e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 28 Jul 2016 10:14:07 +0100 Subject: [PATCH 41/51] Add r0.1.0 to the "supported versions" list --- synapse/rest/client/versions.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py index 1fe31abb4..e984ea47d 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py @@ -28,6 +28,7 @@ class VersionsRestServlet(RestServlet): return (200, { "versions": [ "r0.0.1", + "r0.1.0", "r0.2.0", ] }) From 389c890f14c456a157d973fd29b49d64e5fa9226 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 28 Jul 2016 10:20:47 +0100 Subject: [PATCH 42/51] Don't include name of room for invites in push Avoids insane pushes like, "Bob invited you to invite from Bob" --- synapse/util/presentable_names.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/util/presentable_names.py b/synapse/util/presentable_names.py index 4c54812e6..f68676e9e 100644 --- a/synapse/util/presentable_names.py +++ b/synapse/util/presentable_names.py @@ -83,7 +83,10 @@ def calculate_room_name(room_state, user_id, fallback_to_members=True, ): if ("m.room.member", my_member_event.sender) in room_state: inviter_member_event = room_state[("m.room.member", my_member_event.sender)] - return "Invite from %s" % (name_from_member_event(inviter_member_event),) + if fallback_to_single_member: + return "Invite from %s" % (name_from_member_event(inviter_member_event),) + else: + return None else: return "Room Invite" From 7871790db1b38d10783d88ebfc9bd4e0356195c7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jul 2016 10:38:56 +0100 Subject: [PATCH 43/51] Bump version and changelog --- CHANGES.rst | 7 +++++-- synapse/__init__.py | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 799c14575..65566adda 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,5 +1,5 @@ -Changes in synapse v0.17.0-r1 (2016-07-27) -========================================== +Changes in synapse v0.17.0-rc1 (2016-07-28) +=========================================== This release changes the LDAP configuration format in a backwards incompatible way, see PR #843 for details. @@ -36,6 +36,9 @@ Changes: * Add metrics for psutil derived memory usage (PR #936) * Record device_id in client_ips (PR #938) * Log the hostname the reCAPTCHA was completed on (PR #946) +* Make the device id on e2e key upload optional (PR #956) +* Add r0.2.0 to the "supported versions" list (PR #960) +* Don't include name of room for invites in push (PR #961) Bug fixes: diff --git a/synapse/__init__.py b/synapse/__init__.py index b0bd7254c..8f0176e18 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.16.17" +__version__ = "0.17.0-rc1" From bf81e38d365b79130b5e04053de0eaff94b0d472 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 28 Jul 2016 10:29:12 +0100 Subject: [PATCH 44/51] Fix retry utils to check if the exception is a subclass of CME --- synapse/util/retryutils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 43cf11f3f..49527f4d2 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -128,7 +128,7 @@ class RetryDestinationLimiter(object): ) valid_err_code = False - if exc_type is CodeMessageException: + if exc_type is not None and issubclass(exc_type, CodeMessageException): valid_err_code = 0 <= exc_val.code < 500 if exc_type is None or valid_err_code: From 019cf013d6ea4a8182189d068dc44ec403cc58ce Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jul 2016 10:47:45 +0100 Subject: [PATCH 45/51] Update changelog --- CHANGES.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index 65566adda..c2fb98247 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -4,6 +4,9 @@ Changes in synapse v0.17.0-rc1 (2016-07-28) This release changes the LDAP configuration format in a backwards incompatible way, see PR #843 for details. +This release contains significant security bug fixes regarding authenticating +events received over federation. Please upgrade. + Features: @@ -35,6 +38,7 @@ Changes: (PR #927) * Add metrics for psutil derived memory usage (PR #936) * Record device_id in client_ips (PR #938) +* Send the correct host header when fetching keys (PR #941) * Log the hostname the reCAPTCHA was completed on (PR #946) * Make the device id on e2e key upload optional (PR #956) * Add r0.2.0 to the "supported versions" list (PR #960) From 7861cfec0aaed29b4bea0aab8fe7e89c7f23adcb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jul 2016 14:35:05 +0100 Subject: [PATCH 46/51] Add authors to changelog --- CHANGES.rst | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index c2fb98247..03668370a 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -12,7 +12,7 @@ Features: * Add purge_media_cache admin API (PR #902) * Add deactivate account admin API (PR #903) -* Add optional pepper to password hashing (PR #907, #910) +* Add optional pepper to password hashing (PR #907, #910 by KentShikama) * Add an admin option to shared secret registration (breaks backwards compat) (PR #909) * Add purge local room history API (PR #911, #923, #924) @@ -28,14 +28,15 @@ Features: Changes: -* Rewrite LDAP Authentication against ldap3 (PR #843) +* Rewrite LDAP Authentication against ldap3 (PR #843 by mweinelt) * Linearize some federation endpoints based on (origin, room_id) (PR #879) * Remove the legacy v0 content upload API. (PR #888) * Use similar naming we use in email notifs for push (PR #894) -* Optionally include password hash in createUser endpoint (PR #905) +* Optionally include password hash in createUser endpoint (PR #905 by + KentShikama) * Use a query that postgresql optimises better for get_events_around (PR #906) * Fall back to 'username' if 'user' is not given for appservice registration. - (PR #927) + (PR #927 by Half-Shot) * Add metrics for psutil derived memory usage (PR #936) * Record device_id in client_ips (PR #938) * Send the correct host header when fetching keys (PR #941) From 367b594183c553436bb0338e9f26e42fa46424dc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jul 2016 14:56:09 +0100 Subject: [PATCH 47/51] Add some basic admin API docs --- docs/admin_api/README.rst | 12 ++++++++++++ docs/admin_api/purge_history_api.rst | 15 +++++++++++++++ docs/admin_api/purge_remote_media.rst | 19 +++++++++++++++++++ 3 files changed, 46 insertions(+) create mode 100644 docs/admin_api/README.rst create mode 100644 docs/admin_api/purge_history_api.rst create mode 100644 docs/admin_api/purge_remote_media.rst diff --git a/docs/admin_api/README.rst b/docs/admin_api/README.rst new file mode 100644 index 000000000..d4f564cfa --- /dev/null +++ b/docs/admin_api/README.rst @@ -0,0 +1,12 @@ +Admin APIs +========== + +This directory includes documentation for the various synapse specific admin +APIs available. + +Only users that are server admins can use these APIs. A user can be marked as a +server admin by updating the database directly, e.g.: + +``UPDATE users SET admin = 1 WHERE name = '@foo:bar.com'`` + +Restarting may be required for the changes to register. diff --git a/docs/admin_api/purge_history_api.rst b/docs/admin_api/purge_history_api.rst new file mode 100644 index 000000000..986efe40f --- /dev/null +++ b/docs/admin_api/purge_history_api.rst @@ -0,0 +1,15 @@ +Purge History API +================= + +The purge history API allows server admins to purge historic events from their +database, reclaiming disk space. + +Depending on the amount of history being purged a call to the API may take +several minutes or longer. During this period users will not be able to +paginate further back in the room from the point being purged from. + +The API is simply: + +``POST /_matrix/client/r0/admin/purge_history//`` + +including an ``access_token`` of a server admin. diff --git a/docs/admin_api/purge_remote_media.rst b/docs/admin_api/purge_remote_media.rst new file mode 100644 index 000000000..749ed1b2b --- /dev/null +++ b/docs/admin_api/purge_remote_media.rst @@ -0,0 +1,19 @@ +Purge Remote Media API +====================== + +The purge remote media API allows server admins to purge old cached remote +media. + +The API is:: + + POST /_matrix/client/r0/admin/purge_history/ + + { + "before_ts": + } + +Which will remove all cached media that was last accessed before +````. + +If the user re-requests purged remote media, synapse will re-request the media +from the originating server. From 3c3246c078134124610afa40ec55626568c5627c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jul 2016 15:08:37 +0100 Subject: [PATCH 48/51] Use correct path --- docs/admin_api/purge_remote_media.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/admin_api/purge_remote_media.rst b/docs/admin_api/purge_remote_media.rst index 749ed1b2b..b26c6a9e7 100644 --- a/docs/admin_api/purge_remote_media.rst +++ b/docs/admin_api/purge_remote_media.rst @@ -6,7 +6,7 @@ media. The API is:: - POST /_matrix/client/r0/admin/purge_history/ + POST /_matrix/client/r0/admin/purge_media_cache { "before_ts": From 370135ad0b7cf7ded04e9f2ca0c99f5470f5efc1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 28 Jul 2016 16:47:37 +0100 Subject: [PATCH 49/51] Comment get_unread_push_actions_for_user_in_range function --- synapse/storage/event_push_actions.py | 28 +++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 3d93285f8..958dbcc22 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -119,9 +119,28 @@ class EventPushActionsStore(SQLBaseStore): @defer.inlineCallbacks def get_unread_push_actions_for_user_in_range(self, user_id, min_stream_ordering, - max_stream_ordering=None, + max_stream_ordering, limit=20): + """Get a list of the most recent unread push actions for a given user, + within the given stream ordering range. + + Args: + user_id (str) + min_stream_ordering + max_stream_ordering + limit (int) + Returns: + A promise which resolves to a list of dicts with the keys "event_id", + "room_id", "stream_ordering", "actions", "received_ts". + The list will have between 0~limit entries. + """ + # find rooms that have a read receipt in them and return the most recent + # push actions def get_after_receipt(txn): + # XXX: Do we really need to GROUP BY user_id on the inner SELECT? + # XXX: NATURAL JOIN obfuscates which columns are being joined on the + # inner SELECT (the room_id and event_id), can we + # INNER JOIN ... USING instead? sql = ( "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " "e.received_ts " @@ -160,7 +179,12 @@ class EventPushActionsStore(SQLBaseStore): "get_unread_push_actions_for_user_in_range", get_after_receipt ) + # There are rooms with push actions in them but you don't have a read receipt in + # them e.g. rooms you've been invited to, so get push actions for rooms which do + # not have read receipts in them too. def get_no_receipt(txn): + # XXX: Does the inner SELECT really need to select from the events table? + # We're just extracting the room_id, so isn't receipts_linearized enough? sql = ( "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," " e.received_ts" @@ -198,7 +222,7 @@ class EventPushActionsStore(SQLBaseStore): # Now sort it so it's ordered correctly, since currently it will # contain results from the first query, correctly ordered, followed # by results from the second query, but we want them all ordered - # by received_ts + # by received_ts (most recent first) notifs.sort(key=lambda r: -(r['received_ts'] or 0)) # Now return the first `limit` From 0a7d3cd00f8b7e3ad0ba458c3ab9b40a2496545b Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 28 Jul 2016 20:24:24 +0100 Subject: [PATCH 50/51] Create separate methods for getting messages to push for the email and http pushers rather than trying to make a single method that will work with their conflicting requirements. The http pusher needs to get the messages in ascending stream order, and doesn't want to miss a message. The email pusher needs to get the messages in descending timestamp order, and doesn't mind if it misses messages. --- synapse/push/emailpusher.py | 5 +- synapse/push/httppusher.py | 3 +- synapse/replication/slave/storage/events.py | 7 +- synapse/storage/event_push_actions.py | 199 +++++++++++++++----- tests/storage/test_event_push_actions.py | 41 ++++ 5 files changed, 204 insertions(+), 51 deletions(-) create mode 100644 tests/storage/test_event_push_actions.py diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 12a3ec7fd..e224b6829 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -140,9 +140,8 @@ class EmailPusher(object): being run. """ start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering - unprocessed = yield self.store.get_unread_push_actions_for_user_in_range( - self.user_id, start, self.max_stream_ordering - ) + fn = self.store.get_unread_push_actions_for_user_in_range_for_email + unprocessed = yield fn(self.user_id, start, self.max_stream_ordering) soonest_due_at = None diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 2acc6cc21..9a7db6122 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -141,7 +141,8 @@ class HttpPusher(object): run once per pusher. """ - unprocessed = yield self.store.get_unread_push_actions_for_user_in_range( + fn = self.store.get_unread_push_actions_for_user_in_range_for_http + unprocessed = yield fn( self.user_id, self.last_stream_ordering, self.max_stream_ordering ) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 369d83946..6a644f138 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -93,8 +93,11 @@ class SlavedEventStore(BaseSlavedStore): StreamStore.__dict__["get_recent_event_ids_for_room"] ) - get_unread_push_actions_for_user_in_range = ( - DataStore.get_unread_push_actions_for_user_in_range.__func__ + get_unread_push_actions_for_user_in_range_for_http = ( + DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__ + ) + get_unread_push_actions_for_user_in_range_for_email = ( + DataStore.get_unread_push_actions_for_user_in_range_for_email.__func__ ) get_push_action_users_in_range = ( DataStore.get_push_action_users_in_range.__func__ diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 958dbcc22..5ab362bef 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -117,40 +117,149 @@ class EventPushActionsStore(SQLBaseStore): defer.returnValue(ret) @defer.inlineCallbacks - def get_unread_push_actions_for_user_in_range(self, user_id, - min_stream_ordering, - max_stream_ordering, - limit=20): + def get_unread_push_actions_for_user_in_range_for_http( + self, user_id, min_stream_ordering, max_stream_ordering, limit=20 + ): """Get a list of the most recent unread push actions for a given user, - within the given stream ordering range. + within the given stream ordering range. Called by the httppusher. Args: - user_id (str) - min_stream_ordering - max_stream_ordering - limit (int) + user_id (str): The user to fetch push actions for. + min_stream_ordering(int): The exclusive lower bound on the + stream ordering of event push actions to fetch. + max_stream_ordering(int): The inclusive upper bound on the + stream ordering of event push actions to fetch. + limit (int): The maximum number of rows to return. + Returns: + A promise which resolves to a list of dicts with the keys "event_id", + "room_id", "stream_ordering", "actions". + The list will be ordered by ascending stream_ordering. + The list will have between 0~limit entries. + """ + # find rooms that have a read receipt in them and return the next + # push actions + def get_after_receipt(txn): + # find rooms that have a read receipt in them and return the next + # push actions + sql = ( + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions" + " FROM (" + " SELECT room_id," + " MAX(topological_ordering) as topological_ordering," + " MAX(stream_ordering) as stream_ordering" + " FROM events" + " INNER JOIN receipts_linearized USING (room_id, event_id)" + " WHERE receipt_type = 'm.read' AND user_id = ?" + " GROUP BY room_id" + ") AS rl," + " event_push_actions AS ep" + " WHERE" + " ep.room_id = rl.room_id" + " AND (" + " ep.topological_ordering > rl.topological_ordering" + " OR (" + " ep.topological_ordering = rl.topological_ordering" + " AND ep.stream_ordering > rl.stream_ordering" + " )" + " )" + " AND ep.user_id = ?" + " AND ep.stream_ordering > ?" + " AND ep.stream_ordering <= ?" + " ORDER BY ep.stream_ordering ASC LIMIT ?" + ) + args = [ + user_id, user_id, + min_stream_ordering, max_stream_ordering, limit, + ] + txn.execute(sql, args) + return txn.fetchall() + after_read_receipt = yield self.runInteraction( + "get_unread_push_actions_for_user_in_range_http_arr", get_after_receipt + ) + + # There are rooms with push actions in them but you don't have a read receipt in + # them e.g. rooms you've been invited to, so get push actions for rooms which do + # not have read receipts in them too. + def get_no_receipt(txn): + sql = ( + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," + " e.received_ts" + " FROM event_push_actions AS ep" + " INNER JOIN events AS e USING (room_id, event_id)" + " WHERE" + " ep.room_id NOT IN (" + " SELECT room_id FROM receipts_linearized" + " WHERE receipt_type = 'm.read' AND user_id = ?" + " GROUP BY room_id" + " )" + " AND ep.user_id = ?" + " AND ep.stream_ordering > ?" + " AND ep.stream_ordering <= ?" + " ORDER BY ep.stream_ordering ASC LIMIT ?" + ) + args = [ + user_id, user_id, + min_stream_ordering, max_stream_ordering, limit, + ] + txn.execute(sql, args) + return txn.fetchall() + no_read_receipt = yield self.runInteraction( + "get_unread_push_actions_for_user_in_range_http_nrr", get_no_receipt + ) + + notifs = [ + { + "event_id": row[0], + "room_id": row[1], + "stream_ordering": row[2], + "actions": json.loads(row[3]), + } for row in after_read_receipt + no_read_receipt + ] + + # Now sort it so it's ordered correctly, since currently it will + # contain results from the first query, correctly ordered, followed + # by results from the second query, but we want them all ordered + # by stream_ordering, oldest first. + notifs.sort(key=lambda r: r['stream_ordering']) + + # Take only up to the limit. We have to stop at the limit because + # one of the subqueries may have hit the limit. + defer.returnValue(notifs[:limit]) + + @defer.inlineCallbacks + def get_unread_push_actions_for_user_in_range_for_email( + self, user_id, min_stream_ordering, max_stream_ordering, limit=20 + ): + """Get a list of the most recent unread push actions for a given user, + within the given stream ordering range. Called by the emailpusher + + Args: + user_id (str): The user to fetch push actions for. + min_stream_ordering(int): The exclusive lower bound on the + stream ordering of event push actions to fetch. + max_stream_ordering(int): The inclusive upper bound on the + stream ordering of event push actions to fetch. + limit (int): The maximum number of rows to return. Returns: A promise which resolves to a list of dicts with the keys "event_id", "room_id", "stream_ordering", "actions", "received_ts". + The list will be ordered by descending received_ts. The list will have between 0~limit entries. """ # find rooms that have a read receipt in them and return the most recent # push actions def get_after_receipt(txn): - # XXX: Do we really need to GROUP BY user_id on the inner SELECT? - # XXX: NATURAL JOIN obfuscates which columns are being joined on the - # inner SELECT (the room_id and event_id), can we - # INNER JOIN ... USING instead? sql = ( - "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " - "e.received_ts " - "FROM (" - " SELECT room_id, user_id, " - " max(topological_ordering) as topological_ordering, " - " max(stream_ordering) as stream_ordering " - " FROM events" - " NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'" - " GROUP BY room_id, user_id" + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," + " e.received_ts" + " FROM (" + " SELECT room_id," + " MAX(topological_ordering) as topological_ordering," + " MAX(stream_ordering) as stream_ordering" + " FROM events" + " INNER JOIN receipts_linearized USING (room_id, event_id)" + " WHERE receipt_type = 'm.read' AND user_id = ?" + " GROUP BY room_id" ") AS rl," " event_push_actions AS ep" " INNER JOIN events AS e USING (room_id, event_id)" @@ -165,47 +274,47 @@ class EventPushActionsStore(SQLBaseStore): " )" " AND ep.stream_ordering > ?" " AND ep.user_id = ?" - " AND ep.user_id = rl.user_id" + " AND ep.stream_ordering <= ?" + " ORDER BY ep.stream_ordering DESC LIMIT ?" ) - args = [min_stream_ordering, user_id] - if max_stream_ordering is not None: - sql += " AND ep.stream_ordering <= ?" - args.append(max_stream_ordering) - sql += " ORDER BY ep.stream_ordering DESC LIMIT ?" - args.append(limit) + args = [ + user_id, user_id, + min_stream_ordering, max_stream_ordering, limit, + ] txn.execute(sql, args) return txn.fetchall() after_read_receipt = yield self.runInteraction( - "get_unread_push_actions_for_user_in_range", get_after_receipt + "get_unread_push_actions_for_user_in_range_email_arr", get_after_receipt ) # There are rooms with push actions in them but you don't have a read receipt in # them e.g. rooms you've been invited to, so get push actions for rooms which do # not have read receipts in them too. def get_no_receipt(txn): - # XXX: Does the inner SELECT really need to select from the events table? - # We're just extracting the room_id, so isn't receipts_linearized enough? sql = ( "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," " e.received_ts" " FROM event_push_actions AS ep" - " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id" - " WHERE ep.room_id not in (" - " SELECT room_id FROM events NATURAL JOIN receipts_linearized" - " WHERE receipt_type = 'm.read' AND user_id = ?" - " GROUP BY room_id" - ") AND ep.user_id = ? AND ep.stream_ordering > ?" + " INNER JOIN events AS e USING (room_id, event_id)" + " WHERE" + " ep.room_id NOT IN (" + " SELECT room_id FROM receipts_linearized" + " WHERE receipt_type = 'm.read' AND user_id = ?" + " GROUP BY room_id" + " )" + " AND ep.user_id = ?" + " AND ep.stream_ordering > ?" + " AND ep.stream_ordering <= ?" + " ORDER BY ep.stream_ordering DESC LIMIT ?" ) - args = [user_id, user_id, min_stream_ordering] - if max_stream_ordering is not None: - sql += " AND ep.stream_ordering <= ?" - args.append(max_stream_ordering) - sql += " ORDER BY ep.stream_ordering DESC LIMIT ?" - args.append(limit) + args = [ + user_id, user_id, + min_stream_ordering, max_stream_ordering, limit, + ] txn.execute(sql, args) return txn.fetchall() no_read_receipt = yield self.runInteraction( - "get_unread_push_actions_for_user_in_range", get_no_receipt + "get_unread_push_actions_for_user_in_range_email_nrr", get_no_receipt ) # Make a list of dicts from the two sets of results. diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py new file mode 100644 index 000000000..e9044afa2 --- /dev/null +++ b/tests/storage/test_event_push_actions.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +import tests.unittest +import tests.utils + +USER_ID = "@user:example.com" + + +class EventPushActionsStoreTestCase(tests.unittest.TestCase): + + @defer.inlineCallbacks + def setUp(self): + hs = yield tests.utils.setup_test_homeserver() + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def test_get_unread_push_actions_for_user_in_range_for_http(self): + yield self.store.get_unread_push_actions_for_user_in_range_for_http( + USER_ID, 0, 1000, 20 + ) + + @defer.inlineCallbacks + def test_get_unread_push_actions_for_user_in_range_for_email(self): + yield self.store.get_unread_push_actions_for_user_in_range_for_email( + USER_ID, 0, 1000, 20 + ) From 8dad08a9509103f38d9eec5dc28d46e4a757fad8 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 29 Jul 2016 09:57:13 +0100 Subject: [PATCH 51/51] Fix SQL to supply arguments in the same order --- synapse/storage/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 5ab362bef..df4000d0d 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -272,8 +272,8 @@ class EventPushActionsStore(SQLBaseStore): " AND ep.stream_ordering > rl.stream_ordering" " )" " )" - " AND ep.stream_ordering > ?" " AND ep.user_id = ?" + " AND ep.stream_ordering > ?" " AND ep.stream_ordering <= ?" " ORDER BY ep.stream_ordering DESC LIMIT ?" )