Merge branch 'develop' into application-services-txn-reliability

Conflicts:
	synapse/storage/appservice.py
This commit is contained in:
Kegan Dougal 2015-03-16 10:09:15 +00:00
commit f9232c7917
45 changed files with 1192 additions and 372 deletions

View File

@ -13,7 +13,6 @@ General:
* Notify when invited to a new room. * Notify when invited to a new room.
* Notify for messages that don't match any rule. * Notify for messages that don't match any rule.
* Notify on incoming call. * Notify on incoming call.
* Notify if there were no matching rules.
Federation: Federation:

View File

@ -1,3 +1,5 @@
.. contents::
Introduction Introduction
============ ============
@ -250,7 +252,8 @@ fix try re-installing from PyPI or directly from
ArchLinux ArchLinux
--------- ---------
If running `$ synctl start` fails wit 'returned non-zero exit status 1', you will need to explicitly call Python2.7 - either running as:: If running `$ synctl start` fails with 'returned non-zero exit status 1',
you will need to explicitly call Python2.7 - either running as::
$ python2.7 -m synapse.app.homeserver --daemonize -c homeserver.yaml --pid-file homeserver.pid $ python2.7 -m synapse.app.homeserver --daemonize -c homeserver.yaml --pid-file homeserver.pid

View File

@ -175,13 +175,12 @@ sub on_room_message
my $verto_connecting = $loop->new_future; my $verto_connecting = $loop->new_future;
$bot_verto->connect( $bot_verto->connect(
%{ $CONFIG{"verto-bot"} }, %{ $CONFIG{"verto-bot"} },
on_connected => sub {
warn("[Verto] connected to websocket");
$verto_connecting->done($bot_verto) if not $verto_connecting->is_done;
},
on_connect_error => sub { die "Cannot connect to verto - $_[-1]" }, on_connect_error => sub { die "Cannot connect to verto - $_[-1]" },
on_resolve_error => sub { die "Cannot resolve to verto - $_[-1]" }, on_resolve_error => sub { die "Cannot resolve to verto - $_[-1]" },
); )->then( sub {
warn("[Verto] connected to websocket");
$verto_connecting->done($bot_verto) if not $verto_connecting->is_done;
});
Future->needs_all( Future->needs_all(
$bot_matrix->login( %{ $CONFIG{"matrix-bot"} } )->then( sub { $bot_matrix->login( %{ $CONFIG{"matrix-bot"} } )->then( sub {

View File

@ -273,10 +273,14 @@ Future->needs_all(
{ {
"as_token": "$as_token", "as_token": "$as_token",
"url": "$as_url", "url": "$as_url",
"namespaces": { "users": ["\@\\\\+.*"] } "namespaces": { "users": [ { "regex": "\@\\\\+.*", "exclusive": false } ] }
} }
EOT EOT
), )->then( sub{
my ($response) = (@_);
warn $response->as_string if ($response->code != 200);
return Future->done;
}),
$verto_connecting, $verto_connecting,
)->get; )->get;

View File

@ -26,6 +26,7 @@ from synapse.server import HomeServer
from synapse.python_dependencies import check_requirements from synapse.python_dependencies import check_requirements
from twisted.internet import reactor from twisted.internet import reactor
from twisted.application import service
from twisted.enterprise import adbapi from twisted.enterprise import adbapi
from twisted.web.resource import Resource from twisted.web.resource import Resource
from twisted.web.static import File from twisted.web.static import File
@ -46,6 +47,7 @@ from synapse.crypto import context_factory
from synapse.util.logcontext import LoggingContext from synapse.util.logcontext import LoggingContext
from synapse.rest.client.v1 import ClientV1RestResource from synapse.rest.client.v1 import ClientV1RestResource
from synapse.rest.client.v2_alpha import ClientV2AlphaRestResource from synapse.rest.client.v2_alpha import ClientV2AlphaRestResource
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from daemonize import Daemonize from daemonize import Daemonize
import twisted.manhole.telnet import twisted.manhole.telnet
@ -99,6 +101,12 @@ class SynapseHomeServer(HomeServer):
def build_resource_for_server_key(self): def build_resource_for_server_key(self):
return LocalKey(self) return LocalKey(self)
def build_resource_for_metrics(self):
if self.get_config().enable_metrics:
return MetricsResource(self)
else:
return None
def build_db_pool(self): def build_db_pool(self):
return adbapi.ConnectionPool( return adbapi.ConnectionPool(
"sqlite3", self.get_db_name(), "sqlite3", self.get_db_name(),
@ -109,7 +117,7 @@ class SynapseHomeServer(HomeServer):
# so that :memory: sqlite works # so that :memory: sqlite works
) )
def create_resource_tree(self, web_client, redirect_root_to_web_client): def create_resource_tree(self, redirect_root_to_web_client):
"""Create the resource tree for this Home Server. """Create the resource tree for this Home Server.
This in unduly complicated because Twisted does not support putting This in unduly complicated because Twisted does not support putting
@ -121,6 +129,9 @@ class SynapseHomeServer(HomeServer):
location of the web client. This does nothing if web_client is not location of the web client. This does nothing if web_client is not
True. True.
""" """
config = self.get_config()
web_client = config.webclient
# list containing (path_str, Resource) e.g: # list containing (path_str, Resource) e.g:
# [ ("/aaa/bbb/cc", Resource1), ("/aaa/dummy", Resource2) ] # [ ("/aaa/bbb/cc", Resource1), ("/aaa/dummy", Resource2) ]
desired_tree = [ desired_tree = [
@ -144,6 +155,10 @@ class SynapseHomeServer(HomeServer):
else: else:
self.root_resource = Resource() self.root_resource = Resource()
metrics_resource = self.get_resource_for_metrics()
if config.metrics_port is None and metrics_resource is not None:
desired_tree.append((METRICS_PREFIX, metrics_resource))
# ideally we'd just use getChild and putChild but getChild doesn't work # ideally we'd just use getChild and putChild but getChild doesn't work
# unless you give it a Request object IN ADDITION to the name :/ So # unless you give it a Request object IN ADDITION to the name :/ So
# instead, we'll store a copy of this mapping so we can actually add # instead, we'll store a copy of this mapping so we can actually add
@ -205,17 +220,32 @@ class SynapseHomeServer(HomeServer):
""" """
return "%s-%s" % (resource, path_seg) return "%s-%s" % (resource, path_seg)
def start_listening(self, secure_port, unsecure_port): def start_listening(self):
if secure_port is not None: config = self.get_config()
if not config.no_tls and config.bind_port is not None:
reactor.listenSSL( reactor.listenSSL(
secure_port, Site(self.root_resource), self.tls_context_factory config.bind_port,
Site(self.root_resource),
self.tls_context_factory,
interface=config.bind_host
) )
logger.info("Synapse now listening on port %d", secure_port) logger.info("Synapse now listening on port %d", config.bind_port)
if unsecure_port is not None:
if config.unsecure_port is not None:
reactor.listenTCP( reactor.listenTCP(
unsecure_port, Site(self.root_resource) config.unsecure_port,
Site(self.root_resource),
interface=config.bind_host
) )
logger.info("Synapse now listening on port %d", unsecure_port) logger.info("Synapse now listening on port %d", config.unsecure_port)
metrics_resource = self.get_resource_for_metrics()
if metrics_resource and config.metrics_port is not None:
reactor.listenTCP(
config.metrics_port, Site(metrics_resource), interface="127.0.0.1",
)
logger.info("Metrics now running on 127.0.0.1 port %d", config.metrics_port)
def get_version_string(): def get_version_string():
@ -295,10 +325,19 @@ def change_resource_limit(soft_file_no):
logger.warn("Failed to set file limit: %s", e) logger.warn("Failed to set file limit: %s", e)
def setup(): def setup(config_options):
"""
Args:
config_options_options: The options passed to Synapse. Usually
`sys.argv[1:]`.
should_run (bool): Whether to start the reactor.
Returns:
HomeServer
"""
config = HomeServerConfig.load_config( config = HomeServerConfig.load_config(
"Synapse Homeserver", "Synapse Homeserver",
sys.argv[1:], config_options,
generate_section="Homeserver" generate_section="Homeserver"
) )
@ -330,7 +369,6 @@ def setup():
) )
hs.create_resource_tree( hs.create_resource_tree(
web_client=config.webclient,
redirect_root_to_web_client=True, redirect_root_to_web_client=True,
) )
@ -359,24 +397,47 @@ def setup():
f.namespace['hs'] = hs f.namespace['hs'] = hs
reactor.listenTCP(config.manhole, f, interface='127.0.0.1') reactor.listenTCP(config.manhole, f, interface='127.0.0.1')
bind_port = config.bind_port hs.start_listening()
if config.no_tls:
bind_port = None
hs.start_listening(bind_port, config.unsecure_port)
hs.get_pusherpool().start() hs.get_pusherpool().start()
hs.get_state_handler().start_caching() hs.get_state_handler().start_caching()
hs.get_datastore().start_profiling() hs.get_datastore().start_profiling()
hs.get_replication_layer().start_get_pdu_cache() hs.get_replication_layer().start_get_pdu_cache()
if config.daemonize: return hs
print config.pid_file
class SynapseService(service.Service):
"""A twisted Service class that will start synapse. Used to run synapse
via twistd and a .tac.
"""
def __init__(self, config):
self.config = config
def startService(self):
hs = setup(self.config)
change_resource_limit(hs.config.soft_file_limit)
def stopService(self):
return self._port.stopListening()
def run(hs):
def in_thread():
with LoggingContext("run"):
change_resource_limit(hs.config.soft_file_limit)
reactor.run()
if hs.config.daemonize:
print hs.config.pid_file
daemon = Daemonize( daemon = Daemonize(
app="synapse-homeserver", app="synapse-homeserver",
pid=config.pid_file, pid=hs.config.pid_file,
action=lambda: run(config), action=lambda: in_thread(),
auto_close_fds=False, auto_close_fds=False,
verbose=True, verbose=True,
logger=logger, logger=logger,
@ -384,20 +445,14 @@ def setup():
daemon.start() daemon.start()
else: else:
run(config) in_thread()
def run(config):
with LoggingContext("run"):
change_resource_limit(config.soft_file_limit)
reactor.run()
def main(): def main():
with LoggingContext("main"): with LoggingContext("main"):
check_requirements() check_requirements()
setup() hs = setup(sys.argv[1:])
run(hs)
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -23,11 +23,13 @@ from .captcha import CaptchaConfig
from .email import EmailConfig from .email import EmailConfig
from .voip import VoipConfig from .voip import VoipConfig
from .registration import RegistrationConfig from .registration import RegistrationConfig
from .metrics import MetricsConfig
class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
RatelimitConfig, ContentRepositoryConfig, CaptchaConfig, RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
EmailConfig, VoipConfig, RegistrationConfig,): EmailConfig, VoipConfig, RegistrationConfig,
MetricsConfig,):
pass pass

36
synapse/config/metrics.py Normal file
View File

@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ._base import Config
class MetricsConfig(Config):
def __init__(self, args):
super(MetricsConfig, self).__init__(args)
self.enable_metrics = args.enable_metrics
self.metrics_port = args.metrics_port
@classmethod
def add_arguments(cls, parser):
super(MetricsConfig, cls).add_arguments(parser)
metrics_group = parser.add_argument_group("metrics")
metrics_group.add_argument(
'--enable-metrics', dest="enable_metrics", action="store_true",
help="Enable collection and rendering of performance metrics"
)
metrics_group.add_argument(
'--metrics-port', metavar="PORT", type=int,
help="Separate port to accept metrics requests on (on localhost)"
)

View File

@ -25,6 +25,7 @@ from synapse.api.errors import (
from synapse.util.expiringcache import ExpiringCache from synapse.util.expiringcache import ExpiringCache
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.events import FrozenEvent from synapse.events import FrozenEvent
import synapse.metrics
from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
@ -36,9 +37,17 @@ import random
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# synapse.federation.federation_client is a silly name
metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
sent_pdus_destination_dist = metrics.register_distribution("sent_pdu_destinations")
sent_edus_counter = metrics.register_counter("sent_edus")
sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
class FederationClient(FederationBase): class FederationClient(FederationBase):
def __init__(self):
self._get_pdu_cache = None
def start_get_pdu_cache(self): def start_get_pdu_cache(self):
self._get_pdu_cache = ExpiringCache( self._get_pdu_cache = ExpiringCache(
@ -68,6 +77,8 @@ class FederationClient(FederationBase):
order = self._order order = self._order
self._order += 1 self._order += 1
sent_pdus_destination_dist.inc_by(len(destinations))
logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id) logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id)
# TODO, add errback, etc. # TODO, add errback, etc.
@ -87,6 +98,8 @@ class FederationClient(FederationBase):
content=content, content=content,
) )
sent_edus_counter.inc()
# TODO, add errback, etc. # TODO, add errback, etc.
self._transaction_queue.enqueue_edu(edu) self._transaction_queue.enqueue_edu(edu)
return defer.succeed(None) return defer.succeed(None)
@ -113,6 +126,8 @@ class FederationClient(FederationBase):
a Deferred which will eventually yield a JSON object from the a Deferred which will eventually yield a JSON object from the
response response
""" """
sent_queries_counter.inc(query_type)
return self.transport_layer.make_query( return self.transport_layer.make_query(
destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail
) )

View File

@ -22,6 +22,7 @@ from .units import Transaction, Edu
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logcontext import PreserveLoggingContext
from synapse.events import FrozenEvent from synapse.events import FrozenEvent
import synapse.metrics
from synapse.api.errors import FederationError, SynapseError from synapse.api.errors import FederationError, SynapseError
@ -32,6 +33,15 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# synapse.federation.federation_server is a silly name
metrics = synapse.metrics.get_metrics_for("synapse.federation.server")
received_pdus_counter = metrics.register_counter("received_pdus")
received_edus_counter = metrics.register_counter("received_edus")
received_queries_counter = metrics.register_counter("received_queries", labels=["type"])
class FederationServer(FederationBase): class FederationServer(FederationBase):
def set_handler(self, handler): def set_handler(self, handler):
@ -84,6 +94,8 @@ class FederationServer(FederationBase):
def on_incoming_transaction(self, transaction_data): def on_incoming_transaction(self, transaction_data):
transaction = Transaction(**transaction_data) transaction = Transaction(**transaction_data)
received_pdus_counter.inc_by(len(transaction.pdus))
for p in transaction.pdus: for p in transaction.pdus:
if "unsigned" in p: if "unsigned" in p:
unsigned = p["unsigned"] unsigned = p["unsigned"]
@ -153,6 +165,8 @@ class FederationServer(FederationBase):
defer.returnValue((200, response)) defer.returnValue((200, response))
def received_edu(self, origin, edu_type, content): def received_edu(self, origin, edu_type, content):
received_edus_counter.inc()
if edu_type in self.edu_handlers: if edu_type in self.edu_handlers:
self.edu_handlers[edu_type](origin, content) self.edu_handlers[edu_type](origin, content)
else: else:
@ -204,6 +218,8 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks @defer.inlineCallbacks
def on_query_request(self, query_type, args): def on_query_request(self, query_type, args):
received_queries_counter.inc(query_type)
if query_type in self.query_handlers: if query_type in self.query_handlers:
response = yield self.query_handlers[query_type](args) response = yield self.query_handlers[query_type](args)
defer.returnValue((200, response)) defer.returnValue((200, response))

View File

@ -25,12 +25,15 @@ from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.retryutils import ( from synapse.util.retryutils import (
get_retry_limiter, NotRetryingDestination, get_retry_limiter, NotRetryingDestination,
) )
import synapse.metrics
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
class TransactionQueue(object): class TransactionQueue(object):
"""This class makes sure we only have one transaction in flight at """This class makes sure we only have one transaction in flight at
@ -54,11 +57,25 @@ class TransactionQueue(object):
# done # done
self.pending_transactions = {} self.pending_transactions = {}
metrics.register_callback(
"pending_destinations",
lambda: len(self.pending_transactions),
)
# Is a mapping from destination -> list of # Is a mapping from destination -> list of
# tuple(pending pdus, deferred, order) # tuple(pending pdus, deferred, order)
self.pending_pdus_by_dest = {} self.pending_pdus_by_dest = pdus = {}
# destination -> list of tuple(edu, deferred) # destination -> list of tuple(edu, deferred)
self.pending_edus_by_dest = {} self.pending_edus_by_dest = edus = {}
metrics.register_callback(
"pending_pdus",
lambda: sum(map(len, pdus.values())),
)
metrics.register_callback(
"pending_edus",
lambda: sum(map(len, edus.values())),
)
# destination -> list of tuple(failure, deferred) # destination -> list of tuple(failure, deferred)
self.pending_failures_by_dest = {} self.pending_failures_by_dest = {}
@ -115,8 +132,8 @@ class TransactionQueue(object):
if not deferred.called: if not deferred.called:
deferred.errback(failure) deferred.errback(failure)
def log_failure(failure): def log_failure(f):
logger.warn("Failed to send pdu", failure.value) logger.warn("Failed to send pdu to %s: %s", destination, f.value)
deferred.addErrback(log_failure) deferred.addErrback(log_failure)
@ -143,8 +160,8 @@ class TransactionQueue(object):
if not deferred.called: if not deferred.called:
deferred.errback(failure) deferred.errback(failure)
def log_failure(failure): def log_failure(f):
logger.warn("Failed to send pdu", failure.value) logger.warn("Failed to send edu to %s: %s", destination, f.value)
deferred.addErrback(log_failure) deferred.addErrback(log_failure)
@ -174,7 +191,7 @@ class TransactionQueue(object):
deferred.errback(f) deferred.errback(f)
def log_failure(f): def log_failure(f):
logger.warn("Failed to send pdu", f.value) logger.warn("Failed to send failure to %s: %s", destination, f.value)
deferred.addErrback(log_failure) deferred.addErrback(log_failure)

View File

@ -19,6 +19,7 @@ from synapse.api.urls import FEDERATION_PREFIX as PREFIX
from synapse.api.errors import Codes, SynapseError from synapse.api.errors import Codes, SynapseError
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
import functools
import logging import logging
import simplejson as json import simplejson as json
import re import re
@ -30,8 +31,9 @@ logger = logging.getLogger(__name__)
class TransportLayerServer(object): class TransportLayerServer(object):
"""Handles incoming federation HTTP requests""" """Handles incoming federation HTTP requests"""
# A method just so we can pass 'self' as the authenticator to the Servlets
@defer.inlineCallbacks @defer.inlineCallbacks
def _authenticate_request(self, request): def authenticate_request(self, request):
json_request = { json_request = {
"method": request.method, "method": request.method,
"uri": request.uri, "uri": request.uri,
@ -93,28 +95,6 @@ class TransportLayerServer(object):
defer.returnValue((origin, content)) defer.returnValue((origin, content))
def _with_authentication(self, handler):
@defer.inlineCallbacks
def new_handler(request, *args, **kwargs):
try:
(origin, content) = yield self._authenticate_request(request)
with self.ratelimiter.ratelimit(origin) as d:
yield d
response = yield handler(
origin, content, request.args, *args, **kwargs
)
except:
logger.exception("_authenticate_request failed")
raise
defer.returnValue(response)
return new_handler
def rate_limit_origin(self, handler):
def new_handler(origin, *args, **kwargs):
response = yield handler(origin, *args, **kwargs)
defer.returnValue(response)
return new_handler()
@log_function @log_function
def register_received_handler(self, handler): def register_received_handler(self, handler):
""" Register a handler that will be fired when we receive data. """ Register a handler that will be fired when we receive data.
@ -122,14 +102,12 @@ class TransportLayerServer(object):
Args: Args:
handler (TransportReceivedHandler) handler (TransportReceivedHandler)
""" """
self.received_handler = handler FederationSendServlet(
handler,
# This is when someone is trying to send us a bunch of data. authenticator=self,
self.server.register_path( ratelimiter=self.ratelimiter,
"PUT", server_name=self.server_name,
re.compile("^" + PREFIX + "/send/([^/]*)/$"), ).register(self.server)
self._with_authentication(self._on_send_request)
)
@log_function @log_function
def register_request_handler(self, handler): def register_request_handler(self, handler):
@ -138,136 +116,65 @@ class TransportLayerServer(object):
Args: Args:
handler (TransportRequestHandler) handler (TransportRequestHandler)
""" """
self.request_handler = handler for servletclass in SERVLET_CLASSES:
servletclass(
handler,
authenticator=self,
ratelimiter=self.ratelimiter,
).register(self.server)
# This is for when someone asks us for everything since version X
self.server.register_path(
"GET",
re.compile("^" + PREFIX + "/pull/$"),
self._with_authentication(
lambda origin, content, query:
handler.on_pull_request(query["origin"][0], query["v"])
)
)
# This is when someone asks for a data item for a given server class BaseFederationServlet(object):
# data_id pair. def __init__(self, handler, authenticator, ratelimiter):
self.server.register_path( self.handler = handler
"GET", self.authenticator = authenticator
re.compile("^" + PREFIX + "/event/([^/]*)/$"), self.ratelimiter = ratelimiter
self._with_authentication(
lambda origin, content, query, event_id:
handler.on_pdu_request(origin, event_id)
)
)
# This is when someone asks for all data for a given context. def _wrap(self, code):
self.server.register_path( authenticator = self.authenticator
"GET", ratelimiter = self.ratelimiter
re.compile("^" + PREFIX + "/state/([^/]*)/$"),
self._with_authentication(
lambda origin, content, query, context:
handler.on_context_state_request(
origin,
context,
query.get("event_id", [None])[0],
)
)
)
self.server.register_path(
"GET",
re.compile("^" + PREFIX + "/backfill/([^/]*)/$"),
self._with_authentication(
lambda origin, content, query, context:
self._on_backfill_request(
origin, context, query["v"], query["limit"]
)
)
)
# This is when we receive a server-server Query
self.server.register_path(
"GET",
re.compile("^" + PREFIX + "/query/([^/]*)$"),
self._with_authentication(
lambda origin, content, query, query_type:
handler.on_query_request(
query_type,
{k: v[0].decode("utf-8") for k, v in query.items()}
)
)
)
self.server.register_path(
"GET",
re.compile("^" + PREFIX + "/make_join/([^/]*)/([^/]*)$"),
self._with_authentication(
lambda origin, content, query, context, user_id:
self._on_make_join_request(
origin, content, query, context, user_id
)
)
)
self.server.register_path(
"GET",
re.compile("^" + PREFIX + "/event_auth/([^/]*)/([^/]*)$"),
self._with_authentication(
lambda origin, content, query, context, event_id:
handler.on_event_auth(
origin, context, event_id,
)
)
)
self.server.register_path(
"PUT",
re.compile("^" + PREFIX + "/send_join/([^/]*)/([^/]*)$"),
self._with_authentication(
lambda origin, content, query, context, event_id:
self._on_send_join_request(
origin, content, query,
)
)
)
self.server.register_path(
"PUT",
re.compile("^" + PREFIX + "/invite/([^/]*)/([^/]*)$"),
self._with_authentication(
lambda origin, content, query, context, event_id:
self._on_invite_request(
origin, content, query,
)
)
)
self.server.register_path(
"POST",
re.compile("^" + PREFIX + "/query_auth/([^/]*)/([^/]*)$"),
self._with_authentication(
lambda origin, content, query, context, event_id:
self._on_query_auth_request(
origin, content, event_id,
)
)
)
self.server.register_path(
"POST",
re.compile("^" + PREFIX + "/get_missing_events/([^/]*)/?$"),
self._with_authentication(
lambda origin, content, query, room_id:
self._get_missing_events(
origin, content, room_id,
)
)
)
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @functools.wraps(code)
def _on_send_request(self, origin, content, query, transaction_id): def new_code(request, *args, **kwargs):
try:
(origin, content) = yield authenticator.authenticate_request(request)
with ratelimiter.ratelimit(origin) as d:
yield d
response = yield code(
origin, content, request.args, *args, **kwargs
)
except:
logger.exception("authenticate_request failed")
raise
defer.returnValue(response)
# Extra logic that functools.wraps() doesn't finish
new_code.__self__ = code.__self__
return new_code
def register(self, server):
pattern = re.compile("^" + PREFIX + self.PATH + "$")
for method in ("GET", "PUT", "POST"):
code = getattr(self, "on_%s" % (method), None)
if code is None:
continue
server.register_path(method, pattern, self._wrap(code))
class FederationSendServlet(BaseFederationServlet):
PATH = "/send/([^/]*)/"
def __init__(self, handler, server_name, **kwargs):
super(FederationSendServlet, self).__init__(handler, **kwargs)
self.server_name = server_name
# This is when someone is trying to send us a bunch of data.
@defer.inlineCallbacks
def on_PUT(self, origin, content, query, transaction_id):
""" Called on PUT /send/<transaction_id>/ """ Called on PUT /send/<transaction_id>/
Args: Args:
@ -305,8 +212,7 @@ class TransportLayerServer(object):
return return
try: try:
handler = self.received_handler code, response = yield self.handler.on_incoming_transaction(
code, response = yield handler.on_incoming_transaction(
transaction_data transaction_data
) )
except: except:
@ -315,65 +221,123 @@ class TransportLayerServer(object):
defer.returnValue((code, response)) defer.returnValue((code, response))
@log_function
def _on_backfill_request(self, origin, context, v_list, limits): class FederationPullServlet(BaseFederationServlet):
if not limits: PATH = "/pull/"
return defer.succeed(
(400, {"error": "Did not include limit param"}) # This is for when someone asks us for everything since version X
def on_GET(self, origin, content, query):
return self.handler.on_pull_request(query["origin"][0], query["v"])
class FederationEventServlet(BaseFederationServlet):
PATH = "/event/([^/]*)/"
# This is when someone asks for a data item for a given server data_id pair.
def on_GET(self, origin, content, query, event_id):
return self.handler.on_pdu_request(origin, event_id)
class FederationStateServlet(BaseFederationServlet):
PATH = "/state/([^/]*)/"
# This is when someone asks for all data for a given context.
def on_GET(self, origin, content, query, context):
return self.handler.on_context_state_request(
origin,
context,
query.get("event_id", [None])[0],
) )
class FederationBackfillServlet(BaseFederationServlet):
PATH = "/backfill/([^/]*)/"
def on_GET(self, origin, content, query, context):
versions = query["v"]
limits = query["limit"]
if not limits:
return defer.succeed((400, {"error": "Did not include limit param"}))
limit = int(limits[-1]) limit = int(limits[-1])
versions = v_list return self.handler.on_backfill_request(origin, context, versions, limit)
return self.request_handler.on_backfill_request(
origin, context, versions, limit class FederationQueryServlet(BaseFederationServlet):
PATH = "/query/([^/]*)"
# This is when we receive a server-server Query
def on_GET(self, origin, content, query, query_type):
return self.handler.on_query_request(
query_type,
{k: v[0].decode("utf-8") for k, v in query.items()}
) )
class FederationMakeJoinServlet(BaseFederationServlet):
PATH = "/make_join/([^/]*)/([^/]*)"
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function def on_GET(self, origin, content, query, context, user_id):
def _on_make_join_request(self, origin, content, query, context, user_id): content = yield self.handler.on_make_join_request(context, user_id)
content = yield self.request_handler.on_make_join_request(
context, user_id,
)
defer.returnValue((200, content)) defer.returnValue((200, content))
@defer.inlineCallbacks
@log_function
def _on_send_join_request(self, origin, content, query):
content = yield self.request_handler.on_send_join_request(
origin, content,
)
class FederationEventAuthServlet(BaseFederationServlet):
PATH = "/event_auth/([^/]*)/([^/]*)"
def on_GET(self, origin, content, query, context, event_id):
return self.handler.on_event_auth(origin, context, event_id)
class FederationSendJoinServlet(BaseFederationServlet):
PATH = "/send_join/([^/]*)/([^/]*)"
@defer.inlineCallbacks
def on_PUT(self, origin, content, query, context, event_id):
# TODO(paul): assert that context/event_id parsed from path actually
# match those given in content
content = yield self.handler.on_send_join_request(origin, content)
defer.returnValue((200, content)) defer.returnValue((200, content))
@defer.inlineCallbacks
@log_function
def _on_invite_request(self, origin, content, query):
content = yield self.request_handler.on_invite_request(
origin, content,
)
class FederationInviteServlet(BaseFederationServlet):
PATH = "/invite/([^/]*)/([^/]*)"
@defer.inlineCallbacks
def on_PUT(self, origin, content, query, context, event_id):
# TODO(paul): assert that context/event_id parsed from path actually
# match those given in content
content = yield self.handler.on_invite_request(origin, content)
defer.returnValue((200, content)) defer.returnValue((200, content))
class FederationQueryAuthServlet(BaseFederationServlet):
PATH = "/query_auth/([^/]*)/([^/]*)"
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function def on_POST(self, origin, content, query, context, event_id):
def _on_query_auth_request(self, origin, content, event_id): new_content = yield self.handler.on_query_auth_request(
new_content = yield self.request_handler.on_query_auth_request(
origin, content, event_id origin, content, event_id
) )
defer.returnValue((200, new_content)) defer.returnValue((200, new_content))
class FederationGetMissingEventsServlet(BaseFederationServlet):
# TODO(paul): Why does this path alone end with "/?" optional?
PATH = "/get_missing_events/([^/]*)/?"
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function def on_POST(self, origin, content, query, room_id):
def _get_missing_events(self, origin, content, room_id):
limit = int(content.get("limit", 10)) limit = int(content.get("limit", 10))
min_depth = int(content.get("min_depth", 0)) min_depth = int(content.get("min_depth", 0))
earliest_events = content.get("earliest_events", []) earliest_events = content.get("earliest_events", [])
latest_events = content.get("latest_events", []) latest_events = content.get("latest_events", [])
content = yield self.request_handler.on_get_missing_events( content = yield self.handler.on_get_missing_events(
origin, origin,
room_id=room_id, room_id=room_id,
earliest_events=earliest_events, earliest_events=earliest_events,
@ -383,3 +347,18 @@ class TransportLayerServer(object):
) )
defer.returnValue((200, content)) defer.returnValue((200, content))
SERVLET_CLASSES = (
FederationPullServlet,
FederationEventServlet,
FederationStateServlet,
FederationBackfillServlet,
FederationQueryServlet,
FederationMakeJoinServlet,
FederationEventServlet,
FederationSendJoinServlet,
FederationInviteServlet,
FederationQueryAuthServlet,
FederationGetMissingEventsServlet,
)

View File

@ -71,7 +71,7 @@ class EventStreamHandler(BaseHandler):
self._streams_per_user[auth_user] += 1 self._streams_per_user[auth_user] += 1
rm_handler = self.hs.get_handlers().room_member_handler rm_handler = self.hs.get_handlers().room_member_handler
room_ids = yield rm_handler.get_rooms_for_user(auth_user) room_ids = yield rm_handler.get_joined_rooms_for_user(auth_user)
if timeout: if timeout:
# If they've set a timeout set a minimum limit. # If they've set a timeout set a minimum limit.

View File

@ -21,6 +21,7 @@ from synapse.api.constants import PresenceState
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logcontext import PreserveLoggingContext
from synapse.types import UserID from synapse.types import UserID
import synapse.metrics
from ._base import BaseHandler from ._base import BaseHandler
@ -29,6 +30,8 @@ import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
# TODO(paul): Maybe there's one of these I can steal from somewhere # TODO(paul): Maybe there's one of these I can steal from somewhere
def partition(l, func): def partition(l, func):
@ -133,6 +136,11 @@ class PresenceHandler(BaseHandler):
self._user_cachemap = {} self._user_cachemap = {}
self._user_cachemap_latest_serial = 0 self._user_cachemap_latest_serial = 0
metrics.register_callback(
"userCachemap:size",
lambda: len(self._user_cachemap),
)
def _get_or_make_usercache(self, user): def _get_or_make_usercache(self, user):
"""If the cache entry doesn't exist, initialise a new one.""" """If the cache entry doesn't exist, initialise a new one."""
if user not in self._user_cachemap: if user not in self._user_cachemap:
@ -452,7 +460,7 @@ class PresenceHandler(BaseHandler):
# Also include people in all my rooms # Also include people in all my rooms
rm_handler = self.homeserver.get_handlers().room_member_handler rm_handler = self.homeserver.get_handlers().room_member_handler
room_ids = yield rm_handler.get_rooms_for_user(user) room_ids = yield rm_handler.get_joined_rooms_for_user(user)
if state is None: if state is None:
state = yield self.store.get_presence_state(user.localpart) state = yield self.store.get_presence_state(user.localpart)
@ -596,7 +604,7 @@ class PresenceHandler(BaseHandler):
localusers.add(user) localusers.add(user)
rm_handler = self.homeserver.get_handlers().room_member_handler rm_handler = self.homeserver.get_handlers().room_member_handler
room_ids = yield rm_handler.get_rooms_for_user(user) room_ids = yield rm_handler.get_joined_rooms_for_user(user)
if not localusers and not room_ids: if not localusers and not room_ids:
defer.returnValue(None) defer.returnValue(None)
@ -663,7 +671,7 @@ class PresenceHandler(BaseHandler):
) )
rm_handler = self.homeserver.get_handlers().room_member_handler rm_handler = self.homeserver.get_handlers().room_member_handler
room_ids = yield rm_handler.get_rooms_for_user(user) room_ids = yield rm_handler.get_joined_rooms_for_user(user)
if room_ids: if room_ids:
logger.debug(" | %d interested room IDs %r", len(room_ids), room_ids) logger.debug(" | %d interested room IDs %r", len(room_ids), room_ids)

View File

@ -197,9 +197,8 @@ class ProfileHandler(BaseHandler):
self.ratelimit(user.to_string()) self.ratelimit(user.to_string())
joins = yield self.store.get_rooms_for_user_where_membership_is( joins = yield self.store.get_rooms_for_user(
user.to_string(), user.to_string(),
[Membership.JOIN],
) )
for j in joins: for j in joins:

View File

@ -507,7 +507,7 @@ class RoomMemberHandler(BaseHandler):
defer.returnValue((is_remote_invite_join, room_host)) defer.returnValue((is_remote_invite_join, room_host))
@defer.inlineCallbacks @defer.inlineCallbacks
def get_rooms_for_user(self, user, membership_list=[Membership.JOIN]): def get_joined_rooms_for_user(self, user):
"""Returns a list of roomids that the user has any of the given """Returns a list of roomids that the user has any of the given
membership states in.""" membership states in."""
@ -517,8 +517,8 @@ class RoomMemberHandler(BaseHandler):
if app_service: if app_service:
rooms = yield self.store.get_app_service_rooms(app_service) rooms = yield self.store.get_app_service_rooms(app_service)
else: else:
rooms = yield self.store.get_rooms_for_user_where_membership_is( rooms = yield self.store.get_rooms_for_user(
user_id=user.to_string(), membership_list=membership_list user.to_string(),
) )
# For some reason the list of events contains duplicates # For some reason the list of events contains duplicates

View File

@ -96,7 +96,9 @@ class SyncHandler(BaseHandler):
return self.current_sync_for_user(sync_config, since_token) return self.current_sync_for_user(sync_config, since_token)
rm_handler = self.hs.get_handlers().room_member_handler rm_handler = self.hs.get_handlers().room_member_handler
room_ids = yield rm_handler.get_rooms_for_user(sync_config.user) room_ids = yield rm_handler.get_joined_rooms_for_user(
sync_config.user
)
result = yield self.notifier.wait_for_events( result = yield self.notifier.wait_for_events(
sync_config.user, room_ids, sync_config.user, room_ids,
sync_config.filter, timeout, current_sync_callback sync_config.filter, timeout, current_sync_callback
@ -227,7 +229,7 @@ class SyncHandler(BaseHandler):
logger.debug("Typing %r", typing_by_room) logger.debug("Typing %r", typing_by_room)
rm_handler = self.hs.get_handlers().room_member_handler rm_handler = self.hs.get_handlers().room_member_handler
room_ids = yield rm_handler.get_rooms_for_user(sync_config.user) room_ids = yield rm_handler.get_joined_rooms_for_user(sync_config.user)
# TODO (mjark): Does public mean "published"? # TODO (mjark): Does public mean "published"?
published_rooms = yield self.store.get_rooms(is_public=True) published_rooms = yield self.store.get_rooms(is_public=True)

View File

@ -15,6 +15,7 @@
from synapse.api.errors import CodeMessageException from synapse.api.errors import CodeMessageException
from syutil.jsonutil import encode_canonical_json from syutil.jsonutil import encode_canonical_json
import synapse.metrics
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
from twisted.web.client import ( from twisted.web.client import (
@ -31,6 +32,17 @@ import urllib
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
outgoing_requests_counter = metrics.register_counter(
"requests",
labels=["method"],
)
incoming_responses_counter = metrics.register_counter(
"responses",
labels=["method", "code"],
)
class SimpleHttpClient(object): class SimpleHttpClient(object):
""" """
@ -45,12 +57,30 @@ class SimpleHttpClient(object):
self.agent = Agent(reactor) self.agent = Agent(reactor)
self.version_string = hs.version_string self.version_string = hs.version_string
def request(self, method, *args, **kwargs):
# A small wrapper around self.agent.request() so we can easily attach
# counters to it
outgoing_requests_counter.inc(method)
d = self.agent.request(method, *args, **kwargs)
def _cb(response):
incoming_responses_counter.inc(method, response.code)
return response
def _eb(failure):
incoming_responses_counter.inc(method, "ERR")
return failure
d.addCallbacks(_cb, _eb)
return d
@defer.inlineCallbacks @defer.inlineCallbacks
def post_urlencoded_get_json(self, uri, args={}): def post_urlencoded_get_json(self, uri, args={}):
logger.debug("post_urlencoded_get_json args: %s", args) logger.debug("post_urlencoded_get_json args: %s", args)
query_bytes = urllib.urlencode(args, True) query_bytes = urllib.urlencode(args, True)
response = yield self.agent.request( response = yield self.request(
"POST", "POST",
uri.encode("ascii"), uri.encode("ascii"),
headers=Headers({ headers=Headers({
@ -70,7 +100,7 @@ class SimpleHttpClient(object):
logger.info("HTTP POST %s -> %s", json_str, uri) logger.info("HTTP POST %s -> %s", json_str, uri)
response = yield self.agent.request( response = yield self.request(
"POST", "POST",
uri.encode("ascii"), uri.encode("ascii"),
headers=Headers({ headers=Headers({
@ -104,7 +134,7 @@ class SimpleHttpClient(object):
query_bytes = urllib.urlencode(args, True) query_bytes = urllib.urlencode(args, True)
uri = "%s?%s" % (uri, query_bytes) uri = "%s?%s" % (uri, query_bytes)
response = yield self.agent.request( response = yield self.request(
"GET", "GET",
uri.encode("ascii"), uri.encode("ascii"),
headers=Headers({ headers=Headers({
@ -145,7 +175,7 @@ class SimpleHttpClient(object):
json_str = encode_canonical_json(json_body) json_str = encode_canonical_json(json_body)
response = yield self.agent.request( response = yield self.request(
"PUT", "PUT",
uri.encode("ascii"), uri.encode("ascii"),
headers=Headers({ headers=Headers({
@ -176,7 +206,7 @@ class CaptchaServerHttpClient(SimpleHttpClient):
def post_urlencoded_get_raw(self, url, args={}): def post_urlencoded_get_raw(self, url, args={}):
query_bytes = urllib.urlencode(args, True) query_bytes = urllib.urlencode(args, True)
response = yield self.agent.request( response = yield self.request(
"POST", "POST",
url.encode("ascii"), url.encode("ascii"),
bodyProducer=FileBodyProducer(StringIO(query_bytes)), bodyProducer=FileBodyProducer(StringIO(query_bytes)),

View File

@ -23,6 +23,7 @@ from twisted.web._newclient import ResponseDone
from synapse.http.endpoint import matrix_federation_endpoint from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util.async import sleep from synapse.util.async import sleep
from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logcontext import PreserveLoggingContext
import synapse.metrics
from syutil.jsonutil import encode_canonical_json from syutil.jsonutil import encode_canonical_json
@ -40,6 +41,17 @@ import urlparse
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
outgoing_requests_counter = metrics.register_counter(
"requests",
labels=["method"],
)
incoming_responses_counter = metrics.register_counter(
"responses",
labels=["method", "code"],
)
class MatrixFederationHttpAgent(_AgentBase): class MatrixFederationHttpAgent(_AgentBase):
@ -49,6 +61,8 @@ class MatrixFederationHttpAgent(_AgentBase):
def request(self, destination, endpoint, method, path, params, query, def request(self, destination, endpoint, method, path, params, query,
headers, body_producer): headers, body_producer):
outgoing_requests_counter.inc(method)
host = b"" host = b""
port = 0 port = 0
fragment = b"" fragment = b""
@ -59,10 +73,22 @@ class MatrixFederationHttpAgent(_AgentBase):
# Set the connection pool key to be the destination. # Set the connection pool key to be the destination.
key = destination key = destination
return self._requestWithEndpoint(key, endpoint, method, parsed_URI, d = self._requestWithEndpoint(key, endpoint, method, parsed_URI,
headers, body_producer, headers, body_producer,
parsed_URI.originForm) parsed_URI.originForm)
def _cb(response):
incoming_responses_counter.inc(method, response.code)
return response
def _eb(failure):
incoming_responses_counter.inc(method, "ERR")
return failure
d.addCallbacks(_cb, _eb)
return d
class MatrixFederationHttpClient(object): class MatrixFederationHttpClient(object):
"""HTTP client used to talk to other homeservers over the federation """HTTP client used to talk to other homeservers over the federation

View File

@ -18,6 +18,7 @@ from synapse.api.errors import (
cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError
) )
from synapse.util.logcontext import LoggingContext from synapse.util.logcontext import LoggingContext
import synapse.metrics
from syutil.jsonutil import ( from syutil.jsonutil import (
encode_canonical_json, encode_pretty_printed_json encode_canonical_json, encode_pretty_printed_json
@ -34,6 +35,17 @@ import urllib
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
incoming_requests_counter = metrics.register_counter(
"requests",
labels=["method", "servlet"],
)
outgoing_responses_counter = metrics.register_counter(
"responses",
labels=["method", "code"],
)
class HttpServer(object): class HttpServer(object):
""" Interface for registering callbacks on a HTTP server """ Interface for registering callbacks on a HTTP server
@ -74,6 +86,7 @@ class JsonResource(HttpServer, resource.Resource):
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.path_regexs = {} self.path_regexs = {}
self.version_string = hs.version_string self.version_string = hs.version_string
self.hs = hs
def register_path(self, method, path_pattern, callback): def register_path(self, method, path_pattern, callback):
self.path_regexs.setdefault(method, []).append( self.path_regexs.setdefault(method, []).append(
@ -87,7 +100,11 @@ class JsonResource(HttpServer, resource.Resource):
port (int): The port to listen on. port (int): The port to listen on.
""" """
reactor.listenTCP(port, server.Site(self)) reactor.listenTCP(
port,
server.Site(self),
interface=self.hs.config.bind_host
)
# Gets called by twisted # Gets called by twisted
def render(self, request): def render(self, request):
@ -131,6 +148,15 @@ class JsonResource(HttpServer, resource.Resource):
# returned response. We pass both the request and any # returned response. We pass both the request and any
# matched groups from the regex to the callback. # matched groups from the regex to the callback.
callback = path_entry.callback
servlet_instance = getattr(callback, "__self__", None)
if servlet_instance is not None:
servlet_classname = servlet_instance.__class__.__name__
else:
servlet_classname = "%r" % callback
incoming_requests_counter.inc(request.method, servlet_classname)
args = [ args = [
urllib.unquote(u).decode("UTF-8") for u in m.groups() urllib.unquote(u).decode("UTF-8") for u in m.groups()
] ]
@ -140,10 +166,7 @@ class JsonResource(HttpServer, resource.Resource):
request.method, request.path request.method, request.path
) )
code, response = yield path_entry.callback( code, response = yield callback(request, *args)
request,
*args
)
self._send_response(request, code, response) self._send_response(request, code, response)
return return
@ -190,6 +213,8 @@ class JsonResource(HttpServer, resource.Resource):
request) request)
return return
outgoing_responses_counter.inc(request.method, str(code))
# TODO: Only enable CORS for the requests that need it. # TODO: Only enable CORS for the requests that need it.
respond_with_json( respond_with_json(
request, code, response_json_object, request, code, response_json_object,

111
synapse/metrics/__init__.py Normal file
View File

@ -0,0 +1,111 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Because otherwise 'resource' collides with synapse.metrics.resource
from __future__ import absolute_import
import logging
from resource import getrusage, getpagesize, RUSAGE_SELF
from .metric import (
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric
)
logger = logging.getLogger(__name__)
# We'll keep all the available metrics in a single toplevel dict, one shared
# for the entire process. We don't currently support per-HomeServer instances
# of metrics, because in practice any one python VM will host only one
# HomeServer anyway. This makes a lot of implementation neater
all_metrics = {}
class Metrics(object):
""" A single Metrics object gives a (mutable) slice view of the all_metrics
dict, allowing callers to easily register new metrics that are namespaced
nicely."""
def __init__(self, name):
self.name_prefix = name
def _register(self, metric_class, name, *args, **kwargs):
full_name = "%s_%s" % (self.name_prefix, name)
metric = metric_class(full_name, *args, **kwargs)
all_metrics[full_name] = metric
return metric
def register_counter(self, *args, **kwargs):
return self._register(CounterMetric, *args, **kwargs)
def register_callback(self, *args, **kwargs):
return self._register(CallbackMetric, *args, **kwargs)
def register_distribution(self, *args, **kwargs):
return self._register(DistributionMetric, *args, **kwargs)
def register_cache(self, *args, **kwargs):
return self._register(CacheMetric, *args, **kwargs)
def get_metrics_for(pkg_name):
""" Returns a Metrics instance for conveniently creating metrics
namespaced with the given name prefix. """
# Convert a "package.name" to "package_name" because Prometheus doesn't
# let us use . in metric names
return Metrics(pkg_name.replace(".", "_"))
def render_all():
strs = []
# TODO(paul): Internal hack
update_resource_metrics()
for name in sorted(all_metrics.keys()):
try:
strs += all_metrics[name].render()
except Exception:
strs += ["# FAILED to render %s" % name]
logger.exception("Failed to render %s metric", name)
strs.append("") # to generate a final CRLF
return "\n".join(strs)
# Now register some standard process-wide state metrics, to give indications of
# process resource usage
rusage = None
PAGE_SIZE = getpagesize()
def update_resource_metrics():
global rusage
rusage = getrusage(RUSAGE_SELF)
resource_metrics = get_metrics_for("process.resource")
# msecs
resource_metrics.register_callback("utime", lambda: rusage.ru_utime * 1000)
resource_metrics.register_callback("stime", lambda: rusage.ru_stime * 1000)
# pages
resource_metrics.register_callback("maxrss", lambda: rusage.ru_maxrss * PAGE_SIZE)

155
synapse/metrics/metric.py Normal file
View File

@ -0,0 +1,155 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from itertools import chain
# TODO(paul): I can't believe Python doesn't have one of these
def map_concat(func, items):
# flatten a list-of-lists
return list(chain.from_iterable(map(func, items)))
class BaseMetric(object):
def __init__(self, name, labels=[]):
self.name = name
self.labels = labels # OK not to clone as we never write it
def dimension(self):
return len(self.labels)
def is_scalar(self):
return not len(self.labels)
def _render_labelvalue(self, value):
# TODO: some kind of value escape
return '"%s"' % (value)
def _render_key(self, values):
if self.is_scalar():
return ""
return "{%s}" % (
",".join(["%s=%s" % (k, self._render_labelvalue(v))
for k, v in zip(self.labels, values)])
)
def render(self):
return map_concat(self.render_item, sorted(self.counts.keys()))
class CounterMetric(BaseMetric):
"""The simplest kind of metric; one that stores a monotonically-increasing
integer that counts events."""
def __init__(self, *args, **kwargs):
super(CounterMetric, self).__init__(*args, **kwargs)
self.counts = {}
# Scalar metrics are never empty
if self.is_scalar():
self.counts[()] = 0
def inc_by(self, incr, *values):
if len(values) != self.dimension():
raise ValueError(
"Expected as many values to inc() as labels (%d)" % (self.dimension())
)
# TODO: should assert that the tag values are all strings
if values not in self.counts:
self.counts[values] = incr
else:
self.counts[values] += incr
def inc(self, *values):
self.inc_by(1, *values)
def render_item(self, k):
return ["%s%s %d" % (self.name, self._render_key(k), self.counts[k])]
class CallbackMetric(BaseMetric):
"""A metric that returns the numeric value returned by a callback whenever
it is rendered. Typically this is used to implement gauges that yield the
size or other state of some in-memory object by actively querying it."""
def __init__(self, name, callback, labels=[]):
super(CallbackMetric, self).__init__(name, labels=labels)
self.callback = callback
def render(self):
value = self.callback()
if self.is_scalar():
return ["%s %d" % (self.name, value)]
return ["%s%s %d" % (self.name, self._render_key(k), value[k])
for k in sorted(value.keys())]
class DistributionMetric(object):
"""A combination of an event counter and an accumulator, which counts
both the number of events and accumulates the total value. Typically this
could be used to keep track of method-running times, or other distributions
of values that occur in discrete occurances.
TODO(paul): Try to export some heatmap-style stats?
"""
def __init__(self, name, *args, **kwargs):
self.counts = CounterMetric(name + ":count", **kwargs)
self.totals = CounterMetric(name + ":total", **kwargs)
def inc_by(self, inc, *values):
self.counts.inc(*values)
self.totals.inc_by(inc, *values)
def render(self):
return self.counts.render() + self.totals.render()
class CacheMetric(object):
"""A combination of two CounterMetrics, one to count cache hits and one to
count a total, and a callback metric to yield the current size.
This metric generates standard metric name pairs, so that monitoring rules
can easily be applied to measure hit ratio."""
def __init__(self, name, size_callback, labels=[]):
self.name = name
self.hits = CounterMetric(name + ":hits", labels=labels)
self.total = CounterMetric(name + ":total", labels=labels)
self.size = CallbackMetric(
name + ":size",
callback=size_callback,
labels=labels,
)
def inc_hits(self, *values):
self.hits.inc(*values)
self.total.inc(*values)
def inc_misses(self, *values):
self.total.inc(*values)
def render(self):
return self.hits.render() + self.total.render() + self.size.render()

View File

@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.web.resource import Resource
import synapse.metrics
METRICS_PREFIX = "/_synapse/metrics"
class MetricsResource(Resource):
isLeaf = True
def __init__(self, hs):
Resource.__init__(self) # Resource is old-style, so no super()
self.hs = hs
def render_GET(self, request):
response = synapse.metrics.render_all()
request.setHeader("Content-Type", "text/plain")
request.setHeader("Content-Length", str(len(response)))
# Encode as UTF-8 (default)
return response.encode()

View File

@ -19,12 +19,27 @@ from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.async import run_on_reactor from synapse.util.async import run_on_reactor
from synapse.types import StreamToken from synapse.types import StreamToken
import synapse.metrics
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
notified_events_counter = metrics.register_counter("notified_events")
# TODO(paul): Should be shared somewhere
def count(func, l):
"""Return the number of items in l for which func returns true."""
n = 0
for x in l:
if func(x):
n += 1
return n
class _NotificationListener(object): class _NotificationListener(object):
""" This represents a single client connection to the events stream. """ This represents a single client connection to the events stream.
@ -59,6 +74,7 @@ class _NotificationListener(object):
try: try:
self.deferred.callback(result) self.deferred.callback(result)
notified_events_counter.inc_by(len(events))
except defer.AlreadyCalledError: except defer.AlreadyCalledError:
pass pass
@ -95,6 +111,35 @@ class Notifier(object):
"user_joined_room", self._user_joined_room "user_joined_room", self._user_joined_room
) )
# This is not a very cheap test to perform, but it's only executed
# when rendering the metrics page, which is likely once per minute at
# most when scraping it.
def count_listeners():
all_listeners = set()
for x in self.room_to_listeners.values():
all_listeners |= x
for x in self.user_to_listeners.values():
all_listeners |= x
for x in self.appservice_to_listeners.values():
all_listeners |= x
return len(all_listeners)
metrics.register_callback("listeners", count_listeners)
metrics.register_callback(
"rooms",
lambda: count(bool, self.room_to_listeners.values()),
)
metrics.register_callback(
"users",
lambda: count(bool, self.user_to_listeners.values()),
)
metrics.register_callback(
"appservices",
lambda: count(bool, self.appservice_to_listeners.values()),
)
@log_function @log_function
@defer.inlineCallbacks @defer.inlineCallbacks
def on_new_room_event(self, event, extra_users=[]): def on_new_room_event(self, event, extra_users=[]):

View File

@ -32,7 +32,7 @@ class Pusher(object):
INITIAL_BACKOFF = 1000 INITIAL_BACKOFF = 1000
MAX_BACKOFF = 60 * 60 * 1000 MAX_BACKOFF = 60 * 60 * 1000
GIVE_UP_AFTER = 24 * 60 * 60 * 1000 GIVE_UP_AFTER = 24 * 60 * 60 * 1000
DEFAULT_ACTIONS = ['dont-notify'] DEFAULT_ACTIONS = ['dont_notify']
INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$") INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$")
@ -105,7 +105,11 @@ class Pusher(object):
room_member_count += 1 room_member_count += 1
for r in rules: for r in rules:
if r['rule_id'] in enabled_map and not enabled_map[r['rule_id']]: if r['rule_id'] in enabled_map:
r['enabled'] = enabled_map[r['rule_id']]
elif 'enabled' not in r:
r['enabled'] = True
if not r['enabled']:
continue continue
matches = True matches = True
@ -124,13 +128,21 @@ class Pusher(object):
# ignore rules with no actions (we have an explict 'dont_notify') # ignore rules with no actions (we have an explict 'dont_notify')
if len(actions) == 0: if len(actions) == 0:
logger.warn( logger.warn(
"Ignoring rule id %s with no actions for user %s" % "Ignoring rule id %s with no actions for user %s",
(r['rule_id'], r['user_name']) r['rule_id'], self.user_name
) )
continue continue
if matches: if matches:
logger.info(
"%s matches for user %s, event %s",
r['rule_id'], self.user_name, ev['event_id']
)
defer.returnValue(actions) defer.returnValue(actions)
logger.info(
"No rules match for user %s, event %s",
self.user_name, ev['event_id']
)
defer.returnValue(Pusher.DEFAULT_ACTIONS) defer.returnValue(Pusher.DEFAULT_ACTIONS)
@staticmethod @staticmethod

View File

@ -6,36 +6,51 @@ def list_with_base_rules(rawrules, user_name):
# shove the server default rules for each kind onto the end of each # shove the server default rules for each kind onto the end of each
current_prio_class = PRIORITY_CLASS_INVERSE_MAP.keys()[-1] current_prio_class = PRIORITY_CLASS_INVERSE_MAP.keys()[-1]
ruleslist.extend(make_base_prepend_rules(
user_name, PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
))
for r in rawrules: for r in rawrules:
if r['priority_class'] < current_prio_class: if r['priority_class'] < current_prio_class:
while r['priority_class'] < current_prio_class: while r['priority_class'] < current_prio_class:
ruleslist.extend(make_base_rules( ruleslist.extend(make_base_append_rules(
user_name, user_name,
PRIORITY_CLASS_INVERSE_MAP[current_prio_class] PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
)) ))
current_prio_class -= 1 current_prio_class -= 1
if current_prio_class > 0:
ruleslist.extend(make_base_prepend_rules(
user_name,
PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
))
ruleslist.append(r) ruleslist.append(r)
while current_prio_class > 0: while current_prio_class > 0:
ruleslist.extend(make_base_rules( ruleslist.extend(make_base_append_rules(
user_name, user_name,
PRIORITY_CLASS_INVERSE_MAP[current_prio_class] PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
)) ))
current_prio_class -= 1 current_prio_class -= 1
if current_prio_class > 0:
ruleslist.extend(make_base_prepend_rules(
user_name,
PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
))
return ruleslist return ruleslist
def make_base_rules(user, kind): def make_base_append_rules(user, kind):
rules = [] rules = []
if kind == 'override': if kind == 'override':
rules = make_base_override_rules() rules = make_base_append_override_rules()
elif kind == 'underride': elif kind == 'underride':
rules = make_base_underride_rules(user) rules = make_base_append_underride_rules(user)
elif kind == 'content': elif kind == 'content':
rules = make_base_content_rules(user) rules = make_base_append_content_rules(user)
for r in rules: for r in rules:
r['priority_class'] = PRIORITY_CLASS_MAP[kind] r['priority_class'] = PRIORITY_CLASS_MAP[kind]
@ -44,7 +59,20 @@ def make_base_rules(user, kind):
return rules return rules
def make_base_content_rules(user): def make_base_prepend_rules(user, kind):
rules = []
if kind == 'override':
rules = make_base_prepend_override_rules()
for r in rules:
r['priority_class'] = PRIORITY_CLASS_MAP[kind]
r['default'] = True # Deprecated, left for backwards compat
return rules
def make_base_append_content_rules(user):
return [ return [
{ {
'rule_id': 'global/content/.m.rule.contains_user_name', 'rule_id': 'global/content/.m.rule.contains_user_name',
@ -68,10 +96,43 @@ def make_base_content_rules(user):
] ]
def make_base_override_rules(): def make_base_prepend_override_rules():
return [ return [
{ {
'rule_id': 'global/underride/.m.rule.suppress_notices', 'rule_id': 'global/override/.m.rule.master',
'enabled': False,
'conditions': [],
'actions': [
"dont_notify"
]
}
]
def make_base_append_override_rules():
return [
{
'rule_id': 'global/override/.m.rule.call',
'conditions': [
{
'kind': 'event_match',
'key': 'type',
'pattern': 'm.call.invite',
}
],
'actions': [
'notify',
{
'set_tweak': 'sound',
'value': 'ring'
}, {
'set_tweak': 'highlight',
'value': False
}
]
},
{
'rule_id': 'global/override/.m.rule.suppress_notices',
'conditions': [ 'conditions': [
{ {
'kind': 'event_match', 'kind': 'event_match',
@ -80,7 +141,7 @@ def make_base_override_rules():
} }
], ],
'actions': [ 'actions': [
'dont-notify', 'dont_notify',
] ]
}, },
{ {
@ -113,13 +174,16 @@ def make_base_override_rules():
{ {
'set_tweak': 'sound', 'set_tweak': 'sound',
'value': 'default' 'value': 'default'
}, {
'set_tweak': 'highlight',
'value': False
} }
] ]
} }
] ]
def make_base_underride_rules(user): def make_base_append_underride_rules(user):
return [ return [
{ {
'rule_id': 'global/underride/.m.rule.invite_for_me', 'rule_id': 'global/underride/.m.rule.invite_for_me',
@ -145,6 +209,9 @@ def make_base_underride_rules(user):
{ {
'set_tweak': 'sound', 'set_tweak': 'sound',
'value': 'default' 'value': 'default'
}, {
'set_tweak': 'highlight',
'value': False
} }
] ]
}, },
@ -158,7 +225,10 @@ def make_base_underride_rules(user):
} }
], ],
'actions': [ 'actions': [
'notify', 'notify', {
'set_tweak': 'highlight',
'value': False
}
] ]
}, },
{ {
@ -171,32 +241,10 @@ def make_base_underride_rules(user):
} }
], ],
'actions': [ 'actions': [
'notify', 'notify', {
] 'set_tweak': 'highlight',
}, 'value': False
{
'rule_id': 'global/underride/.m.rule.call',
'conditions': [
{
'kind': 'event_match',
'key': 'type',
'pattern': 'm.call.invite',
}
],
'actions': [
'notify',
{
'set_tweak': 'sound',
'value': 'ring'
} }
] ]
}, }
{
'rule_id': 'global/underride/.m.rule.fallback',
'conditions': [
],
'actions': [
'notify',
]
},
] ]

View File

@ -109,7 +109,7 @@ class HttpPusher(Pusher):
try: try:
resp = yield self.httpCli.post_json_get_json(self.url, notification_dict) resp = yield self.httpCli.post_json_get_json(self.url, notification_dict)
except: except:
logger.exception("Failed to push %s ", self.url) logger.warn("Failed to push %s ", self.url)
defer.returnValue(False) defer.returnValue(False)
rejected = [] rejected = []
if 'rejected' in resp: if 'rejected' in resp:

View File

@ -5,7 +5,7 @@ logger = logging.getLogger(__name__)
REQUIREMENTS = { REQUIREMENTS = {
"syutil>=0.0.3": ["syutil"], "syutil>=0.0.3": ["syutil"],
"matrix_angular_sdk>=0.6.4": ["syweb>=0.6.4"], "matrix_angular_sdk>=0.6.5": ["syweb>=0.6.5"],
"Twisted==14.0.2": ["twisted==14.0.2"], "Twisted==14.0.2": ["twisted==14.0.2"],
"service_identity>=1.0.0": ["service_identity>=1.0.0"], "service_identity>=1.0.0": ["service_identity>=1.0.0"],
"pyopenssl>=0.14": ["OpenSSL>=0.14"], "pyopenssl>=0.14": ["OpenSSL>=0.14"],
@ -36,8 +36,8 @@ DEPENDENCY_LINKS = [
), ),
github_link( github_link(
project="matrix-org/matrix-angular-sdk", project="matrix-org/matrix-angular-sdk",
version="v0.6.4", version="v0.6.5",
egg="matrix_angular_sdk-0.6.4", egg="matrix_angular_sdk-0.6.5",
), ),
] ]

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd # Copyright 2015 OpenMarket Ltd
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
# You may obtain a copy of the License at # You may obtain a copy of the License at
# #
@ -89,7 +89,8 @@ def _parse_json(request):
if type(content) != dict: if type(content) != dict:
raise SynapseError(400, "Content must be a JSON object.") raise SynapseError(400, "Content must be a JSON object.")
return content return content
except ValueError: except ValueError as e:
logger.warn(e)
raise SynapseError(400, "Content not JSON.") raise SynapseError(400, "Content not JSON.")

View File

@ -156,9 +156,12 @@ class PushRuleRestServlet(ClientV1RestServlet):
template_rule = _rule_to_template(r) template_rule = _rule_to_template(r)
if template_rule: if template_rule:
template_rule['enabled'] = True
if r['rule_id'] in enabled_map: if r['rule_id'] in enabled_map:
template_rule['enabled'] = enabled_map[r['rule_id']] template_rule['enabled'] = enabled_map[r['rule_id']]
elif 'enabled' in r:
template_rule['enabled'] = r['enabled']
else:
template_rule['enabled'] = True
rulearray.append(template_rule) rulearray.append(template_rule)
path = request.postpath[1:] path = request.postpath[1:]

View File

@ -56,6 +56,7 @@ class BaseHomeServer(object):
""" """
DEPENDENCIES = [ DEPENDENCIES = [
'config',
'clock', 'clock',
'http_client', 'http_client',
'db_name', 'db_name',
@ -79,6 +80,7 @@ class BaseHomeServer(object):
'resource_for_server_key', 'resource_for_server_key',
'resource_for_media_repository', 'resource_for_media_repository',
'resource_for_app_services', 'resource_for_app_services',
'resource_for_metrics',
'event_sources', 'event_sources',
'ratelimiter', 'ratelimiter',
'keyring', 'keyring',

View File

@ -453,7 +453,7 @@ class DataStore(RoomMemberStore, RoomStore,
else: else:
args = (room_id, ) args = (room_id, )
results = yield self._execute_and_decode(sql, *args) results = yield self._execute_and_decode("get_current_state", sql, *args)
events = yield self._parse_events(results) events = yield self._parse_events(results)
defer.returnValue(events) defer.returnValue(events)
@ -478,7 +478,7 @@ class DataStore(RoomMemberStore, RoomStore,
sql += " OR s.type = 'm.room.aliases')" sql += " OR s.type = 'm.room.aliases')"
args = (room_id,) args = (room_id,)
results = yield self._execute_and_decode(sql, *args) results = yield self._execute_and_decode("get_current_state", sql, *args)
events = yield self._parse_events(results) events = yield self._parse_events(results)
@ -487,8 +487,10 @@ class DataStore(RoomMemberStore, RoomStore,
for e in events: for e in events:
if e.type == 'm.room.name': if e.type == 'm.room.name':
if 'name' in e.content:
name = e.content['name'] name = e.content['name']
elif e.type == 'm.room.aliases': elif e.type == 'm.room.aliases':
if 'aliases' in e.content:
aliases.extend(e.content['aliases']) aliases.extend(e.content['aliases'])
defer.returnValue((name, aliases)) defer.returnValue((name, aliases))
@ -496,8 +498,7 @@ class DataStore(RoomMemberStore, RoomStore,
@defer.inlineCallbacks @defer.inlineCallbacks
def _get_min_token(self): def _get_min_token(self):
row = yield self._execute( row = yield self._execute(
None, "_get_min_token", None, "SELECT MIN(stream_ordering) FROM events"
"SELECT MIN(stream_ordering) FROM events"
) )
self.min_token = row[0][0] if row and row[0] and row[0][0] else -1 self.min_token = row[0][0] if row and row[0] and row[0][0] else -1

View File

@ -20,6 +20,7 @@ from synapse.events.utils import prune_event
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext, LoggingContext from synapse.util.logcontext import PreserveLoggingContext, LoggingContext
from synapse.util.lrucache import LruCache from synapse.util.lrucache import LruCache
import synapse.metrics
from twisted.internet import defer from twisted.internet import defer
@ -35,9 +36,22 @@ sql_logger = logging.getLogger("synapse.storage.SQL")
transaction_logger = logging.getLogger("synapse.storage.txn") transaction_logger = logging.getLogger("synapse.storage.txn")
metrics = synapse.metrics.get_metrics_for("synapse.storage")
sql_query_timer = metrics.register_distribution("query_time", labels=["verb"])
sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"])
sql_getevents_timer = metrics.register_distribution("getEvents_time", labels=["desc"])
caches_by_name = {}
cache_counter = metrics.register_cache(
"cache",
lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
labels=["name"],
)
# TODO(paul): # TODO(paul):
# * more generic key management # * more generic key management
# * export monitoring stats
# * consider other eviction strategies - LRU? # * consider other eviction strategies - LRU?
def cached(max_entries=1000): def cached(max_entries=1000):
""" A method decorator that applies a memoizing cache around the function. """ A method decorator that applies a memoizing cache around the function.
@ -55,6 +69,9 @@ def cached(max_entries=1000):
""" """
def wrap(orig): def wrap(orig):
cache = OrderedDict() cache = OrderedDict()
name = orig.__name__
caches_by_name[name] = cache
def prefill(key, value): def prefill(key, value):
while len(cache) > max_entries: while len(cache) > max_entries:
@ -65,8 +82,10 @@ def cached(max_entries=1000):
@defer.inlineCallbacks @defer.inlineCallbacks
def wrapped(self, key): def wrapped(self, key):
if key in cache: if key in cache:
cache_counter.inc_hits(name)
defer.returnValue(cache[key]) defer.returnValue(cache[key])
cache_counter.inc_misses(name)
ret = yield orig(self, key) ret = yield orig(self, key)
prefill(key, ret) prefill(key, ret)
defer.returnValue(ret) defer.returnValue(ret)
@ -83,7 +102,8 @@ def cached(max_entries=1000):
class LoggingTransaction(object): class LoggingTransaction(object):
"""An object that almost-transparently proxies for the 'txn' object """An object that almost-transparently proxies for the 'txn' object
passed to the constructor. Adds logging to the .execute() method.""" passed to the constructor. Adds logging and metrics to the .execute()
method."""
__slots__ = ["txn", "name"] __slots__ = ["txn", "name"]
def __init__(self, txn, name): def __init__(self, txn, name):
@ -99,6 +119,7 @@ class LoggingTransaction(object):
def execute(self, sql, *args, **kwargs): def execute(self, sql, *args, **kwargs):
# TODO(paul): Maybe use 'info' and 'debug' for values? # TODO(paul): Maybe use 'info' and 'debug' for values?
sql_logger.debug("[SQL] {%s} %s", self.name, sql) sql_logger.debug("[SQL] {%s} %s", self.name, sql)
try: try:
if args and args[0]: if args and args[0]:
values = args[0] values = args[0]
@ -120,8 +141,9 @@ class LoggingTransaction(object):
logger.exception("[SQL FAIL] {%s}", self.name) logger.exception("[SQL FAIL] {%s}", self.name)
raise raise
finally: finally:
end = time.time() * 1000 msecs = (time.time() * 1000) - start
sql_logger.debug("[SQL time] {%s} %f", self.name, end - start) sql_logger.debug("[SQL time] {%s} %f", self.name, msecs)
sql_query_timer.inc_by(msecs, sql.split()[0])
class PerformanceCounters(object): class PerformanceCounters(object):
@ -172,11 +194,18 @@ class SQLBaseStore(object):
self._previous_txn_total_time = 0 self._previous_txn_total_time = 0
self._current_txn_total_time = 0 self._current_txn_total_time = 0
self._previous_loop_ts = 0 self._previous_loop_ts = 0
# TODO(paul): These can eventually be removed once the metrics code
# is running in mainline, and we have some nice monitoring frontends
# to watch it
self._txn_perf_counters = PerformanceCounters() self._txn_perf_counters = PerformanceCounters()
self._get_event_counters = PerformanceCounters() self._get_event_counters = PerformanceCounters()
self._get_event_cache = LruCache(hs.config.event_cache_size) self._get_event_cache = LruCache(hs.config.event_cache_size)
# Pretend the getEventCache is just another named cache
caches_by_name["*getEvent*"] = self._get_event_cache
def start_profiling(self): def start_profiling(self):
self._previous_loop_ts = self._clock.time_msec() self._previous_loop_ts = self._clock.time_msec()
@ -231,13 +260,13 @@ class SQLBaseStore(object):
raise raise
finally: finally:
end = time.time() * 1000 end = time.time() * 1000
transaction_logger.debug( duration = end - start
"[TXN END] {%s} %f",
name, end - start
)
self._current_txn_total_time += end - start transaction_logger.debug("[TXN END] {%s} %f", name, duration)
self._current_txn_total_time += duration
self._txn_perf_counters.update(desc, start, end) self._txn_perf_counters.update(desc, start, end)
sql_txn_timer.inc_by(duration, desc)
with PreserveLoggingContext(): with PreserveLoggingContext():
result = yield self._db_pool.runInteraction( result = yield self._db_pool.runInteraction(
@ -259,7 +288,7 @@ class SQLBaseStore(object):
) )
return results return results
def _execute(self, decoder, query, *args): def _execute(self, desc, decoder, query, *args):
"""Runs a single query for a result set. """Runs a single query for a result set.
Args: Args:
@ -277,10 +306,10 @@ class SQLBaseStore(object):
else: else:
return cursor.fetchall() return cursor.fetchall()
return self.runInteraction("_execute", interaction) return self.runInteraction(desc, interaction)
def _execute_and_decode(self, query, *args): def _execute_and_decode(self, desc, query, *args):
return self._execute(self.cursor_to_dict, query, *args) return self._execute(desc, self.cursor_to_dict, query, *args)
# "Simple" SQL API methods that operate on a single table with no JOINs, # "Simple" SQL API methods that operate on a single table with no JOINs,
# no complex WHERE clauses, just a dict of values for columns. # no complex WHERE clauses, just a dict of values for columns.
@ -638,14 +667,22 @@ class SQLBaseStore(object):
get_prev_content=False, allow_rejected=False): get_prev_content=False, allow_rejected=False):
start_time = time.time() * 1000 start_time = time.time() * 1000
update_counter = self._get_event_counters.update
def update_counter(desc, last_time):
curr_time = self._get_event_counters.update(desc, last_time)
sql_getevents_timer.inc_by(curr_time - last_time, desc)
return curr_time
cache = self._get_event_cache.setdefault(event_id, {}) cache = self._get_event_cache.setdefault(event_id, {})
try: try:
# Separate cache entries for each way to invoke _get_event_txn # Separate cache entries for each way to invoke _get_event_txn
return cache[(check_redacted, get_prev_content, allow_rejected)] ret = cache[(check_redacted, get_prev_content, allow_rejected)]
cache_counter.inc_hits("*getEvent*")
return ret
except KeyError: except KeyError:
cache_counter.inc_misses("*getEvent*")
pass pass
finally: finally:
start_time = update_counter("event_cache", start_time) start_time = update_counter("event_cache", start_time)
@ -685,7 +722,11 @@ class SQLBaseStore(object):
check_redacted=True, get_prev_content=False): check_redacted=True, get_prev_content=False):
start_time = time.time() * 1000 start_time = time.time() * 1000
update_counter = self._get_event_counters.update
def update_counter(desc, last_time):
curr_time = self._get_event_counters.update(desc, last_time)
sql_getevents_timer.inc_by(curr_time - last_time, desc)
return curr_time
d = json.loads(js) d = json.loads(js)
start_time = update_counter("decode_json", start_time) start_time = update_counter("decode_json", start_time)

View File

@ -341,7 +341,7 @@ class ApplicationServiceStore(SQLBaseStore):
sql = ("SELECT r.*, a.* FROM application_services AS a LEFT JOIN " sql = ("SELECT r.*, a.* FROM application_services AS a LEFT JOIN "
"application_services_regex AS r ON a.id = r.as_id") "application_services_regex AS r ON a.id = r.as_id")
results = yield self._execute_and_decode(sql) results = yield self._execute_and_decode("appservice_cache", sql)
services = self._parse_services_dict(results) services = self._parse_services_dict(results)
for service in services: for service in services:
@ -369,7 +369,9 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
"application_services AS a ON a.id=s.as_id LEFT JOIN " "application_services AS a ON a.id=s.as_id LEFT JOIN "
"application_services_regex AS r ON r.as_id=a.id WHERE state = ?" "application_services_regex AS r ON r.as_id=a.id WHERE state = ?"
) )
results = yield self._execute_and_decode(sql, state) results = yield self._execute_and_decode(
"get_appservices_by_state", sql, state
)
# NB: This assumes this class is linked with ApplicationServiceStore # NB: This assumes this class is linked with ApplicationServiceStore
defer.returnValue(self._parse_services_dict(results)) defer.returnValue(self._parse_services_dict(results))

View File

@ -37,7 +37,7 @@ class FeedbackStore(SQLBaseStore):
"WHERE feedback.target_event_id = ? " "WHERE feedback.target_event_id = ? "
) )
rows = yield self._execute_and_decode(sql, event_id) rows = yield self._execute_and_decode("get_feedback_for_event", sql, event_id)
defer.returnValue( defer.returnValue(
[ [

View File

@ -85,7 +85,9 @@ class KeyStore(SQLBaseStore):
" AND key_id in (" + ",".join("?" for key_id in key_ids) + ")" " AND key_id in (" + ",".join("?" for key_id in key_ids) + ")"
) )
rows = yield self._execute_and_decode(sql, server_name, *key_ids) rows = yield self._execute_and_decode(
"get_server_verify_keys", sql, server_name, *key_ids
)
keys = [] keys = []
for row in rows: for row in rows:

View File

@ -34,7 +34,7 @@ class PushRuleStore(SQLBaseStore):
"WHERE user_name = ? " "WHERE user_name = ? "
"ORDER BY priority_class DESC, priority DESC" "ORDER BY priority_class DESC, priority DESC"
) )
rows = yield self._execute(None, sql, user_name) rows = yield self._execute("get_push_rules_for_user", None, sql, user_name)
dicts = [] dicts = []
for r in rows: for r in rows:
@ -56,17 +56,6 @@ class PushRuleStore(SQLBaseStore):
{r['rule_id']: False if r['enabled'] == 0 else True for r in results} {r['rule_id']: False if r['enabled'] == 0 else True for r in results}
) )
@defer.inlineCallbacks
def get_push_rule_enabled_by_user_rule_id(self, user_name, rule_id):
results = yield self._simple_select_list(
PushRuleEnableTable.table_name,
{'user_name': user_name, 'rule_id': rule_id},
['enabled']
)
if not results:
defer.returnValue(True)
defer.returnValue(results[0])
@defer.inlineCallbacks @defer.inlineCallbacks
def add_push_rule(self, before, after, **kwargs): def add_push_rule(self, before, after, **kwargs):
vals = copy.copy(kwargs) vals = copy.copy(kwargs)
@ -217,16 +206,10 @@ class PushRuleStore(SQLBaseStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def set_push_rule_enabled(self, user_name, rule_id, enabled): def set_push_rule_enabled(self, user_name, rule_id, enabled):
if enabled:
yield self._simple_delete_one(
PushRuleEnableTable.table_name,
{'user_name': user_name, 'rule_id': rule_id}
)
else:
yield self._simple_upsert( yield self._simple_upsert(
PushRuleEnableTable.table_name, PushRuleEnableTable.table_name,
{'user_name': user_name, 'rule_id': rule_id}, {'user_name': user_name, 'rule_id': rule_id},
{'enabled': False} {'enabled': enabled}
) )

View File

@ -37,7 +37,8 @@ class PusherStore(SQLBaseStore):
) )
rows = yield self._execute( rows = yield self._execute(
None, sql, app_id_and_pushkey[0], app_id_and_pushkey[1] "get_pushers_by_app_id_and_pushkey", None, sql,
app_id_and_pushkey[0], app_id_and_pushkey[1]
) )
ret = [ ret = [
@ -70,7 +71,7 @@ class PusherStore(SQLBaseStore):
"FROM pushers" "FROM pushers"
) )
rows = yield self._execute(None, sql) rows = yield self._execute("get_all_pushers", None, sql)
ret = [ ret = [
{ {

View File

@ -88,8 +88,7 @@ class RegistrationStore(SQLBaseStore):
query = ("SELECT users.name, users.password_hash FROM users" query = ("SELECT users.name, users.password_hash FROM users"
" WHERE users.name = ?") " WHERE users.name = ?")
return self._execute( return self._execute(
self.cursor_to_dict, "get_user_by_id", self.cursor_to_dict, query, user_id
query, user_id
) )
def get_user_by_token(self, token): def get_user_by_token(self, token):

View File

@ -68,7 +68,7 @@ class RoomStore(SQLBaseStore):
""" """
query = RoomsTable.select_statement("room_id=?") query = RoomsTable.select_statement("room_id=?")
return self._execute( return self._execute(
RoomsTable.decode_single_result, query, room_id, "get_room", RoomsTable.decode_single_result, query, room_id,
) )
@defer.inlineCallbacks @defer.inlineCallbacks

View File

@ -16,7 +16,6 @@
class LruCache(object): class LruCache(object):
"""Least-recently-used cache.""" """Least-recently-used cache."""
# TODO(mjark) Add hit/miss counters
# TODO(mjark) Add mutex for linked list for thread safety. # TODO(mjark) Add mutex for linked list for thread safety.
def __init__(self, max_size): def __init__(self, max_size):
cache = {} cache = {}

View File

@ -100,7 +100,7 @@ class PresenceTestCase(unittest.TestCase):
self.room_members = [] self.room_members = []
room_member_handler = handlers.room_member_handler = Mock(spec=[ room_member_handler = handlers.room_member_handler = Mock(spec=[
"get_rooms_for_user", "get_joined_rooms_for_user",
"get_room_members", "get_room_members",
"fetch_room_distributions_into", "fetch_room_distributions_into",
]) ])
@ -111,7 +111,7 @@ class PresenceTestCase(unittest.TestCase):
return defer.succeed([self.room_id]) return defer.succeed([self.room_id])
else: else:
return defer.succeed([]) return defer.succeed([])
room_member_handler.get_rooms_for_user = get_rooms_for_user room_member_handler.get_joined_rooms_for_user = get_rooms_for_user
def get_room_members(room_id): def get_room_members(room_id):
if room_id == self.room_id: if room_id == self.room_id:

View File

@ -64,7 +64,7 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
"set_presence_state", "set_presence_state",
"is_presence_visible", "is_presence_visible",
"set_profile_displayname", "set_profile_displayname",
"get_rooms_for_user_where_membership_is", "get_rooms_for_user",
]), ]),
handlers=None, handlers=None,
resource_for_federation=Mock(), resource_for_federation=Mock(),
@ -124,9 +124,9 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
self.mock_update_client) self.mock_update_client)
hs.handlers.room_member_handler = Mock(spec=[ hs.handlers.room_member_handler = Mock(spec=[
"get_rooms_for_user", "get_joined_rooms_for_user",
]) ])
hs.handlers.room_member_handler.get_rooms_for_user = ( hs.handlers.room_member_handler.get_joined_rooms_for_user = (
lambda u: defer.succeed([])) lambda u: defer.succeed([]))
# Some local users to test with # Some local users to test with
@ -138,7 +138,7 @@ class PresenceProfilelikeDataTestCase(unittest.TestCase):
self.u_potato = UserID.from_string("@potato:remote") self.u_potato = UserID.from_string("@potato:remote")
self.mock_get_joined = ( self.mock_get_joined = (
self.datastore.get_rooms_for_user_where_membership_is self.datastore.get_rooms_for_user
) )
@defer.inlineCallbacks @defer.inlineCallbacks

View File

View File

@ -0,0 +1,161 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from tests import unittest
from synapse.metrics.metric import (
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric
)
class CounterMetricTestCase(unittest.TestCase):
def test_scalar(self):
counter = CounterMetric("scalar")
self.assertEquals(counter.render(), [
'scalar 0',
])
counter.inc()
self.assertEquals(counter.render(), [
'scalar 1',
])
counter.inc_by(2)
self.assertEquals(counter.render(), [
'scalar 3'
])
def test_vector(self):
counter = CounterMetric("vector", labels=["method"])
# Empty counter doesn't yet know what values it has
self.assertEquals(counter.render(), [])
counter.inc("GET")
self.assertEquals(counter.render(), [
'vector{method="GET"} 1',
])
counter.inc("GET")
counter.inc("PUT")
self.assertEquals(counter.render(), [
'vector{method="GET"} 2',
'vector{method="PUT"} 1',
])
class CallbackMetricTestCase(unittest.TestCase):
def test_scalar(self):
d = dict()
metric = CallbackMetric("size", lambda: len(d))
self.assertEquals(metric.render(), [
'size 0',
])
d["key"] = "value"
self.assertEquals(metric.render(), [
'size 1',
])
def test_vector(self):
vals = dict()
metric = CallbackMetric("values", lambda: vals, labels=["type"])
self.assertEquals(metric.render(), [])
# Keys have to be tuples, even if they're 1-element
vals[("foo",)] = 1
vals[("bar",)] = 2
self.assertEquals(metric.render(), [
'values{type="bar"} 2',
'values{type="foo"} 1',
])
class DistributionMetricTestCase(unittest.TestCase):
def test_scalar(self):
metric = DistributionMetric("thing")
self.assertEquals(metric.render(), [
'thing:count 0',
'thing:total 0',
])
metric.inc_by(500)
self.assertEquals(metric.render(), [
'thing:count 1',
'thing:total 500',
])
def test_vector(self):
metric = DistributionMetric("queries", labels=["verb"])
self.assertEquals(metric.render(), [])
metric.inc_by(300, "SELECT")
metric.inc_by(200, "SELECT")
metric.inc_by(800, "INSERT")
self.assertEquals(metric.render(), [
'queries:count{verb="INSERT"} 1',
'queries:count{verb="SELECT"} 2',
'queries:total{verb="INSERT"} 800',
'queries:total{verb="SELECT"} 500',
])
class CacheMetricTestCase(unittest.TestCase):
def test_cache(self):
d = dict()
metric = CacheMetric("cache", lambda: len(d))
self.assertEquals(metric.render(), [
'cache:hits 0',
'cache:total 0',
'cache:size 0',
])
metric.inc_misses()
d["key"] = "value"
self.assertEquals(metric.render(), [
'cache:hits 0',
'cache:total 1',
'cache:size 1',
])
metric.inc_hits()
self.assertEquals(metric.render(), [
'cache:hits 1',
'cache:total 2',
'cache:size 1',
])

View File

@ -79,13 +79,13 @@ class PresenceStateTestCase(unittest.TestCase):
room_member_handler = hs.handlers.room_member_handler = Mock( room_member_handler = hs.handlers.room_member_handler = Mock(
spec=[ spec=[
"get_rooms_for_user", "get_joined_rooms_for_user",
] ]
) )
def get_rooms_for_user(user): def get_rooms_for_user(user):
return defer.succeed([]) return defer.succeed([])
room_member_handler.get_rooms_for_user = get_rooms_for_user room_member_handler.get_joined_rooms_for_user = get_rooms_for_user
presence.register_servlets(hs, self.mock_resource) presence.register_servlets(hs, self.mock_resource)
@ -166,7 +166,7 @@ class PresenceListTestCase(unittest.TestCase):
hs.handlers.room_member_handler = Mock( hs.handlers.room_member_handler = Mock(
spec=[ spec=[
"get_rooms_for_user", "get_joined_rooms_for_user",
] ]
) )
@ -291,7 +291,7 @@ class PresenceEventStreamTestCase(unittest.TestCase):
return ["a-room"] return ["a-room"]
else: else:
return [] return []
hs.handlers.room_member_handler.get_rooms_for_user = get_rooms_for_user hs.handlers.room_member_handler.get_joined_rooms_for_user = get_rooms_for_user
self.mock_datastore = hs.get_datastore() self.mock_datastore = hs.get_datastore()
self.mock_datastore.get_app_service_by_token = Mock(return_value=None) self.mock_datastore.get_app_service_by_token = Mock(return_value=None)