diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 071abb899..aa15f73f3 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -46,6 +46,7 @@ class Codes(object): THREEPID_AUTH_FAILED = "M_THREEPID_AUTH_FAILED" THREEPID_IN_USE = "M_THREEPID_IN_USE" THREEPID_NOT_FOUND = "M_THREEPID_NOT_FOUND" + THREEPID_DENIED = "M_THREEPID_DENIED" INVALID_USERNAME = "M_INVALID_USERNAME" SERVER_NOT_TRUSTED = "M_SERVER_NOT_TRUSTED" diff --git a/synapse/config/registration.py b/synapse/config/registration.py index ef917fc9f..336959094 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -31,6 +31,8 @@ class RegistrationConfig(Config): strtobool(str(config["disable_registration"])) ) + self.registrations_require_3pid = config.get("registrations_require_3pid", []) + self.allowed_local_3pids = config.get("allowed_local_3pids", []) self.registration_shared_secret = config.get("registration_shared_secret") self.bcrypt_rounds = config.get("bcrypt_rounds", 12) @@ -52,6 +54,23 @@ class RegistrationConfig(Config): # Enable registration for new users. enable_registration: False + # The user must provide all of the below types of 3PID when registering. + # + # registrations_require_3pid: + # - email + # - msisdn + + # Mandate that users are only allowed to associate certain formats of + # 3PIDs with accounts on this server. + # + # allowed_local_3pids: + # - medium: email + # pattern: ".*@matrix\\.org" + # - medium: email + # pattern: ".*@vector\\.im" + # - medium: msisdn + # pattern: "\\+44" + # If set, allows registration by anyone who also has the shared # secret, even if registration is otherwise disabled. registration_shared_secret: "%(registration_shared_secret)s" diff --git a/synapse/config/repository.py b/synapse/config/repository.py index 6baa47493..25ea77738 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -16,6 +16,8 @@ from ._base import Config, ConfigError from collections import namedtuple +from synapse.util.module_loader import load_module + MISSING_NETADDR = ( "Missing netaddr library. This is required for URL preview API." @@ -36,6 +38,14 @@ ThumbnailRequirement = namedtuple( "ThumbnailRequirement", ["width", "height", "method", "media_type"] ) +MediaStorageProviderConfig = namedtuple( + "MediaStorageProviderConfig", ( + "store_local", # Whether to store newly uploaded local files + "store_remote", # Whether to store newly downloaded remote files + "store_synchronous", # Whether to wait for successful storage for local uploads + ), +) + def parse_thumbnail_requirements(thumbnail_sizes): """ Takes a list of dictionaries with "width", "height", and "method" keys @@ -73,16 +83,61 @@ class ContentRepositoryConfig(Config): self.media_store_path = self.ensure_directory(config["media_store_path"]) - self.backup_media_store_path = config.get("backup_media_store_path") - if self.backup_media_store_path: - self.backup_media_store_path = self.ensure_directory( - self.backup_media_store_path - ) + backup_media_store_path = config.get("backup_media_store_path") - self.synchronous_backup_media_store = config.get( + synchronous_backup_media_store = config.get( "synchronous_backup_media_store", False ) + storage_providers = config.get("media_storage_providers", []) + + if backup_media_store_path: + if storage_providers: + raise ConfigError( + "Cannot use both 'backup_media_store_path' and 'storage_providers'" + ) + + storage_providers = [{ + "module": "file_system", + "store_local": True, + "store_synchronous": synchronous_backup_media_store, + "store_remote": True, + "config": { + "directory": backup_media_store_path, + } + }] + + # This is a list of config that can be used to create the storage + # providers. The entries are tuples of (Class, class_config, + # MediaStorageProviderConfig), where Class is the class of the provider, + # the class_config the config to pass to it, and + # MediaStorageProviderConfig are options for StorageProviderWrapper. + # + # We don't create the storage providers here as not all workers need + # them to be started. + self.media_storage_providers = [] + + for provider_config in storage_providers: + # We special case the module "file_system" so as not to need to + # expose FileStorageProviderBackend + if provider_config["module"] == "file_system": + provider_config["module"] = ( + "synapse.rest.media.v1.storage_provider" + ".FileStorageProviderBackend" + ) + + provider_class, parsed_config = load_module(provider_config) + + wrapper_config = MediaStorageProviderConfig( + provider_config.get("store_local", False), + provider_config.get("store_remote", False), + provider_config.get("store_synchronous", False), + ) + + self.media_storage_providers.append( + (provider_class, parsed_config, wrapper_config,) + ) + self.uploads_path = self.ensure_directory(config["uploads_path"]) self.dynamic_thumbnails = config["dynamic_thumbnails"] self.thumbnail_requirements = parse_thumbnail_requirements( @@ -127,13 +182,19 @@ class ContentRepositoryConfig(Config): # Directory where uploaded images and attachments are stored. media_store_path: "%(media_store)s" - # A secondary directory where uploaded images and attachments are - # stored as a backup. - # backup_media_store_path: "%(media_store)s" - - # Whether to wait for successful write to backup media store before - # returning successfully. - # synchronous_backup_media_store: false + # Media storage providers allow media to be stored in different + # locations. + # media_storage_providers: + # - module: file_system + # # Whether to write new local files. + # store_local: false + # # Whether to write new remote media + # store_remote: false + # # Whether to block upload requests waiting for write to this + # # provider to complete + # store_synchronous: false + # config: + # directory: /mnt/some/other/directory # Directory where in-progress uploads are stored. uploads_path: "%(uploads_path)s" diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 5b808beac..9021d4d57 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -25,6 +25,7 @@ from synapse.http.client import CaptchaServerHttpClient from synapse import types from synapse.types import UserID from synapse.util.async import run_on_reactor +from synapse.util.threepids import check_3pid_allowed from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -293,7 +294,7 @@ class RegistrationHandler(BaseHandler): """ for c in threepidCreds: - logger.info("validating theeepidcred sid %s on id server %s", + logger.info("validating threepidcred sid %s on id server %s", c['sid'], c['idServer']) try: identity_handler = self.hs.get_handlers().identity_handler @@ -307,6 +308,11 @@ class RegistrationHandler(BaseHandler): logger.info("got threepid with medium '%s' and address '%s'", threepid['medium'], threepid['address']) + if not check_3pid_allowed(self.hs, threepid['medium'], threepid['address']): + raise RegistrationError( + 403, "Third party identifier is not allowed" + ) + @defer.inlineCallbacks def bind_emails(self, user_id, threepidCreds): """Links emails with a user ID and informs an identity server. diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py index f480aae61..1e783e5ff 100644 --- a/synapse/metrics/metric.py +++ b/synapse/metrics/metric.py @@ -15,6 +15,9 @@ from itertools import chain +import logging + +logger = logging.getLogger(__name__) def flatten(items): @@ -153,7 +156,11 @@ class CallbackMetric(BaseMetric): self.callback = callback def render(self): - value = self.callback() + try: + value = self.callback() + except Exception: + logger.exception("Failed to render %s", self.name) + return ["# FAILED to render " + self.name] if self.is_scalar(): return list(self._render_for_labels([], value)) diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index 32ed1d3ab..5c5fa8f7a 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -70,10 +70,15 @@ class RegisterRestServlet(ClientV1RestServlet): self.handlers = hs.get_handlers() def on_GET(self, request): + + require_email = 'email' in self.hs.config.registrations_require_3pid + require_msisdn = 'msisdn' in self.hs.config.registrations_require_3pid + + flows = [] if self.hs.config.enable_registration_captcha: - return ( - 200, - {"flows": [ + # only support the email-only flow if we don't require MSISDN 3PIDs + if not require_msisdn: + flows.extend([ { "type": LoginType.RECAPTCHA, "stages": [ @@ -82,27 +87,34 @@ class RegisterRestServlet(ClientV1RestServlet): LoginType.PASSWORD ] }, + ]) + # only support 3PIDless registration if no 3PIDs are required + if not require_email and not require_msisdn: + flows.extend([ { "type": LoginType.RECAPTCHA, "stages": [LoginType.RECAPTCHA, LoginType.PASSWORD] } - ]} - ) + ]) else: - return ( - 200, - {"flows": [ + # only support the email-only flow if we don't require MSISDN 3PIDs + if require_email or not require_msisdn: + flows.extend([ { "type": LoginType.EMAIL_IDENTITY, "stages": [ LoginType.EMAIL_IDENTITY, LoginType.PASSWORD ] - }, + } + ]) + # only support 3PIDless registration if no 3PIDs are required + if not require_email and not require_msisdn: + flows.extend([ { "type": LoginType.PASSWORD } - ]} - ) + ]) + return (200, {"flows": flows}) @defer.inlineCallbacks def on_POST(self, request): diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 682a0af9f..867ec8602 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -195,15 +195,20 @@ class RoomSendEventRestServlet(ClientV1RestServlet): requester = yield self.auth.get_user_by_req(request, allow_guest=True) content = parse_json_object_from_request(request) + event_dict = { + "type": event_type, + "content": content, + "room_id": room_id, + "sender": requester.user.to_string(), + } + + if 'ts' in request.args and requester.app_service: + event_dict['origin_server_ts'] = parse_integer(request, "ts", 0) + msg_handler = self.handlers.message_handler event = yield msg_handler.create_and_send_nonmember_event( requester, - { - "type": event_type, - "content": content, - "room_id": room_id, - "sender": requester.user.to_string(), - }, + event_dict, txn_id=txn_id, ) diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py index 385a3ad2e..30523995a 100644 --- a/synapse/rest/client/v2_alpha/account.py +++ b/synapse/rest/client/v2_alpha/account.py @@ -26,6 +26,7 @@ from synapse.http.servlet import ( ) from synapse.util.async import run_on_reactor from synapse.util.msisdn import phone_number_to_msisdn +from synapse.util.threepids import check_3pid_allowed from ._base import client_v2_patterns, interactive_auth_handler logger = logging.getLogger(__name__) @@ -47,6 +48,11 @@ class EmailPasswordRequestTokenRestServlet(RestServlet): 'id_server', 'client_secret', 'email', 'send_attempt' ]) + if not check_3pid_allowed(self.hs, "email", body['email']): + raise SynapseError( + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED, + ) + existingUid = yield self.hs.get_datastore().get_user_id_by_threepid( 'email', body['email'] ) @@ -78,6 +84,11 @@ class MsisdnPasswordRequestTokenRestServlet(RestServlet): msisdn = phone_number_to_msisdn(body['country'], body['phone_number']) + if not check_3pid_allowed(self.hs, "msisdn", msisdn): + raise SynapseError( + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED, + ) + existingUid = yield self.datastore.get_user_id_by_threepid( 'msisdn', msisdn ) @@ -217,6 +228,11 @@ class EmailThreepidRequestTokenRestServlet(RestServlet): if absent: raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM) + if not check_3pid_allowed(self.hs, "email", body['email']): + raise SynapseError( + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED, + ) + existingUid = yield self.datastore.get_user_id_by_threepid( 'email', body['email'] ) @@ -255,6 +271,11 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet): msisdn = phone_number_to_msisdn(body['country'], body['phone_number']) + if not check_3pid_allowed(self.hs, "msisdn", msisdn): + raise SynapseError( + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED, + ) + existingUid = yield self.datastore.get_user_id_by_threepid( 'msisdn', msisdn ) diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py index e9d88a889..3abfe3547 100644 --- a/synapse/rest/client/v2_alpha/register.py +++ b/synapse/rest/client/v2_alpha/register.py @@ -26,6 +26,7 @@ from synapse.http.servlet import ( RestServlet, parse_json_object_from_request, assert_params_in_request, parse_string ) from synapse.util.msisdn import phone_number_to_msisdn +from synapse.util.threepids import check_3pid_allowed from ._base import client_v2_patterns, interactive_auth_handler @@ -70,6 +71,11 @@ class EmailRegisterRequestTokenRestServlet(RestServlet): 'id_server', 'client_secret', 'email', 'send_attempt' ]) + if not check_3pid_allowed(self.hs, "email", body['email']): + raise SynapseError( + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED, + ) + existingUid = yield self.hs.get_datastore().get_user_id_by_threepid( 'email', body['email'] ) @@ -105,6 +111,11 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet): msisdn = phone_number_to_msisdn(body['country'], body['phone_number']) + if not check_3pid_allowed(self.hs, "msisdn", msisdn): + raise SynapseError( + 403, "Third party identifier is not allowed", Codes.THREEPID_DENIED, + ) + existingUid = yield self.hs.get_datastore().get_user_id_by_threepid( 'msisdn', msisdn ) @@ -305,31 +316,67 @@ class RegisterRestServlet(RestServlet): if 'x_show_msisdn' in body and body['x_show_msisdn']: show_msisdn = True + # FIXME: need a better error than "no auth flow found" for scenarios + # where we required 3PID for registration but the user didn't give one + require_email = 'email' in self.hs.config.registrations_require_3pid + require_msisdn = 'msisdn' in self.hs.config.registrations_require_3pid + + flows = [] if self.hs.config.enable_registration_captcha: - flows = [ - [LoginType.RECAPTCHA], - [LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA], - ] + # only support 3PIDless registration if no 3PIDs are required + if not require_email and not require_msisdn: + flows.extend([[LoginType.RECAPTCHA]]) + # only support the email-only flow if we don't require MSISDN 3PIDs + if not require_msisdn: + flows.extend([[LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA]]) + if show_msisdn: + # only support the MSISDN-only flow if we don't require email 3PIDs + if not require_email: + flows.extend([[LoginType.MSISDN, LoginType.RECAPTCHA]]) + # always let users provide both MSISDN & email flows.extend([ - [LoginType.MSISDN, LoginType.RECAPTCHA], [LoginType.MSISDN, LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA], ]) else: - flows = [ - [LoginType.DUMMY], - [LoginType.EMAIL_IDENTITY], - ] + # only support 3PIDless registration if no 3PIDs are required + if not require_email and not require_msisdn: + flows.extend([[LoginType.DUMMY]]) + # only support the email-only flow if we don't require MSISDN 3PIDs + if not require_msisdn: + flows.extend([[LoginType.EMAIL_IDENTITY]]) + if show_msisdn: + # only support the MSISDN-only flow if we don't require email 3PIDs + if not require_email or require_msisdn: + flows.extend([[LoginType.MSISDN]]) + # always let users provide both MSISDN & email flows.extend([ - [LoginType.MSISDN], - [LoginType.MSISDN, LoginType.EMAIL_IDENTITY], + [LoginType.MSISDN, LoginType.EMAIL_IDENTITY] ]) auth_result, params, session_id = yield self.auth_handler.check_auth( flows, body, self.hs.get_ip_from_request(request) ) + # Check that we're not trying to register a denied 3pid. + # + # the user-facing checks will probably already have happened in + # /register/email/requestToken when we requested a 3pid, but that's not + # guaranteed. + + if auth_result: + for login_type in [LoginType.EMAIL_IDENTITY, LoginType.MSISDN]: + if login_type in auth_result: + medium = auth_result[login_type].threepid['medium'] + address = auth_result[login_type].threepid['address'] + + if not check_3pid_allowed(self.hs, medium, address): + raise SynapseError( + 403, "Third party identifier is not allowed", + Codes.THREEPID_DENIED, + ) + if registered_user_id is not None: logger.info( "Already registered user ID %r for this session", diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index d61061434..485db8577 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -27,9 +27,7 @@ from .identicon_resource import IdenticonResource from .preview_url_resource import PreviewUrlResource from .filepath import MediaFilePaths from .thumbnailer import Thumbnailer -from .storage_provider import ( - StorageProviderWrapper, FileStorageProviderBackend, -) +from .storage_provider import StorageProviderWrapper from .media_storage import MediaStorage from synapse.http.matrixfederationclient import MatrixFederationHttpClient @@ -84,17 +82,13 @@ class MediaRepository(object): # potentially upload to. storage_providers = [] - # TODO: Move this into config and allow other storage providers to be - # defined. - if hs.config.backup_media_store_path: - backend = FileStorageProviderBackend( - self.primary_base_path, hs.config.backup_media_store_path, - ) + for clz, provider_config, wrapper_config in hs.config.media_storage_providers: + backend = clz(hs, provider_config) provider = StorageProviderWrapper( backend, - store=True, - store_synchronous=hs.config.synchronous_backup_media_store, - store_remote=True, + store_local=wrapper_config.store_local, + store_remote=wrapper_config.store_remote, + store_synchronous=wrapper_config.store_synchronous, ) storage_providers.append(provider) diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index 001e84578..041ae396c 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -164,6 +164,14 @@ class MediaStorage(object): str """ if file_info.url_cache: + if file_info.thumbnail: + return self.filepaths.url_cache_thumbnail_rel( + media_id=file_info.file_id, + width=file_info.thumbnail_width, + height=file_info.thumbnail_height, + content_type=file_info.thumbnail_type, + method=file_info.thumbnail_method, + ) return self.filepaths.url_cache_filepath_rel(file_info.file_id) if file_info.server_name: diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py index 2ad602e10..c188192f2 100644 --- a/synapse/rest/media/v1/storage_provider.py +++ b/synapse/rest/media/v1/storage_provider.py @@ -17,6 +17,7 @@ from twisted.internet import defer, threads from .media_storage import FileResponder +from synapse.config._base import Config from synapse.util.logcontext import preserve_fn import logging @@ -64,19 +65,19 @@ class StorageProviderWrapper(StorageProvider): Args: backend (StorageProvider) - store (bool): Whether to store new files or not. + store_local (bool): Whether to store new local files or not. store_synchronous (bool): Whether to wait for file to be successfully uploaded, or todo the upload in the backgroud. store_remote (bool): Whether remote media should be uploaded """ - def __init__(self, backend, store, store_synchronous, store_remote): + def __init__(self, backend, store_local, store_synchronous, store_remote): self.backend = backend - self.store = store + self.store_local = store_local self.store_synchronous = store_synchronous self.store_remote = store_remote def store_file(self, path, file_info): - if not self.store: + if not file_info.server_name and not self.store_local: return defer.succeed(None) if file_info.server_name and not self.store_remote: @@ -97,13 +98,13 @@ class FileStorageProviderBackend(StorageProvider): """A storage provider that stores files in a directory on a filesystem. Args: - cache_directory (str): Base path of the local media repository - base_directory (str): Base path to store new files + hs (HomeServer) + config: The config returned by `parse_config`. """ - def __init__(self, cache_directory, base_directory): - self.cache_directory = cache_directory - self.base_directory = base_directory + def __init__(self, hs, config): + self.cache_directory = hs.config.media_store_path + self.base_directory = config def store_file(self, path, file_info): """See StorageProvider.store_file""" @@ -125,3 +126,15 @@ class FileStorageProviderBackend(StorageProvider): backup_fname = os.path.join(self.base_directory, path) if os.path.isfile(backup_fname): return FileResponder(open(backup_fname, "rb")) + + @staticmethod + def parse_config(config): + """Called on startup to parse config supplied. This should parse + the config and raise if there is a problem. + + The returned value is passed into the constructor. + + In this case we only care about a single param, the directory, so let's + just pull that out. + """ + return Config.ensure_directory(config["directory"]) diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py index c09f2dec4..12e84a2b7 100644 --- a/synapse/rest/media/v1/thumbnail_resource.py +++ b/synapse/rest/media/v1/thumbnail_resource.py @@ -67,7 +67,7 @@ class ThumbnailResource(Resource): yield self._respond_local_thumbnail( request, media_id, width, height, method, m_type ) - self.media_repo.mark_recently_accessed(server_name, media_id) + self.media_repo.mark_recently_accessed(None, media_id) else: if self.dynamic_thumbnails: yield self._select_or_generate_remote_thumbnail( @@ -79,7 +79,7 @@ class ThumbnailResource(Resource): request, server_name, media_id, width, height, method, m_type ) - self.media_repo.mark_recently_accessed(None, media_id) + self.media_repo.mark_recently_accessed(server_name, media_id) @defer.inlineCallbacks def _respond_local_thumbnail(self, request, media_id, width, height, diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py new file mode 100644 index 000000000..90a2608d6 --- /dev/null +++ b/synapse/util/file_consumer.py @@ -0,0 +1,139 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import threads, reactor + +from synapse.util.logcontext import make_deferred_yieldable, preserve_fn + +import Queue + + +class BackgroundFileConsumer(object): + """A consumer that writes to a file like object. Supports both push + and pull producers + + Args: + file_obj (file): The file like object to write to. Closed when + finished. + """ + + # For PushProducers pause if we have this many unwritten slices + _PAUSE_ON_QUEUE_SIZE = 5 + # And resume once the size of the queue is less than this + _RESUME_ON_QUEUE_SIZE = 2 + + def __init__(self, file_obj): + self._file_obj = file_obj + + # Producer we're registered with + self._producer = None + + # True if PushProducer, false if PullProducer + self.streaming = False + + # For PushProducers, indicates whether we've paused the producer and + # need to call resumeProducing before we get more data. + self._paused_producer = False + + # Queue of slices of bytes to be written. When producer calls + # unregister a final None is sent. + self._bytes_queue = Queue.Queue() + + # Deferred that is resolved when finished writing + self._finished_deferred = None + + # If the _writer thread throws an exception it gets stored here. + self._write_exception = None + + def registerProducer(self, producer, streaming): + """Part of IConsumer interface + + Args: + producer (IProducer) + streaming (bool): True if push based producer, False if pull + based. + """ + if self._producer: + raise Exception("registerProducer called twice") + + self._producer = producer + self.streaming = streaming + self._finished_deferred = preserve_fn(threads.deferToThread)(self._writer) + if not streaming: + self._producer.resumeProducing() + + def unregisterProducer(self): + """Part of IProducer interface + """ + self._producer = None + if not self._finished_deferred.called: + self._bytes_queue.put_nowait(None) + + def write(self, bytes): + """Part of IProducer interface + """ + if self._write_exception: + raise self._write_exception + + if self._finished_deferred.called: + raise Exception("consumer has closed") + + self._bytes_queue.put_nowait(bytes) + + # If this is a PushProducer and the queue is getting behind + # then we pause the producer. + if self.streaming and self._bytes_queue.qsize() >= self._PAUSE_ON_QUEUE_SIZE: + self._paused_producer = True + self._producer.pauseProducing() + + def _writer(self): + """This is run in a background thread to write to the file. + """ + try: + while self._producer or not self._bytes_queue.empty(): + # If we've paused the producer check if we should resume the + # producer. + if self._producer and self._paused_producer: + if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE: + reactor.callFromThread(self._resume_paused_producer) + + bytes = self._bytes_queue.get() + + # If we get a None (or empty list) then that's a signal used + # to indicate we should check if we should stop. + if bytes: + self._file_obj.write(bytes) + + # If its a pull producer then we need to explicitly ask for + # more stuff. + if not self.streaming and self._producer: + reactor.callFromThread(self._producer.resumeProducing) + except Exception as e: + self._write_exception = e + raise + finally: + self._file_obj.close() + + def wait(self): + """Returns a deferred that resolves when finished writing to file + """ + return make_deferred_yieldable(self._finished_deferred) + + def _resume_paused_producer(self): + """Gets called if we should resume producing after being paused + """ + if self._paused_producer and self._producer: + self._paused_producer = False + self._producer.resumeProducing() diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 059bb7fed..e4b5687a4 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -28,7 +28,7 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) # total number of times we have hit this block -response_count = metrics.register_counter( +block_counter = metrics.register_counter( "block_count", labels=["block_name"], alternative_names=( @@ -76,7 +76,7 @@ block_db_txn_count = metrics.register_counter( block_db_txn_duration = metrics.register_counter( "block_db_txn_duration_seconds", labels=["block_name"], alternative_names=( - metrics.name_prefix + "_block_db_txn_count:total", + metrics.name_prefix + "_block_db_txn_duration:total", ), ) @@ -131,6 +131,8 @@ class Measure(object): return duration = self.clock.time_msec() - self.start + + block_counter.inc(self.name) block_timer.inc_by(duration, self.name) context = LoggingContext.current_context() diff --git a/synapse/util/threepids.py b/synapse/util/threepids.py new file mode 100644 index 000000000..75efa0117 --- /dev/null +++ b/synapse/util/threepids.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import re + +logger = logging.getLogger(__name__) + + +def check_3pid_allowed(hs, medium, address): + """Checks whether a given format of 3PID is allowed to be used on this HS + + Args: + hs (synapse.server.HomeServer): server + medium (str): 3pid medium - e.g. email, msisdn + address (str): address within that medium (e.g. "wotan@matrix.org") + msisdns need to first have been canonicalised + Returns: + bool: whether the 3PID medium/address is allowed to be added to this HS + """ + + if hs.config.allowed_local_3pids: + for constraint in hs.config.allowed_local_3pids: + logger.debug( + "Checking 3PID %s (%s) against %s (%s)", + address, medium, constraint['pattern'], constraint['medium'], + ) + if ( + medium == constraint['medium'] and + re.match(constraint['pattern'], address) + ): + return True + else: + return True + + return False diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index 81063f19a..74f104e3b 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -15,6 +15,8 @@ from twisted.internet import defer, reactor from tests import unittest +import tempfile + from mock import Mock, NonCallableMock from tests.utils import setup_test_homeserver from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory @@ -41,7 +43,9 @@ class BaseSlavedStoreTestCase(unittest.TestCase): self.event_id = 0 server_factory = ReplicationStreamProtocolFactory(self.hs) - listener = reactor.listenUNIX("\0xxx", server_factory) + # XXX: mktemp is unsafe and should never be used. but we're just a test. + path = tempfile.mktemp(prefix="base_slaved_store_test_case_socket") + listener = reactor.listenUNIX(path, server_factory) self.addCleanup(listener.stopListening) self.streamer = server_factory.streamer @@ -49,7 +53,7 @@ class BaseSlavedStoreTestCase(unittest.TestCase): client_factory = ReplicationClientFactory( self.hs, "client_name", self.replication_handler ) - client_connector = reactor.connectUNIX("\0xxx", client_factory) + client_connector = reactor.connectUNIX(path, client_factory) self.addCleanup(client_factory.stopTrying) self.addCleanup(client_connector.disconnect) diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py index 096f771be..8aba45651 100644 --- a/tests/rest/client/v2_alpha/test_register.py +++ b/tests/rest/client/v2_alpha/test_register.py @@ -49,6 +49,7 @@ class RegisterRestServletTestCase(unittest.TestCase): self.hs.get_auth_handler = Mock(return_value=self.auth_handler) self.hs.get_device_handler = Mock(return_value=self.device_handler) self.hs.config.enable_registration = True + self.hs.config.registrations_require_3pid = [] self.hs.config.auto_join_rooms = [] # init the thing we're testing diff --git a/tests/util/test_file_consumer.py b/tests/util/test_file_consumer.py new file mode 100644 index 000000000..76e223425 --- /dev/null +++ b/tests/util/test_file_consumer.py @@ -0,0 +1,176 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from twisted.internet import defer, reactor +from mock import NonCallableMock + +from synapse.util.file_consumer import BackgroundFileConsumer + +from tests import unittest +from StringIO import StringIO + +import threading + + +class FileConsumerTests(unittest.TestCase): + + @defer.inlineCallbacks + def test_pull_consumer(self): + string_file = StringIO() + consumer = BackgroundFileConsumer(string_file) + + try: + producer = DummyPullProducer() + + yield producer.register_with_consumer(consumer) + + yield producer.write_and_wait("Foo") + + self.assertEqual(string_file.getvalue(), "Foo") + + yield producer.write_and_wait("Bar") + + self.assertEqual(string_file.getvalue(), "FooBar") + finally: + consumer.unregisterProducer() + + yield consumer.wait() + + self.assertTrue(string_file.closed) + + @defer.inlineCallbacks + def test_push_consumer(self): + string_file = BlockingStringWrite() + consumer = BackgroundFileConsumer(string_file) + + try: + producer = NonCallableMock(spec_set=[]) + + consumer.registerProducer(producer, True) + + consumer.write("Foo") + yield string_file.wait_for_n_writes(1) + + self.assertEqual(string_file.buffer, "Foo") + + consumer.write("Bar") + yield string_file.wait_for_n_writes(2) + + self.assertEqual(string_file.buffer, "FooBar") + finally: + consumer.unregisterProducer() + + yield consumer.wait() + + self.assertTrue(string_file.closed) + + @defer.inlineCallbacks + def test_push_producer_feedback(self): + string_file = BlockingStringWrite() + consumer = BackgroundFileConsumer(string_file) + + try: + producer = NonCallableMock(spec_set=["pauseProducing", "resumeProducing"]) + + resume_deferred = defer.Deferred() + producer.resumeProducing.side_effect = lambda: resume_deferred.callback(None) + + consumer.registerProducer(producer, True) + + number_writes = 0 + with string_file.write_lock: + for _ in range(consumer._PAUSE_ON_QUEUE_SIZE): + consumer.write("Foo") + number_writes += 1 + + producer.pauseProducing.assert_called_once() + + yield string_file.wait_for_n_writes(number_writes) + + yield resume_deferred + producer.resumeProducing.assert_called_once() + finally: + consumer.unregisterProducer() + + yield consumer.wait() + + self.assertTrue(string_file.closed) + + +class DummyPullProducer(object): + def __init__(self): + self.consumer = None + self.deferred = defer.Deferred() + + def resumeProducing(self): + d = self.deferred + self.deferred = defer.Deferred() + d.callback(None) + + def write_and_wait(self, bytes): + d = self.deferred + self.consumer.write(bytes) + return d + + def register_with_consumer(self, consumer): + d = self.deferred + self.consumer = consumer + self.consumer.registerProducer(self, False) + return d + + +class BlockingStringWrite(object): + def __init__(self): + self.buffer = "" + self.closed = False + self.write_lock = threading.Lock() + + self._notify_write_deferred = None + self._number_of_writes = 0 + + def write(self, bytes): + with self.write_lock: + self.buffer += bytes + self._number_of_writes += 1 + + reactor.callFromThread(self._notify_write) + + def close(self): + self.closed = True + + def _notify_write(self): + "Called by write to indicate a write happened" + with self.write_lock: + if not self._notify_write_deferred: + return + d = self._notify_write_deferred + self._notify_write_deferred = None + d.callback(None) + + @defer.inlineCallbacks + def wait_for_n_writes(self, n): + "Wait for n writes to have happened" + while True: + with self.write_lock: + if n <= self._number_of_writes: + return + + if not self._notify_write_deferred: + self._notify_write_deferred = defer.Deferred() + + d = self._notify_write_deferred + + yield d