diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 3649406ef..aa7c722ef 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -47,6 +47,7 @@ from synapse.crypto import context_factory from synapse.util.logcontext import LoggingContext from synapse.rest.client.v1 import ClientV1RestResource from synapse.rest.client.v2_alpha import ClientV2AlphaRestResource +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from daemonize import Daemonize import twisted.manhole.telnet @@ -100,6 +101,12 @@ class SynapseHomeServer(HomeServer): def build_resource_for_server_key(self): return LocalKey(self) + def build_resource_for_metrics(self): + if self.get_config().enable_metrics: + return MetricsResource(self) + else: + return None + def build_db_pool(self): return adbapi.ConnectionPool( "sqlite3", self.get_db_name(), @@ -110,7 +117,7 @@ class SynapseHomeServer(HomeServer): # so that :memory: sqlite works ) - def create_resource_tree(self, web_client, redirect_root_to_web_client): + def create_resource_tree(self, redirect_root_to_web_client): """Create the resource tree for this Home Server. This in unduly complicated because Twisted does not support putting @@ -122,6 +129,9 @@ class SynapseHomeServer(HomeServer): location of the web client. This does nothing if web_client is not True. """ + config = self.get_config() + web_client = config.webclient + # list containing (path_str, Resource) e.g: # [ ("/aaa/bbb/cc", Resource1), ("/aaa/dummy", Resource2) ] desired_tree = [ @@ -145,6 +155,10 @@ class SynapseHomeServer(HomeServer): else: self.root_resource = Resource() + metrics_resource = self.get_resource_for_metrics() + if config.metrics_port is None and metrics_resource is not None: + desired_tree.append((METRICS_PREFIX, metrics_resource)) + # ideally we'd just use getChild and putChild but getChild doesn't work # unless you give it a Request object IN ADDITION to the name :/ So # instead, we'll store a copy of this mapping so we can actually add @@ -206,17 +220,27 @@ class SynapseHomeServer(HomeServer): """ return "%s-%s" % (resource, path_seg) - def start_listening(self, secure_port, unsecure_port): - if secure_port is not None: + def start_listening(self): + config = self.get_config() + + if not config.no_tls and config.bind_port is not None: reactor.listenSSL( - secure_port, Site(self.root_resource), self.tls_context_factory + config.bind_port, Site(self.root_resource), self.tls_context_factory ) - logger.info("Synapse now listening on port %d", secure_port) - if unsecure_port is not None: + logger.info("Synapse now listening on port %d", config.bind_port) + + if config.unsecure_port is not None: reactor.listenTCP( - unsecure_port, Site(self.root_resource) + config.unsecure_port, Site(self.root_resource) ) - logger.info("Synapse now listening on port %d", unsecure_port) + logger.info("Synapse now listening on port %d", config.unsecure_port) + + metrics_resource = self.get_resource_for_metrics() + if metrics_resource and config.metrics_port is not None: + reactor.listenTCP( + config.metrics_port, Site(metrics_resource), interface="127.0.0.1", + ) + logger.info("Metrics now running on 127.0.0.1 port %d", config.metrics_port) def get_version_string(): @@ -340,7 +364,6 @@ def setup(config_options): ) hs.create_resource_tree( - web_client=config.webclient, redirect_root_to_web_client=True, ) @@ -369,11 +392,7 @@ def setup(config_options): f.namespace['hs'] = hs reactor.listenTCP(config.manhole, f, interface='127.0.0.1') - bind_port = config.bind_port - if config.no_tls: - bind_port = None - - hs.start_listening(bind_port, config.unsecure_port) + hs.start_listening() hs.get_pusherpool().start() hs.get_state_handler().start_caching() diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index c024535f5..241afdf87 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -23,11 +23,13 @@ from .captcha import CaptchaConfig from .email import EmailConfig from .voip import VoipConfig from .registration import RegistrationConfig +from .metrics import MetricsConfig class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig, RatelimitConfig, ContentRepositoryConfig, CaptchaConfig, - EmailConfig, VoipConfig, RegistrationConfig,): + EmailConfig, VoipConfig, RegistrationConfig, + MetricsConfig,): pass diff --git a/synapse/config/metrics.py b/synapse/config/metrics.py new file mode 100644 index 000000000..901a429c7 --- /dev/null +++ b/synapse/config/metrics.py @@ -0,0 +1,36 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import Config + + +class MetricsConfig(Config): + def __init__(self, args): + super(MetricsConfig, self).__init__(args) + self.enable_metrics = args.enable_metrics + self.metrics_port = args.metrics_port + + @classmethod + def add_arguments(cls, parser): + super(MetricsConfig, cls).add_arguments(parser) + metrics_group = parser.add_argument_group("metrics") + metrics_group.add_argument( + '--enable-metrics', dest="enable_metrics", action="store_true", + help="Enable collection and rendering of performance metrics" + ) + metrics_group.add_argument( + '--metrics-port', metavar="PORT", type=int, + help="Separate port to accept metrics requests on (on localhost)" + ) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index f131941f4..6811a0e3d 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -25,6 +25,7 @@ from synapse.api.errors import ( from synapse.util.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.events import FrozenEvent +import synapse.metrics from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination @@ -36,9 +37,17 @@ import random logger = logging.getLogger(__name__) +# synapse.federation.federation_client is a silly name +metrics = synapse.metrics.get_metrics_for("synapse.federation.client") + +sent_pdus_destination_dist = metrics.register_distribution("sent_pdu_destinations") + +sent_edus_counter = metrics.register_counter("sent_edus") + +sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"]) + + class FederationClient(FederationBase): - def __init__(self): - self._get_pdu_cache = None def start_get_pdu_cache(self): self._get_pdu_cache = ExpiringCache( @@ -68,6 +77,8 @@ class FederationClient(FederationBase): order = self._order self._order += 1 + sent_pdus_destination_dist.inc_by(len(destinations)) + logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id) # TODO, add errback, etc. @@ -87,6 +98,8 @@ class FederationClient(FederationBase): content=content, ) + sent_edus_counter.inc() + # TODO, add errback, etc. self._transaction_queue.enqueue_edu(edu) return defer.succeed(None) @@ -113,6 +126,8 @@ class FederationClient(FederationBase): a Deferred which will eventually yield a JSON object from the response """ + sent_queries_counter.inc(query_type) + return self.transport_layer.make_query( destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail ) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9c7dcdba9..25c0014f9 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -22,6 +22,7 @@ from .units import Transaction, Edu from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext from synapse.events import FrozenEvent +import synapse.metrics from synapse.api.errors import FederationError, SynapseError @@ -32,6 +33,15 @@ import logging logger = logging.getLogger(__name__) +# synapse.federation.federation_server is a silly name +metrics = synapse.metrics.get_metrics_for("synapse.federation.server") + +received_pdus_counter = metrics.register_counter("received_pdus") + +received_edus_counter = metrics.register_counter("received_edus") + +received_queries_counter = metrics.register_counter("received_queries", labels=["type"]) + class FederationServer(FederationBase): def set_handler(self, handler): @@ -84,6 +94,8 @@ class FederationServer(FederationBase): def on_incoming_transaction(self, transaction_data): transaction = Transaction(**transaction_data) + received_pdus_counter.inc_by(len(transaction.pdus)) + for p in transaction.pdus: if "unsigned" in p: unsigned = p["unsigned"] @@ -153,6 +165,8 @@ class FederationServer(FederationBase): defer.returnValue((200, response)) def received_edu(self, origin, edu_type, content): + received_edus_counter.inc() + if edu_type in self.edu_handlers: self.edu_handlers[edu_type](origin, content) else: @@ -204,6 +218,8 @@ class FederationServer(FederationBase): @defer.inlineCallbacks def on_query_request(self, query_type, args): + received_queries_counter.inc(query_type) + if query_type in self.query_handlers: response = yield self.query_handlers[query_type](args) defer.returnValue((200, response)) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 9dc7849b1..4dccd93d0 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -25,12 +25,15 @@ from synapse.util.logcontext import PreserveLoggingContext from synapse.util.retryutils import ( get_retry_limiter, NotRetryingDestination, ) +import synapse.metrics import logging logger = logging.getLogger(__name__) +metrics = synapse.metrics.get_metrics_for(__name__) + class TransactionQueue(object): """This class makes sure we only have one transaction in flight at @@ -54,11 +57,25 @@ class TransactionQueue(object): # done self.pending_transactions = {} + metrics.register_callback( + "pending_destinations", + lambda: len(self.pending_transactions), + ) + # Is a mapping from destination -> list of # tuple(pending pdus, deferred, order) - self.pending_pdus_by_dest = {} + self.pending_pdus_by_dest = pdus = {} # destination -> list of tuple(edu, deferred) - self.pending_edus_by_dest = {} + self.pending_edus_by_dest = edus = {} + + metrics.register_callback( + "pending_pdus", + lambda: sum(map(len, pdus.values())), + ) + metrics.register_callback( + "pending_edus", + lambda: sum(map(len, edus.values())), + ) # destination -> list of tuple(failure, deferred) self.pending_failures_by_dest = {} diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 6c624977d..7838a8136 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -148,6 +148,10 @@ class BaseFederationServlet(object): logger.exception("authenticate_request failed") raise defer.returnValue(response) + + # Extra logic that functools.wraps() doesn't finish + new_code.__self__ = code.__self__ + return new_code def register(self, server): diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 28e922f79..731df0064 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -21,6 +21,7 @@ from synapse.api.constants import PresenceState from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext from synapse.types import UserID +import synapse.metrics from ._base import BaseHandler @@ -29,6 +30,8 @@ import logging logger = logging.getLogger(__name__) +metrics = synapse.metrics.get_metrics_for(__name__) + # TODO(paul): Maybe there's one of these I can steal from somewhere def partition(l, func): @@ -133,6 +136,11 @@ class PresenceHandler(BaseHandler): self._user_cachemap = {} self._user_cachemap_latest_serial = 0 + metrics.register_callback( + "userCachemap:size", + lambda: len(self._user_cachemap), + ) + def _get_or_make_usercache(self, user): """If the cache entry doesn't exist, initialise a new one.""" if user not in self._user_cachemap: diff --git a/synapse/http/client.py b/synapse/http/client.py index b53a07aa2..2ae1c4d3a 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -15,6 +15,7 @@ from synapse.api.errors import CodeMessageException from syutil.jsonutil import encode_canonical_json +import synapse.metrics from twisted.internet import defer, reactor from twisted.web.client import ( @@ -31,6 +32,17 @@ import urllib logger = logging.getLogger(__name__) +metrics = synapse.metrics.get_metrics_for(__name__) + +outgoing_requests_counter = metrics.register_counter( + "requests", + labels=["method"], +) +incoming_responses_counter = metrics.register_counter( + "responses", + labels=["method", "code"], +) + class SimpleHttpClient(object): """ @@ -45,12 +57,30 @@ class SimpleHttpClient(object): self.agent = Agent(reactor) self.version_string = hs.version_string + def request(self, method, *args, **kwargs): + # A small wrapper around self.agent.request() so we can easily attach + # counters to it + outgoing_requests_counter.inc(method) + d = self.agent.request(method, *args, **kwargs) + + def _cb(response): + incoming_responses_counter.inc(method, response.code) + return response + + def _eb(failure): + incoming_responses_counter.inc(method, "ERR") + return failure + + d.addCallbacks(_cb, _eb) + + return d + @defer.inlineCallbacks def post_urlencoded_get_json(self, uri, args={}): logger.debug("post_urlencoded_get_json args: %s", args) query_bytes = urllib.urlencode(args, True) - response = yield self.agent.request( + response = yield self.request( "POST", uri.encode("ascii"), headers=Headers({ @@ -70,7 +100,7 @@ class SimpleHttpClient(object): logger.info("HTTP POST %s -> %s", json_str, uri) - response = yield self.agent.request( + response = yield self.request( "POST", uri.encode("ascii"), headers=Headers({ @@ -104,7 +134,7 @@ class SimpleHttpClient(object): query_bytes = urllib.urlencode(args, True) uri = "%s?%s" % (uri, query_bytes) - response = yield self.agent.request( + response = yield self.request( "GET", uri.encode("ascii"), headers=Headers({ @@ -145,7 +175,7 @@ class SimpleHttpClient(object): json_str = encode_canonical_json(json_body) - response = yield self.agent.request( + response = yield self.request( "PUT", uri.encode("ascii"), headers=Headers({ @@ -176,7 +206,7 @@ class CaptchaServerHttpClient(SimpleHttpClient): def post_urlencoded_get_raw(self, url, args={}): query_bytes = urllib.urlencode(args, True) - response = yield self.agent.request( + response = yield self.request( "POST", url.encode("ascii"), bodyProducer=FileBodyProducer(StringIO(query_bytes)), diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 7db001cc6..7fa295cad 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -23,6 +23,7 @@ from twisted.web._newclient import ResponseDone from synapse.http.endpoint import matrix_federation_endpoint from synapse.util.async import sleep from synapse.util.logcontext import PreserveLoggingContext +import synapse.metrics from syutil.jsonutil import encode_canonical_json @@ -40,6 +41,17 @@ import urlparse logger = logging.getLogger(__name__) +metrics = synapse.metrics.get_metrics_for(__name__) + +outgoing_requests_counter = metrics.register_counter( + "requests", + labels=["method"], +) +incoming_responses_counter = metrics.register_counter( + "responses", + labels=["method", "code"], +) + class MatrixFederationHttpAgent(_AgentBase): @@ -49,6 +61,8 @@ class MatrixFederationHttpAgent(_AgentBase): def request(self, destination, endpoint, method, path, params, query, headers, body_producer): + outgoing_requests_counter.inc(method) + host = b"" port = 0 fragment = b"" @@ -59,9 +73,21 @@ class MatrixFederationHttpAgent(_AgentBase): # Set the connection pool key to be the destination. key = destination - return self._requestWithEndpoint(key, endpoint, method, parsed_URI, - headers, body_producer, - parsed_URI.originForm) + d = self._requestWithEndpoint(key, endpoint, method, parsed_URI, + headers, body_producer, + parsed_URI.originForm) + + def _cb(response): + incoming_responses_counter.inc(method, response.code) + return response + + def _eb(failure): + incoming_responses_counter.inc(method, "ERR") + return failure + + d.addCallbacks(_cb, _eb) + + return d class MatrixFederationHttpClient(object): diff --git a/synapse/http/server.py b/synapse/http/server.py index 767c3ef79..d77cb7779 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -18,6 +18,7 @@ from synapse.api.errors import ( cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError ) from synapse.util.logcontext import LoggingContext +import synapse.metrics from syutil.jsonutil import ( encode_canonical_json, encode_pretty_printed_json @@ -34,6 +35,17 @@ import urllib logger = logging.getLogger(__name__) +metrics = synapse.metrics.get_metrics_for(__name__) + +incoming_requests_counter = metrics.register_counter( + "requests", + labels=["method", "servlet"], +) +outgoing_responses_counter = metrics.register_counter( + "responses", + labels=["method", "code"], +) + class HttpServer(object): """ Interface for registering callbacks on a HTTP server @@ -131,6 +143,15 @@ class JsonResource(HttpServer, resource.Resource): # returned response. We pass both the request and any # matched groups from the regex to the callback. + callback = path_entry.callback + + servlet_instance = getattr(callback, "__self__", None) + if servlet_instance is not None: + servlet_classname = servlet_instance.__class__.__name__ + else: + servlet_classname = "%r" % callback + incoming_requests_counter.inc(request.method, servlet_classname) + args = [ urllib.unquote(u).decode("UTF-8") for u in m.groups() ] @@ -140,10 +161,7 @@ class JsonResource(HttpServer, resource.Resource): request.method, request.path ) - code, response = yield path_entry.callback( - request, - *args - ) + code, response = yield callback(request, *args) self._send_response(request, code, response) return @@ -190,6 +208,8 @@ class JsonResource(HttpServer, resource.Resource): request) return + outgoing_responses_counter.inc(request.method, str(code)) + # TODO: Only enable CORS for the requests that need it. respond_with_json( request, code, response_json_object, diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py new file mode 100644 index 000000000..dffb8a486 --- /dev/null +++ b/synapse/metrics/__init__.py @@ -0,0 +1,111 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Because otherwise 'resource' collides with synapse.metrics.resource +from __future__ import absolute_import + +import logging +from resource import getrusage, getpagesize, RUSAGE_SELF + +from .metric import ( + CounterMetric, CallbackMetric, DistributionMetric, CacheMetric +) + + +logger = logging.getLogger(__name__) + + +# We'll keep all the available metrics in a single toplevel dict, one shared +# for the entire process. We don't currently support per-HomeServer instances +# of metrics, because in practice any one python VM will host only one +# HomeServer anyway. This makes a lot of implementation neater +all_metrics = {} + + +class Metrics(object): + """ A single Metrics object gives a (mutable) slice view of the all_metrics + dict, allowing callers to easily register new metrics that are namespaced + nicely.""" + + def __init__(self, name): + self.name_prefix = name + + def _register(self, metric_class, name, *args, **kwargs): + full_name = "%s_%s" % (self.name_prefix, name) + + metric = metric_class(full_name, *args, **kwargs) + + all_metrics[full_name] = metric + return metric + + def register_counter(self, *args, **kwargs): + return self._register(CounterMetric, *args, **kwargs) + + def register_callback(self, *args, **kwargs): + return self._register(CallbackMetric, *args, **kwargs) + + def register_distribution(self, *args, **kwargs): + return self._register(DistributionMetric, *args, **kwargs) + + def register_cache(self, *args, **kwargs): + return self._register(CacheMetric, *args, **kwargs) + + +def get_metrics_for(pkg_name): + """ Returns a Metrics instance for conveniently creating metrics + namespaced with the given name prefix. """ + + # Convert a "package.name" to "package_name" because Prometheus doesn't + # let us use . in metric names + return Metrics(pkg_name.replace(".", "_")) + + +def render_all(): + strs = [] + + # TODO(paul): Internal hack + update_resource_metrics() + + for name in sorted(all_metrics.keys()): + try: + strs += all_metrics[name].render() + except Exception: + strs += ["# FAILED to render %s" % name] + logger.exception("Failed to render %s metric", name) + + strs.append("") # to generate a final CRLF + + return "\n".join(strs) + + +# Now register some standard process-wide state metrics, to give indications of +# process resource usage + +rusage = None +PAGE_SIZE = getpagesize() + + +def update_resource_metrics(): + global rusage + rusage = getrusage(RUSAGE_SELF) + +resource_metrics = get_metrics_for("process.resource") + +# msecs +resource_metrics.register_callback("utime", lambda: rusage.ru_utime * 1000) +resource_metrics.register_callback("stime", lambda: rusage.ru_stime * 1000) + +# pages +resource_metrics.register_callback("maxrss", lambda: rusage.ru_maxrss * PAGE_SIZE) diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py new file mode 100644 index 000000000..21b37748f --- /dev/null +++ b/synapse/metrics/metric.py @@ -0,0 +1,155 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from itertools import chain + + +# TODO(paul): I can't believe Python doesn't have one of these +def map_concat(func, items): + # flatten a list-of-lists + return list(chain.from_iterable(map(func, items))) + + +class BaseMetric(object): + + def __init__(self, name, labels=[]): + self.name = name + self.labels = labels # OK not to clone as we never write it + + def dimension(self): + return len(self.labels) + + def is_scalar(self): + return not len(self.labels) + + def _render_labelvalue(self, value): + # TODO: some kind of value escape + return '"%s"' % (value) + + def _render_key(self, values): + if self.is_scalar(): + return "" + return "{%s}" % ( + ",".join(["%s=%s" % (k, self._render_labelvalue(v)) + for k, v in zip(self.labels, values)]) + ) + + def render(self): + return map_concat(self.render_item, sorted(self.counts.keys())) + + +class CounterMetric(BaseMetric): + """The simplest kind of metric; one that stores a monotonically-increasing + integer that counts events.""" + + def __init__(self, *args, **kwargs): + super(CounterMetric, self).__init__(*args, **kwargs) + + self.counts = {} + + # Scalar metrics are never empty + if self.is_scalar(): + self.counts[()] = 0 + + def inc_by(self, incr, *values): + if len(values) != self.dimension(): + raise ValueError( + "Expected as many values to inc() as labels (%d)" % (self.dimension()) + ) + + # TODO: should assert that the tag values are all strings + + if values not in self.counts: + self.counts[values] = incr + else: + self.counts[values] += incr + + def inc(self, *values): + self.inc_by(1, *values) + + def render_item(self, k): + return ["%s%s %d" % (self.name, self._render_key(k), self.counts[k])] + + +class CallbackMetric(BaseMetric): + """A metric that returns the numeric value returned by a callback whenever + it is rendered. Typically this is used to implement gauges that yield the + size or other state of some in-memory object by actively querying it.""" + + def __init__(self, name, callback, labels=[]): + super(CallbackMetric, self).__init__(name, labels=labels) + + self.callback = callback + + def render(self): + value = self.callback() + + if self.is_scalar(): + return ["%s %d" % (self.name, value)] + + return ["%s%s %d" % (self.name, self._render_key(k), value[k]) + for k in sorted(value.keys())] + + +class DistributionMetric(object): + """A combination of an event counter and an accumulator, which counts + both the number of events and accumulates the total value. Typically this + could be used to keep track of method-running times, or other distributions + of values that occur in discrete occurances. + + TODO(paul): Try to export some heatmap-style stats? + """ + + def __init__(self, name, *args, **kwargs): + self.counts = CounterMetric(name + ":count", **kwargs) + self.totals = CounterMetric(name + ":total", **kwargs) + + def inc_by(self, inc, *values): + self.counts.inc(*values) + self.totals.inc_by(inc, *values) + + def render(self): + return self.counts.render() + self.totals.render() + + +class CacheMetric(object): + """A combination of two CounterMetrics, one to count cache hits and one to + count a total, and a callback metric to yield the current size. + + This metric generates standard metric name pairs, so that monitoring rules + can easily be applied to measure hit ratio.""" + + def __init__(self, name, size_callback, labels=[]): + self.name = name + + self.hits = CounterMetric(name + ":hits", labels=labels) + self.total = CounterMetric(name + ":total", labels=labels) + + self.size = CallbackMetric( + name + ":size", + callback=size_callback, + labels=labels, + ) + + def inc_hits(self, *values): + self.hits.inc(*values) + self.total.inc(*values) + + def inc_misses(self, *values): + self.total.inc(*values) + + def render(self): + return self.hits.render() + self.total.render() + self.size.render() diff --git a/synapse/metrics/resource.py b/synapse/metrics/resource.py new file mode 100644 index 000000000..0af4b3eb5 --- /dev/null +++ b/synapse/metrics/resource.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.web.resource import Resource + +import synapse.metrics + + +METRICS_PREFIX = "/_synapse/metrics" + + +class MetricsResource(Resource): + isLeaf = True + + def __init__(self, hs): + Resource.__init__(self) # Resource is old-style, so no super() + + self.hs = hs + + def render_GET(self, request): + response = synapse.metrics.render_all() + + request.setHeader("Content-Type", "text/plain") + request.setHeader("Content-Length", str(len(response))) + + # Encode as UTF-8 (default) + return response.encode() diff --git a/synapse/notifier.py b/synapse/notifier.py index df13e8ddb..7121d659d 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -19,12 +19,27 @@ from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext from synapse.util.async import run_on_reactor from synapse.types import StreamToken +import synapse.metrics import logging logger = logging.getLogger(__name__) +metrics = synapse.metrics.get_metrics_for(__name__) + +notified_events_counter = metrics.register_counter("notified_events") + + +# TODO(paul): Should be shared somewhere +def count(func, l): + """Return the number of items in l for which func returns true.""" + n = 0 + for x in l: + if func(x): + n += 1 + return n + class _NotificationListener(object): """ This represents a single client connection to the events stream. @@ -59,6 +74,7 @@ class _NotificationListener(object): try: self.deferred.callback(result) + notified_events_counter.inc_by(len(events)) except defer.AlreadyCalledError: pass @@ -95,6 +111,35 @@ class Notifier(object): "user_joined_room", self._user_joined_room ) + # This is not a very cheap test to perform, but it's only executed + # when rendering the metrics page, which is likely once per minute at + # most when scraping it. + def count_listeners(): + all_listeners = set() + + for x in self.room_to_listeners.values(): + all_listeners |= x + for x in self.user_to_listeners.values(): + all_listeners |= x + for x in self.appservice_to_listeners.values(): + all_listeners |= x + + return len(all_listeners) + metrics.register_callback("listeners", count_listeners) + + metrics.register_callback( + "rooms", + lambda: count(bool, self.room_to_listeners.values()), + ) + metrics.register_callback( + "users", + lambda: count(bool, self.user_to_listeners.values()), + ) + metrics.register_callback( + "appservices", + lambda: count(bool, self.appservice_to_listeners.values()), + ) + @log_function @defer.inlineCallbacks def on_new_room_event(self, event, extra_users=[]): diff --git a/synapse/server.py b/synapse/server.py index cb8610a1b..c7772244b 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -56,6 +56,7 @@ class BaseHomeServer(object): """ DEPENDENCIES = [ + 'config', 'clock', 'http_client', 'db_name', @@ -79,6 +80,7 @@ class BaseHomeServer(object): 'resource_for_server_key', 'resource_for_media_repository', 'resource_for_app_services', + 'resource_for_metrics', 'event_sources', 'ratelimiter', 'keyring', diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 3ea738276..40f2fc6d7 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -20,6 +20,7 @@ from synapse.events.utils import prune_event from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext, LoggingContext from synapse.util.lrucache import LruCache +import synapse.metrics from twisted.internet import defer @@ -35,9 +36,22 @@ sql_logger = logging.getLogger("synapse.storage.SQL") transaction_logger = logging.getLogger("synapse.storage.txn") +metrics = synapse.metrics.get_metrics_for("synapse.storage") + +sql_query_timer = metrics.register_distribution("query_time", labels=["verb"]) +sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"]) +sql_getevents_timer = metrics.register_distribution("getEvents_time", labels=["desc"]) + +caches_by_name = {} +cache_counter = metrics.register_cache( + "cache", + lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, + labels=["name"], +) + + # TODO(paul): # * more generic key management -# * export monitoring stats # * consider other eviction strategies - LRU? def cached(max_entries=1000): """ A method decorator that applies a memoizing cache around the function. @@ -55,6 +69,9 @@ def cached(max_entries=1000): """ def wrap(orig): cache = OrderedDict() + name = orig.__name__ + + caches_by_name[name] = cache def prefill(key, value): while len(cache) > max_entries: @@ -65,8 +82,10 @@ def cached(max_entries=1000): @defer.inlineCallbacks def wrapped(self, key): if key in cache: + cache_counter.inc_hits(name) defer.returnValue(cache[key]) + cache_counter.inc_misses(name) ret = yield orig(self, key) prefill(key, ret) defer.returnValue(ret) @@ -83,7 +102,8 @@ def cached(max_entries=1000): class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object - passed to the constructor. Adds logging to the .execute() method.""" + passed to the constructor. Adds logging and metrics to the .execute() + method.""" __slots__ = ["txn", "name"] def __init__(self, txn, name): @@ -99,6 +119,7 @@ class LoggingTransaction(object): def execute(self, sql, *args, **kwargs): # TODO(paul): Maybe use 'info' and 'debug' for values? sql_logger.debug("[SQL] {%s} %s", self.name, sql) + try: if args and args[0]: values = args[0] @@ -120,8 +141,9 @@ class LoggingTransaction(object): logger.exception("[SQL FAIL] {%s}", self.name) raise finally: - end = time.time() * 1000 - sql_logger.debug("[SQL time] {%s} %f", self.name, end - start) + msecs = (time.time() * 1000) - start + sql_logger.debug("[SQL time] {%s} %f", self.name, msecs) + sql_query_timer.inc_by(msecs, sql.split()[0]) class PerformanceCounters(object): @@ -172,11 +194,18 @@ class SQLBaseStore(object): self._previous_txn_total_time = 0 self._current_txn_total_time = 0 self._previous_loop_ts = 0 + + # TODO(paul): These can eventually be removed once the metrics code + # is running in mainline, and we have some nice monitoring frontends + # to watch it self._txn_perf_counters = PerformanceCounters() self._get_event_counters = PerformanceCounters() self._get_event_cache = LruCache(hs.config.event_cache_size) + # Pretend the getEventCache is just another named cache + caches_by_name["*getEvent*"] = self._get_event_cache + def start_profiling(self): self._previous_loop_ts = self._clock.time_msec() @@ -231,13 +260,13 @@ class SQLBaseStore(object): raise finally: end = time.time() * 1000 - transaction_logger.debug( - "[TXN END] {%s} %f", - name, end - start - ) + duration = end - start - self._current_txn_total_time += end - start + transaction_logger.debug("[TXN END] {%s} %f", name, duration) + + self._current_txn_total_time += duration self._txn_perf_counters.update(desc, start, end) + sql_txn_timer.inc_by(duration, desc) with PreserveLoggingContext(): result = yield self._db_pool.runInteraction( @@ -638,14 +667,22 @@ class SQLBaseStore(object): get_prev_content=False, allow_rejected=False): start_time = time.time() * 1000 - update_counter = self._get_event_counters.update + + def update_counter(desc, last_time): + curr_time = self._get_event_counters.update(desc, last_time) + sql_getevents_timer.inc_by(curr_time - last_time, desc) + return curr_time cache = self._get_event_cache.setdefault(event_id, {}) try: # Separate cache entries for each way to invoke _get_event_txn - return cache[(check_redacted, get_prev_content, allow_rejected)] + ret = cache[(check_redacted, get_prev_content, allow_rejected)] + + cache_counter.inc_hits("*getEvent*") + return ret except KeyError: + cache_counter.inc_misses("*getEvent*") pass finally: start_time = update_counter("event_cache", start_time) @@ -685,7 +722,11 @@ class SQLBaseStore(object): check_redacted=True, get_prev_content=False): start_time = time.time() * 1000 - update_counter = self._get_event_counters.update + + def update_counter(desc, last_time): + curr_time = self._get_event_counters.update(desc, last_time) + sql_getevents_timer.inc_by(curr_time - last_time, desc) + return curr_time d = json.loads(js) start_time = update_counter("decode_json", start_time) diff --git a/synapse/util/lrucache.py b/synapse/util/lrucache.py index f115f50e5..65d579290 100644 --- a/synapse/util/lrucache.py +++ b/synapse/util/lrucache.py @@ -16,7 +16,6 @@ class LruCache(object): """Least-recently-used cache.""" - # TODO(mjark) Add hit/miss counters # TODO(mjark) Add mutex for linked list for thread safety. def __init__(self, max_size): cache = {} diff --git a/tests/metrics/__init__.py b/tests/metrics/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/metrics/test_metric.py b/tests/metrics/test_metric.py new file mode 100644 index 000000000..600901429 --- /dev/null +++ b/tests/metrics/test_metric.py @@ -0,0 +1,161 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from tests import unittest + +from synapse.metrics.metric import ( + CounterMetric, CallbackMetric, DistributionMetric, CacheMetric +) + + +class CounterMetricTestCase(unittest.TestCase): + + def test_scalar(self): + counter = CounterMetric("scalar") + + self.assertEquals(counter.render(), [ + 'scalar 0', + ]) + + counter.inc() + + self.assertEquals(counter.render(), [ + 'scalar 1', + ]) + + counter.inc_by(2) + + self.assertEquals(counter.render(), [ + 'scalar 3' + ]) + + def test_vector(self): + counter = CounterMetric("vector", labels=["method"]) + + # Empty counter doesn't yet know what values it has + self.assertEquals(counter.render(), []) + + counter.inc("GET") + + self.assertEquals(counter.render(), [ + 'vector{method="GET"} 1', + ]) + + counter.inc("GET") + counter.inc("PUT") + + self.assertEquals(counter.render(), [ + 'vector{method="GET"} 2', + 'vector{method="PUT"} 1', + ]) + + +class CallbackMetricTestCase(unittest.TestCase): + + def test_scalar(self): + d = dict() + + metric = CallbackMetric("size", lambda: len(d)) + + self.assertEquals(metric.render(), [ + 'size 0', + ]) + + d["key"] = "value" + + self.assertEquals(metric.render(), [ + 'size 1', + ]) + + def test_vector(self): + vals = dict() + + metric = CallbackMetric("values", lambda: vals, labels=["type"]) + + self.assertEquals(metric.render(), []) + + # Keys have to be tuples, even if they're 1-element + vals[("foo",)] = 1 + vals[("bar",)] = 2 + + self.assertEquals(metric.render(), [ + 'values{type="bar"} 2', + 'values{type="foo"} 1', + ]) + + +class DistributionMetricTestCase(unittest.TestCase): + + def test_scalar(self): + metric = DistributionMetric("thing") + + self.assertEquals(metric.render(), [ + 'thing:count 0', + 'thing:total 0', + ]) + + metric.inc_by(500) + + self.assertEquals(metric.render(), [ + 'thing:count 1', + 'thing:total 500', + ]) + + def test_vector(self): + metric = DistributionMetric("queries", labels=["verb"]) + + self.assertEquals(metric.render(), []) + + metric.inc_by(300, "SELECT") + metric.inc_by(200, "SELECT") + metric.inc_by(800, "INSERT") + + self.assertEquals(metric.render(), [ + 'queries:count{verb="INSERT"} 1', + 'queries:count{verb="SELECT"} 2', + 'queries:total{verb="INSERT"} 800', + 'queries:total{verb="SELECT"} 500', + ]) + + +class CacheMetricTestCase(unittest.TestCase): + + def test_cache(self): + d = dict() + + metric = CacheMetric("cache", lambda: len(d)) + + self.assertEquals(metric.render(), [ + 'cache:hits 0', + 'cache:total 0', + 'cache:size 0', + ]) + + metric.inc_misses() + d["key"] = "value" + + self.assertEquals(metric.render(), [ + 'cache:hits 0', + 'cache:total 1', + 'cache:size 1', + ]) + + metric.inc_hits() + + self.assertEquals(metric.render(), [ + 'cache:hits 1', + 'cache:total 2', + 'cache:size 1', + ])