From 58367a9da2539abdbfe4dc817fba5b179b95334b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Mar 2015 12:59:45 +0000 Subject: [PATCH 01/50] Disable registration by default --- synapse/config/registration.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/config/registration.py b/synapse/config/registration.py index cca8ab567..e603575da 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -31,3 +31,7 @@ class RegistrationConfig(Config): action='store_true', help="Disable registration of new users." ) + + @classmethod + def generate_config(cls, args, config_dir_path): + args.disable_registration = True From 69135f59aa87962b848f9f19cad6adc625821ba8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Mar 2015 15:23:37 +0000 Subject: [PATCH 02/50] Implement registering with shared secret. --- synapse/api/constants.py | 1 + synapse/config/registration.py | 20 +++++++++-- synapse/rest/client/v1/register.py | 57 ++++++++++++++++++++++++++++-- synapse/util/stringutils.py | 10 ++++++ 4 files changed, 83 insertions(+), 5 deletions(-) diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 420f963d9..b16bf4247 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -60,6 +60,7 @@ class LoginType(object): EMAIL_IDENTITY = u"m.login.email.identity" RECAPTCHA = u"m.login.recaptcha" APPLICATION_SERVICE = u"m.login.application_service" + SHARED_SECRET = u"org.matrix.login.shared_secret" class EventTypes(object): diff --git a/synapse/config/registration.py b/synapse/config/registration.py index e603575da..6a0aaea92 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -15,23 +15,37 @@ from ._base import Config +from synapse.util.stringutils import random_string_with_symbols + class RegistrationConfig(Config): def __init__(self, args): super(RegistrationConfig, self).__init__(args) self.disable_registration = args.disable_registration + self.registration_shared_secret = args.registration_shared_secret @classmethod def add_arguments(cls, parser): super(RegistrationConfig, cls).add_arguments(parser) reg_group = parser.add_argument_group("registration") + reg_group.add_argument( "--disable-registration", - action='store_true', - help="Disable registration of new users." + action='store_const', + const=True, + help="Disable registration of new users.", + ) + reg_group.add_argument( + "--registration-shared-secret", type=str, + help="If set, allows registration by anyone who also has the shared" + " secret, even if registration is otherwise disabled.", ) @classmethod def generate_config(cls, args, config_dir_path): - args.disable_registration = True + if args.disable_registration is None: + args.disable_registration = True + + if args.registration_shared_secret is None: + args.registration_shared_secret= random_string_with_symbols(50) diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index f5acfb945..a7c9c5bb6 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -110,14 +110,22 @@ class RegisterRestServlet(ClientV1RestServlet): login_type = register_json["type"] is_application_server = login_type == LoginType.APPLICATION_SERVICE - if self.disable_registration and not is_application_server: + is_using_shared_secret = login_type == LoginType.SHARED_SECRET + + can_register = ( + not self.disable_registration + or is_application_server + or is_using_shared_secret + ) + if not can_register: raise SynapseError(403, "Registration has been disabled") stages = { LoginType.RECAPTCHA: self._do_recaptcha, LoginType.PASSWORD: self._do_password, LoginType.EMAIL_IDENTITY: self._do_email_identity, - LoginType.APPLICATION_SERVICE: self._do_app_service + LoginType.APPLICATION_SERVICE: self._do_app_service, + LoginType.SHARED_SECRET: self._do_shared_secret, } session_info = self._get_session_info(request, session) @@ -304,6 +312,51 @@ class RegisterRestServlet(ClientV1RestServlet): "home_server": self.hs.hostname, }) + @defer.inlineCallbacks + def _do_shared_secret(self, request, register_json, session): + yield run_on_reactor() + + if "mac" not in register_json: + raise SynapseError(400, "Expected mac.") + if "user" not in register_json: + raise SynapseError(400, "Expected 'user' key.") + if "password" not in register_json: + raise SynapseError(400, "Expected 'password' key.") + + if not self.hs.config.registration_shared_secret: + raise SynapseError(400, "Shared secret registration is not enabled") + + user = register_json["user"].encode("utf-8") + + # str() because otherwise hmac complains that 'unicode' does not + # have the buffer interface + got_mac = str(register_json["mac"]) + + want_mac = hmac.new( + key=self.hs.config.registration_shared_secret, + msg=user, + digestmod=sha1, + ).hexdigest() + + password = register_json["password"].encode("utf-8") + + if compare_digest(want_mac, got_mac): + handler = self.handlers.registration_handler + user_id, token = yield handler.register( + localpart=user, + password=password, + ) + self._remove_session(session) + defer.returnValue({ + "user_id": user_id, + "access_token": token, + "home_server": self.hs.hostname, + }) + else: + raise SynapseError( + 400, "HMAC incorrect", + ) + def _parse_json(request): try: diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py index ea53a8085..52e66beae 100644 --- a/synapse/util/stringutils.py +++ b/synapse/util/stringutils.py @@ -16,6 +16,10 @@ import random import string +_string_with_symbols = ( + string.digits + string.ascii_letters + ".,;:^&*-_+=#~@" +) + def origin_from_ucid(ucid): return ucid.split("@", 1)[1] @@ -23,3 +27,9 @@ def origin_from_ucid(ucid): def random_string(length): return ''.join(random.choice(string.ascii_letters) for _ in xrange(length)) + + +def random_string_with_symbols(length): + return ''.join( + random.choice(_string_with_symbols) for _ in xrange(length) + ) From dea236e4fa6dd9f42e2adc10858b118c814d28d4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Mar 2015 15:24:03 +0000 Subject: [PATCH 03/50] Add missing commas --- synapse/http/servlet.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index a4eb6c817..265559a3e 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -51,8 +51,8 @@ class RestServlet(object): pattern = self.PATTERN for method in ("GET", "PUT", "POST", "OPTIONS", "DELETE"): - if hasattr(self, "on_%s" % (method)): - method_handler = getattr(self, "on_%s" % (method)) + if hasattr(self, "on_%s" % (method,)): + method_handler = getattr(self, "on_%s" % (method,)) http_server.register_path(method, pattern, method_handler) else: raise NotImplementedError("RestServlet must register something.") From bcfce93ccdb28c044a8e778a4e113e053d6dfe62 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Mar 2015 15:25:30 +0000 Subject: [PATCH 04/50] Add 'register_new_user' script --- register_new_user | 149 ++++++++++++++++++++++++++++++++++++++++++++++ setup.py | 2 +- 2 files changed, 150 insertions(+), 1 deletion(-) create mode 100755 register_new_user diff --git a/register_new_user b/register_new_user new file mode 100755 index 000000000..e368c4481 --- /dev/null +++ b/register_new_user @@ -0,0 +1,149 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import argparse +import getpass +import hashlib +import hmac +import json +import sys +import urllib2 +import yaml + + +def request_registration(user, password, server_location, shared_secret): + mac = hmac.new( + key=shared_secret, + msg=user, + digestmod=hashlib.sha1, + ).hexdigest() + + data = { + "user": user, + "password": password, + "mac": mac, + "type": "org.matrix.login.shared_secret", + } + + server_location = server_location.rstrip("/") + + print "Sending registration request..." + + req = urllib2.Request( + "%s/_matrix/client/api/v1/register" % (server_location,), + data=json.dumps(data), + headers={'Content-Type': 'application/json'} + ) + try: + f = urllib2.urlopen(req) + f.read() + f.close() + print "Success." + except urllib2.HTTPError as e: + print "ERROR! Received %d %s" % (e.code, e.reason,) + if 400 <= e.code < 500: + if e.info().type == "application/json": + resp = json.load(e) + if "error" in resp: + print resp["error"] + sys.exit(1) + + +def register_new_user(user, password, server_location, shared_secret): + if not user: + try: + default_user = getpass.getuser() + except: + default_user = None + + if default_user: + user = raw_input("New user localpart [%s]: " % (default_user,)) + if not user: + user = default_user + else: + user = raw_input("New user localpart: ") + + if not user: + print "Invalid user name" + sys.exit(1) + + if not password: + password = getpass.getpass("Password: ") + + if not password: + print "Password cannot be blank." + sys.exit(1) + + confirm_password = getpass.getpass("Confirm password: ") + + if password != confirm_password: + print "Passwords do not match" + sys.exit(1) + + request_registration(user, password, server_location, shared_secret) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Used to register new users with a given home server when" + " registration has been disabled. The home server must be" + " configured with the 'registration_shared_secret' option" + " set.", + ) + parser.add_argument( + "-u", "--user", + default=None, + help="Local part of the new user. Will prompt if omitted.", + ) + parser.add_argument( + "-p", "--password", + default=None, + help="New password for user. Will prompt if omitted.", + ) + + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument( + "-c", "--config", + type=argparse.FileType('r'), + help="Path to server config file. Used to read in shared secret.", + ) + + group.add_argument( + "-k", "--shared-secret", + help="Shared secret as defined in server config file.", + ) + + parser.add_argument( + "server_url", + default="https://localhost:8480", + nargs='?', + help="URL to use to talk to the home server. Defaults to " + " 'https://localhost:8480'.", + ) + + args = parser.parse_args() + + if "config" in args and args.config: + config = yaml.safe_load(args.config) + secret = config.get("registration_shared_secret", None) + if not secret: + print "No 'registration_shared_secret' defined in config." + sys.exit(1) + else: + secret = args.shared_secret + + register_new_user(args.user, args.password, args.server_url, secret) diff --git a/setup.py b/setup.py index 2d812fa38..a45dfb6e0 100755 --- a/setup.py +++ b/setup.py @@ -55,5 +55,5 @@ setup( include_package_data=True, zip_safe=False, long_description=long_description, - scripts=["synctl"], + scripts=["synctl", "register_new_user"], ) From 9266cb0a220f83061ccf99b9c031fb9383c55c7f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Mar 2015 15:26:00 +0000 Subject: [PATCH 05/50] PEP8 --- synapse/config/registration.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/config/registration.py b/synapse/config/registration.py index 6a0aaea92..e015680d0 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -48,4 +48,4 @@ class RegistrationConfig(Config): args.disable_registration = True if args.registration_shared_secret is None: - args.registration_shared_secret= random_string_with_symbols(50) + args.registration_shared_secret = random_string_with_symbols(50) From 598c47a10835accdc2fc382fed8db803e7a33deb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Mar 2015 15:29:38 +0000 Subject: [PATCH 06/50] Change default server url to match default ports --- register_new_user | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/register_new_user b/register_new_user index e368c4481..daddadc30 100755 --- a/register_new_user +++ b/register_new_user @@ -129,10 +129,10 @@ if __name__ == "__main__": parser.add_argument( "server_url", - default="https://localhost:8480", + default="https://localhost:8448", nargs='?', help="URL to use to talk to the home server. Defaults to " - " 'https://localhost:8480'.", + " 'https://localhost:8448'.", ) args = parser.parse_args() From 7393c5ce4c831640f771ca32c8f22f7f2fd7fba2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Mar 2015 15:33:21 +0000 Subject: [PATCH 07/50] Rename register script to 'register_new_matrix_user' --- register_new_user => register_new_matrix_user | 0 setup.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename register_new_user => register_new_matrix_user (100%) diff --git a/register_new_user b/register_new_matrix_user similarity index 100% rename from register_new_user rename to register_new_matrix_user diff --git a/setup.py b/setup.py index a45dfb6e0..45943adb2 100755 --- a/setup.py +++ b/setup.py @@ -55,5 +55,5 @@ setup( include_package_data=True, zip_safe=False, long_description=long_description, - scripts=["synctl", "register_new_user"], + scripts=["synctl", "register_new_matrix_user"], ) From 98a3825614328887ad1d855d2d1076496e49be6b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Mar 2015 16:49:18 +0000 Subject: [PATCH 08/50] Allow enabling of registration with --disable-registration false --- synapse/config/registration.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/synapse/config/registration.py b/synapse/config/registration.py index e015680d0..3fed8364c 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -17,12 +17,17 @@ from ._base import Config from synapse.util.stringutils import random_string_with_symbols +import distutils.util + class RegistrationConfig(Config): def __init__(self, args): super(RegistrationConfig, self).__init__(args) - self.disable_registration = args.disable_registration + + self.disable_registration = bool( + distutils.util.strtobool(str(args.disable_registration)) + ) self.registration_shared_secret = args.registration_shared_secret @classmethod @@ -32,8 +37,9 @@ class RegistrationConfig(Config): reg_group.add_argument( "--disable-registration", - action='store_const', const=True, + default=True, + nargs='?', help="Disable registration of new users.", ) reg_group.add_argument( From a1abee013c5cd9c1251f6544dcd0b89779c8e6e4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Mar 2015 17:06:21 +0000 Subject: [PATCH 09/50] Add note about disabling registration by default --- README.rst | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/README.rst b/README.rst index c2af7c933..d1c0e9bd1 100644 --- a/README.rst +++ b/README.rst @@ -126,6 +126,17 @@ To set up your homeserver, run (in your virtualenv, as before):: Substituting your host and domain name as appropriate. +By default, registration of new users is disabled. You can either enable +registration in the config (it is then recommended to also set up CAPTCHA), or +you can use the command line to register new users:: + + $ source ~/.synapse/bin/activate + $ register_new_matrix_user -c homeserver.yaml https://localhost:8448 + New user localpart: erikj + Password: + Confirm password: + Success! + For reliable VoIP calls to be routed via this homeserver, you MUST configure a TURN server. See docs/turn-howto.rst for details. From ab8229479bddd89546ab486152344e80f01be820 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Mar 2015 00:17:25 +0000 Subject: [PATCH 10/50] Respect ban membership --- synapse/api/auth.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index b176db8ce..96963d743 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -166,6 +166,7 @@ class Auth(object): target = auth_events.get(key) target_in_room = target and target.membership == Membership.JOIN + target_banned = target and target.membership == Membership.BAN key = (EventTypes.JoinRules, "", ) join_rule_event = auth_events.get(key) @@ -194,6 +195,7 @@ class Auth(object): { "caller_in_room": caller_in_room, "caller_invited": caller_invited, + "target_banned": target_banned, "target_in_room": target_in_room, "membership": membership, "join_rule": join_rule, @@ -202,6 +204,11 @@ class Auth(object): } ) + if ban_level: + ban_level = int(ban_level) + else: + ban_level = 50 # FIXME (erikj): What should we do here? + if Membership.INVITE == membership: # TODO (erikj): We should probably handle this more intelligently # PRIVATE join rules. @@ -212,6 +219,10 @@ class Auth(object): 403, "%s not in room %s." % (event.user_id, event.room_id,) ) + elif target_banned: + raise AuthError( + 403, "%s is banned from the room" % (target_user_id,) + ) elif target_in_room: # the target is already in the room. raise AuthError(403, "%s is already in the room." % target_user_id) @@ -221,6 +232,8 @@ class Auth(object): # joined: It's a NOOP if event.user_id != target_user_id: raise AuthError(403, "Cannot force another user to join.") + elif target_banned: + raise AuthError(403, "You are banned from this room") elif join_rule == JoinRules.PUBLIC: pass elif join_rule == JoinRules.INVITE: @@ -238,6 +251,10 @@ class Auth(object): 403, "%s not in room %s." % (target_user_id, event.room_id,) ) + elif target_banned and user_level < ban_level: + raise AuthError( + 403, "You cannot unban user &s." % (target_user_id,) + ) elif target_user_id != event.user_id: if kick_level: kick_level = int(kick_level) @@ -249,11 +266,6 @@ class Auth(object): 403, "You cannot kick user %s." % target_user_id ) elif Membership.BAN == membership: - if ban_level: - ban_level = int(ban_level) - else: - ban_level = 50 # FIXME (erikj): What should we do here? - if user_level < ban_level: raise AuthError(403, "You don't have permission to ban") else: From ea8590cf6626364e9532860548a5f1ae3b172d80 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Mar 2015 00:18:08 +0000 Subject: [PATCH 11/50] Make context.auth_events grap auth events from current state. Otherwise auth is wrong. --- synapse/api/auth.py | 8 +++++++- synapse/state.py | 22 +++++----------------- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 96963d743..4873cf9d1 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -28,6 +28,12 @@ import logging logger = logging.getLogger(__name__) +AuthEventTypes = ( + EventTypes.Create, EventTypes.Member, EventTypes.PowerLevels, + EventTypes.JoinRules, +) + + class Auth(object): def __init__(self, hs): @@ -427,7 +433,7 @@ class Auth(object): context.auth_events = { k: v for k, v in context.current_state.items() - if v.event_id in auth_ids + if v.type in AuthEventTypes } def compute_auth_events(self, event, current_state): diff --git a/synapse/state.py b/synapse/state.py index 80cced351..345046cd8 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -21,6 +21,7 @@ from synapse.util.async import run_on_reactor from synapse.util.expiringcache import ExpiringCache from synapse.api.constants import EventTypes from synapse.api.errors import AuthError +from synapse.api.auth import AuthEventTypes from synapse.events.snapshot import EventContext from collections import namedtuple @@ -38,12 +39,6 @@ def _get_state_key_from_event(event): KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key")) -AuthEventTypes = ( - EventTypes.Create, EventTypes.Member, EventTypes.PowerLevels, - EventTypes.JoinRules, -) - - SIZE_OF_CACHE = 1000 EVICTION_TIMEOUT_SECONDS = 20 @@ -187,17 +182,10 @@ class StateHandler(object): replaces = context.current_state[key] event.unsigned["replaces_state"] = replaces.event_id - if hasattr(event, "auth_events") and event.auth_events: - auth_ids = self.hs.get_auth().compute_auth_events( - event, context.current_state - ) - context.auth_events = { - k: v - for k, v in context.current_state.items() - if v.event_id in auth_ids - } - else: - context.auth_events = {} + context.auth_events = { + k: e for k, e in context.current_state.items() + if k[0] in AuthEventTypes + } context.prev_state_events = prev_state defer.returnValue(context) From 758d114cbce3b71f4253bf49669ec366185bfde9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Mar 2015 00:27:59 +0000 Subject: [PATCH 12/50] Send all membership events to the remote homeserver --- synapse/handlers/_base.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 1773fa20a..349a52b85 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -133,10 +133,9 @@ class BaseHandler(object): for k, s in context.current_state.items(): try: if k[0] == EventTypes.Member: - if s.content["membership"] == Membership.JOIN: - destinations.add( - UserID.from_string(s.state_key).domain - ) + destinations.add( + UserID.from_string(s.state_key).domain + ) except SynapseError: logger.warn( "Failed to get destination from event %s", s.event_id From e7ce5d8b062561b17df0441f5b1be38026b0d2b3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Mar 2015 00:30:59 +0000 Subject: [PATCH 13/50] Fix test --- tests/handlers/test_room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/handlers/test_room.py b/tests/handlers/test_room.py index 6417f7330..0da7eb877 100644 --- a/tests/handlers/test_room.py +++ b/tests/handlers/test_room.py @@ -219,7 +219,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): yield room_handler.change_membership(event, context) self.federation.handle_new_event.assert_called_once_with( - event, destinations=set() + event, destinations=set(['red']) ) self.datastore.persist_event.assert_called_once_with( From b2e6ee5b43ebcd9e7ba37a56ed22b26c63b01370 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Mar 2015 13:06:23 +0000 Subject: [PATCH 14/50] Remove concept of context.auth_events, instead use context.current_state --- synapse/api/auth.py | 6 ------ synapse/events/snapshot.py | 3 +-- synapse/handlers/_base.py | 6 +++--- synapse/handlers/federation.py | 8 +++----- synapse/state.py | 17 ----------------- 5 files changed, 7 insertions(+), 33 deletions(-) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 4873cf9d1..90f9eb684 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -430,12 +430,6 @@ class Auth(object): builder.auth_events = auth_events_entries - context.auth_events = { - k: v - for k, v in context.current_state.items() - if v.type in AuthEventTypes - } - def compute_auth_events(self, event, current_state): if event.type == EventTypes.Create: return [] diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 7e98bdef2..4ecadf087 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -16,8 +16,7 @@ class EventContext(object): - def __init__(self, current_state=None, auth_events=None): + def __init__(self, current_state=None): self.current_state = current_state - self.auth_events = auth_events self.state_group = None self.rejected = False diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 349a52b85..261335b27 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -90,8 +90,8 @@ class BaseHandler(object): event = builder.build() logger.debug( - "Created event %s with auth_events: %s, current state: %s", - event.event_id, context.auth_events, context.current_state, + "Created event %s with current state: %s", + event.event_id, context.current_state, ) defer.returnValue( @@ -106,7 +106,7 @@ class BaseHandler(object): # We now need to go and hit out to wherever we need to hit out to. if not suppress_auth: - self.auth.check(event, auth_events=context.auth_events) + self.auth.check(event, auth_events=context.current_state) yield self.store.persist_event(event, context=context) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ae4e9b316..65cfacba2 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -464,11 +464,9 @@ class FederationHandler(BaseHandler): builder=builder, ) - self.auth.check(event, auth_events=context.auth_events) + self.auth.check(event, auth_events=context.current_state) - pdu = event - - defer.returnValue(pdu) + defer.returnValue(event) @defer.inlineCallbacks @log_function @@ -705,7 +703,7 @@ class FederationHandler(BaseHandler): ) if not auth_events: - auth_events = context.auth_events + auth_events = context.current_state logger.debug( "_handle_new_event: %s, auth_events: %s", diff --git a/synapse/state.py b/synapse/state.py index 345046cd8..ba2500d61 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -134,18 +134,6 @@ class StateHandler(object): } context.state_group = None - if hasattr(event, "auth_events") and event.auth_events: - auth_ids = self.hs.get_auth().compute_auth_events( - event, context.current_state - ) - context.auth_events = { - k: v - for k, v in context.current_state.items() - if v.event_id in auth_ids - } - else: - context.auth_events = {} - if event.is_state(): key = (event.type, event.state_key) if key in context.current_state: @@ -182,11 +170,6 @@ class StateHandler(object): replaces = context.current_state[key] event.unsigned["replaces_state"] = replaces.event_id - context.auth_events = { - k: e for k, e in context.current_state.items() - if k[0] in AuthEventTypes - } - context.prev_state_events = prev_state defer.returnValue(context) From 250e143084dd1e6d29c6378abaa3b5177323ebf9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Mar 2015 13:11:42 +0000 Subject: [PATCH 15/50] Use 403 instead of 400 --- synapse/rest/client/v1/register.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index a7c9c5bb6..86519fd9d 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -354,7 +354,7 @@ class RegisterRestServlet(ClientV1RestServlet): }) else: raise SynapseError( - 400, "HMAC incorrect", + 403, "HMAC incorrect", ) From 8bad40701b00cbbedd5bf1f4c32a2f7ac77b200b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Mar 2015 13:13:07 +0000 Subject: [PATCH 16/50] Comment. --- synapse/config/registration.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/config/registration.py b/synapse/config/registration.py index 3fed8364c..4401e774d 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -25,6 +25,9 @@ class RegistrationConfig(Config): def __init__(self, args): super(RegistrationConfig, self).__init__(args) + # `args.disable_registration` may either be a bool or a string depending + # on if the option was given a value (e.g. --disable-registration=false + # would set `args.disable_registration` to "false" not False.) self.disable_registration = bool( distutils.util.strtobool(str(args.disable_registration)) ) From c2c9471cbab2795edaced4d89ed9ef80ef129afe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Mar 2015 13:16:37 +0000 Subject: [PATCH 17/50] Don't block waiting on waking up all the listeners when sending an event. --- synapse/handlers/_base.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 1773fa20a..7f07f0521 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -142,7 +142,16 @@ class BaseHandler(object): "Failed to get destination from event %s", s.event_id ) - yield self.notifier.on_new_room_event(event, extra_users=extra_users) + # Don't block waiting on waking up all the listeners. + d = self.notifier.on_new_room_event(event, extra_users=extra_users) + + def log_failure(f): + logger.warn( + "Failed to notify about %s: %s", + event.event_id, f.value + ) + + d.addErrback(log_failure) yield federation_handler.handle_new_event( event, destinations=destinations, From 857810d2dd5e3ca6fe39b3bec7d76d75cb0c94ec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Mar 2015 15:12:47 +0000 Subject: [PATCH 18/50] Revert incorrect changes to where we send events --- synapse/handlers/_base.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 261335b27..2a9d9ec13 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -133,9 +133,10 @@ class BaseHandler(object): for k, s in context.current_state.items(): try: if k[0] == EventTypes.Member: - destinations.add( - UserID.from_string(s.state_key).domain - ) + if s.content["membership"] == Membership.JOIN: + destinations.add( + UserID.from_string(s.state_key).domain + ) except SynapseError: logger.warn( "Failed to get destination from event %s", s.event_id From f1d2b94e0b6fbdde811a7deef3ab4ab7386a207f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Mar 2015 15:13:05 +0000 Subject: [PATCH 19/50] Copy dict of context.current_state before changing it. --- synapse/storage/state.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 71db16d0e..456e4bd45 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -82,7 +82,7 @@ class StateStore(SQLBaseStore): if context.current_state is None: return - state_events = context.current_state + state_events = dict(context.current_state) if event.is_state(): state_events[(event.type, event.state_key)] = event From 6df319b6f07f2acce0c1b9aa19fd9f6005aee4ac Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Mar 2015 15:15:14 +0000 Subject: [PATCH 20/50] Fix tests --- tests/handlers/test_room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/handlers/test_room.py b/tests/handlers/test_room.py index 0da7eb877..6417f7330 100644 --- a/tests/handlers/test_room.py +++ b/tests/handlers/test_room.py @@ -219,7 +219,7 @@ class RoomMemberHandlerTestCase(unittest.TestCase): yield room_handler.change_membership(event, context) self.federation.handle_new_event.assert_called_once_with( - event, destinations=set(['red']) + event, destinations=set() ) self.datastore.persist_event.assert_called_once_with( From be170b1426f31fb56fdb06e4b52ba3fdf617f246 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Mon, 16 Mar 2015 17:21:59 +0000 Subject: [PATCH 21/50] Add a metric for the scheduling latency of SQL queries --- synapse/storage/_base.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 40f2fc6d7..2979a8352 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -38,6 +38,8 @@ transaction_logger = logging.getLogger("synapse.storage.txn") metrics = synapse.metrics.get_metrics_for("synapse.storage") +sql_scheduling_timer = metrics.register_distribution("schedule_time") + sql_query_timer = metrics.register_distribution("query_time", labels=["verb"]) sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"]) sql_getevents_timer = metrics.register_distribution("getEvents_time", labels=["desc"]) @@ -240,6 +242,8 @@ class SQLBaseStore(object): """Wraps the .runInteraction() method on the underlying db_pool.""" current_context = LoggingContext.current_context() + start_time = time.time() * 1000 + def inner_func(txn, *args, **kwargs): with LoggingContext("runInteraction") as context: current_context.copy_to(context) @@ -252,6 +256,7 @@ class SQLBaseStore(object): name = "%s-%x" % (desc, txn_id, ) + sql_scheduling_timer.inc_by(time.time() * 1000 - start_time) transaction_logger.debug("[TXN START] {%s}", name) try: return func(LoggingTransaction(txn, name), *args, **kwargs) From 3f7a31d3663049eeab6fff90e18d060966c51853 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Mon, 16 Mar 2015 18:31:29 +0000 Subject: [PATCH 22/50] Add a DistributionMetric to HTTP request/response processing time in the server --- synapse/http/server.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/synapse/http/server.py b/synapse/http/server.py index f1376ee24..dee49b9e1 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -46,6 +46,11 @@ outgoing_responses_counter = metrics.register_counter( labels=["method", "code"], ) +response_timer = metrics.register_distribution( + "response_time", + labels=["method", "servlet"] +) + class HttpServer(object): """ Interface for registering callbacks on a HTTP server @@ -169,6 +174,10 @@ class JsonResource(HttpServer, resource.Resource): code, response = yield callback(request, *args) self._send_response(request, code, response) + response_timer.inc_by( + self.clock.time_msec() - start, request.method, servlet_classname + ) + return # Huh. No one wanted to handle that? Fiiiiiine. Send 400. From 7564dac8cbb245581c4cba19717f1c30b431059e Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 17 Mar 2015 12:45:37 +0100 Subject: [PATCH 23/50] Wire up the webclient option It existed but was hardcoded to True. Give it an underscore for consistency. Also don't pull in syweb unless we're actually using the web client. --- synapse/app/homeserver.py | 8 +++++--- synapse/config/server.py | 4 +++- synapse/python_dependencies.py | 20 ++++++++++++++++---- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 15c454af7..500cae05f 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -60,7 +60,6 @@ import re import resource import subprocess import sqlite3 -import syweb logger = logging.getLogger(__name__) @@ -83,6 +82,7 @@ class SynapseHomeServer(HomeServer): return AppServiceRestResource(self) def build_resource_for_web_client(self): + import syweb syweb_path = os.path.dirname(syweb.__file__) webclient_path = os.path.join(syweb_path, "webclient") return File(webclient_path) # TODO configurable? @@ -130,7 +130,7 @@ class SynapseHomeServer(HomeServer): True. """ config = self.get_config() - web_client = config.webclient + web_client = config.web_client # list containing (path_str, Resource) e.g: # [ ("/aaa/bbb/cc", Resource1), ("/aaa/dummy", Resource2) ] @@ -343,7 +343,8 @@ def setup(config_options): config.setup_logging() - check_requirements() + # check any extra requirements we have now we have a config + check_requirements(config) version_string = get_version_string() @@ -450,6 +451,7 @@ def run(hs): def main(): with LoggingContext("main"): + # check base requirements check_requirements() hs = setup(sys.argv[1:]) run(hs) diff --git a/synapse/config/server.py b/synapse/config/server.py index b042d4eed..58a828cc4 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -28,7 +28,7 @@ class ServerConfig(Config): self.unsecure_port = args.unsecure_port self.daemonize = args.daemonize self.pid_file = self.abspath(args.pid_file) - self.webclient = True + self.web_client = args.web_client self.manhole = args.manhole self.soft_file_limit = args.soft_file_limit @@ -68,6 +68,8 @@ class ServerConfig(Config): server_group.add_argument('--pid-file', default="homeserver.pid", help="When running as a daemon, the file to" " store the pid in") + server_group.add_argument('--web_client', default=True, type=bool, + help="Whether or not to serve a web client") server_group.add_argument("--manhole", metavar="PORT", dest="manhole", type=int, help="Turn on the twisted telnet manhole" diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 8a5849d96..e27ecbed2 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -5,7 +5,6 @@ logger = logging.getLogger(__name__) REQUIREMENTS = { "syutil>=0.0.3": ["syutil"], - "matrix_angular_sdk>=0.6.5": ["syweb>=0.6.5"], "Twisted==14.0.2": ["twisted==14.0.2"], "service_identity>=1.0.0": ["service_identity>=1.0.0"], "pyopenssl>=0.14": ["OpenSSL>=0.14"], @@ -18,6 +17,19 @@ REQUIREMENTS = { "pillow": ["PIL"], "pydenticon": ["pydenticon"], } +CONDITIONAL_REQUIREMENTS = { + "web_client": { + "matrix_angular_sdk>=0.6.5": ["syweb>=0.6.5"], + } +} + + +def requirements(config=None, include_conditional=False): + reqs = REQUIREMENTS.copy() + for key,req in CONDITIONAL_REQUIREMENTS.items(): + if (config and getattr(config, key)) or include_conditional: + reqs.update(req) + return reqs def github_link(project, version, egg): @@ -46,10 +58,10 @@ class MissingRequirementError(Exception): pass -def check_requirements(): +def check_requirements(config=None): """Checks that all the modules needed by synapse have been correctly installed and are at the correct version""" - for dependency, module_requirements in REQUIREMENTS.items(): + for dependency, module_requirements in requirements(config, include_conditional=False).items(): for module_requirement in module_requirements: if ">=" in module_requirement: module_name, required_version = module_requirement.split(">=") @@ -110,7 +122,7 @@ def list_requirements(): egg = link.split("#egg=")[1] linked.append(egg.split('-')[0]) result.append(link) - for requirement in REQUIREMENTS: + for requirement in requirements(include_conditional=True): is_linked = False for link in linked: if requirement.replace('-', '_').startswith(link): From 6d33f97703f95607da5777299404cb784e1930aa Mon Sep 17 00:00:00 2001 From: David Baker Date: Tue, 17 Mar 2015 11:53:55 +0000 Subject: [PATCH 24/50] pep8 --- synapse/python_dependencies.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index e27ecbed2..6b6d5508b 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -26,7 +26,7 @@ CONDITIONAL_REQUIREMENTS = { def requirements(config=None, include_conditional=False): reqs = REQUIREMENTS.copy() - for key,req in CONDITIONAL_REQUIREMENTS.items(): + for key, req in CONDITIONAL_REQUIREMENTS.items(): if (config and getattr(config, key)) or include_conditional: reqs.update(req) return reqs @@ -61,7 +61,8 @@ class MissingRequirementError(Exception): def check_requirements(config=None): """Checks that all the modules needed by synapse have been correctly installed and are at the correct version""" - for dependency, module_requirements in requirements(config, include_conditional=False).items(): + for dependency, module_requirements in ( + requirements(config, include_conditional=False).items()): for module_requirement in module_requirements: if ">=" in module_requirement: module_name, required_version = module_requirement.split(">=") From 1489521ee5e5edd4d3d8735e1a3a5f6a9c7ee58b Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Tue, 17 Mar 2015 17:19:22 +0000 Subject: [PATCH 25/50] Be polite and ensure we use @functools.wraps() when creating a function decorator --- synapse/storage/_base.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2979a8352..9125bb119 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -25,6 +25,7 @@ import synapse.metrics from twisted.internet import defer from collections import namedtuple, OrderedDict +import functools import simplejson as json import sys import time @@ -81,6 +82,7 @@ def cached(max_entries=1000): cache[key] = value + @functools.wraps(orig) @defer.inlineCallbacks def wrapped(self, key): if key in cache: From 93978c5e2beb22b097061e58182777e9c2257228 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Tue, 17 Mar 2015 17:24:51 +0000 Subject: [PATCH 26/50] @cached() annotate get_user_by_token() - achieves a minor DB performance improvement --- synapse/api/auth.py | 2 +- synapse/storage/registration.py | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 90f9eb684..64f605b96 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -388,7 +388,7 @@ class Auth(object): AuthError if no user by that token exists or the token is invalid. """ try: - ret = yield self.store.get_user_by_token(token=token) + ret = yield self.store.get_user_by_token(token) if not ret: raise StoreError(400, "Unknown token") user_info = { diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index adc8fc079..3c2f1d6a1 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -19,7 +19,7 @@ from sqlite3 import IntegrityError from synapse.api.errors import StoreError, Codes -from ._base import SQLBaseStore +from ._base import SQLBaseStore, cached class RegistrationStore(SQLBaseStore): @@ -91,6 +91,11 @@ class RegistrationStore(SQLBaseStore): "get_user_by_id", self.cursor_to_dict, query, user_id ) + @cached() + # TODO(paul): Currently there's no code to invalidate this cache. That + # means if/when we ever add internal ways to invalidate access tokens or + # change whether a user is a server admin, those will need to invoke + # store.get_user_by_token.invalidate(token) def get_user_by_token(self, token): """Get a user from the given access token. From bb246091585dc648177f70a022a185784b3ff862 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Mar 2015 11:19:47 +0000 Subject: [PATCH 27/50] Clean out event_forward_extremities table when the server rejoins the room --- synapse/handlers/federation.py | 2 ++ synapse/storage/event_federation.py | 12 ++++++++++++ 2 files changed, 14 insertions(+) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 65cfacba2..15ba417e0 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -290,6 +290,8 @@ class FederationHandler(BaseHandler): """ logger.debug("Joining %s to %s", joinee, room_id) + yield self.store.clean_room_for_join(room_id) + origin, pdu = yield self.replication_layer.make_join( target_hosts, room_id, diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 2deda8ac5..032334bfd 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -429,3 +429,15 @@ class EventFederationStore(SQLBaseStore): ) return events[:limit] + + def clean_room_for_join(self, room_id): + return self.runInteraction( + "clean_room_for_join", + self._clean_room_for_join_txn, + room_id, + ) + + def _clean_room_for_join_txn(self, txn, room_id): + query = "DELETE FROM event_forward_extremities WHERE room_id = ?" + + txn.execute(query, (room_id,)) From 57976f646ffe60eeb5fafce646983641fbfd7944 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Mar 2015 11:30:04 +0000 Subject: [PATCH 28/50] Do more validation of incoming request --- synapse/rest/client/v1/register.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index 86519fd9d..ccc457924 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -316,11 +316,11 @@ class RegisterRestServlet(ClientV1RestServlet): def _do_shared_secret(self, request, register_json, session): yield run_on_reactor() - if "mac" not in register_json: + if not isinstance(register_json.get("mac", None), basestring): raise SynapseError(400, "Expected mac.") - if "user" not in register_json: + if not isinstance(register_json.get("user", None), basestring): raise SynapseError(400, "Expected 'user' key.") - if "password" not in register_json: + if not isinstance(register_json.get("password", None), basestring): raise SynapseError(400, "Expected 'password' key.") if not self.hs.config.registration_shared_secret: From f88db7ac0bc36974240db869606634b817471842 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Mar 2015 11:33:46 +0000 Subject: [PATCH 29/50] Factor out user id validation checks --- synapse/handlers/register.py | 8 ++++++++ synapse/rest/client/v1/register.py | 14 +++++--------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index cda4a8502..c25e32109 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -31,6 +31,7 @@ import base64 import bcrypt import json import logging +import urllib logger = logging.getLogger(__name__) @@ -63,6 +64,13 @@ class RegistrationHandler(BaseHandler): password_hash = bcrypt.hashpw(password, bcrypt.gensalt()) if localpart: + if localpart and urllib.quote(localpart) != localpart: + raise SynapseError( + 400, + "User ID must only contain characters which do not" + " require URL encoding." + ) + user = UserID(localpart, self.hs.hostname) user_id = user.to_string() diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index ccc457924..a56834e36 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -27,7 +27,6 @@ from hashlib import sha1 import hmac import simplejson as json import logging -import urllib logger = logging.getLogger(__name__) @@ -263,14 +262,11 @@ class RegisterRestServlet(ClientV1RestServlet): ) password = register_json["password"].encode("utf-8") - desired_user_id = (register_json["user"].encode("utf-8") - if "user" in register_json else None) - if (desired_user_id - and urllib.quote(desired_user_id) != desired_user_id): - raise SynapseError( - 400, - "User ID must only contain characters which do not " + - "require URL encoding.") + desired_user_id = ( + register_json["user"].encode("utf-8") + if "user" in register_json else None + ) + handler = self.handlers.registration_handler (user_id, token) = yield handler.register( localpart=desired_user_id, From 6cc046302f634c0cca8371108b7442600c531d5e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Mar 2015 11:41:00 +0000 Subject: [PATCH 30/50] Bump version --- synapse/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/__init__.py b/synapse/__init__.py index f46a6df1f..e134fb241 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.8.0" +__version__ = "0.8.1" From 0a55a2b69229fff4e115f65a34e5d8db2ddd522d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Mar 2015 11:48:47 +0000 Subject: [PATCH 31/50] Update CHANGES --- CHANGES.rst | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index f89542a2b..da31af960 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,12 @@ +Changes in synapse v0.8.1 (2015-03-18) +====================================== + +* Disable registration by default. New users can be added using the command + ``register_new_matrix_user`` or by enabling registration in the config. +* Add metrics to synapse. To enable metrics use config options + ``enable_metrics`` and ``metrics_port``. +* Fix bug where banning only kicked the user. + Changes in synapse v0.8.0 (2015-03-06) ====================================== From 64cf1483e5386c961a8836b610e29a4bb32dbed7 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 19 Mar 2015 11:21:34 +0000 Subject: [PATCH 32/50] D'oh - setup.py used the dict directly: make it use the wrapper function. --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 45943adb2..ab24159be 100755 --- a/setup.py +++ b/setup.py @@ -45,7 +45,7 @@ setup( version=version, packages=find_packages(exclude=["tests", "tests.*"]), description="Reference Synapse Home Server", - install_requires=dependencies["REQUIREMENTS"].keys(), + install_requires=dependencies['requirements'](include_conditional=True).keys(), setup_requires=[ "Twisted==14.0.2", # Here to override setuptools_trial's dependency on Twisted>=2.4.0 "setuptools_trial", From 0dcb145c7e55a76086ac13955c45dfd76e583a18 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Mar 2015 11:26:03 +0000 Subject: [PATCH 33/50] Bump version --- synapse/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/__init__.py b/synapse/__init__.py index e134fb241..ca53f2db8 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.8.1" +__version__ = "0.8.1-r1" From c178e4e6cacfdcf3f69361bcf3eaa24f939c7c69 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Mar 2015 17:48:21 +0000 Subject: [PATCH 34/50] Add missing servlet to list --- synapse/federation/transport/server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 7838a8136..2bfe0f3c9 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -361,4 +361,5 @@ SERVLET_CLASSES = ( FederationInviteServlet, FederationQueryAuthServlet, FederationGetMissingEventsServlet, + FederationEventAuthServlet, ) From 56f2d316769eeb6327585de2c640ecfb90a3af4d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Mar 2015 17:48:33 +0000 Subject: [PATCH 35/50] Bump version --- synapse/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/__init__.py b/synapse/__init__.py index ca53f2db8..749a60329 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.8.1-r1" +__version__ = "0.8.1-r2" From 87db64b83962873a3cf2af951e4c4bc2e4d50d76 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Mar 2015 13:52:56 +0000 Subject: [PATCH 36/50] Rearrange storage modules --- synapse/storage/__init__.py | 472 +----------------------------------- synapse/storage/_base.py | 7 + synapse/storage/events.py | 394 ++++++++++++++++++++++++++++++ synapse/storage/feedback.py | 47 ---- synapse/storage/room.py | 37 +++ synapse/storage/state.py | 32 +++ synapse/storage/stream.py | 19 ++ 7 files changed, 493 insertions(+), 515 deletions(-) create mode 100644 synapse/storage/events.py delete mode 100644 synapse/storage/feedback.py diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 4b16f445d..4295f7348 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -13,14 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer - -from synapse.util.logutils import log_function -from synapse.api.constants import EventTypes - from .appservice import ApplicationServiceStore from .directory import DirectoryStore -from .feedback import FeedbackStore +from .events import EventsStore from .presence import PresenceStore from .profile import ProfileStore from .registration import RegistrationStore @@ -39,11 +34,6 @@ from .state import StateStore from .signatures import SignatureStore from .filtering import FilteringStore -from syutil.base64util import decode_base64 -from syutil.jsonutil import encode_canonical_json - -from synapse.crypto.event_signing import compute_event_reference_hash - import fnmatch import imp @@ -62,15 +52,8 @@ SCHEMA_VERSION = 14 dir_path = os.path.abspath(os.path.dirname(__file__)) -class _RollbackButIsFineException(Exception): - """ This exception is used to rollback a transaction without implying - something went wrong. - """ - pass - - class DataStore(RoomMemberStore, RoomStore, - RegistrationStore, StreamStore, ProfileStore, FeedbackStore, + RegistrationStore, StreamStore, ProfileStore, PresenceStore, TransactionStore, DirectoryStore, KeyStore, StateStore, SignatureStore, ApplicationServiceStore, @@ -79,7 +62,8 @@ class DataStore(RoomMemberStore, RoomStore, RejectionsStore, FilteringStore, PusherStore, - PushRuleStore + PushRuleStore, + EventsStore, ): def __init__(self, hs): @@ -89,422 +73,6 @@ class DataStore(RoomMemberStore, RoomStore, self.min_token_deferred = self._get_min_token() self.min_token = None - @defer.inlineCallbacks - @log_function - def persist_event(self, event, context, backfilled=False, - is_new_state=True, current_state=None): - stream_ordering = None - if backfilled: - if not self.min_token_deferred.called: - yield self.min_token_deferred - self.min_token -= 1 - stream_ordering = self.min_token - - try: - yield self.runInteraction( - "persist_event", - self._persist_event_txn, - event=event, - context=context, - backfilled=backfilled, - stream_ordering=stream_ordering, - is_new_state=is_new_state, - current_state=current_state, - ) - except _RollbackButIsFineException: - pass - - @defer.inlineCallbacks - def get_event(self, event_id, check_redacted=True, - get_prev_content=False, allow_rejected=False, - allow_none=False): - """Get an event from the database by event_id. - - Args: - event_id (str): The event_id of the event to fetch - check_redacted (bool): If True, check if event has been redacted - and redact it. - get_prev_content (bool): If True and event is a state event, - include the previous states content in the unsigned field. - allow_rejected (bool): If True return rejected events. - allow_none (bool): If True, return None if no event found, if - False throw an exception. - - Returns: - Deferred : A FrozenEvent. - """ - event = yield self.runInteraction( - "get_event", self._get_event_txn, - event_id, - check_redacted=check_redacted, - get_prev_content=get_prev_content, - allow_rejected=allow_rejected, - ) - - if not event and not allow_none: - raise RuntimeError("Could not find event %s" % (event_id,)) - - defer.returnValue(event) - - @log_function - def _persist_event_txn(self, txn, event, context, backfilled, - stream_ordering=None, is_new_state=True, - current_state=None): - - # Remove the any existing cache entries for the event_id - self._get_event_cache.pop(event.event_id) - - # We purposefully do this first since if we include a `current_state` - # key, we *want* to update the `current_state_events` table - if current_state: - txn.execute( - "DELETE FROM current_state_events WHERE room_id = ?", - (event.room_id,) - ) - - for s in current_state: - self._simple_insert_txn( - txn, - "current_state_events", - { - "event_id": s.event_id, - "room_id": s.room_id, - "type": s.type, - "state_key": s.state_key, - }, - or_replace=True, - ) - - if event.is_state() and is_new_state: - if not backfilled and not context.rejected: - self._simple_insert_txn( - txn, - table="state_forward_extremities", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - }, - or_replace=True, - ) - - for prev_state_id, _ in event.prev_state: - self._simple_delete_txn( - txn, - table="state_forward_extremities", - keyvalues={ - "event_id": prev_state_id, - } - ) - - outlier = event.internal_metadata.is_outlier() - - if not outlier: - self._store_state_groups_txn(txn, event, context) - - self._update_min_depth_for_room_txn( - txn, - event.room_id, - event.depth - ) - - self._handle_prev_events( - txn, - outlier=outlier, - event_id=event.event_id, - prev_events=event.prev_events, - room_id=event.room_id, - ) - - have_persisted = self._simple_select_one_onecol_txn( - txn, - table="event_json", - keyvalues={"event_id": event.event_id}, - retcol="event_id", - allow_none=True, - ) - - metadata_json = encode_canonical_json( - event.internal_metadata.get_dict() - ) - - # If we have already persisted this event, we don't need to do any - # more processing. - # The processing above must be done on every call to persist event, - # since they might not have happened on previous calls. For example, - # if we are persisting an event that we had persisted as an outlier, - # but is no longer one. - if have_persisted: - if not outlier: - sql = ( - "UPDATE event_json SET internal_metadata = ?" - " WHERE event_id = ?" - ) - txn.execute( - sql, - (metadata_json.decode("UTF-8"), event.event_id,) - ) - - sql = ( - "UPDATE events SET outlier = 0" - " WHERE event_id = ?" - ) - txn.execute( - sql, - (event.event_id,) - ) - return - - if event.type == EventTypes.Member: - self._store_room_member_txn(txn, event) - elif event.type == EventTypes.Feedback: - self._store_feedback_txn(txn, event) - elif event.type == EventTypes.Name: - self._store_room_name_txn(txn, event) - elif event.type == EventTypes.Topic: - self._store_room_topic_txn(txn, event) - elif event.type == EventTypes.Redaction: - self._store_redaction(txn, event) - - event_dict = { - k: v - for k, v in event.get_dict().items() - if k not in [ - "redacted", - "redacted_because", - ] - } - - self._simple_insert_txn( - txn, - table="event_json", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "internal_metadata": metadata_json.decode("UTF-8"), - "json": encode_canonical_json(event_dict).decode("UTF-8"), - }, - or_replace=True, - ) - - content = encode_canonical_json( - event.content - ).decode("UTF-8") - - vals = { - "topological_ordering": event.depth, - "event_id": event.event_id, - "type": event.type, - "room_id": event.room_id, - "content": content, - "processed": True, - "outlier": outlier, - "depth": event.depth, - } - - if stream_ordering is not None: - vals["stream_ordering"] = stream_ordering - - unrec = { - k: v - for k, v in event.get_dict().items() - if k not in vals.keys() and k not in [ - "redacted", - "redacted_because", - "signatures", - "hashes", - "prev_events", - ] - } - - vals["unrecognized_keys"] = encode_canonical_json( - unrec - ).decode("UTF-8") - - try: - self._simple_insert_txn( - txn, - "events", - vals, - or_replace=(not outlier), - or_ignore=bool(outlier), - ) - except: - logger.warn( - "Failed to persist, probably duplicate: %s", - event.event_id, - exc_info=True, - ) - raise _RollbackButIsFineException("_persist_event") - - if context.rejected: - self._store_rejections_txn(txn, event.event_id, context.rejected) - - if event.is_state(): - vals = { - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - } - - # TODO: How does this work with backfilling? - if hasattr(event, "replaces_state"): - vals["prev_state"] = event.replaces_state - - self._simple_insert_txn( - txn, - "state_events", - vals, - or_replace=True, - ) - - if is_new_state and not context.rejected: - self._simple_insert_txn( - txn, - "current_state_events", - { - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - }, - or_replace=True, - ) - - for e_id, h in event.prev_state: - self._simple_insert_txn( - txn, - table="event_edges", - values={ - "event_id": event.event_id, - "prev_event_id": e_id, - "room_id": event.room_id, - "is_state": 1, - }, - or_ignore=True, - ) - - for hash_alg, hash_base64 in event.hashes.items(): - hash_bytes = decode_base64(hash_base64) - self._store_event_content_hash_txn( - txn, event.event_id, hash_alg, hash_bytes, - ) - - for prev_event_id, prev_hashes in event.prev_events: - for alg, hash_base64 in prev_hashes.items(): - hash_bytes = decode_base64(hash_base64) - self._store_prev_event_hash_txn( - txn, event.event_id, prev_event_id, alg, hash_bytes - ) - - for auth_id, _ in event.auth_events: - self._simple_insert_txn( - txn, - table="event_auth", - values={ - "event_id": event.event_id, - "room_id": event.room_id, - "auth_id": auth_id, - }, - or_ignore=True, - ) - - (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) - self._store_event_reference_hash_txn( - txn, event.event_id, ref_alg, ref_hash_bytes - ) - - def _store_redaction(self, txn, event): - # invalidate the cache for the redacted event - self._get_event_cache.pop(event.redacts) - txn.execute( - "INSERT OR IGNORE INTO redactions " - "(event_id, redacts) VALUES (?,?)", - (event.event_id, event.redacts) - ) - - @defer.inlineCallbacks - def get_current_state(self, room_id, event_type=None, state_key=""): - del_sql = ( - "SELECT event_id FROM redactions WHERE redacts = e.event_id " - "LIMIT 1" - ) - - sql = ( - "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " - "INNER JOIN current_state_events as c ON e.event_id = c.event_id " - "INNER JOIN state_events as s ON e.event_id = s.event_id " - "WHERE c.room_id = ? " - ) % { - "redacted": del_sql, - } - - if event_type and state_key is not None: - sql += " AND s.type = ? AND s.state_key = ? " - args = (room_id, event_type, state_key) - elif event_type: - sql += " AND s.type = ?" - args = (room_id, event_type) - else: - args = (room_id, ) - - results = yield self._execute_and_decode("get_current_state", sql, *args) - - events = yield self._parse_events(results) - defer.returnValue(events) - - @defer.inlineCallbacks - def get_room_name_and_aliases(self, room_id): - del_sql = ( - "SELECT event_id FROM redactions WHERE redacts = e.event_id " - "LIMIT 1" - ) - - sql = ( - "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " - "INNER JOIN current_state_events as c ON e.event_id = c.event_id " - "INNER JOIN state_events as s ON e.event_id = s.event_id " - "WHERE c.room_id = ? " - ) % { - "redacted": del_sql, - } - - sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')" - sql += " OR s.type = 'm.room.aliases')" - args = (room_id,) - - results = yield self._execute_and_decode("get_current_state", sql, *args) - - events = yield self._parse_events(results) - - name = None - aliases = [] - - for e in events: - if e.type == 'm.room.name': - if 'name' in e.content: - name = e.content['name'] - elif e.type == 'm.room.aliases': - if 'aliases' in e.content: - aliases.extend(e.content['aliases']) - - defer.returnValue((name, aliases)) - - @defer.inlineCallbacks - def _get_min_token(self): - row = yield self._execute( - "_get_min_token", None, "SELECT MIN(stream_ordering) FROM events" - ) - - self.min_token = row[0][0] if row and row[0] and row[0][0] else -1 - self.min_token = min(self.min_token, -1) - - logger.debug("min_token is: %s", self.min_token) - - defer.returnValue(self.min_token) - def insert_client_ip(self, user, access_token, device_id, ip, user_agent): return self._simple_insert( "user_ips", @@ -527,38 +95,6 @@ class DataStore(RoomMemberStore, RoomStore, ], ) - def have_events(self, event_ids): - """Given a list of event ids, check if we have already processed them. - - Returns: - dict: Has an entry for each event id we already have seen. Maps to - the rejected reason string if we rejected the event, else maps to - None. - """ - if not event_ids: - return defer.succeed({}) - - def f(txn): - sql = ( - "SELECT e.event_id, reason FROM events as e " - "LEFT JOIN rejections as r ON e.event_id = r.event_id " - "WHERE e.event_id = ?" - ) - - res = {} - for event_id in event_ids: - txn.execute(sql, (event_id,)) - row = txn.fetchone() - if row: - _, rejected = row - res[event_id] = rejected - - return res - - return self.runInteraction( - "have_events", f, - ) - def read_schema(path): """ Read the named database schema. diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 9125bb119..0260b4e64 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -789,6 +789,13 @@ class SQLBaseStore(object): return result[0] if result else None +class _RollbackButIsFineException(Exception): + """ This exception is used to rollback a transaction without implying + something went wrong. + """ + pass + + class Table(object): """ A base class used to store information about a particular table. """ diff --git a/synapse/storage/events.py b/synapse/storage/events.py new file mode 100644 index 000000000..b295dc5b2 --- /dev/null +++ b/synapse/storage/events.py @@ -0,0 +1,394 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from _base import SQLBaseStore, _RollbackButIsFineException + +from twisted.internet import defer + +from synapse.util.logutils import log_function +from synapse.api.constants import EventTypes +from synapse.crypto.event_signing import compute_event_reference_hash + +from syutil.base64util import decode_base64 +from syutil.jsonutil import encode_canonical_json + +import logging + +logger = logging.getLogger(__name__) + + +class EventsStore(SQLBaseStore): + @defer.inlineCallbacks + @log_function + def persist_event(self, event, context, backfilled=False, + is_new_state=True, current_state=None): + stream_ordering = None + if backfilled: + if not self.min_token_deferred.called: + yield self.min_token_deferred + self.min_token -= 1 + stream_ordering = self.min_token + + try: + yield self.runInteraction( + "persist_event", + self._persist_event_txn, + event=event, + context=context, + backfilled=backfilled, + stream_ordering=stream_ordering, + is_new_state=is_new_state, + current_state=current_state, + ) + except _RollbackButIsFineException: + pass + + @defer.inlineCallbacks + def get_event(self, event_id, check_redacted=True, + get_prev_content=False, allow_rejected=False, + allow_none=False): + """Get an event from the database by event_id. + + Args: + event_id (str): The event_id of the event to fetch + check_redacted (bool): If True, check if event has been redacted + and redact it. + get_prev_content (bool): If True and event is a state event, + include the previous states content in the unsigned field. + allow_rejected (bool): If True return rejected events. + allow_none (bool): If True, return None if no event found, if + False throw an exception. + + Returns: + Deferred : A FrozenEvent. + """ + event = yield self.runInteraction( + "get_event", self._get_event_txn, + event_id, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) + + if not event and not allow_none: + raise RuntimeError("Could not find event %s" % (event_id,)) + + defer.returnValue(event) + + @log_function + def _persist_event_txn(self, txn, event, context, backfilled, + stream_ordering=None, is_new_state=True, + current_state=None): + + # Remove the any existing cache entries for the event_id + self._get_event_cache.pop(event.event_id) + + # We purposefully do this first since if we include a `current_state` + # key, we *want* to update the `current_state_events` table + if current_state: + txn.execute( + "DELETE FROM current_state_events WHERE room_id = ?", + (event.room_id,) + ) + + for s in current_state: + self._simple_insert_txn( + txn, + "current_state_events", + { + "event_id": s.event_id, + "room_id": s.room_id, + "type": s.type, + "state_key": s.state_key, + }, + or_replace=True, + ) + + if event.is_state() and is_new_state: + if not backfilled and not context.rejected: + self._simple_insert_txn( + txn, + table="state_forward_extremities", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + }, + or_replace=True, + ) + + for prev_state_id, _ in event.prev_state: + self._simple_delete_txn( + txn, + table="state_forward_extremities", + keyvalues={ + "event_id": prev_state_id, + } + ) + + outlier = event.internal_metadata.is_outlier() + + if not outlier: + self._store_state_groups_txn(txn, event, context) + + self._update_min_depth_for_room_txn( + txn, + event.room_id, + event.depth + ) + + self._handle_prev_events( + txn, + outlier=outlier, + event_id=event.event_id, + prev_events=event.prev_events, + room_id=event.room_id, + ) + + have_persisted = self._simple_select_one_onecol_txn( + txn, + table="event_json", + keyvalues={"event_id": event.event_id}, + retcol="event_id", + allow_none=True, + ) + + metadata_json = encode_canonical_json( + event.internal_metadata.get_dict() + ) + + # If we have already persisted this event, we don't need to do any + # more processing. + # The processing above must be done on every call to persist event, + # since they might not have happened on previous calls. For example, + # if we are persisting an event that we had persisted as an outlier, + # but is no longer one. + if have_persisted: + if not outlier: + sql = ( + "UPDATE event_json SET internal_metadata = ?" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (metadata_json.decode("UTF-8"), event.event_id,) + ) + + sql = ( + "UPDATE events SET outlier = 0" + " WHERE event_id = ?" + ) + txn.execute( + sql, + (event.event_id,) + ) + return + + if event.type == EventTypes.Member: + self._store_room_member_txn(txn, event) + elif event.type == EventTypes.Feedback: + self._store_feedback_txn(txn, event) + elif event.type == EventTypes.Name: + self._store_room_name_txn(txn, event) + elif event.type == EventTypes.Topic: + self._store_room_topic_txn(txn, event) + elif event.type == EventTypes.Redaction: + self._store_redaction(txn, event) + + event_dict = { + k: v + for k, v in event.get_dict().items() + if k not in [ + "redacted", + "redacted_because", + ] + } + + self._simple_insert_txn( + txn, + table="event_json", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "internal_metadata": metadata_json.decode("UTF-8"), + "json": encode_canonical_json(event_dict).decode("UTF-8"), + }, + or_replace=True, + ) + + content = encode_canonical_json( + event.content + ).decode("UTF-8") + + vals = { + "topological_ordering": event.depth, + "event_id": event.event_id, + "type": event.type, + "room_id": event.room_id, + "content": content, + "processed": True, + "outlier": outlier, + "depth": event.depth, + } + + if stream_ordering is not None: + vals["stream_ordering"] = stream_ordering + + unrec = { + k: v + for k, v in event.get_dict().items() + if k not in vals.keys() and k not in [ + "redacted", + "redacted_because", + "signatures", + "hashes", + "prev_events", + ] + } + + vals["unrecognized_keys"] = encode_canonical_json( + unrec + ).decode("UTF-8") + + try: + self._simple_insert_txn( + txn, + "events", + vals, + or_replace=(not outlier), + or_ignore=bool(outlier), + ) + except: + logger.warn( + "Failed to persist, probably duplicate: %s", + event.event_id, + exc_info=True, + ) + raise _RollbackButIsFineException("_persist_event") + + if context.rejected: + self._store_rejections_txn(txn, event.event_id, context.rejected) + + if event.is_state(): + vals = { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + } + + # TODO: How does this work with backfilling? + if hasattr(event, "replaces_state"): + vals["prev_state"] = event.replaces_state + + self._simple_insert_txn( + txn, + "state_events", + vals, + ) + + if is_new_state and not context.rejected: + self._simple_insert_txn( + txn, + "current_state_events", + { + "event_id": event.event_id, + "room_id": event.room_id, + "type": event.type, + "state_key": event.state_key, + }, + ) + + for e_id, h in event.prev_state: + self._simple_insert_txn( + txn, + table="event_edges", + values={ + "event_id": event.event_id, + "prev_event_id": e_id, + "room_id": event.room_id, + "is_state": 1, + }, + ) + + for hash_alg, hash_base64 in event.hashes.items(): + hash_bytes = decode_base64(hash_base64) + self._store_event_content_hash_txn( + txn, event.event_id, hash_alg, hash_bytes, + ) + + for prev_event_id, prev_hashes in event.prev_events: + for alg, hash_base64 in prev_hashes.items(): + hash_bytes = decode_base64(hash_base64) + self._store_prev_event_hash_txn( + txn, event.event_id, prev_event_id, alg, hash_bytes + ) + + for auth_id, _ in event.auth_events: + self._simple_insert_txn( + txn, + table="event_auth", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "auth_id": auth_id, + }, + ) + + (ref_alg, ref_hash_bytes) = compute_event_reference_hash(event) + self._store_event_reference_hash_txn( + txn, event.event_id, ref_alg, ref_hash_bytes + ) + + def _store_redaction(self, txn, event): + # invalidate the cache for the redacted event + self._get_event_cache.pop(event.redacts) + txn.execute( + "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", + (event.event_id, event.redacts) + ) + + def have_events(self, event_ids): + """Given a list of event ids, check if we have already processed them. + + Returns: + dict: Has an entry for each event id we already have seen. Maps to + the rejected reason string if we rejected the event, else maps to + None. + """ + if not event_ids: + return defer.succeed({}) + + def f(txn): + sql = ( + "SELECT e.event_id, reason FROM events as e " + "LEFT JOIN rejections as r ON e.event_id = r.event_id " + "WHERE e.event_id = ?" + ) + + res = {} + for event_id in event_ids: + txn.execute(sql, (event_id,)) + row = txn.fetchone() + if row: + _, rejected = row + res[event_id] = rejected + + return res + + return self.runInteraction( + "have_events", f, + ) diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py deleted file mode 100644 index 8eab769b7..000000000 --- a/synapse/storage/feedback.py +++ /dev/null @@ -1,47 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014, 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from twisted.internet import defer - -from ._base import SQLBaseStore - - -class FeedbackStore(SQLBaseStore): - - def _store_feedback_txn(self, txn, event): - self._simple_insert_txn(txn, "feedback", { - "event_id": event.event_id, - "feedback_type": event.content["type"], - "room_id": event.room_id, - "target_event_id": event.content["target_event_id"], - "sender": event.user_id, - }) - - @defer.inlineCallbacks - def get_feedback_for_event(self, event_id): - sql = ( - "SELECT events.* FROM events INNER JOIN feedback " - "ON events.event_id = feedback.event_id " - "WHERE feedback.target_event_id = ? " - ) - - rows = yield self._execute_and_decode("get_feedback_for_event", sql, event_id) - - defer.returnValue( - [ - (yield self._parse_events(r)) - for r in rows - ] - ) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 549c9af39..71bae1545 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -158,6 +158,43 @@ class RoomStore(SQLBaseStore): } ) + @defer.inlineCallbacks + def get_room_name_and_aliases(self, room_id): + del_sql = ( + "SELECT event_id FROM redactions WHERE redacts = e.event_id " + "LIMIT 1" + ) + + sql = ( + "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " + "INNER JOIN current_state_events as c ON e.event_id = c.event_id " + "INNER JOIN state_events as s ON e.event_id = s.event_id " + "WHERE c.room_id = ? " + ) % { + "redacted": del_sql, + } + + sql += " AND ((s.type = 'm.room.name' AND s.state_key = '')" + sql += " OR s.type = 'm.room.aliases')" + args = (room_id,) + + results = yield self._execute_and_decode("get_current_state", sql, *args) + + events = yield self._parse_events(results) + + name = None + aliases = [] + + for e in events: + if e.type == 'm.room.name': + if 'name' in e.content: + name = e.content['name'] + elif e.type == 'm.room.aliases': + if 'aliases' in e.content: + aliases.extend(e.content['aliases']) + + defer.returnValue((name, aliases)) + class RoomsTable(Table): table_name = "rooms" diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 456e4bd45..58dbf2802 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -15,6 +15,8 @@ from ._base import SQLBaseStore +from twisted.internet import defer + import logging logger = logging.getLogger(__name__) @@ -122,3 +124,33 @@ class StateStore(SQLBaseStore): }, or_replace=True, ) + + @defer.inlineCallbacks + def get_current_state(self, room_id, event_type=None, state_key=""): + del_sql = ( + "SELECT event_id FROM redactions WHERE redacts = e.event_id " + "LIMIT 1" + ) + + sql = ( + "SELECT e.*, (%(redacted)s) AS redacted FROM events as e " + "INNER JOIN current_state_events as c ON e.event_id = c.event_id " + "INNER JOIN state_events as s ON e.event_id = s.event_id " + "WHERE c.room_id = ? " + ) % { + "redacted": del_sql, + } + + if event_type and state_key is not None: + sql += " AND s.type = ? AND s.state_key = ? " + args = (room_id, event_type, state_key) + elif event_type: + sql += " AND s.type = ?" + args = (room_id, event_type) + else: + args = (room_id, ) + + results = yield self._execute_and_decode("get_current_state", sql, *args) + + events = yield self._parse_events(results) + defer.returnValue(events) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 09bc52221..df234efdf 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -419,6 +419,25 @@ class StreamStore(SQLBaseStore): self._get_room_events_max_id_txn ) + @defer.inlineCallbacks + def _get_min_token(self): + row = yield self._execute( + "_get_min_token", None, "SELECT MIN(stream_ordering) FROM events" + ) + + self.min_token = row[0][0] if row and row[0] and row[0][0] else -1 + self.min_token = min(self.min_token, -1) + + logger.debug("min_token is: %s", self.min_token) + + defer.returnValue(self.min_token) + + def get_next_stream_id(self): + with self._next_stream_id_lock: + i = self._next_stream_id + self._next_stream_id += 1 + return i + def _get_room_events_max_id_txn(self, txn): txn.execute( "SELECT MAX(stream_ordering) as m FROM events" From 91cb46191d23e840f6772a3113580a1b77c60ef0 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Tue, 17 Mar 2015 18:38:55 +0000 Subject: [PATCH 37/50] Allow @cached-wrapped functions to have more or fewer than 1 argument; assert on the total count of them though --- synapse/storage/_base.py | 40 +++++++++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 9125bb119..f483bd152 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -54,13 +54,12 @@ cache_counter = metrics.register_cache( # TODO(paul): -# * more generic key management # * consider other eviction strategies - LRU? -def cached(max_entries=1000): +def cached(max_entries=1000, num_args=1): """ A method decorator that applies a memoizing cache around the function. - The function is presumed to take one additional argument, which is used as - the key for the cache. Cache hits are served directly from the cache; + The function is presumed to take zero or more arguments, which are used in + a tuple as the key for the cache. Hits are served directly from the cache; misses use the function body to generate the value. The wrapped function has an additional member, a callable called @@ -76,26 +75,41 @@ def cached(max_entries=1000): caches_by_name[name] = cache - def prefill(key, value): + def prefill(*args): # because I can't *keyargs, value + keyargs = args[:-1] + value = args[-1] + + if len(keyargs) != num_args: + raise ValueError("Expected a call to have %d arguments", num_args) + while len(cache) > max_entries: cache.popitem(last=False) - cache[key] = value + cache[keyargs] = value @functools.wraps(orig) @defer.inlineCallbacks - def wrapped(self, key): - if key in cache: + def wrapped(self, *keyargs): + if len(keyargs) != num_args: + raise ValueError("Expected a call to have %d arguments", num_args) + + if keyargs in cache: cache_counter.inc_hits(name) - defer.returnValue(cache[key]) + defer.returnValue(cache[keyargs]) cache_counter.inc_misses(name) - ret = yield orig(self, key) - prefill(key, ret) + ret = yield orig(self, *keyargs) + + prefill_args = keyargs + (ret,) + prefill(*prefill_args) + defer.returnValue(ret) - def invalidate(key): - cache.pop(key, None) + def invalidate(*keyargs): + if len(keyargs) != num_args: + raise ValueError("Expected a call to have %d arguments", num_args) + + cache.pop(keyargs, None) wrapped.invalidate = invalidate wrapped.prefill = prefill From 7e282a53a542832f44119b886b2d4d474bbe1d0f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Mar 2015 14:59:48 +0000 Subject: [PATCH 38/50] Tidy up _simple_... methods --- synapse/storage/_base.py | 101 ++++++++++++++++++++++-------------- synapse/storage/presence.py | 1 - tests/storage/test_base.py | 2 +- tests/storage/test_room.py | 2 +- 4 files changed, 63 insertions(+), 43 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 0260b4e64..a6f94de01 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -399,7 +399,7 @@ class SQLBaseStore(object): txn.execute(sql, allvalues.values()) def _simple_select_one(self, table, keyvalues, retcols, - allow_none=False): + allow_none=False, desc="_simple_select_one"): """Executes a SELECT query on the named table, which is expected to return a single row, returning a single column from it. @@ -411,8 +411,10 @@ class SQLBaseStore(object): allow_none : If true, return None instead of failing if the SELECT statement returns no rows """ - return self._simple_selectupdate_one( - table, keyvalues, retcols=retcols, allow_none=allow_none + return self.runInteraction( + desc, + self._simple_select_one_txn, + table, keyvalues, retcols, allow_none, ) def _simple_select_one_onecol(self, table, keyvalues, retcol, @@ -523,7 +525,7 @@ class SQLBaseStore(object): return self.cursor_to_dict(txn) def _simple_update_one(self, table, keyvalues, updatevalues, - retcols=None): + desc="_simple_update_one"): """Executes an UPDATE query on the named table, setting new values for columns in a row matching the key values. @@ -541,51 +543,70 @@ class SQLBaseStore(object): get-and-set. This can be used to implement compare-and-set by putting the update column in the 'keyvalues' dict as well. """ - return self._simple_selectupdate_one(table, keyvalues, updatevalues, - retcols=retcols) + return self.runInteraction( + desc, + self._simple_update_one_txn, + table, keyvalues, updatevalues, + ) + + def _simple_update_one_txn(self, txn, table, keyvalues, updatevalues): + update_sql = "UPDATE %s SET %s WHERE %s" % ( + table, + ", ".join("%s = ?" % (k,) for k in updatevalues), + " AND ".join("%s = ?" % (k,) for k in keyvalues) + ) + + txn.execute( + update_sql, + updatevalues.values() + keyvalues.values() + ) + + if txn.rowcount == 0: + raise StoreError(404, "No row found") + if txn.rowcount > 1: + raise StoreError(500, "More than one row matched") + + def _simple_select_one_txn(self, txn, table, keyvalues, retcols, + allow_none=False): + select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( + ", ".join(retcols), + table, + " AND ".join("%s = ?" % (k) for k in keyvalues) + ) + + txn.execute(select_sql, keyvalues.values()) + + row = txn.fetchone() + if not row: + if allow_none: + return None + raise StoreError(404, "No row found") + if txn.rowcount > 1: + raise StoreError(500, "More than one row matched") + + return dict(zip(retcols, row)) def _simple_selectupdate_one(self, table, keyvalues, updatevalues=None, retcols=None, allow_none=False): """ Combined SELECT then UPDATE.""" - if retcols: - select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( - ", ".join(retcols), - table, - " AND ".join("%s = ?" % (k) for k in keyvalues) - ) - - if updatevalues: - update_sql = "UPDATE %s SET %s WHERE %s" % ( - table, - ", ".join("%s = ?" % (k,) for k in updatevalues), - " AND ".join("%s = ?" % (k,) for k in keyvalues) - ) - def func(txn): ret = None if retcols: - txn.execute(select_sql, keyvalues.values()) - - row = txn.fetchone() - if not row: - if allow_none: - return None - raise StoreError(404, "No row found") - if txn.rowcount > 1: - raise StoreError(500, "More than one row matched") - - ret = dict(zip(retcols, row)) - - if updatevalues: - txn.execute( - update_sql, - updatevalues.values() + keyvalues.values() + ret = self._simple_select_one_txn( + txn, + table=table, + keyvalues=keyvalues, + retcols=retcols, + allow_none=allow_none, ) - if txn.rowcount == 0: - raise StoreError(404, "No row found") - if txn.rowcount > 1: - raise StoreError(500, "More than one row matched") + if updatevalues: + self._simple_update_one_txn( + txn, + table=table, + keyvalues=keyvalues, + updatevalues=updatevalues, + ) return ret return self.runInteraction("_simple_selectupdate_one", func) diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 1dcd34723..0084d67e5 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -45,7 +45,6 @@ class PresenceStore(SQLBaseStore): updatevalues={"state": new_state["state"], "status_msg": new_state["status_msg"], "mtime": self._clock.time_msec()}, - retcols=["state"], ) def allow_presence_visible(self, observed_localpart, observer_userid): diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py index 55fbffa7a..7f5845cf0 100644 --- a/tests/storage/test_base.py +++ b/tests/storage/test_base.py @@ -180,7 +180,7 @@ class SQLBaseStoreTestCase(unittest.TestCase): self.mock_txn.rowcount = 1 self.mock_txn.fetchone.return_value = ("Old Value",) - ret = yield self.datastore._simple_update_one( + ret = yield self.datastore._simple_selectupdate_one( table="tablename", keyvalues={"keycol": "TheKey"}, updatevalues={"columname": "New Value"}, diff --git a/tests/storage/test_room.py b/tests/storage/test_room.py index c88dd446f..ab7625a3c 100644 --- a/tests/storage/test_room.py +++ b/tests/storage/test_room.py @@ -44,7 +44,7 @@ class RoomStoreTestCase(unittest.TestCase): @defer.inlineCallbacks def test_get_room(self): - self.assertObjectHasAttributes( + self.assertDictContainsSubset( {"room_id": self.room.to_string(), "creator": self.u_creator.to_string(), "is_public": True}, From fce01140057c54714b591aa327095dcee9936b4b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Mar 2015 15:05:44 +0000 Subject: [PATCH 39/50] Start removing Tables --- synapse/storage/room.py | 35 ++++++++++++++++----------------- synapse/storage/transactions.py | 20 +++++++++++-------- 2 files changed, 29 insertions(+), 26 deletions(-) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 71bae1545..a16d32192 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -15,11 +15,9 @@ from twisted.internet import defer -from sqlite3 import IntegrityError - from synapse.api.errors import StoreError -from ._base import SQLBaseStore, Table +from ._base import SQLBaseStore import collections import logging @@ -27,8 +25,9 @@ import logging logger = logging.getLogger(__name__) -OpsLevel = collections.namedtuple("OpsLevel", ( - "ban_level", "kick_level", "redact_level") +OpsLevel = collections.namedtuple( + "OpsLevel", + ("ban_level", "kick_level", "redact_level",) ) @@ -47,13 +46,14 @@ class RoomStore(SQLBaseStore): StoreError if the room could not be stored. """ try: - yield self._simple_insert(RoomsTable.table_name, dict( - room_id=room_id, - creator=room_creator_user_id, - is_public=is_public - )) - except IntegrityError: - raise StoreError(409, "Room ID in use.") + yield self._simple_insert( + RoomsTable.table_name, + { + "room_id": room_id, + "creator": room_creator_user_id, + "is_public": is_public, + } + ) except Exception as e: logger.error("store_room with room_id=%s failed: %s", room_id, e) raise StoreError(500, "Problem creating room.") @@ -66,9 +66,10 @@ class RoomStore(SQLBaseStore): Returns: A namedtuple containing the room information, or an empty list. """ - query = RoomsTable.select_statement("room_id=?") - return self._execute( - "get_room", RoomsTable.decode_single_result, query, room_id, + return self._simple_select_one( + table=RoomsTable.table_name, + keyvalues={"room_id": room_id}, + retcols=RoomsTable.fields, ) @defer.inlineCallbacks @@ -196,7 +197,7 @@ class RoomStore(SQLBaseStore): defer.returnValue((name, aliases)) -class RoomsTable(Table): +class RoomsTable(object): table_name = "rooms" fields = [ @@ -204,5 +205,3 @@ class RoomsTable(Table): "is_public", "creator" ] - - EntryType = collections.namedtuple("RoomEntry", fields) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index 0b8a3b7a0..b777395e0 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -46,15 +46,19 @@ class TransactionStore(SQLBaseStore): ) def _get_received_txn_response(self, txn, transaction_id, origin): - where_clause = "transaction_id = ? AND origin = ?" - query = ReceivedTransactionsTable.select_statement(where_clause) + result = self._simple_select_one_txn( + txn, + table=ReceivedTransactionsTable.table_name, + keyvalues={ + "transaction_id": transaction_id, + "origin": origin, + }, + retcols=ReceivedTransactionsTable.fields, + allow_none=True, + ) - txn.execute(query, (transaction_id, origin)) - - results = ReceivedTransactionsTable.decode_results(txn.fetchall()) - - if results and results[0].response_code: - return (results[0].response_code, results[0].response_json) + if result and result.response_code: + return result["response_code"], result["response_json"] else: return None From ceb61daa70d30b56584bab61e17e68fd868d5264 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 20 Mar 2015 15:44:06 +0000 Subject: [PATCH 40/50] Add the tiniest of tiny one-element caches to get_room_events_max_id() as it's read every time someone hits eventstream --- synapse/storage/__init__.py | 1 + synapse/storage/stream.py | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 4b16f445d..70cb8a3ae 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -111,6 +111,7 @@ class DataStore(RoomMemberStore, RoomStore, is_new_state=is_new_state, current_state=current_state, ) + self.get_room_events_max_id.invalidate() except _RollbackButIsFineException: pass diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 09bc52221..850ab9e0e 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -35,7 +35,7 @@ what sort order was used: from twisted.internet import defer -from ._base import SQLBaseStore +from ._base import SQLBaseStore, cached from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.util.logutils import log_function @@ -413,6 +413,7 @@ class StreamStore(SQLBaseStore): "get_recent_events_for_room", get_recent_events_for_room_txn ) + @cached(num_args=0) def get_room_events_max_id(self): return self.runInteraction( "get_room_events_max_id", From dc0c989ef43ecfe3b4159e0cf16c50a7f38c1f20 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Mar 2015 15:59:18 +0000 Subject: [PATCH 41/50] Give sensible names for '_simple_...' transactions --- synapse/storage/__init__.py | 4 +++- synapse/storage/_base.py | 37 ++++++++++++++++------------- synapse/storage/directory.py | 7 +++++- synapse/storage/filtering.py | 1 + synapse/storage/media_repository.py | 20 +++++++++++----- synapse/storage/presence.py | 11 +++++++++ synapse/storage/profile.py | 5 ++++ synapse/storage/push_rule.py | 9 ++++--- synapse/storage/pusher.py | 16 +++++++++---- synapse/storage/registration.py | 9 +++++-- synapse/storage/rejections.py | 3 ++- synapse/storage/room.py | 6 +++-- synapse/storage/roommember.py | 3 ++- 13 files changed, 93 insertions(+), 38 deletions(-) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 4295f7348..76e7bdfae 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -83,7 +83,8 @@ class DataStore(RoomMemberStore, RoomStore, "ip": ip, "user_agent": user_agent, "last_seen": int(self._clock.time_msec()), - } + }, + desc="insert_client_ip", ) def get_user_ip_and_agents(self, user): @@ -93,6 +94,7 @@ class DataStore(RoomMemberStore, RoomStore, retcols=[ "device_id", "access_token", "ip", "user_agent", "last_seen" ], + desc="get_user_ip_and_agents", ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index a6f94de01..0aab9a8af 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -321,7 +321,8 @@ class SQLBaseStore(object): # "Simple" SQL API methods that operate on a single table with no JOINs, # no complex WHERE clauses, just a dict of values for columns. - def _simple_insert(self, table, values, or_replace=False, or_ignore=False): + def _simple_insert(self, table, values, or_replace=False, or_ignore=False, + desc="_simple_insert"): """Executes an INSERT query on the named table. Args: @@ -330,7 +331,7 @@ class SQLBaseStore(object): or_replace : bool; if True performs an INSERT OR REPLACE """ return self.runInteraction( - "_simple_insert", + desc, self._simple_insert_txn, table, values, or_replace=or_replace, or_ignore=or_ignore, ) @@ -354,7 +355,7 @@ class SQLBaseStore(object): txn.execute(sql, values.values()) return txn.lastrowid - def _simple_upsert(self, table, keyvalues, values): + def _simple_upsert(self, table, keyvalues, values, desc="_simple_upsert"): """ Args: table (str): The table to upsert into @@ -363,7 +364,7 @@ class SQLBaseStore(object): Returns: A deferred """ return self.runInteraction( - "_simple_upsert", + desc, self._simple_upsert_txn, table, keyvalues, values ) @@ -418,7 +419,8 @@ class SQLBaseStore(object): ) def _simple_select_one_onecol(self, table, keyvalues, retcol, - allow_none=False): + allow_none=False, + desc="_simple_select_one_onecol"): """Executes a SELECT query on the named table, which is expected to return a single row, returning a single column from it." @@ -428,7 +430,7 @@ class SQLBaseStore(object): retcol : string giving the name of the column to return """ return self.runInteraction( - "_simple_select_one_onecol", + desc, self._simple_select_one_onecol_txn, table, keyvalues, retcol, allow_none=allow_none, ) @@ -464,7 +466,8 @@ class SQLBaseStore(object): return [r[0] for r in txn.fetchall()] - def _simple_select_onecol(self, table, keyvalues, retcol): + def _simple_select_onecol(self, table, keyvalues, retcol, + desc="_simple_select_onecol"): """Executes a SELECT query on the named table, which returns a list comprising of the values of the named column from the selected rows. @@ -477,12 +480,13 @@ class SQLBaseStore(object): Deferred: Results in a list """ return self.runInteraction( - "_simple_select_onecol", + desc, self._simple_select_onecol_txn, table, keyvalues, retcol ) - def _simple_select_list(self, table, keyvalues, retcols): + def _simple_select_list(self, table, keyvalues, retcols, + desc="_simple_select_list"): """Executes a SELECT query on the named table, which may return zero or more rows, returning the result as a list of dicts. @@ -493,7 +497,7 @@ class SQLBaseStore(object): retcols : list of strings giving the names of the columns to return """ return self.runInteraction( - "_simple_select_list", + desc, self._simple_select_list_txn, table, keyvalues, retcols ) @@ -587,7 +591,8 @@ class SQLBaseStore(object): return dict(zip(retcols, row)) def _simple_selectupdate_one(self, table, keyvalues, updatevalues=None, - retcols=None, allow_none=False): + retcols=None, allow_none=False, + desc="_simple_selectupdate_one"): """ Combined SELECT then UPDATE.""" def func(txn): ret = None @@ -609,9 +614,9 @@ class SQLBaseStore(object): ) return ret - return self.runInteraction("_simple_selectupdate_one", func) + return self.runInteraction(desc, func) - def _simple_delete_one(self, table, keyvalues): + def _simple_delete_one(self, table, keyvalues, desc="_simple_delete_one"): """Executes a DELETE query on the named table, expecting to delete a single row. @@ -630,9 +635,9 @@ class SQLBaseStore(object): raise StoreError(404, "No row found") if txn.rowcount > 1: raise StoreError(500, "more than one row matched") - return self.runInteraction("_simple_delete_one", func) + return self.runInteraction(desc, func) - def _simple_delete(self, table, keyvalues): + def _simple_delete(self, table, keyvalues, desc="_simple_delete"): """Executes a DELETE query on the named table. Args: @@ -640,7 +645,7 @@ class SQLBaseStore(object): keyvalues : dict of column names and values to select the row with """ - return self.runInteraction("_simple_delete", self._simple_delete_txn) + return self.runInteraction(desc, self._simple_delete_txn) def _simple_delete_txn(self, txn, table, keyvalues): sql = "DELETE FROM %s WHERE %s" % ( diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 68b7d5969..6672752fe 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -48,6 +48,7 @@ class DirectoryStore(SQLBaseStore): {"room_alias": room_alias.to_string()}, "room_id", allow_none=True, + desc="get_association_from_room_alias", ) if not room_id: @@ -58,6 +59,7 @@ class DirectoryStore(SQLBaseStore): "room_alias_servers", {"room_alias": room_alias.to_string()}, "server", + desc="get_association_from_room_alias", ) if not servers: @@ -87,6 +89,7 @@ class DirectoryStore(SQLBaseStore): "room_alias": room_alias.to_string(), "room_id": room_id, }, + desc="create_room_alias_association", ) except sqlite3.IntegrityError: raise SynapseError( @@ -100,7 +103,8 @@ class DirectoryStore(SQLBaseStore): { "room_alias": room_alias.to_string(), "server": server, - } + }, + desc="create_room_alias_association", ) def delete_room_alias(self, room_alias): @@ -139,4 +143,5 @@ class DirectoryStore(SQLBaseStore): "room_aliases", {"room_id": room_id}, "room_alias", + desc="get_aliases_for_room", ) diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py index 457a11fd0..880011657 100644 --- a/synapse/storage/filtering.py +++ b/synapse/storage/filtering.py @@ -31,6 +31,7 @@ class FilteringStore(SQLBaseStore): }, retcol="filter_json", allow_none=False, + desc="get_user_filter", ) defer.returnValue(json.loads(def_json)) diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py index 7101d2bee..7bf57234f 100644 --- a/synapse/storage/media_repository.py +++ b/synapse/storage/media_repository.py @@ -32,6 +32,7 @@ class MediaRepositoryStore(SQLBaseStore): {"media_id": media_id}, ("media_type", "media_length", "upload_name", "created_ts"), allow_none=True, + desc="get_local_media", ) def store_local_media(self, media_id, media_type, time_now_ms, upload_name, @@ -45,7 +46,8 @@ class MediaRepositoryStore(SQLBaseStore): "upload_name": upload_name, "media_length": media_length, "user_id": user_id.to_string(), - } + }, + desc="store_local_media", ) def get_local_media_thumbnails(self, media_id): @@ -55,7 +57,8 @@ class MediaRepositoryStore(SQLBaseStore): ( "thumbnail_width", "thumbnail_height", "thumbnail_method", "thumbnail_type", "thumbnail_length", - ) + ), + desc="get_local_media_thumbnails", ) def store_local_thumbnail(self, media_id, thumbnail_width, @@ -70,7 +73,8 @@ class MediaRepositoryStore(SQLBaseStore): "thumbnail_method": thumbnail_method, "thumbnail_type": thumbnail_type, "thumbnail_length": thumbnail_length, - } + }, + desc="store_local_thumbnail", ) def get_cached_remote_media(self, origin, media_id): @@ -82,6 +86,7 @@ class MediaRepositoryStore(SQLBaseStore): "filesystem_id", ), allow_none=True, + desc="get_cached_remote_media", ) def store_cached_remote_media(self, origin, media_id, media_type, @@ -97,7 +102,8 @@ class MediaRepositoryStore(SQLBaseStore): "created_ts": time_now_ms, "upload_name": upload_name, "filesystem_id": filesystem_id, - } + }, + desc="store_cached_remote_media", ) def get_remote_media_thumbnails(self, origin, media_id): @@ -107,7 +113,8 @@ class MediaRepositoryStore(SQLBaseStore): ( "thumbnail_width", "thumbnail_height", "thumbnail_method", "thumbnail_type", "thumbnail_length", "filesystem_id", - ) + ), + desc="get_remote_media_thumbnails", ) def store_remote_media_thumbnail(self, origin, media_id, filesystem_id, @@ -125,5 +132,6 @@ class MediaRepositoryStore(SQLBaseStore): "thumbnail_type": thumbnail_type, "thumbnail_length": thumbnail_length, "filesystem_id": filesystem_id, - } + }, + desc="store_remote_media_thumbnail", ) diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 0084d67e5..87fba5543 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -21,6 +21,7 @@ class PresenceStore(SQLBaseStore): return self._simple_insert( table="presence", values={"user_id": user_localpart}, + desc="create_presence", ) def has_presence_state(self, user_localpart): @@ -29,6 +30,7 @@ class PresenceStore(SQLBaseStore): keyvalues={"user_id": user_localpart}, retcols=["user_id"], allow_none=True, + desc="has_presence_state", ) def get_presence_state(self, user_localpart): @@ -36,6 +38,7 @@ class PresenceStore(SQLBaseStore): table="presence", keyvalues={"user_id": user_localpart}, retcols=["state", "status_msg", "mtime"], + desc="get_presence_state", ) def set_presence_state(self, user_localpart, new_state): @@ -45,6 +48,7 @@ class PresenceStore(SQLBaseStore): updatevalues={"state": new_state["state"], "status_msg": new_state["status_msg"], "mtime": self._clock.time_msec()}, + desc="set_presence_state", ) def allow_presence_visible(self, observed_localpart, observer_userid): @@ -52,6 +56,7 @@ class PresenceStore(SQLBaseStore): table="presence_allow_inbound", values={"observed_user_id": observed_localpart, "observer_user_id": observer_userid}, + desc="allow_presence_visible", ) def disallow_presence_visible(self, observed_localpart, observer_userid): @@ -59,6 +64,7 @@ class PresenceStore(SQLBaseStore): table="presence_allow_inbound", keyvalues={"observed_user_id": observed_localpart, "observer_user_id": observer_userid}, + desc="disallow_presence_visible", ) def is_presence_visible(self, observed_localpart, observer_userid): @@ -68,6 +74,7 @@ class PresenceStore(SQLBaseStore): "observer_user_id": observer_userid}, retcols=["observed_user_id"], allow_none=True, + desc="is_presence_visible", ) def add_presence_list_pending(self, observer_localpart, observed_userid): @@ -76,6 +83,7 @@ class PresenceStore(SQLBaseStore): values={"user_id": observer_localpart, "observed_user_id": observed_userid, "accepted": False}, + desc="add_presence_list_pending", ) def set_presence_list_accepted(self, observer_localpart, observed_userid): @@ -84,6 +92,7 @@ class PresenceStore(SQLBaseStore): keyvalues={"user_id": observer_localpart, "observed_user_id": observed_userid}, updatevalues={"accepted": True}, + desc="set_presence_list_accepted", ) def get_presence_list(self, observer_localpart, accepted=None): @@ -95,6 +104,7 @@ class PresenceStore(SQLBaseStore): table="presence_list", keyvalues=keyvalues, retcols=["observed_user_id", "accepted"], + desc="get_presence_list", ) def del_presence_list(self, observer_localpart, observed_userid): @@ -102,4 +112,5 @@ class PresenceStore(SQLBaseStore): table="presence_list", keyvalues={"user_id": observer_localpart, "observed_user_id": observed_userid}, + desc="del_presence_list", ) diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py index 153c7ad02..a6e52cb24 100644 --- a/synapse/storage/profile.py +++ b/synapse/storage/profile.py @@ -21,6 +21,7 @@ class ProfileStore(SQLBaseStore): return self._simple_insert( table="profiles", values={"user_id": user_localpart}, + desc="create_profile", ) def get_profile_displayname(self, user_localpart): @@ -28,6 +29,7 @@ class ProfileStore(SQLBaseStore): table="profiles", keyvalues={"user_id": user_localpart}, retcol="displayname", + desc="get_profile_displayname", ) def set_profile_displayname(self, user_localpart, new_displayname): @@ -35,6 +37,7 @@ class ProfileStore(SQLBaseStore): table="profiles", keyvalues={"user_id": user_localpart}, updatevalues={"displayname": new_displayname}, + desc="set_profile_displayname", ) def get_profile_avatar_url(self, user_localpart): @@ -42,6 +45,7 @@ class ProfileStore(SQLBaseStore): table="profiles", keyvalues={"user_id": user_localpart}, retcol="avatar_url", + desc="get_profile_avatar_url", ) def set_profile_avatar_url(self, user_localpart, new_avatar_url): @@ -49,4 +53,5 @@ class ProfileStore(SQLBaseStore): table="profiles", keyvalues={"user_id": user_localpart}, updatevalues={"avatar_url": new_avatar_url}, + desc="set_profile_avatar_url", ) diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index d769db2c7..c47bdc286 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -50,7 +50,8 @@ class PushRuleStore(SQLBaseStore): results = yield self._simple_select_list( PushRuleEnableTable.table_name, {'user_name': user_name}, - PushRuleEnableTable.fields + PushRuleEnableTable.fields, + desc="get_push_rules_enabled_for_user", ) defer.returnValue( {r['rule_id']: False if r['enabled'] == 0 else True for r in results} @@ -201,7 +202,8 @@ class PushRuleStore(SQLBaseStore): """ yield self._simple_delete_one( PushRuleTable.table_name, - {'user_name': user_name, 'rule_id': rule_id} + {'user_name': user_name, 'rule_id': rule_id}, + desc="delete_push_rule", ) @defer.inlineCallbacks @@ -209,7 +211,8 @@ class PushRuleStore(SQLBaseStore): yield self._simple_upsert( PushRuleEnableTable.table_name, {'user_name': user_name, 'rule_id': rule_id}, - {'enabled': enabled} + {'enabled': enabled}, + desc="set_push_rule_enabled", ) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 587dada68..000502b4f 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -114,7 +114,9 @@ class PusherStore(SQLBaseStore): ts=pushkey_ts, lang=lang, data=data - )) + ), + desc="add_pusher", + ) except Exception as e: logger.error("create_pusher with failed: %s", e) raise StoreError(500, "Problem creating pusher.") @@ -123,7 +125,8 @@ class PusherStore(SQLBaseStore): def delete_pusher_by_app_id_pushkey(self, app_id, pushkey): yield self._simple_delete_one( PushersTable.table_name, - dict(app_id=app_id, pushkey=pushkey) + {"app_id": app_id, "pushkey": pushkey}, + desc="delete_pusher_by_app_id_pushkey", ) @defer.inlineCallbacks @@ -131,7 +134,8 @@ class PusherStore(SQLBaseStore): yield self._simple_update_one( PushersTable.table_name, {'app_id': app_id, 'pushkey': pushkey}, - {'last_token': last_token} + {'last_token': last_token}, + desc="update_pusher_last_token", ) @defer.inlineCallbacks @@ -140,7 +144,8 @@ class PusherStore(SQLBaseStore): yield self._simple_update_one( PushersTable.table_name, {'app_id': app_id, 'pushkey': pushkey}, - {'last_token': last_token, 'last_success': last_success} + {'last_token': last_token, 'last_success': last_success}, + desc="update_pusher_last_token_and_success", ) @defer.inlineCallbacks @@ -148,7 +153,8 @@ class PusherStore(SQLBaseStore): yield self._simple_update_one( PushersTable.table_name, {'app_id': app_id, 'pushkey': pushkey}, - {'failing_since': failing_since} + {'failing_since': failing_since}, + desc="update_pusher_failing_since", ) diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 3c2f1d6a1..f24154f14 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -39,7 +39,10 @@ class RegistrationStore(SQLBaseStore): Raises: StoreError if there was a problem adding this. """ - row = yield self._simple_select_one("users", {"name": user_id}, ["id"]) + row = yield self._simple_select_one( + "users", {"name": user_id}, ["id"], + desc="add_access_token_to_user", + ) if not row: raise StoreError(400, "Bad user ID supplied.") row_id = row["id"] @@ -48,7 +51,8 @@ class RegistrationStore(SQLBaseStore): { "user_id": row_id, "token": token - } + }, + desc="add_access_token_to_user", ) @defer.inlineCallbacks @@ -120,6 +124,7 @@ class RegistrationStore(SQLBaseStore): keyvalues={"name": user.to_string()}, retcol="admin", allow_none=True, + desc="is_server_admin", ) defer.returnValue(res if res else False) diff --git a/synapse/storage/rejections.py b/synapse/storage/rejections.py index 4e1a9a278..0838eb3d1 100644 --- a/synapse/storage/rejections.py +++ b/synapse/storage/rejections.py @@ -29,7 +29,7 @@ class RejectionsStore(SQLBaseStore): "event_id": event_id, "reason": reason, "last_check": self._clock.time_msec(), - } + }, ) def get_rejection_reason(self, event_id): @@ -40,4 +40,5 @@ class RejectionsStore(SQLBaseStore): "event_id": event_id, }, allow_none=True, + desc="get_rejection_reason", ) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index a16d32192..be3e28c2e 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -52,7 +52,8 @@ class RoomStore(SQLBaseStore): "room_id": room_id, "creator": room_creator_user_id, "is_public": is_public, - } + }, + desc="store_room", ) except Exception as e: logger.error("store_room with room_id=%s failed: %s", room_id, e) @@ -70,6 +71,7 @@ class RoomStore(SQLBaseStore): table=RoomsTable.table_name, keyvalues={"room_id": room_id}, retcols=RoomsTable.fields, + desc="get_room", ) @defer.inlineCallbacks @@ -144,7 +146,7 @@ class RoomStore(SQLBaseStore): "event_id": event.event_id, "room_id": event.room_id, "topic": event.content["topic"], - } + }, ) def _store_room_name_txn(self, txn, event): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 65ffb4627..52c37c76f 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -212,7 +212,8 @@ class RoomMemberStore(SQLBaseStore): return self._simple_select_onecol( "room_hosts", {"room_id": room_id}, - "host" + "host", + desc="get_joined_hosts_for_room", ) def _get_members_by_dict(self, where_dict): From 80cd08c190000c0b49e476b0a8283317edab9a8c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 20 Mar 2015 16:03:25 +0000 Subject: [PATCH 42/50] PEP8 --- synapse/storage/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 0ada6029f..e53630a68 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -585,7 +585,7 @@ class SQLBaseStore(object): raise StoreError(500, "More than one row matched") def _simple_select_one_txn(self, txn, table, keyvalues, retcols, - allow_none=False): + allow_none=False): select_sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % ( ", ".join(retcols), table, From b1022ed8b5df2d9827cf0437574fce4154eb606e Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 20 Mar 2015 17:28:33 +0000 Subject: [PATCH 43/50] func(*EXPR) is valid Python syntax, really... --- synapse/storage/_base.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e53630a68..2552a74f8 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -100,8 +100,7 @@ def cached(max_entries=1000, num_args=1): cache_counter.inc_misses(name) ret = yield orig(self, *keyargs) - prefill_args = keyargs + (ret,) - prefill(*prefill_args) + prefill(*keyargs + (ret,)) defer.returnValue(ret) From 0f86312c4cb262ad1b69207dd46712707dee75bb Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 20 Mar 2015 18:13:49 +0000 Subject: [PATCH 44/50] Pull out the cache logic from the @cached wrapper into its own class we can reuse --- synapse/storage/_base.py | 89 ++++++++++++++++++++++--------------- tests/storage/test__base.py | 34 +++++++++++++- 2 files changed, 87 insertions(+), 36 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2552a74f8..27ea65a0f 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -53,6 +53,47 @@ cache_counter = metrics.register_cache( ) +class Cache(object): + + def __init__(self, name, max_entries=1000, keylen=1): + self.cache = OrderedDict() + + self.max_entries = max_entries + self.name = name + self.keylen = keylen + + caches_by_name[name] = self.cache + + def get(self, *keyargs): + if len(keyargs) != self.keylen: + raise ValueError("Expected a key to have %d items", self.keylen) + + if keyargs in self.cache: + cache_counter.inc_hits(self.name) + return self.cache[keyargs] + + cache_counter.inc_misses(self.name) + raise KeyError() + + def prefill(self, *args): # because I can't *keyargs, value + keyargs = args[:-1] + value = args[-1] + + if len(keyargs) != self.keylen: + raise ValueError("Expected a key to have %d items", self.keylen) + + while len(self.cache) > self.max_entries: + self.cache.popitem(last=False) + + self.cache[keyargs] = value + + def invalidate(self, *keyargs): + if len(keyargs) != self.keylen: + raise ValueError("Expected a key to have %d items", self.keylen) + + self.cache.pop(keyargs, None) + + # TODO(paul): # * consider other eviction strategies - LRU? def cached(max_entries=1000, num_args=1): @@ -70,48 +111,26 @@ def cached(max_entries=1000, num_args=1): calling the calculation function. """ def wrap(orig): - cache = OrderedDict() - name = orig.__name__ - - caches_by_name[name] = cache - - def prefill(*args): # because I can't *keyargs, value - keyargs = args[:-1] - value = args[-1] - - if len(keyargs) != num_args: - raise ValueError("Expected a call to have %d arguments", num_args) - - while len(cache) > max_entries: - cache.popitem(last=False) - - cache[keyargs] = value + cache = Cache( + name=orig.__name__, + max_entries=max_entries, + keylen=num_args, + ) @functools.wraps(orig) @defer.inlineCallbacks def wrapped(self, *keyargs): - if len(keyargs) != num_args: - raise ValueError("Expected a call to have %d arguments", num_args) + try: + defer.returnValue(cache.get(*keyargs)) + except KeyError: + ret = yield orig(self, *keyargs) - if keyargs in cache: - cache_counter.inc_hits(name) - defer.returnValue(cache[keyargs]) + cache.prefill(*keyargs + (ret,)) - cache_counter.inc_misses(name) - ret = yield orig(self, *keyargs) + defer.returnValue(ret) - prefill(*keyargs + (ret,)) - - defer.returnValue(ret) - - def invalidate(*keyargs): - if len(keyargs) != num_args: - raise ValueError("Expected a call to have %d arguments", num_args) - - cache.pop(keyargs, None) - - wrapped.invalidate = invalidate - wrapped.prefill = prefill + wrapped.invalidate = cache.invalidate + wrapped.prefill = cache.prefill return wrapped return wrap diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index 55d22f665..783abc2b0 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -17,7 +17,39 @@ from tests import unittest from twisted.internet import defer -from synapse.storage._base import cached +from synapse.storage._base import Cache, cached + + +class CacheTestCase(unittest.TestCase): + + def setUp(self): + self.cache = Cache("test") + + def test_empty(self): + failed = False + try: + self.cache.get("foo") + except KeyError: + failed = True + + self.assertTrue(failed) + + def test_hit(self): + self.cache.prefill("foo", 123) + + self.assertEquals(self.cache.get("foo"), 123) + + def test_invalidate(self): + self.cache.prefill("foo", 123) + self.cache.invalidate("foo") + + failed = False + try: + self.cache.get("foo") + except KeyError: + failed = True + + self.assertTrue(failed) class CacheDecoratorTestCase(unittest.TestCase): From a63b4f71013f6a4e96b2b703c3a469fc8a9a5d57 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 20 Mar 2015 17:08:15 +0000 Subject: [PATCH 45/50] Remember the 'last seen' time for a given user/IP/device combination and only bother INSERTing another if it's stale --- synapse/storage/__init__.py | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 76e7bdfae..c69d11261 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + +from ._base import Cache from .appservice import ApplicationServiceStore from .directory import DirectoryStore from .events import EventsStore @@ -51,6 +54,11 @@ SCHEMA_VERSION = 14 dir_path = os.path.abspath(os.path.dirname(__file__)) +# Number of msec of granularity to store the user IP 'last seen' time. Smaller +# times give more inserts into the database even for readonly API hits +# 120 seconds == 2 minutes +LAST_SEEN_GRANULARITY = 120*1000 + class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, @@ -73,8 +81,28 @@ class DataStore(RoomMemberStore, RoomStore, self.min_token_deferred = self._get_min_token() self.min_token = None + self.client_ip_last_seen = Cache( + name="client_ip_last_seen", + keylen=4, + ) + + @defer.inlineCallbacks def insert_client_ip(self, user, access_token, device_id, ip, user_agent): - return self._simple_insert( + now = int(self._clock.time_msec()) + key = (user.to_string(), access_token, device_id, ip) + + try: + last_seen = self.client_ip_last_seen.get(*key) + except KeyError: + last_seen = None + + # Rate-limited inserts + if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: + defer.returnValue(None) + + self.client_ip_last_seen.prefill(*key + (now,)) + + yield self._simple_insert( "user_ips", { "user": user.to_string(), @@ -82,7 +110,7 @@ class DataStore(RoomMemberStore, RoomStore, "device_id": device_id, "ip": ip, "user_agent": user_agent, - "last_seen": int(self._clock.time_msec()), + "last_seen": now, }, desc="insert_client_ip", ) From 72d84064094be60a907ca515739e2a4ea1af0bd5 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Fri, 20 Mar 2015 19:21:13 +0000 Subject: [PATCH 46/50] Put a cache on get_aliases_for_room --- synapse/storage/directory.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py index 6672752fe..0199539fe 100644 --- a/synapse/storage/directory.py +++ b/synapse/storage/directory.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import SQLBaseStore +from ._base import SQLBaseStore, cached from synapse.api.errors import SynapseError @@ -106,14 +106,19 @@ class DirectoryStore(SQLBaseStore): }, desc="create_room_alias_association", ) + self.get_aliases_for_room.invalidate(room_id) + @defer.inlineCallbacks def delete_room_alias(self, room_alias): - return self.runInteraction( + room_id = yield self.runInteraction( "delete_room_alias", self._delete_room_alias_txn, room_alias, ) + self.get_aliases_for_room.invalidate(room_id) + defer.returnValue(room_id) + def _delete_room_alias_txn(self, txn, room_alias): cursor = txn.execute( "SELECT room_id FROM room_aliases WHERE room_alias = ?", @@ -138,6 +143,7 @@ class DirectoryStore(SQLBaseStore): return room_id + @cached() def get_aliases_for_room(self, room_id): return self._simple_select_onecol( "room_aliases", From ed008e85a8a2d9254d4d6f23cc7eb47ee52d0989 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Mon, 23 Mar 2015 17:25:44 +0000 Subject: [PATCH 47/50] Reduce activity timer granularity to avoid too many quick updates (SYN-247) --- synapse/handlers/presence.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 731df0064..bbc7a0f20 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -33,6 +33,10 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) +# Don't bother bumping "last active" time if it differs by less than 60 seconds +LAST_ACTIVE_GRANULARITY = 60*1000 + + # TODO(paul): Maybe there's one of these I can steal from somewhere def partition(l, func): """Partition the list by the result of func applied to each element.""" @@ -282,6 +286,10 @@ class PresenceHandler(BaseHandler): if now is None: now = self.clock.time_msec() + prev_state = self._get_or_make_usercache(user) + if now - prev_state.state.get("last_active", 0) < LAST_ACTIVE_GRANULARITY: + return + self.changed_presencelike_data(user, {"last_active": now}) def changed_presencelike_data(self, user, state): From 7ab9f91a605d501cadee1c212eca2ef0467adc50 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 25 Mar 2015 18:50:43 +0000 Subject: [PATCH 48/50] Unit-test that Cache() key eviction is ordered --- tests/storage/test__base.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index 783abc2b0..b6853ba2d 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -51,6 +51,24 @@ class CacheTestCase(unittest.TestCase): self.assertTrue(failed) + def test_eviction(self): + cache = Cache("test", max_entries=2) + + cache.prefill(1, "one") + cache.prefill(2, "two") + cache.prefill(3, "three") # 1 will be evicted + + failed = False + try: + cache.get(1) + except KeyError: + failed = True + + self.assertTrue(failed) + + cache.get(2) + cache.get(3) + class CacheDecoratorTestCase(unittest.TestCase): From d6b3ea75d4eba6961242ce68d5df90557b00609b Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 25 Mar 2015 19:04:59 +0000 Subject: [PATCH 49/50] Implement the 'key in dict' test for LruCache() --- synapse/util/lrucache.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/synapse/util/lrucache.py b/synapse/util/lrucache.py index 65d579290..2f7b615f7 100644 --- a/synapse/util/lrucache.py +++ b/synapse/util/lrucache.py @@ -90,12 +90,16 @@ class LruCache(object): def cache_len(): return len(cache) + def cache_contains(key): + return key in cache + self.sentinel = object() self.get = cache_get self.set = cache_set self.setdefault = cache_set_default self.pop = cache_pop self.len = cache_len + self.contains = cache_contains def __getitem__(self, key): result = self.get(key, self.sentinel) @@ -114,3 +118,6 @@ class LruCache(object): def __len__(self): return self.len() + + def __contains__(self, key): + return self.contains(key) From 9ba6487b3fe985c4ec84b02d9804aea7e2df6c40 Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Wed, 25 Mar 2015 19:05:34 +0000 Subject: [PATCH 50/50] Allow a choice of LRU behaviour for Cache() by using LruCache() or OrderedDict() --- synapse/storage/_base.py | 20 ++++++++++++-------- tests/storage/test__base.py | 22 ++++++++++++++++++++++ 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 27ea65a0f..6fa63f052 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -55,10 +55,14 @@ cache_counter = metrics.register_cache( class Cache(object): - def __init__(self, name, max_entries=1000, keylen=1): - self.cache = OrderedDict() + def __init__(self, name, max_entries=1000, keylen=1, lru=False): + if lru: + self.cache = LruCache(max_size=max_entries) + self.max_entries = None + else: + self.cache = OrderedDict() + self.max_entries = max_entries - self.max_entries = max_entries self.name = name self.keylen = keylen @@ -82,8 +86,9 @@ class Cache(object): if len(keyargs) != self.keylen: raise ValueError("Expected a key to have %d items", self.keylen) - while len(self.cache) > self.max_entries: - self.cache.popitem(last=False) + if self.max_entries is not None: + while len(self.cache) >= self.max_entries: + self.cache.popitem(last=False) self.cache[keyargs] = value @@ -94,9 +99,7 @@ class Cache(object): self.cache.pop(keyargs, None) -# TODO(paul): -# * consider other eviction strategies - LRU? -def cached(max_entries=1000, num_args=1): +def cached(max_entries=1000, num_args=1, lru=False): """ A method decorator that applies a memoizing cache around the function. The function is presumed to take zero or more arguments, which are used in @@ -115,6 +118,7 @@ def cached(max_entries=1000, num_args=1): name=orig.__name__, max_entries=max_entries, keylen=num_args, + lru=lru, ) @functools.wraps(orig) diff --git a/tests/storage/test__base.py b/tests/storage/test__base.py index b6853ba2d..96caf8c4c 100644 --- a/tests/storage/test__base.py +++ b/tests/storage/test__base.py @@ -69,6 +69,28 @@ class CacheTestCase(unittest.TestCase): cache.get(2) cache.get(3) + def test_eviction_lru(self): + cache = Cache("test", max_entries=2, lru=True) + + cache.prefill(1, "one") + cache.prefill(2, "two") + + # Now access 1 again, thus causing 2 to be least-recently used + cache.get(1) + + cache.prefill(3, "three") + + failed = False + try: + cache.get(2) + except KeyError: + failed = True + + self.assertTrue(failed) + + cache.get(1) + cache.get(3) + class CacheDecoratorTestCase(unittest.TestCase):