From 248e6770ca0faadf574cfd62f72d8e200cb5b57a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Jul 2016 10:30:12 +0100 Subject: [PATCH 01/24] Cache federation state responses --- synapse/federation/federation_server.py | 68 +++++++++++++++++-------- synapse/handlers/federation.py | 7 +-- synapse/handlers/room.py | 4 +- synapse/handlers/sync.py | 2 +- synapse/util/caches/response_cache.py | 13 ++++- 5 files changed, 61 insertions(+), 33 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 85f5e752f..d15c7e1b4 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -21,10 +21,11 @@ from .units import Transaction, Edu from synapse.util.async import Linearizer from synapse.util.logutils import log_function +from synapse.util.caches.response_cache import ResponseCache from synapse.events import FrozenEvent import synapse.metrics -from synapse.api.errors import FederationError, SynapseError +from synapse.api.errors import AuthError, FederationError, SynapseError from synapse.crypto.event_signing import compute_event_signature @@ -48,9 +49,15 @@ class FederationServer(FederationBase): def __init__(self, hs): super(FederationServer, self).__init__(hs) + self.auth = hs.get_auth() + self._room_pdu_linearizer = Linearizer() self._server_linearizer = Linearizer() + # We cache responses to state queries, as they take a while and often + # come in waves. + self._state_resp_cache = ResponseCache(hs, timeout_ms=30000) + def set_handler(self, handler): """Sets the handler that the replication layer will use to communicate receipt of new PDUs from other home servers. The required methods are @@ -188,28 +195,45 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function def on_context_state_request(self, origin, room_id, event_id): - with (yield self._server_linearizer.queue((origin, room_id))): - if event_id: - pdus = yield self.handler.get_state_for_pdu( - origin, room_id, event_id, - ) - auth_chain = yield self.store.get_auth_chain( - [pdu.event_id for pdu in pdus] - ) + if not event_id: + raise NotImplementedError("Specify an event") - for event in auth_chain: - # We sign these again because there was a bug where we - # incorrectly signed things the first time round - if self.hs.is_mine_id(event.event_id): - event.signatures.update( - compute_event_signature( - event, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - else: - raise NotImplementedError("Specify an event") + in_room = yield self.auth.check_host_in_room(room_id, origin) + if not in_room: + raise AuthError(403, "Host not in room.") + + result = self._state_resp_cache.get((room_id, event_id)) + if not result: + with (yield self._server_linearizer.queue((origin, room_id))): + resp = yield self.response_cache.set( + (room_id, event_id), + self._on_context_state_request_compute(room_id, event_id) + ) + else: + resp = yield result + + defer.returnValue((200, resp)) + + @defer.inlineCallbacks + def _on_context_state_request_compute(self, room_id, event_id): + pdus = yield self.handler.get_state_for_pdu( + room_id, event_id, + ) + auth_chain = yield self.store.get_auth_chain( + [pdu.event_id for pdu in pdus] + ) + + for event in auth_chain: + # We sign these again because there was a bug where we + # incorrectly signed things the first time round + if self.hs.is_mine_id(event.event_id): + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ) defer.returnValue((200, { "pdus": [pdu.get_pdu_json() for pdu in pdus], diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3f138daf1..fcad41d7b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -991,14 +991,9 @@ class FederationHandler(BaseHandler): defer.returnValue(None) @defer.inlineCallbacks - def get_state_for_pdu(self, origin, room_id, event_id, do_auth=True): + def get_state_for_pdu(self, room_id, event_id): yield run_on_reactor() - if do_auth: - in_room = yield self.auth.check_host_in_room(room_id, origin) - if not in_room: - raise AuthError(403, "Host not in room.") - state_groups = yield self.store.get_state_groups( room_id, [event_id] ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index ae44c7a55..bf6b1c153 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -345,8 +345,8 @@ class RoomCreationHandler(BaseHandler): class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) - self.response_cache = ResponseCache() - self.remote_list_request_cache = ResponseCache() + self.response_cache = ResponseCache(hs) + self.remote_list_request_cache = ResponseCache(hs) self.remote_list_cache = {} self.fetch_looping_call = hs.get_clock().looping_call( self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index be26a491f..0ee4ebe50 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -138,7 +138,7 @@ class SyncHandler(object): self.presence_handler = hs.get_presence_handler() self.event_sources = hs.get_event_sources() self.clock = hs.get_clock() - self.response_cache = ResponseCache() + self.response_cache = ResponseCache(hs) def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0, full_state=False): diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 36686b479..00af53988 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -24,9 +24,12 @@ class ResponseCache(object): used rather than trying to compute a new response. """ - def __init__(self): + def __init__(self, hs, timeout_ms=0): self.pending_result_cache = {} # Requests that haven't finished yet. + self.clock = hs.get_clock() + self.timeout_sec = timeout_ms / 1000. + def get(self, key): result = self.pending_result_cache.get(key) if result is not None: @@ -39,7 +42,13 @@ class ResponseCache(object): self.pending_result_cache[key] = result def remove(r): - self.pending_result_cache.pop(key, None) + if self.timeout_sec: + self.clock.call_later( + self.timeout_sec, + self.pending_result_cache.pop, key, None, + ) + else: + self.pending_result_cache.pop(key, None) return r result.addBoth(remove) From aede7248ab04118b83d7787547b9cf3fd615e7ad Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 21 Jul 2016 17:37:44 +0100 Subject: [PATCH 02/24] Split out a FederationReader process --- synapse/app/federation_reader.py | 200 ++++++++++++++++++++ synapse/replication/slave/storage/events.py | 5 + synapse/replication/slave/storage/keys.py | 29 +++ synapse/storage/keys.py | 4 + 4 files changed, 238 insertions(+) create mode 100644 synapse/app/federation_reader.py create mode 100644 synapse/replication/slave/storage/keys.py diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py new file mode 100644 index 000000000..98a18f9b3 --- /dev/null +++ b/synapse/app/federation_reader.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.config._base import ConfigError +from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging +from synapse.http.site import SynapseSite +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.keys import SlavedKeyStore +from synapse.server import HomeServer +from synapse.storage.engines import create_engine +from synapse.util.async import sleep +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string +from synapse.api.urls import FEDERATION_PREFIX +from synapse.federation.transport.server import TransportLayerServer +from synapse.crypto import context_factory + + +from twisted.internet import reactor, defer +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import gc + +logger = logging.getLogger("synapse.app.federation_reader") + + +class FederationReaderSlavedStore( + SlavedEventStore, + SlavedKeyStore, + BaseSlavedStore, +): + pass + + +class FederationReaderServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + elif name == "federation": + resources.update({ + FEDERATION_PREFIX: TransportLayerServer(self), + }) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse federation reader now listening on port %d", port) + + def start_listening(self, listeners): + for listener in listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.worker_replication_url + + while True: + try: + args = store.stream_positions() + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + yield store.process_replication(result) + except: + logger.exception("Error replicating from %r", replication_url) + yield sleep(5) + + +def start(config_options): + try: + config = HomeServerConfig.load_config( + "Synapse federation reader", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + assert config.worker_app == "synapse.app.federation_reader" + + setup_logging(config.worker_log_config, config.worker_log_file) + + database_engine = create_engine(config.database_config) + + tls_server_context_factory = context_factory.ServerContextFactory(config) + + ss = FederationReaderServer( + config.server_name, + db_config=config.database_config, + tls_server_context_factory=tls_server_context_factory, + config=config, + version_string=get_version_string("Synapse", synapse), + database_engine=database_engine, + ) + + ss.setup() + ss.get_handlers() + ss.start_listening(config.worker_listeners) + + def run(): + with LoggingContext("run"): + logger.info("Running") + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + gc.set_threshold(*config.gc_thresholds) + reactor.run() + + def start(): + ss.get_datastore().start_profiling() + ss.replicate() + + reactor.callWhenRunning(start) + + if config.worker_daemonize: + daemon = Daemonize( + app="synapse-federation-reader", + pid=config.worker_pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() + + +if __name__ == '__main__': + with LoggingContext("main"): + start(sys.argv[1:]) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 369d83946..2ba1e6b80 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -142,6 +142,11 @@ class SlavedEventStore(BaseSlavedStore): _get_events_around_txn = DataStore._get_events_around_txn.__func__ _get_some_state_from_cache = DataStore._get_some_state_from_cache.__func__ + get_backfill_events = DataStore.get_backfill_events.__func__ + _get_backfill_events = DataStore._get_backfill_events.__func__ + get_missing_events = DataStore.get_missing_events.__func__ + _get_missing_events = DataStore._get_missing_events.__func__ + def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py new file mode 100644 index 000000000..c1c895439 --- /dev/null +++ b/synapse/replication/slave/storage/keys.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from ._base import BaseSlavedStore +from synapse.storage import DataStore +from synapse.storage.keys import KeyStore + + +class SlavedKeyStore(BaseSlavedStore): + # TODO: use the cached version and invalidate deleted tokens + get_all_server_verify_keys = defer.inlineCallbacks(KeyStore.__dict__[ + "get_all_server_verify_keys" + ].orig) + + get_server_verify_keys = DataStore.get_server_verify_keys.__func__ diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index a495a8a7d..1195efec0 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -22,6 +22,10 @@ import OpenSSL from signedjson.key import decode_verify_key_bytes import hashlib +import logging + +logger = logging.getLogger(__name__) + class KeyStore(SQLBaseStore): """Persistence for signature verification keys and tls X.509 certificates From 05f6447301ddc72cec7564f9d39f3e16aaa728c6 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 27 Jul 2016 17:54:26 +0100 Subject: [PATCH 03/24] Forbid non-ASes from registering users whose names begin with '_' (SYN-738) --- synapse/handlers/register.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index b9b5880d6..dd75c4fec 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -53,6 +53,13 @@ class RegistrationHandler(BaseHandler): Codes.INVALID_USERNAME ) + if localpart[0] == '_': + raise SynapseError( + 400, + "User ID may not begin with _", + Codes.INVALID_USERNAME + ) + user = UserID(localpart, self.hs.hostname) user_id = user.to_string() From bf81e38d365b79130b5e04053de0eaff94b0d472 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 28 Jul 2016 10:29:12 +0100 Subject: [PATCH 04/24] Fix retry utils to check if the exception is a subclass of CME --- synapse/util/retryutils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 43cf11f3f..49527f4d2 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -128,7 +128,7 @@ class RetryDestinationLimiter(object): ) valid_err_code = False - if exc_type is CodeMessageException: + if exc_type is not None and issubclass(exc_type, CodeMessageException): valid_err_code = 0 <= exc_val.code < 500 if exc_type is None or valid_err_code: From 367b594183c553436bb0338e9f26e42fa46424dc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jul 2016 14:56:09 +0100 Subject: [PATCH 05/24] Add some basic admin API docs --- docs/admin_api/README.rst | 12 ++++++++++++ docs/admin_api/purge_history_api.rst | 15 +++++++++++++++ docs/admin_api/purge_remote_media.rst | 19 +++++++++++++++++++ 3 files changed, 46 insertions(+) create mode 100644 docs/admin_api/README.rst create mode 100644 docs/admin_api/purge_history_api.rst create mode 100644 docs/admin_api/purge_remote_media.rst diff --git a/docs/admin_api/README.rst b/docs/admin_api/README.rst new file mode 100644 index 000000000..d4f564cfa --- /dev/null +++ b/docs/admin_api/README.rst @@ -0,0 +1,12 @@ +Admin APIs +========== + +This directory includes documentation for the various synapse specific admin +APIs available. + +Only users that are server admins can use these APIs. A user can be marked as a +server admin by updating the database directly, e.g.: + +``UPDATE users SET admin = 1 WHERE name = '@foo:bar.com'`` + +Restarting may be required for the changes to register. diff --git a/docs/admin_api/purge_history_api.rst b/docs/admin_api/purge_history_api.rst new file mode 100644 index 000000000..986efe40f --- /dev/null +++ b/docs/admin_api/purge_history_api.rst @@ -0,0 +1,15 @@ +Purge History API +================= + +The purge history API allows server admins to purge historic events from their +database, reclaiming disk space. + +Depending on the amount of history being purged a call to the API may take +several minutes or longer. During this period users will not be able to +paginate further back in the room from the point being purged from. + +The API is simply: + +``POST /_matrix/client/r0/admin/purge_history//`` + +including an ``access_token`` of a server admin. diff --git a/docs/admin_api/purge_remote_media.rst b/docs/admin_api/purge_remote_media.rst new file mode 100644 index 000000000..749ed1b2b --- /dev/null +++ b/docs/admin_api/purge_remote_media.rst @@ -0,0 +1,19 @@ +Purge Remote Media API +====================== + +The purge remote media API allows server admins to purge old cached remote +media. + +The API is:: + + POST /_matrix/client/r0/admin/purge_history/ + + { + "before_ts": + } + +Which will remove all cached media that was last accessed before +````. + +If the user re-requests purged remote media, synapse will re-request the media +from the originating server. From 3c3246c078134124610afa40ec55626568c5627c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jul 2016 15:08:37 +0100 Subject: [PATCH 06/24] Use correct path --- docs/admin_api/purge_remote_media.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/admin_api/purge_remote_media.rst b/docs/admin_api/purge_remote_media.rst index 749ed1b2b..b26c6a9e7 100644 --- a/docs/admin_api/purge_remote_media.rst +++ b/docs/admin_api/purge_remote_media.rst @@ -6,7 +6,7 @@ media. The API is:: - POST /_matrix/client/r0/admin/purge_history/ + POST /_matrix/client/r0/admin/purge_media_cache { "before_ts": From 6ede23ff1b956e72b3a2864e85accb8c05fff6f0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Jul 2016 15:51:43 +0100 Subject: [PATCH 07/24] Add more key storage funcs into slave store --- synapse/replication/slave/storage/keys.py | 16 +++++++---- synapse/storage/keys.py | 34 +++++++++++------------ 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py index c1c895439..dd2ae49e4 100644 --- a/synapse/replication/slave/storage/keys.py +++ b/synapse/replication/slave/storage/keys.py @@ -13,17 +13,21 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer - from ._base import BaseSlavedStore from synapse.storage import DataStore from synapse.storage.keys import KeyStore class SlavedKeyStore(BaseSlavedStore): - # TODO: use the cached version and invalidate deleted tokens - get_all_server_verify_keys = defer.inlineCallbacks(KeyStore.__dict__[ - "get_all_server_verify_keys" - ].orig) + _get_server_verify_key = KeyStore.__dict__[ + "_get_server_verify_key" + ] get_server_verify_keys = DataStore.get_server_verify_keys.__func__ + store_server_verify_key = DataStore.store_server_verify_key.__func__ + + get_server_certificate = DataStore.get_server_certificate.__func__ + store_server_certificate = DataStore.store_server_certificate.__func__ + + get_server_keys_json = DataStore.get_server_keys_json.__func__ + store_server_keys_json = DataStore.store_server_keys_json.__func__ diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index 1195efec0..86b37b9dd 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -78,22 +78,22 @@ class KeyStore(SQLBaseStore): ) @cachedInlineCallbacks() - def get_all_server_verify_keys(self, server_name): - rows = yield self._simple_select_list( + def _get_server_verify_key(self, server_name, key_id): + verify_key_bytes = yield self._simple_select_one_onecol( table="server_signature_keys", keyvalues={ "server_name": server_name, + "key_id": key_id, }, - retcols=["key_id", "verify_key"], - desc="get_all_server_verify_keys", + retcol="verify_key", + desc="_get_server_verify_key", + allow_none=True, ) - defer.returnValue({ - row["key_id"]: decode_verify_key_bytes( - row["key_id"], str(row["verify_key"]) - ) - for row in rows - }) + if verify_key_bytes: + defer.returnValue(decode_verify_key_bytes( + key_id, str(verify_key_bytes) + )) @defer.inlineCallbacks def get_server_verify_keys(self, server_name, key_ids): @@ -105,12 +105,12 @@ class KeyStore(SQLBaseStore): Returns: (list of VerifyKey): The verification keys. """ - keys = yield self.get_all_server_verify_keys(server_name) - defer.returnValue({ - k: keys[k] - for k in key_ids - if k in keys and keys[k] - }) + keys = {} + for key_id in key_ids: + key = yield self._get_server_verify_key(server_name, key_id) + if key: + keys[key_id] = key + defer.returnValue(keys) @defer.inlineCallbacks def store_server_verify_key(self, server_name, from_server, time_now_ms, @@ -137,8 +137,6 @@ class KeyStore(SQLBaseStore): desc="store_server_verify_key", ) - self.get_all_server_verify_keys.invalidate((server_name,)) - def store_server_keys_json(self, server_name, key_id, from_server, ts_now_ms, ts_expires_ms, key_json_bytes): """Stores the JSON bytes for a set of keys from a server From 1e2740caabe348e4131fe6bd2d777fc7483909a4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jul 2016 16:08:33 +0100 Subject: [PATCH 08/24] Handle the case of missing auth events when joining a room --- synapse/handlers/federation.py | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3f138daf1..cab7efb5d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -124,7 +124,7 @@ class FederationHandler(BaseHandler): try: event_stream_id, max_stream_id = yield self._persist_auth_tree( - auth_chain, state, event + origin, auth_chain, state, event ) except AuthError as e: raise FederationError( @@ -637,7 +637,7 @@ class FederationHandler(BaseHandler): pass event_stream_id, max_stream_id = yield self._persist_auth_tree( - auth_chain, state, event + origin, auth_chain, state, event ) with PreserveLoggingContext(): @@ -1155,7 +1155,7 @@ class FederationHandler(BaseHandler): ) @defer.inlineCallbacks - def _persist_auth_tree(self, auth_events, state, event): + def _persist_auth_tree(self, origin, auth_events, state, event): """Checks the auth chain is valid (and passes auth checks) for the state and event. Then persists the auth chain and state atomically. Persists the event seperately. @@ -1172,7 +1172,7 @@ class FederationHandler(BaseHandler): event_map = { e.event_id: e - for e in auth_events + for e in itertools.chain(auth_events, state, [event]) } create_event = None @@ -1181,10 +1181,29 @@ class FederationHandler(BaseHandler): create_event = e break + missing_auth_events = set() + for e in itertools.chain(auth_events, state, [event]): + for e_id, _ in e.auth_events: + if e_id not in event_map: + missing_auth_events.add(e_id) + + for e_id in missing_auth_events: + m_ev = yield self.replication_layer.get_pdu( + [origin], + e_id, + outlier=True, + timeout=10000, + ) + if m_ev and m_ev.event_id == e_id: + event_map[e_id] = m_ev + else: + logger.info("Failed to find auth event %r", e_id) + for e in itertools.chain(auth_events, state, [event]): auth_for_e = { (event_map[e_id].type, event_map[e_id].state_key): event_map[e_id] for e_id, _ in e.auth_events + if e_id in event_map } if create_event: auth_for_e[(EventTypes.Create, "")] = create_event From 0fcbca531f448e3cef50074404cbf7af457105f3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jul 2016 16:36:28 +0100 Subject: [PATCH 09/24] Add get_auth_chain to slave store --- synapse/replication/slave/storage/events.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 2ba1e6b80..fcd0f14a6 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -147,6 +147,10 @@ class SlavedEventStore(BaseSlavedStore): get_missing_events = DataStore.get_missing_events.__func__ _get_missing_events = DataStore._get_missing_events.__func__ + get_auth_chain = DataStore.get_auth_chain.__func__ + get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__ + _get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__ + def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() From 370135ad0b7cf7ded04e9f2ca0c99f5470f5efc1 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Thu, 28 Jul 2016 16:47:37 +0100 Subject: [PATCH 10/24] Comment get_unread_push_actions_for_user_in_range function --- synapse/storage/event_push_actions.py | 28 +++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 3d93285f8..958dbcc22 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -119,9 +119,28 @@ class EventPushActionsStore(SQLBaseStore): @defer.inlineCallbacks def get_unread_push_actions_for_user_in_range(self, user_id, min_stream_ordering, - max_stream_ordering=None, + max_stream_ordering, limit=20): + """Get a list of the most recent unread push actions for a given user, + within the given stream ordering range. + + Args: + user_id (str) + min_stream_ordering + max_stream_ordering + limit (int) + Returns: + A promise which resolves to a list of dicts with the keys "event_id", + "room_id", "stream_ordering", "actions", "received_ts". + The list will have between 0~limit entries. + """ + # find rooms that have a read receipt in them and return the most recent + # push actions def get_after_receipt(txn): + # XXX: Do we really need to GROUP BY user_id on the inner SELECT? + # XXX: NATURAL JOIN obfuscates which columns are being joined on the + # inner SELECT (the room_id and event_id), can we + # INNER JOIN ... USING instead? sql = ( "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " "e.received_ts " @@ -160,7 +179,12 @@ class EventPushActionsStore(SQLBaseStore): "get_unread_push_actions_for_user_in_range", get_after_receipt ) + # There are rooms with push actions in them but you don't have a read receipt in + # them e.g. rooms you've been invited to, so get push actions for rooms which do + # not have read receipts in them too. def get_no_receipt(txn): + # XXX: Does the inner SELECT really need to select from the events table? + # We're just extracting the room_id, so isn't receipts_linearized enough? sql = ( "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," " e.received_ts" @@ -198,7 +222,7 @@ class EventPushActionsStore(SQLBaseStore): # Now sort it so it's ordered correctly, since currently it will # contain results from the first query, correctly ordered, followed # by results from the second query, but we want them all ordered - # by received_ts + # by received_ts (most recent first) notifs.sort(key=lambda r: -(r['received_ts'] or 0)) # Now return the first `limit` From 76b89d0edb9df7c5d8b595b85ff895367631fdf2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jul 2016 17:03:40 +0100 Subject: [PATCH 11/24] Add slace storage functions for public room list --- synapse/app/federation_reader.py | 4 ++++ .../replication/slave/storage/directory.py | 23 +++++++++++++++++++ synapse/replication/slave/storage/room.py | 21 +++++++++++++++++ 3 files changed, 48 insertions(+) create mode 100644 synapse/replication/slave/storage/directory.py create mode 100644 synapse/replication/slave/storage/room.py diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 98a18f9b3..2e5ba0901 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -24,6 +24,8 @@ from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore +from synapse.replication.slave.storage.room import RoomStore +from synapse.replication.slave.storage.directory import DirectoryStore from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.async import sleep @@ -52,6 +54,8 @@ logger = logging.getLogger("synapse.app.federation_reader") class FederationReaderSlavedStore( SlavedEventStore, SlavedKeyStore, + RoomStore, + DirectoryStore, BaseSlavedStore, ): pass diff --git a/synapse/replication/slave/storage/directory.py b/synapse/replication/slave/storage/directory.py new file mode 100644 index 000000000..5fbe3a303 --- /dev/null +++ b/synapse/replication/slave/storage/directory.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage.directory import DirectoryStore + + +class DirectoryStore(BaseSlavedStore): + get_aliases_for_room = DirectoryStore.__dict__[ + "get_aliases_for_room" + ].orig diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py new file mode 100644 index 000000000..d5bb0f98e --- /dev/null +++ b/synapse/replication/slave/storage/room.py @@ -0,0 +1,21 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage import DataStore + + +class RoomStore(BaseSlavedStore): + get_public_room_ids = DataStore.get_public_room_ids.__func__ From ec8b217722be15fe110be77c7c7909a7758202cb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 28 Jul 2016 17:35:53 +0100 Subject: [PATCH 12/24] Add destination retry to slave store --- synapse/app/federation_reader.py | 2 ++ .../replication/slave/storage/transactions.py | 30 +++++++++++++++++++ 2 files changed, 32 insertions(+) create mode 100644 synapse/replication/slave/storage/transactions.py diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 2e5ba0901..58d425f9a 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -25,6 +25,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.replication.slave.storage.room import RoomStore +from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -56,6 +57,7 @@ class FederationReaderSlavedStore( SlavedKeyStore, RoomStore, DirectoryStore, + TransactionStore, BaseSlavedStore, ): pass diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py new file mode 100644 index 000000000..6f2ba98af --- /dev/null +++ b/synapse/replication/slave/storage/transactions.py @@ -0,0 +1,30 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer +from ._base import BaseSlavedStore +from synapse.storage import DataStore +from synapse.storage.transactions import TransactionStore + + +class TransactionStore(BaseSlavedStore): + get_destination_retry_timings = TransactionStore.__dict__[ + "get_destination_retry_timings" + ].orig + _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__ + + # For now, don't record the destination rety timings + def set_destination_retry_timings(*args, **kwargs): + return defer.succeed(None) From 0a7d3cd00f8b7e3ad0ba458c3ab9b40a2496545b Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 28 Jul 2016 20:24:24 +0100 Subject: [PATCH 13/24] Create separate methods for getting messages to push for the email and http pushers rather than trying to make a single method that will work with their conflicting requirements. The http pusher needs to get the messages in ascending stream order, and doesn't want to miss a message. The email pusher needs to get the messages in descending timestamp order, and doesn't mind if it misses messages. --- synapse/push/emailpusher.py | 5 +- synapse/push/httppusher.py | 3 +- synapse/replication/slave/storage/events.py | 7 +- synapse/storage/event_push_actions.py | 199 +++++++++++++++----- tests/storage/test_event_push_actions.py | 41 ++++ 5 files changed, 204 insertions(+), 51 deletions(-) create mode 100644 tests/storage/test_event_push_actions.py diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 12a3ec7fd..e224b6829 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -140,9 +140,8 @@ class EmailPusher(object): being run. """ start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering - unprocessed = yield self.store.get_unread_push_actions_for_user_in_range( - self.user_id, start, self.max_stream_ordering - ) + fn = self.store.get_unread_push_actions_for_user_in_range_for_email + unprocessed = yield fn(self.user_id, start, self.max_stream_ordering) soonest_due_at = None diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 2acc6cc21..9a7db6122 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -141,7 +141,8 @@ class HttpPusher(object): run once per pusher. """ - unprocessed = yield self.store.get_unread_push_actions_for_user_in_range( + fn = self.store.get_unread_push_actions_for_user_in_range_for_http + unprocessed = yield fn( self.user_id, self.last_stream_ordering, self.max_stream_ordering ) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 369d83946..6a644f138 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -93,8 +93,11 @@ class SlavedEventStore(BaseSlavedStore): StreamStore.__dict__["get_recent_event_ids_for_room"] ) - get_unread_push_actions_for_user_in_range = ( - DataStore.get_unread_push_actions_for_user_in_range.__func__ + get_unread_push_actions_for_user_in_range_for_http = ( + DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__ + ) + get_unread_push_actions_for_user_in_range_for_email = ( + DataStore.get_unread_push_actions_for_user_in_range_for_email.__func__ ) get_push_action_users_in_range = ( DataStore.get_push_action_users_in_range.__func__ diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 958dbcc22..5ab362bef 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -117,40 +117,149 @@ class EventPushActionsStore(SQLBaseStore): defer.returnValue(ret) @defer.inlineCallbacks - def get_unread_push_actions_for_user_in_range(self, user_id, - min_stream_ordering, - max_stream_ordering, - limit=20): + def get_unread_push_actions_for_user_in_range_for_http( + self, user_id, min_stream_ordering, max_stream_ordering, limit=20 + ): """Get a list of the most recent unread push actions for a given user, - within the given stream ordering range. + within the given stream ordering range. Called by the httppusher. Args: - user_id (str) - min_stream_ordering - max_stream_ordering - limit (int) + user_id (str): The user to fetch push actions for. + min_stream_ordering(int): The exclusive lower bound on the + stream ordering of event push actions to fetch. + max_stream_ordering(int): The inclusive upper bound on the + stream ordering of event push actions to fetch. + limit (int): The maximum number of rows to return. + Returns: + A promise which resolves to a list of dicts with the keys "event_id", + "room_id", "stream_ordering", "actions". + The list will be ordered by ascending stream_ordering. + The list will have between 0~limit entries. + """ + # find rooms that have a read receipt in them and return the next + # push actions + def get_after_receipt(txn): + # find rooms that have a read receipt in them and return the next + # push actions + sql = ( + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions" + " FROM (" + " SELECT room_id," + " MAX(topological_ordering) as topological_ordering," + " MAX(stream_ordering) as stream_ordering" + " FROM events" + " INNER JOIN receipts_linearized USING (room_id, event_id)" + " WHERE receipt_type = 'm.read' AND user_id = ?" + " GROUP BY room_id" + ") AS rl," + " event_push_actions AS ep" + " WHERE" + " ep.room_id = rl.room_id" + " AND (" + " ep.topological_ordering > rl.topological_ordering" + " OR (" + " ep.topological_ordering = rl.topological_ordering" + " AND ep.stream_ordering > rl.stream_ordering" + " )" + " )" + " AND ep.user_id = ?" + " AND ep.stream_ordering > ?" + " AND ep.stream_ordering <= ?" + " ORDER BY ep.stream_ordering ASC LIMIT ?" + ) + args = [ + user_id, user_id, + min_stream_ordering, max_stream_ordering, limit, + ] + txn.execute(sql, args) + return txn.fetchall() + after_read_receipt = yield self.runInteraction( + "get_unread_push_actions_for_user_in_range_http_arr", get_after_receipt + ) + + # There are rooms with push actions in them but you don't have a read receipt in + # them e.g. rooms you've been invited to, so get push actions for rooms which do + # not have read receipts in them too. + def get_no_receipt(txn): + sql = ( + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," + " e.received_ts" + " FROM event_push_actions AS ep" + " INNER JOIN events AS e USING (room_id, event_id)" + " WHERE" + " ep.room_id NOT IN (" + " SELECT room_id FROM receipts_linearized" + " WHERE receipt_type = 'm.read' AND user_id = ?" + " GROUP BY room_id" + " )" + " AND ep.user_id = ?" + " AND ep.stream_ordering > ?" + " AND ep.stream_ordering <= ?" + " ORDER BY ep.stream_ordering ASC LIMIT ?" + ) + args = [ + user_id, user_id, + min_stream_ordering, max_stream_ordering, limit, + ] + txn.execute(sql, args) + return txn.fetchall() + no_read_receipt = yield self.runInteraction( + "get_unread_push_actions_for_user_in_range_http_nrr", get_no_receipt + ) + + notifs = [ + { + "event_id": row[0], + "room_id": row[1], + "stream_ordering": row[2], + "actions": json.loads(row[3]), + } for row in after_read_receipt + no_read_receipt + ] + + # Now sort it so it's ordered correctly, since currently it will + # contain results from the first query, correctly ordered, followed + # by results from the second query, but we want them all ordered + # by stream_ordering, oldest first. + notifs.sort(key=lambda r: r['stream_ordering']) + + # Take only up to the limit. We have to stop at the limit because + # one of the subqueries may have hit the limit. + defer.returnValue(notifs[:limit]) + + @defer.inlineCallbacks + def get_unread_push_actions_for_user_in_range_for_email( + self, user_id, min_stream_ordering, max_stream_ordering, limit=20 + ): + """Get a list of the most recent unread push actions for a given user, + within the given stream ordering range. Called by the emailpusher + + Args: + user_id (str): The user to fetch push actions for. + min_stream_ordering(int): The exclusive lower bound on the + stream ordering of event push actions to fetch. + max_stream_ordering(int): The inclusive upper bound on the + stream ordering of event push actions to fetch. + limit (int): The maximum number of rows to return. Returns: A promise which resolves to a list of dicts with the keys "event_id", "room_id", "stream_ordering", "actions", "received_ts". + The list will be ordered by descending received_ts. The list will have between 0~limit entries. """ # find rooms that have a read receipt in them and return the most recent # push actions def get_after_receipt(txn): - # XXX: Do we really need to GROUP BY user_id on the inner SELECT? - # XXX: NATURAL JOIN obfuscates which columns are being joined on the - # inner SELECT (the room_id and event_id), can we - # INNER JOIN ... USING instead? sql = ( - "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, " - "e.received_ts " - "FROM (" - " SELECT room_id, user_id, " - " max(topological_ordering) as topological_ordering, " - " max(stream_ordering) as stream_ordering " - " FROM events" - " NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'" - " GROUP BY room_id, user_id" + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," + " e.received_ts" + " FROM (" + " SELECT room_id," + " MAX(topological_ordering) as topological_ordering," + " MAX(stream_ordering) as stream_ordering" + " FROM events" + " INNER JOIN receipts_linearized USING (room_id, event_id)" + " WHERE receipt_type = 'm.read' AND user_id = ?" + " GROUP BY room_id" ") AS rl," " event_push_actions AS ep" " INNER JOIN events AS e USING (room_id, event_id)" @@ -165,47 +274,47 @@ class EventPushActionsStore(SQLBaseStore): " )" " AND ep.stream_ordering > ?" " AND ep.user_id = ?" - " AND ep.user_id = rl.user_id" + " AND ep.stream_ordering <= ?" + " ORDER BY ep.stream_ordering DESC LIMIT ?" ) - args = [min_stream_ordering, user_id] - if max_stream_ordering is not None: - sql += " AND ep.stream_ordering <= ?" - args.append(max_stream_ordering) - sql += " ORDER BY ep.stream_ordering DESC LIMIT ?" - args.append(limit) + args = [ + user_id, user_id, + min_stream_ordering, max_stream_ordering, limit, + ] txn.execute(sql, args) return txn.fetchall() after_read_receipt = yield self.runInteraction( - "get_unread_push_actions_for_user_in_range", get_after_receipt + "get_unread_push_actions_for_user_in_range_email_arr", get_after_receipt ) # There are rooms with push actions in them but you don't have a read receipt in # them e.g. rooms you've been invited to, so get push actions for rooms which do # not have read receipts in them too. def get_no_receipt(txn): - # XXX: Does the inner SELECT really need to select from the events table? - # We're just extracting the room_id, so isn't receipts_linearized enough? sql = ( "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," " e.received_ts" " FROM event_push_actions AS ep" - " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id" - " WHERE ep.room_id not in (" - " SELECT room_id FROM events NATURAL JOIN receipts_linearized" - " WHERE receipt_type = 'm.read' AND user_id = ?" - " GROUP BY room_id" - ") AND ep.user_id = ? AND ep.stream_ordering > ?" + " INNER JOIN events AS e USING (room_id, event_id)" + " WHERE" + " ep.room_id NOT IN (" + " SELECT room_id FROM receipts_linearized" + " WHERE receipt_type = 'm.read' AND user_id = ?" + " GROUP BY room_id" + " )" + " AND ep.user_id = ?" + " AND ep.stream_ordering > ?" + " AND ep.stream_ordering <= ?" + " ORDER BY ep.stream_ordering DESC LIMIT ?" ) - args = [user_id, user_id, min_stream_ordering] - if max_stream_ordering is not None: - sql += " AND ep.stream_ordering <= ?" - args.append(max_stream_ordering) - sql += " ORDER BY ep.stream_ordering DESC LIMIT ?" - args.append(limit) + args = [ + user_id, user_id, + min_stream_ordering, max_stream_ordering, limit, + ] txn.execute(sql, args) return txn.fetchall() no_read_receipt = yield self.runInteraction( - "get_unread_push_actions_for_user_in_range", get_no_receipt + "get_unread_push_actions_for_user_in_range_email_nrr", get_no_receipt ) # Make a list of dicts from the two sets of results. diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py new file mode 100644 index 000000000..e9044afa2 --- /dev/null +++ b/tests/storage/test_event_push_actions.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +import tests.unittest +import tests.utils + +USER_ID = "@user:example.com" + + +class EventPushActionsStoreTestCase(tests.unittest.TestCase): + + @defer.inlineCallbacks + def setUp(self): + hs = yield tests.utils.setup_test_homeserver() + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def test_get_unread_push_actions_for_user_in_range_for_http(self): + yield self.store.get_unread_push_actions_for_user_in_range_for_http( + USER_ID, 0, 1000, 20 + ) + + @defer.inlineCallbacks + def test_get_unread_push_actions_for_user_in_range_for_email(self): + yield self.store.get_unread_push_actions_for_user_in_range_for_email( + USER_ID, 0, 1000, 20 + ) From 8dad08a9509103f38d9eec5dc28d46e4a757fad8 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Fri, 29 Jul 2016 09:57:13 +0100 Subject: [PATCH 14/24] Fix SQL to supply arguments in the same order --- synapse/storage/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 5ab362bef..df4000d0d 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -272,8 +272,8 @@ class EventPushActionsStore(SQLBaseStore): " AND ep.stream_ordering > rl.stream_ordering" " )" " )" - " AND ep.stream_ordering > ?" " AND ep.user_id = ?" + " AND ep.stream_ordering > ?" " AND ep.stream_ordering <= ?" " ORDER BY ep.stream_ordering DESC LIMIT ?" ) From 3d13c3a2952263c38111fcf95d625e316416b52b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 Jul 2016 10:45:05 +0100 Subject: [PATCH 15/24] Update docstring --- synapse/handlers/federation.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index cab7efb5d..958362938 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1160,6 +1160,12 @@ class FederationHandler(BaseHandler): state and event. Then persists the auth chain and state atomically. Persists the event seperately. + Args: + origin (str): Where the events came from + auth_events (list) + state (list) + event (Event) + Returns: 2-tuple of (event_stream_id, max_stream_id) from the persist_event call for `event` From c51a52f3002abf4597952e07759c6ab3016e3497 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 Jul 2016 11:17:04 +0100 Subject: [PATCH 16/24] Mention that func will fetch auth events --- synapse/handlers/federation.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 958362938..1323235b6 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1160,6 +1160,8 @@ class FederationHandler(BaseHandler): state and event. Then persists the auth chain and state atomically. Persists the event seperately. + Will attempt to fetch missing auth events. + Args: origin (str): Where the events came from auth_events (list) From 74106ba17177db837bea06c35b39dbf1adc75648 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 29 Jul 2016 11:45:03 +0100 Subject: [PATCH 17/24] Make jenkins dendron test federation read apis --- jenkins-dendron-postgres.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/jenkins-dendron-postgres.sh b/jenkins-dendron-postgres.sh index 50268e098..9e3b2df9c 100755 --- a/jenkins-dendron-postgres.sh +++ b/jenkins-dendron-postgres.sh @@ -82,6 +82,7 @@ echo >&2 "Running sytest with PostgreSQL"; --dendron $WORKSPACE/dendron/bin/dendron \ --pusher \ --synchrotron \ + --federation-reader \ --port-range ${PORT_BASE}:$((PORT_BASE+PORT_COUNT-1)) cd .. From 271d3e78652ef7a477af2b058bdd7c13e4816076 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 29 Jul 2016 15:25:24 +0100 Subject: [PATCH 18/24] Fix adding emails on registration Synapse was not adding email addresses to accounts registered with an email address, due to too many different variables called 'result'. Rename both of them. Also remove the defer.returnValue() with no params because that's not a thing. --- synapse/rest/client/v2_alpha/register.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index 9f599ea8b..943f5676a 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -196,12 +196,12 @@ class RegisterRestServlet(RestServlet): [LoginType.EMAIL_IDENTITY] ] - authed, result, params, session_id = yield self.auth_handler.check_auth( + authed, auth_result, params, session_id = yield self.auth_handler.check_auth( flows, body, self.hs.get_ip_from_request(request) ) if not authed: - defer.returnValue((401, result)) + defer.returnValue((401, auth_result)) return if registered_user_id is not None: @@ -236,18 +236,18 @@ class RegisterRestServlet(RestServlet): add_email = True - result = yield self._create_registration_details( + return_dict = yield self._create_registration_details( registered_user_id, params ) - if add_email and result and LoginType.EMAIL_IDENTITY in result: - threepid = result[LoginType.EMAIL_IDENTITY] + if add_email and auth_result and LoginType.EMAIL_IDENTITY in auth_result: + threepid = auth_result[LoginType.EMAIL_IDENTITY] yield self._register_email_threepid( - registered_user_id, threepid, result["access_token"], + registered_user_id, threepid, return_dict["access_token"], params.get("bind_email") ) - defer.returnValue((200, result)) + defer.returnValue((200, return_dict)) def on_OPTIONS(self, _): return 200, {} @@ -356,8 +356,6 @@ class RegisterRestServlet(RestServlet): else: logger.info("bind_email not specified: not binding email") - defer.returnValue() - @defer.inlineCallbacks def _create_registration_details(self, user_id, params): """Complete registration of newly-registered user From b260f92936e7e80ee9885755d608d58ffb9101ba Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Sun, 31 Jul 2016 15:30:13 +0100 Subject: [PATCH 19/24] Ignore AlreadyCalled errors on timer cancel --- synapse/push/emailpusher.py | 12 ++++++++++-- synapse/push/httppusher.py | 7 ++++++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index e224b6829..6600c9cd5 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -14,6 +14,7 @@ # limitations under the License. from twisted.internet import defer, reactor +from twisted.internet.error import AlreadyCalled, AlreadyCancelled import logging @@ -92,7 +93,11 @@ class EmailPusher(object): def on_stop(self): if self.timed_call: - self.timed_call.cancel() + try: + self.timed_call.cancel() + except (AlreadyCalled, AlreadyCancelled): + pass + self.timed_call = None @defer.inlineCallbacks def on_new_notifications(self, min_stream_ordering, max_stream_ordering): @@ -189,7 +194,10 @@ class EmailPusher(object): soonest_due_at = should_notify_at if self.timed_call is not None: - self.timed_call.cancel() + try: + self.timed_call.cancel() + except (AlreadyCalled, AlreadyCancelled): + pass self.timed_call = None if soonest_due_at is not None: diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 9a7db6122..feedb075e 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -16,6 +16,7 @@ from synapse.push import PusherConfigException from twisted.internet import defer, reactor +from twisted.internet.error import AlreadyCalled, AlreadyCancelled import logging import push_rule_evaluator @@ -109,7 +110,11 @@ class HttpPusher(object): def on_stop(self): if self.timed_call: - self.timed_call.cancel() + try: + self.timed_call.cancel() + except (AlreadyCalled, AlreadyCancelled): + pass + self.timed_call = None @defer.inlineCallbacks def _process(self): From bfeaab6dfc84adc38e5990a7f26c5b7148606a28 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 1 Aug 2016 17:12:02 +0100 Subject: [PATCH 20/24] missing --upgrade --- UPGRADE.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/UPGRADE.rst b/UPGRADE.rst index 699f04c2c..9f044719a 100644 --- a/UPGRADE.rst +++ b/UPGRADE.rst @@ -27,7 +27,7 @@ running: # Pull the latest version of the master branch. git pull # Update the versions of synapse's python dependencies. - python synapse/python_dependencies.py | xargs -n1 pip install + python synapse/python_dependencies.py | xargs -n1 pip install --upgrade Upgrading to v0.15.0 From 55e8a8788895b0c6b6b5a27d153f6d9e7e21d68b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Aug 2016 13:41:17 +0100 Subject: [PATCH 21/24] Change default jenkins port base and count --- jenkins-dendron-postgres.sh | 4 ++-- jenkins-postgres.sh | 4 ++-- jenkins-sqlite.sh | 5 +++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/jenkins-dendron-postgres.sh b/jenkins-dendron-postgres.sh index 9e3b2df9c..f715cd559 100755 --- a/jenkins-dendron-postgres.sh +++ b/jenkins-dendron-postgres.sh @@ -69,8 +69,8 @@ cd sytest git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop) -: ${PORT_BASE:=8000} -: ${PORT_COUNT=20} +: ${PORT_BASE:=20000} +: ${PORT_COUNT=100} ./jenkins/prep_sytest_for_postgres.sh diff --git a/jenkins-postgres.sh b/jenkins-postgres.sh index 2f0768fcb..7a43df0d5 100755 --- a/jenkins-postgres.sh +++ b/jenkins-postgres.sh @@ -43,8 +43,8 @@ cd sytest git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop) -: ${PORT_BASE:=8000} -: ${PORT_COUNT=20} +: ${PORT_BASE:=20000} +: ${PORT_COUNT=100} ./jenkins/prep_sytest_for_postgres.sh diff --git a/jenkins-sqlite.sh b/jenkins-sqlite.sh index da603c5af..27e61af6e 100755 --- a/jenkins-sqlite.sh +++ b/jenkins-sqlite.sh @@ -41,8 +41,9 @@ cd sytest git checkout "${GIT_BRANCH}" || (echo >&2 "No ref ${GIT_BRANCH} found, falling back to develop" ; git checkout develop) -: ${PORT_COUNT=20} -: ${PORT_BASE:=8000} +: ${PORT_BASE:=20000} +: ${PORT_COUNT=100} + ./jenkins/install_and_run.sh --coverage \ --python $TOX_BIN/python \ --synapse-directory $WORKSPACE \ From fcde5b2a9782d1f49f56d0e8ce694e66eeb6c04f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Aug 2016 15:06:08 +0100 Subject: [PATCH 22/24] Print authorization header for federation_client.py --- scripts-dev/federation_client.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/scripts-dev/federation_client.py b/scripts-dev/federation_client.py index ea62dceb3..caa3cee4e 100644 --- a/scripts-dev/federation_client.py +++ b/scripts-dev/federation_client.py @@ -116,11 +116,12 @@ def get_json(origin_name, origin_key, destination, path): authorization_headers = [] for key, sig in signed_json["signatures"][origin_name].items(): - authorization_headers.append(bytes( - "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % ( - origin_name, key, sig, - ) - )) + header = "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % ( + origin_name, key, sig, + ) + authorization_headers.append(bytes(header)) + sys.stderr.write(header) + sys.stderr.write("\n") result = requests.get( lookup(destination, path), From b3d5c4ad9d0c6d858cae1c46bebf0c9442f0187b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Aug 2016 16:42:21 +0100 Subject: [PATCH 23/24] Fix response cache --- synapse/federation/federation_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index d15c7e1b4..8f6955ac1 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -205,7 +205,7 @@ class FederationServer(FederationBase): result = self._state_resp_cache.get((room_id, event_id)) if not result: with (yield self._server_linearizer.queue((origin, room_id))): - resp = yield self.response_cache.set( + resp = yield self._state_resp_cache.set( (room_id, event_id), self._on_context_state_request_compute(room_id, event_id) ) From c9154b970c0af5eb19c43a401f44de95afd3f7de Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 Aug 2016 16:45:53 +0100 Subject: [PATCH 24/24] Don't double wrap 200 --- synapse/federation/federation_server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 8f6955ac1..612d274bd 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -235,10 +235,10 @@ class FederationServer(FederationBase): ) ) - defer.returnValue((200, { + defer.returnValue({ "pdus": [pdu.get_pdu_json() for pdu in pdus], "auth_chain": [pdu.get_pdu_json() for pdu in auth_chain], - })) + }) @defer.inlineCallbacks @log_function