Merge pull request #3256 from matrix-org/3218-official-prom

Switch to the Python Prometheus library
This commit is contained in:
Amber Brown 2018-05-28 23:30:09 +10:00 committed by GitHub
commit 81717e8515
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 583 additions and 1319 deletions

3
.gitignore vendored
View File

@ -1,5 +1,6 @@
*.pyc
.*.swp
*~
.DS_Store
_trial_temp/
@ -13,6 +14,7 @@ docs/build/
cmdclient_config.json
homeserver*.db
homeserver*.log
homeserver*.log.*
homeserver*.pid
homeserver*.yaml
@ -40,6 +42,7 @@ media_store/
*.tac
build/
venv/
localhost-800*/
static/client/register/register_config.js

View File

@ -57,7 +57,7 @@ class Auth(object):
self.TOKEN_NOT_FOUND_HTTP_STATUS = 401
self.token_cache = LruCache(CACHE_SIZE_FACTOR * 10000)
register_cache("token_cache", self.token_cache)
register_cache("cache", "token_cache", self.token_cache)
@defer.inlineCallbacks
def check_from_context(self, event, context, do_sig_check=True):

View File

@ -34,8 +34,8 @@ from synapse.module_api import ModuleApi
from synapse.http.additional_resource import AdditionalResource
from synapse.http.server import RootRedirect
from synapse.http.site import SynapseSite
from synapse.metrics import register_memory_metrics
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX
from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
check_requirements
from synapse.replication.http import ReplicationRestResource, REPLICATION_PREFIX
@ -61,6 +61,8 @@ from twisted.web.resource import EncodingResourceWrapper, NoResource
from twisted.web.server import GzipEncoderFactory
from twisted.web.static import File
from prometheus_client.twisted import MetricsResource
logger = logging.getLogger("synapse.app.homeserver")
@ -230,7 +232,7 @@ class SynapseHomeServer(HomeServer):
resources[WEB_CLIENT_PREFIX] = build_resource_for_web_client(self)
if name == "metrics" and self.get_config().enable_metrics:
resources[METRICS_PREFIX] = MetricsResource(self)
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy())
if name == "replication":
resources[REPLICATION_PREFIX] = ReplicationRestResource(self)
@ -362,8 +364,6 @@ def setup(config_options):
hs.get_datastore().start_doing_background_updates()
hs.get_federation_client().start_get_pdu_cache()
register_memory_metrics(hs)
reactor.callWhenRunning(start)
return hs

View File

@ -32,20 +32,17 @@ from synapse.federation.federation_base import (
FederationBase,
event_from_pdu_json,
)
import synapse.metrics
from synapse.util import logcontext, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination
from prometheus_client import Counter
logger = logging.getLogger(__name__)
# synapse.federation.federation_client is a silly name
metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["type"])
PDU_RETRY_TIME_MS = 1 * 60 * 1000
@ -108,7 +105,7 @@ class FederationClient(FederationBase):
a Deferred which will eventually yield a JSON object from the
response
"""
sent_queries_counter.inc(query_type)
sent_queries_counter.labels(query_type).inc()
return self.transport_layer.make_query(
destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail,
@ -127,7 +124,7 @@ class FederationClient(FederationBase):
a Deferred which will eventually yield a JSON object from the
response
"""
sent_queries_counter.inc("client_device_keys")
sent_queries_counter.labels("client_device_keys").inc()
return self.transport_layer.query_client_keys(
destination, content, timeout
)
@ -137,7 +134,7 @@ class FederationClient(FederationBase):
"""Query the device keys for a list of user ids hosted on a remote
server.
"""
sent_queries_counter.inc("user_devices")
sent_queries_counter.labels("user_devices").inc()
return self.transport_layer.query_user_devices(
destination, user_id, timeout
)
@ -154,7 +151,7 @@ class FederationClient(FederationBase):
a Deferred which will eventually yield a JSON object from the
response
"""
sent_queries_counter.inc("client_one_time_keys")
sent_queries_counter.labels("client_one_time_keys").inc()
return self.transport_layer.claim_client_keys(
destination, content, timeout
)

View File

@ -27,12 +27,13 @@ from synapse.federation.federation_base import (
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
import synapse.metrics
from synapse.types import get_domain_from_id
from synapse.util import async
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logutils import log_function
from prometheus_client import Counter
from six import iteritems
# when processing incoming transactions, we try to handle multiple rooms in
@ -41,17 +42,17 @@ TRANSACTION_CONCURRENCY_LIMIT = 10
logger = logging.getLogger(__name__)
# synapse.federation.federation_server is a silly name
metrics = synapse.metrics.get_metrics_for("synapse.federation.server")
received_pdus_counter = Counter("synapse_federation_server_received_pdus", "")
received_pdus_counter = metrics.register_counter("received_pdus")
received_edus_counter = Counter("synapse_federation_server_received_edus", "")
received_edus_counter = metrics.register_counter("received_edus")
received_queries_counter = metrics.register_counter("received_queries", labels=["type"])
received_queries_counter = Counter(
"synapse_federation_server_received_queries", "", ["type"]
)
class FederationServer(FederationBase):
def __init__(self, hs):
super(FederationServer, self).__init__(hs)
@ -131,7 +132,7 @@ class FederationServer(FederationBase):
logger.debug("[%s] Transaction is new", transaction.transaction_id)
received_pdus_counter.inc_by(len(transaction.pdus))
received_pdus_counter.inc(len(transaction.pdus))
pdus_by_room = {}
@ -292,7 +293,7 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
def on_query_request(self, query_type, args):
received_queries_counter.inc(query_type)
received_queries_counter.labels(query_type).inc()
resp = yield self.registry.on_query(query_type, args)
defer.returnValue((200, resp))

View File

@ -33,7 +33,7 @@ from .units import Edu
from synapse.storage.presence import UserPresenceState
from synapse.util.metrics import Measure
import synapse.metrics
from synapse.metrics import LaterGauge
from blist import sorteddict
from collections import namedtuple
@ -45,9 +45,6 @@ from six import itervalues, iteritems
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
class FederationRemoteSendQueue(object):
"""A drop in replacement for TransactionQueue"""
@ -77,10 +74,8 @@ class FederationRemoteSendQueue(object):
# lambda binds to the queue rather than to the name of the queue which
# changes. ARGH.
def register(name, queue):
metrics.register_callback(
queue_name + "_size",
lambda: len(queue),
)
LaterGauge("synapse_federation_send_queue_%s_size" % (queue_name,),
"", lambda: len(queue))
for queue_name in [
"presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",

View File

@ -26,23 +26,23 @@ from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.util.metrics import measure_func
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
import synapse.metrics
from synapse.metrics import LaterGauge
from synapse.metrics import (
sent_edus_counter,
sent_transactions_counter,
events_processed_counter,
)
from prometheus_client import Counter
import logging
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
client_metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
sent_pdus_destination_dist = client_metrics.register_distribution(
"sent_pdu_destinations"
sent_pdus_destination_dist = Counter(
"synapse_federation_transaction_queue_sent_pdu_destinations", ""
)
sent_edus_counter = client_metrics.register_counter("sent_edus")
sent_transactions_counter = client_metrics.register_counter("sent_transactions")
events_processed_counter = client_metrics.register_counter("events_processed")
class TransactionQueue(object):
@ -69,8 +69,10 @@ class TransactionQueue(object):
# done
self.pending_transactions = {}
metrics.register_callback(
"pending_destinations",
LaterGauge(
"synapse_federation_transaction_queue_pending_destinations",
"",
[],
lambda: len(self.pending_transactions),
)
@ -94,12 +96,16 @@ class TransactionQueue(object):
# Map of destination -> (edu_type, key) -> Edu
self.pending_edus_keyed_by_dest = edus_keyed = {}
metrics.register_callback(
"pending_pdus",
LaterGauge(
"synapse_federation_transaction_queue_pending_pdus",
"",
[],
lambda: sum(map(len, pdus.values())),
)
metrics.register_callback(
"pending_edus",
LaterGauge(
"synapse_federation_transaction_queue_pending_edus",
"",
[],
lambda: (
sum(map(len, edus.values()))
+ sum(map(len, presence.values()))
@ -241,18 +247,15 @@ class TransactionQueue(object):
now = self.clock.time_msec()
ts = yield self.store.get_received_ts(events[-1].event_id)
synapse.metrics.event_processing_lag.set(
now - ts, "federation_sender",
)
synapse.metrics.event_processing_last_ts.set(
ts, "federation_sender",
)
synapse.metrics.event_processing_lag.labels(
"federation_sender").set(now - ts)
synapse.metrics.event_processing_last_ts.labels(
"federation_sender").set(ts)
events_processed_counter.inc_by(len(events))
events_processed_counter.inc(len(events))
synapse.metrics.event_processing_positions.set(
next_token, "federation_sender",
)
synapse.metrics.event_processing_positions.labels(
"federation_sender").set(next_token)
finally:
self._is_processing = False
@ -275,7 +278,7 @@ class TransactionQueue(object):
if not destinations:
return
sent_pdus_destination_dist.inc_by(len(destinations))
sent_pdus_destination_dist.inc(len(destinations))
for destination in destinations:
self.pending_pdus_by_dest.setdefault(destination, []).append(

View File

@ -21,14 +21,13 @@ from synapse.util.metrics import Measure
from synapse.util.logcontext import (
make_deferred_yieldable, run_in_background,
)
from prometheus_client import Counter
import logging
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
events_processed_counter = metrics.register_counter("events_processed")
events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
def log_failure(failure):
@ -128,18 +127,15 @@ class ApplicationServicesHandler(object):
now = self.clock.time_msec()
ts = yield self.store.get_received_ts(events[-1].event_id)
synapse.metrics.event_processing_positions.set(
upper_bound, "appservice_sender",
)
synapse.metrics.event_processing_positions.labels(
"appservice_sender").set(upper_bound)
events_processed_counter.inc_by(len(events))
events_processed_counter.inc(len(events))
synapse.metrics.event_processing_lag.set(
now - ts, "appservice_sender",
)
synapse.metrics.event_processing_last_ts.set(
ts, "appservice_sender",
)
synapse.metrics.event_processing_lag.labels(
"appservice_sender").set(now - ts)
synapse.metrics.event_processing_last_ts.labels(
"appservice_sender").set(ts)
finally:
self.is_processing = False

View File

@ -38,26 +38,29 @@ from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
from synapse.types import UserID, get_domain_from_id
import synapse.metrics
from synapse.metrics import LaterGauge
import logging
from prometheus_client import Counter
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
notified_presence_counter = metrics.register_counter("notified_presence")
federation_presence_out_counter = metrics.register_counter("federation_presence_out")
presence_updates_counter = metrics.register_counter("presence_updates")
timers_fired_counter = metrics.register_counter("timers_fired")
federation_presence_counter = metrics.register_counter("federation_presence")
bump_active_time_counter = metrics.register_counter("bump_active_time")
notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "")
federation_presence_out_counter = Counter(
"synapse_handler_presence_federation_presence_out", "")
presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "")
timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "")
federation_presence_counter = Counter("synapse_handler_presence_federation_presence", "")
bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "")
get_updates_counter = metrics.register_counter("get_updates", labels=["type"])
get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"])
notify_reason_counter = metrics.register_counter("notify_reason", labels=["reason"])
state_transition_counter = metrics.register_counter(
"state_transition", labels=["from", "to"]
notify_reason_counter = Counter(
"synapse_handler_presence_notify_reason", "", ["reason"])
state_transition_counter = Counter(
"synapse_handler_presence_state_transition", "", ["from", "to"]
)
@ -142,8 +145,9 @@ class PresenceHandler(object):
for state in active_presence
}
metrics.register_callback(
"user_to_current_state_size", lambda: len(self.user_to_current_state)
LaterGauge(
"synapse_handlers_presence_user_to_current_state_size", "", [],
lambda: len(self.user_to_current_state)
)
now = self.clock.time_msec()
@ -213,7 +217,8 @@ class PresenceHandler(object):
60 * 1000,
)
metrics.register_callback("wheel_timer_size", lambda: len(self.wheel_timer))
LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [],
lambda: len(self.wheel_timer))
@defer.inlineCallbacks
def _on_shutdown(self):
@ -316,10 +321,10 @@ class PresenceHandler(object):
# TODO: We should probably ensure there are no races hereafter
presence_updates_counter.inc_by(len(new_states))
presence_updates_counter.inc(len(new_states))
if to_notify:
notified_presence_counter.inc_by(len(to_notify))
notified_presence_counter.inc(len(to_notify))
yield self._persist_and_notify(to_notify.values())
self.unpersisted_users_changes |= set(s.user_id for s in new_states)
@ -330,7 +335,7 @@ class PresenceHandler(object):
if user_id not in to_notify
}
if to_federation_ping:
federation_presence_out_counter.inc_by(len(to_federation_ping))
federation_presence_out_counter.inc(len(to_federation_ping))
self._push_to_remotes(to_federation_ping.values())
@ -368,7 +373,7 @@ class PresenceHandler(object):
for user_id in users_to_check
]
timers_fired_counter.inc_by(len(states))
timers_fired_counter.inc(len(states))
changes = handle_timeouts(
states,
@ -657,7 +662,7 @@ class PresenceHandler(object):
updates.append(prev_state.copy_and_replace(**new_fields))
if updates:
federation_presence_counter.inc_by(len(updates))
federation_presence_counter.inc(len(updates))
yield self._update_states(updates)
@defer.inlineCallbacks
@ -932,28 +937,28 @@ def should_notify(old_state, new_state):
return False
if old_state.status_msg != new_state.status_msg:
notify_reason_counter.inc("status_msg_change")
notify_reason_counter.labels("status_msg_change").inc()
return True
if old_state.state != new_state.state:
notify_reason_counter.inc("state_change")
state_transition_counter.inc(old_state.state, new_state.state)
notify_reason_counter.labels("state_change").inc()
state_transition_counter.labels(old_state.state, new_state.state).inc()
return True
if old_state.state == PresenceState.ONLINE:
if new_state.currently_active != old_state.currently_active:
notify_reason_counter.inc("current_active_change")
notify_reason_counter.labels("current_active_change").inc()
return True
if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# Only notify about last active bumps if we're not currently acive
if not new_state.currently_active:
notify_reason_counter.inc("last_active_change_online")
notify_reason_counter.labels("last_active_change_online").inc()
return True
elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# Always notify for a transition where last active gets bumped.
notify_reason_counter.inc("last_active_change_not_online")
notify_reason_counter.labels("last_active_change_not_online").inc()
return True
return False
@ -1027,14 +1032,14 @@ class PresenceEventSource(object):
if changed is not None and len(changed) < 500:
# For small deltas, its quicker to get all changes and then
# work out if we share a room or they're in our presence list
get_updates_counter.inc("stream")
get_updates_counter.labels("stream").inc()
for other_user_id in changed:
if other_user_id in users_interested_in:
user_ids_changed.add(other_user_id)
else:
# Too many possible updates. Find all users we can see and check
# if any of them have changed.
get_updates_counter.inc("full")
get_updates_counter.labels("full").inc()
if from_key:
user_ids_changed = stream_change_cache.get_entities_changed(

View File

@ -23,7 +23,6 @@ from synapse.http import cancelled_to_request_timed_out_error
from synapse.util.async import add_timeout_to_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable
import synapse.metrics
from synapse.http.endpoint import SpiderEndpoint
from canonicaljson import encode_canonical_json
@ -42,6 +41,7 @@ from twisted.web._newclient import ResponseDone
from six import StringIO
from prometheus_client import Counter
import simplejson as json
import logging
import urllib
@ -49,16 +49,9 @@ import urllib
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
outgoing_requests_counter = metrics.register_counter(
"requests",
labels=["method"],
)
incoming_responses_counter = metrics.register_counter(
"responses",
labels=["method", "code"],
)
outgoing_requests_counter = Counter("synapse_http_client_requests", "", ["method"])
incoming_responses_counter = Counter("synapse_http_client_responses", "",
["method", "code"])
class SimpleHttpClient(object):
@ -95,7 +88,7 @@ class SimpleHttpClient(object):
def request(self, method, uri, *args, **kwargs):
# A small wrapper around self.agent.request() so we can easily attach
# counters to it
outgoing_requests_counter.inc(method)
outgoing_requests_counter.labels(method).inc()
logger.info("Sending request %s %s", method, uri)
@ -109,14 +102,14 @@ class SimpleHttpClient(object):
)
response = yield make_deferred_yieldable(request_deferred)
incoming_responses_counter.inc(method, response.code)
incoming_responses_counter.labels(method, response.code).inc()
logger.info(
"Received response to %s %s: %s",
method, uri, response.code
)
defer.returnValue(response)
except Exception as e:
incoming_responses_counter.inc(method, "ERR")
incoming_responses_counter.labels(method, "ERR").inc()
logger.info(
"Error sending request to %s %s: %s %s",
method, uri, type(e).__name__, e.message

View File

@ -45,19 +45,15 @@ from six.moves.urllib import parse as urlparse
from six import string_types
from prometheus_client import Counter
logger = logging.getLogger(__name__)
outbound_logger = logging.getLogger("synapse.http.outbound")
metrics = synapse.metrics.get_metrics_for(__name__)
outgoing_requests_counter = metrics.register_counter(
"requests",
labels=["method"],
)
incoming_responses_counter = metrics.register_counter(
"responses",
labels=["method", "code"],
)
outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests",
"", ["method"])
incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses",
"", ["method", "code"])
MAX_LONG_RETRIES = 10

View File

@ -16,137 +16,109 @@
import logging
import synapse.metrics
from prometheus_client.core import Counter, Histogram
from synapse.metrics import LaterGauge
from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for("synapse.http.server")
# total number of responses served, split by method/servlet/tag
response_count = metrics.register_counter(
"response_count",
labels=["method", "servlet", "tag"],
alternative_names=(
# the following are all deprecated aliases for the same metric
metrics.name_prefix + x for x in (
"_requests",
"_response_time:count",
"_response_ru_utime:count",
"_response_ru_stime:count",
"_response_db_txn_count:count",
"_response_db_txn_duration:count",
)
)
response_count = Counter(
"synapse_http_server_response_count", "", ["method", "servlet", "tag"]
)
requests_counter = metrics.register_counter(
"requests_received",
labels=["method", "servlet", ],
requests_counter = Counter(
"synapse_http_server_requests_received", "", ["method", "servlet"]
)
outgoing_responses_counter = metrics.register_counter(
"responses",
labels=["method", "code"],
outgoing_responses_counter = Counter(
"synapse_http_server_responses", "", ["method", "code"]
)
response_timer = metrics.register_counter(
"response_time_seconds",
labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_time:total",
),
response_timer = Histogram(
"synapse_http_server_response_time_seconds", "sec", ["method", "servlet", "tag"]
)
response_ru_utime = metrics.register_counter(
"response_ru_utime_seconds", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_ru_utime:total",
),
response_ru_utime = Counter(
"synapse_http_server_response_ru_utime_seconds", "sec", ["method", "servlet", "tag"]
)
response_ru_stime = metrics.register_counter(
"response_ru_stime_seconds", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_ru_stime:total",
),
response_ru_stime = Counter(
"synapse_http_server_response_ru_stime_seconds", "sec", ["method", "servlet", "tag"]
)
response_db_txn_count = metrics.register_counter(
"response_db_txn_count", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_db_txn_count:total",
),
response_db_txn_count = Counter(
"synapse_http_server_response_db_txn_count", "", ["method", "servlet", "tag"]
)
# seconds spent waiting for db txns, excluding scheduling time, when processing
# this request
response_db_txn_duration = metrics.register_counter(
"response_db_txn_duration_seconds", labels=["method", "servlet", "tag"],
alternative_names=(
metrics.name_prefix + "_response_db_txn_duration:total",
),
response_db_txn_duration = Counter(
"synapse_http_server_response_db_txn_duration_seconds",
"",
["method", "servlet", "tag"],
)
# seconds spent waiting for a db connection, when processing this request
response_db_sched_duration = metrics.register_counter(
"response_db_sched_duration_seconds", labels=["method", "servlet", "tag"]
response_db_sched_duration = Counter(
"synapse_http_server_response_db_sched_duration_seconds",
"",
["method", "servlet", "tag"],
)
# size in bytes of the response written
response_size = metrics.register_counter(
"response_size", labels=["method", "servlet", "tag"]
response_size = Counter(
"synapse_http_server_response_size", "", ["method", "servlet", "tag"]
)
# In flight metrics are incremented while the requests are in flight, rather
# than when the response was written.
in_flight_requests_ru_utime = metrics.register_counter(
"in_flight_requests_ru_utime_seconds", labels=["method", "servlet"],
in_flight_requests_ru_utime = Counter(
"synapse_http_server_in_flight_requests_ru_utime_seconds",
"",
["method", "servlet"],
)
in_flight_requests_ru_stime = metrics.register_counter(
"in_flight_requests_ru_stime_seconds", labels=["method", "servlet"],
in_flight_requests_ru_stime = Counter(
"synapse_http_server_in_flight_requests_ru_stime_seconds",
"",
["method", "servlet"],
)
in_flight_requests_db_txn_count = metrics.register_counter(
"in_flight_requests_db_txn_count", labels=["method", "servlet"],
in_flight_requests_db_txn_count = Counter(
"synapse_http_server_in_flight_requests_db_txn_count", "", ["method", "servlet"]
)
# seconds spent waiting for db txns, excluding scheduling time, when processing
# this request
in_flight_requests_db_txn_duration = metrics.register_counter(
"in_flight_requests_db_txn_duration_seconds", labels=["method", "servlet"],
in_flight_requests_db_txn_duration = Counter(
"synapse_http_server_in_flight_requests_db_txn_duration_seconds",
"",
["method", "servlet"],
)
# seconds spent waiting for a db connection, when processing this request
in_flight_requests_db_sched_duration = metrics.register_counter(
"in_flight_requests_db_sched_duration_seconds", labels=["method", "servlet"]
in_flight_requests_db_sched_duration = Counter(
"synapse_http_server_in_flight_requests_db_sched_duration_seconds",
"",
["method", "servlet"],
)
# The set of all in flight requests, set[RequestMetrics]
_in_flight_requests = set()
def _collect_in_flight():
"""Called just before metrics are collected, so we use it to update all
the in flight request metrics
"""
for rm in _in_flight_requests:
rm.update_metrics()
metrics.register_collector(_collect_in_flight)
def _get_in_flight_counts():
"""Returns a count of all in flight requests by (method, server_name)
Returns:
dict[tuple[str, str], int]
"""
for rm in _in_flight_requests:
rm.update_metrics()
# Map from (method, name) -> int, the number of in flight requests of that
# type
@ -158,16 +130,17 @@ def _get_in_flight_counts():
return counts
metrics.register_callback(
"in_flight_requests_count",
LaterGauge(
"synapse_http_request_metrics_in_flight_requests_count",
"",
["method", "servlet"],
_get_in_flight_counts,
labels=["method", "servlet"]
)
class RequestMetrics(object):
def start(self, time_msec, name, method):
self.start = time_msec
def start(self, time_sec, name, method):
self.start = time_sec
self.start_context = LoggingContext.current_context()
self.name = name
self.method = method
@ -176,7 +149,7 @@ class RequestMetrics(object):
_in_flight_requests.add(self)
def stop(self, time_msec, request):
def stop(self, time_sec, request):
_in_flight_requests.discard(self)
context = LoggingContext.current_context()
@ -192,34 +165,29 @@ class RequestMetrics(object):
)
return
outgoing_responses_counter.inc(request.method, str(request.code))
outgoing_responses_counter.labels(request.method, str(request.code)).inc()
response_count.inc(request.method, self.name, tag)
response_count.labels(request.method, self.name, tag).inc()
response_timer.inc_by(
time_msec - self.start, request.method,
self.name, tag
response_timer.labels(request.method, self.name, tag).observe(
time_sec - self.start
)
ru_utime, ru_stime = context.get_resource_usage()
response_ru_utime.inc_by(
ru_utime, request.method, self.name, tag
response_ru_utime.labels(request.method, self.name, tag).inc(ru_utime)
response_ru_stime.labels(request.method, self.name, tag).inc(ru_stime)
response_db_txn_count.labels(request.method, self.name, tag).inc(
context.db_txn_count
)
response_ru_stime.inc_by(
ru_stime, request.method, self.name, tag
response_db_txn_duration.labels(request.method, self.name, tag).inc(
context.db_txn_duration_sec
)
response_db_txn_count.inc_by(
context.db_txn_count, request.method, self.name, tag
)
response_db_txn_duration.inc_by(
context.db_txn_duration_ms / 1000., request.method, self.name, tag
)
response_db_sched_duration.inc_by(
context.db_sched_duration_ms / 1000., request.method, self.name, tag
response_db_sched_duration.labels(request.method, self.name, tag).inc(
context.db_sched_duration_sec
)
response_size.inc_by(request.sentLength, request.method, self.name, tag)
response_size.labels(request.method, self.name, tag).inc(request.sentLength)
# We always call this at the end to ensure that we update the metrics
# regardless of whether a call to /metrics while the request was in
@ -229,27 +197,21 @@ class RequestMetrics(object):
def update_metrics(self):
"""Updates the in flight metrics with values from this request.
"""
diff = self._request_stats.update(self.start_context)
in_flight_requests_ru_utime.inc_by(
diff.ru_utime, self.method, self.name,
in_flight_requests_ru_utime.labels(self.method, self.name).inc(diff.ru_utime)
in_flight_requests_ru_stime.labels(self.method, self.name).inc(diff.ru_stime)
in_flight_requests_db_txn_count.labels(self.method, self.name).inc(
diff.db_txn_count
)
in_flight_requests_ru_stime.inc_by(
diff.ru_stime, self.method, self.name,
in_flight_requests_db_txn_duration.labels(self.method, self.name).inc(
diff.db_txn_duration_sec
)
in_flight_requests_db_txn_count.inc_by(
diff.db_txn_count, self.method, self.name,
)
in_flight_requests_db_txn_duration.inc_by(
diff.db_txn_duration_ms / 1000., self.method, self.name,
)
in_flight_requests_db_sched_duration.inc_by(
diff.db_sched_duration_ms / 1000., self.method, self.name,
in_flight_requests_db_sched_duration.labels(self.method, self.name).inc(
diff.db_sched_duration_sec
)
@ -258,17 +220,21 @@ class _RequestStats(object):
"""
__slots__ = [
"ru_utime", "ru_stime",
"db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms",
"ru_utime",
"ru_stime",
"db_txn_count",
"db_txn_duration_sec",
"db_sched_duration_sec",
]
def __init__(self, ru_utime, ru_stime, db_txn_count,
db_txn_duration_ms, db_sched_duration_ms):
def __init__(
self, ru_utime, ru_stime, db_txn_count, db_txn_duration_sec, db_sched_duration_sec
):
self.ru_utime = ru_utime
self.ru_stime = ru_stime
self.db_txn_count = db_txn_count
self.db_txn_duration_ms = db_txn_duration_ms
self.db_sched_duration_ms = db_sched_duration_ms
self.db_txn_duration_sec = db_txn_duration_sec
self.db_sched_duration_sec = db_sched_duration_sec
@staticmethod
def from_context(context):
@ -277,8 +243,8 @@ class _RequestStats(object):
return _RequestStats(
ru_utime, ru_stime,
context.db_txn_count,
context.db_txn_duration_ms,
context.db_sched_duration_ms,
context.db_txn_duration_sec,
context.db_sched_duration_sec,
)
def update(self, context):
@ -294,14 +260,14 @@ class _RequestStats(object):
new.ru_utime - self.ru_utime,
new.ru_stime - self.ru_stime,
new.db_txn_count - self.db_txn_count,
new.db_txn_duration_ms - self.db_txn_duration_ms,
new.db_sched_duration_ms - self.db_sched_duration_ms,
new.db_txn_duration_sec - self.db_txn_duration_sec,
new.db_sched_duration_sec - self.db_sched_duration_sec,
)
self.ru_utime = new.ru_utime
self.ru_stime = new.ru_stime
self.db_txn_count = new.db_txn_count
self.db_txn_duration_ms = new.db_txn_duration_ms
self.db_sched_duration_ms = new.db_sched_duration_ms
self.db_txn_duration_sec = new.db_txn_duration_sec
self.db_sched_duration_sec = new.db_sched_duration_sec
return diff

View File

@ -210,8 +210,8 @@ def wrap_request_handler_with_logging(h):
# dispatching to the handler, so that the handler
# can update the servlet name in the request
# metrics
requests_counter.inc(request.method,
request.request_metrics.name)
requests_counter.labels(request.method,
request.request_metrics.name).inc()
yield d
return wrapped_request_handler

View File

@ -83,7 +83,7 @@ class SynapseRequest(Request):
return Request.render(self, resrc)
def _started_processing(self, servlet_name):
self.start_time = int(time.time() * 1000)
self.start_time = time.time()
self.request_metrics = RequestMetrics()
self.request_metrics.start(
self.start_time, name=servlet_name, method=self.method,
@ -102,26 +102,26 @@ class SynapseRequest(Request):
context = LoggingContext.current_context()
ru_utime, ru_stime = context.get_resource_usage()
db_txn_count = context.db_txn_count
db_txn_duration_ms = context.db_txn_duration_ms
db_sched_duration_ms = context.db_sched_duration_ms
db_txn_duration_sec = context.db_txn_duration_sec
db_sched_duration_sec = context.db_sched_duration_sec
except Exception:
ru_utime, ru_stime = (0, 0)
db_txn_count, db_txn_duration_ms = (0, 0)
db_txn_count, db_txn_duration_sec = (0, 0)
end_time = int(time.time() * 1000)
end_time = time.time()
self.site.access_logger.info(
"%s - %s - {%s}"
" Processed request: %dms (%dms, %dms) (%dms/%dms/%d)"
" Processed request: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
" %sB %s \"%s %s %s\" \"%s\"",
self.getClientIP(),
self.site.site_tag,
self.authenticated_entity,
end_time - self.start_time,
int(ru_utime * 1000),
int(ru_stime * 1000),
db_sched_duration_ms,
db_txn_duration_ms,
ru_utime,
ru_stime,
db_sched_duration_sec,
db_txn_duration_sec,
int(db_txn_count),
self.sentLength,
self.code,

View File

@ -17,165 +17,170 @@ import logging
import functools
import time
import gc
import os
import platform
import attr
from prometheus_client import Gauge, Histogram, Counter
from prometheus_client.core import GaugeMetricFamily, REGISTRY
from twisted.internet import reactor
from .metric import (
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric,
MemoryUsageMetric, GaugeMetric,
)
from .process_collector import register_process_collector
logger = logging.getLogger(__name__)
running_on_pypy = platform.python_implementation() == 'PyPy'
running_on_pypy = platform.python_implementation() == "PyPy"
all_metrics = []
all_collectors = []
all_gauges = {}
class Metrics(object):
""" A single Metrics object gives a (mutable) slice view of the all_metrics
dict, allowing callers to easily register new metrics that are namespaced
nicely."""
class RegistryProxy(object):
def __init__(self, name):
self.name_prefix = name
def make_subspace(self, name):
return Metrics("%s_%s" % (self.name_prefix, name))
def register_collector(self, func):
all_collectors.append(func)
def _register(self, metric_class, name, *args, **kwargs):
full_name = "%s_%s" % (self.name_prefix, name)
metric = metric_class(full_name, *args, **kwargs)
all_metrics.append(metric)
return metric
def register_counter(self, *args, **kwargs):
"""
Returns:
CounterMetric
"""
return self._register(CounterMetric, *args, **kwargs)
def register_gauge(self, *args, **kwargs):
"""
Returns:
GaugeMetric
"""
return self._register(GaugeMetric, *args, **kwargs)
def register_callback(self, *args, **kwargs):
"""
Returns:
CallbackMetric
"""
return self._register(CallbackMetric, *args, **kwargs)
def register_distribution(self, *args, **kwargs):
"""
Returns:
DistributionMetric
"""
return self._register(DistributionMetric, *args, **kwargs)
def register_cache(self, *args, **kwargs):
"""
Returns:
CacheMetric
"""
return self._register(CacheMetric, *args, **kwargs)
def collect(self):
for metric in REGISTRY.collect():
if not metric.name.startswith("__"):
yield metric
def register_memory_metrics(hs):
try:
import psutil
process = psutil.Process()
process.memory_info().rss
except (ImportError, AttributeError):
logger.warn(
"psutil is not installed or incorrect version."
" Disabling memory metrics."
)
return
metric = MemoryUsageMetric(hs, psutil)
all_metrics.append(metric)
@attr.s(hash=True)
class LaterGauge(object):
name = attr.ib()
desc = attr.ib()
labels = attr.ib(hash=False)
caller = attr.ib()
def get_metrics_for(pkg_name):
""" Returns a Metrics instance for conveniently creating metrics
namespaced with the given name prefix. """
def collect(self):
# Convert a "package.name" to "package_name" because Prometheus doesn't
# let us use . in metric names
return Metrics(pkg_name.replace(".", "_"))
g = GaugeMetricFamily(self.name, self.desc, labels=self.labels)
def render_all():
strs = []
for collector in all_collectors:
collector()
for metric in all_metrics:
try:
strs += metric.render()
except Exception:
strs += ["# FAILED to render"]
logger.exception("Failed to render metric")
calls = self.caller()
except Exception as e:
print(e)
logger.err()
yield g
strs.append("") # to generate a final CRLF
if isinstance(calls, dict):
for k, v in calls.items():
g.add_metric(k, v)
else:
g.add_metric([], calls)
return "\n".join(strs)
yield g
def __attrs_post_init__(self):
self._register()
def _register(self):
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
register_process_collector(get_metrics_for("process"))
#
# Detailed CPU metrics
#
class CPUMetrics(object):
def __init__(self):
ticks_per_sec = 100
try:
# Try and get the system config
ticks_per_sec = os.sysconf('SC_CLK_TCK')
except (ValueError, TypeError, AttributeError):
pass
self.ticks_per_sec = ticks_per_sec
def collect(self):
with open("/proc/self/stat") as s:
line = s.read()
raw_stats = line.split(") ", 1)[1].split(" ")
user = GaugeMetricFamily("process_cpu_user_seconds_total", "")
user.add_metric([], float(raw_stats[11]) / self.ticks_per_sec)
yield user
sys = GaugeMetricFamily("process_cpu_system_seconds_total", "")
sys.add_metric([], float(raw_stats[12]) / self.ticks_per_sec)
yield sys
python_metrics = get_metrics_for("python")
REGISTRY.register(CPUMetrics())
gc_time = python_metrics.register_distribution("gc_time", labels=["gen"])
gc_unreachable = python_metrics.register_counter("gc_unreachable_total", labels=["gen"])
python_metrics.register_callback(
"gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"]
#
# Python GC metrics
#
gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"])
gc_time = Histogram(
"python_gc_time",
"Time taken to GC (sec)",
["gen"],
buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50,
5.00, 7.50, 15.00, 30.00, 45.00, 60.00],
)
reactor_metrics = get_metrics_for("python.twisted.reactor")
tick_time = reactor_metrics.register_distribution("tick_time")
pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
synapse_metrics = get_metrics_for("synapse")
class GCCounts(object):
def collect(self):
cm = GaugeMetricFamily("python_gc_counts", "GC cycle counts", labels=["gen"])
for n, m in enumerate(gc.get_count()):
cm.add_metric([str(n)], m)
yield cm
REGISTRY.register(GCCounts())
#
# Twisted reactor metrics
#
tick_time = Histogram(
"python_twisted_reactor_tick_time",
"Tick time of the Twisted reactor (sec)",
buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5],
)
pending_calls_metric = Histogram(
"python_twisted_reactor_pending_calls",
"Pending calls",
buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000],
)
#
# Federation Metrics
#
sent_edus_counter = Counter("synapse_federation_client_sent_edus", "")
sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "")
events_processed_counter = Counter("synapse_federation_client_events_processed", "")
# Used to track where various components have processed in the event stream,
# e.g. federation sending, appservice sending, etc.
event_processing_positions = synapse_metrics.register_gauge(
"event_processing_positions", labels=["name"],
)
event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"])
# Used to track the current max events stream position
event_persisted_position = synapse_metrics.register_gauge(
"event_persisted_position",
)
event_persisted_position = Gauge("synapse_event_persisted_position", "")
# Used to track the received_ts of the last event processed by various
# components
event_processing_last_ts = synapse_metrics.register_gauge(
"event_processing_last_ts", labels=["name"],
)
event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"])
# Used to track the lag processing events. This is the time difference
# between the last processed event's received_ts and the time it was
# finished being processed.
event_processing_lag = synapse_metrics.register_gauge(
"event_processing_lag", labels=["name"],
)
event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
def runUntilCurrentTimer(func):
@ -197,17 +202,17 @@ def runUntilCurrentTimer(func):
num_pending += 1
num_pending += len(reactor.threadCallQueue)
start = time.time() * 1000
start = time.time()
ret = func(*args, **kwargs)
end = time.time() * 1000
end = time.time()
# record the amount of wallclock time spent running pending calls.
# This is a proxy for the actual amount of time between reactor polls,
# since about 25% of time is actually spent running things triggered by
# I/O events, but that is harder to capture without rewriting half the
# reactor.
tick_time.inc_by(end - start)
pending_calls_metric.inc_by(num_pending)
tick_time.observe(end - start)
pending_calls_metric.observe(num_pending)
if running_on_pypy:
return ret
@ -220,12 +225,12 @@ def runUntilCurrentTimer(func):
if threshold[i] < counts[i]:
logger.info("Collecting gc %d", i)
start = time.time() * 1000
start = time.time()
unreachable = gc.collect(i)
end = time.time() * 1000
end = time.time()
gc_time.inc_by(end - start, i)
gc_unreachable.inc_by(unreachable, i)
gc_time.labels(i).observe(end - start)
gc_unreachable.labels(i).set(unreachable)
return ret

View File

@ -1,328 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from itertools import chain
import logging
import re
logger = logging.getLogger(__name__)
def flatten(items):
"""Flatten a list of lists
Args:
items: iterable[iterable[X]]
Returns:
list[X]: flattened list
"""
return list(chain.from_iterable(items))
class BaseMetric(object):
"""Base class for metrics which report a single value per label set
"""
def __init__(self, name, labels=[], alternative_names=[]):
"""
Args:
name (str): principal name for this metric
labels (list(str)): names of the labels which will be reported
for this metric
alternative_names (iterable(str)): list of alternative names for
this metric. This can be useful to provide a migration path
when renaming metrics.
"""
self._names = [name] + list(alternative_names)
self.labels = labels # OK not to clone as we never write it
def dimension(self):
return len(self.labels)
def is_scalar(self):
return not len(self.labels)
def _render_labelvalue(self, value):
return '"%s"' % (_escape_label_value(value),)
def _render_key(self, values):
if self.is_scalar():
return ""
return "{%s}" % (
",".join(["%s=%s" % (k, self._render_labelvalue(v))
for k, v in zip(self.labels, values)])
)
def _render_for_labels(self, label_values, value):
"""Render this metric for a single set of labels
Args:
label_values (list[object]): values for each of the labels,
(which get stringified).
value: value of the metric at with these labels
Returns:
iterable[str]: rendered metric
"""
rendered_labels = self._render_key(label_values)
return (
"%s%s %.12g" % (name, rendered_labels, value)
for name in self._names
)
def render(self):
"""Render this metric
Each metric is rendered as:
name{label1="val1",label2="val2"} value
https://prometheus.io/docs/instrumenting/exposition_formats/#text-format-details
Returns:
iterable[str]: rendered metrics
"""
raise NotImplementedError()
class CounterMetric(BaseMetric):
"""The simplest kind of metric; one that stores a monotonically-increasing
value that counts events or running totals.
Example use cases for Counters:
- Number of requests processed
- Number of items that were inserted into a queue
- Total amount of data that a system has processed
Counters can only go up (and be reset when the process restarts).
"""
def __init__(self, *args, **kwargs):
super(CounterMetric, self).__init__(*args, **kwargs)
# dict[list[str]]: value for each set of label values. the keys are the
# label values, in the same order as the labels in self.labels.
#
# (if the metric is a scalar, the (single) key is the empty tuple).
self.counts = {}
# Scalar metrics are never empty
if self.is_scalar():
self.counts[()] = 0.
def inc_by(self, incr, *values):
if len(values) != self.dimension():
raise ValueError(
"Expected as many values to inc() as labels (%d)" % (self.dimension())
)
# TODO: should assert that the tag values are all strings
if values not in self.counts:
self.counts[values] = incr
else:
self.counts[values] += incr
def inc(self, *values):
self.inc_by(1, *values)
def render(self):
return flatten(
self._render_for_labels(k, self.counts[k])
for k in sorted(self.counts.keys())
)
class GaugeMetric(BaseMetric):
"""A metric that can go up or down
"""
def __init__(self, *args, **kwargs):
super(GaugeMetric, self).__init__(*args, **kwargs)
# dict[list[str]]: value for each set of label values. the keys are the
# label values, in the same order as the labels in self.labels.
#
# (if the metric is a scalar, the (single) key is the empty tuple).
self.guages = {}
def set(self, v, *values):
if len(values) != self.dimension():
raise ValueError(
"Expected as many values to inc() as labels (%d)" % (self.dimension())
)
# TODO: should assert that the tag values are all strings
self.guages[values] = v
def render(self):
return flatten(
self._render_for_labels(k, self.guages[k])
for k in sorted(self.guages.keys())
)
class CallbackMetric(BaseMetric):
"""A metric that returns the numeric value returned by a callback whenever
it is rendered. Typically this is used to implement gauges that yield the
size or other state of some in-memory object by actively querying it."""
def __init__(self, name, callback, labels=[]):
super(CallbackMetric, self).__init__(name, labels=labels)
self.callback = callback
def render(self):
try:
value = self.callback()
except Exception:
logger.exception("Failed to render %s", self.name)
return ["# FAILED to render " + self.name]
if self.is_scalar():
return list(self._render_for_labels([], value))
return flatten(
self._render_for_labels(k, value[k])
for k in sorted(value.keys())
)
class DistributionMetric(object):
"""A combination of an event counter and an accumulator, which counts
both the number of events and accumulates the total value. Typically this
could be used to keep track of method-running times, or other distributions
of values that occur in discrete occurances.
TODO(paul): Try to export some heatmap-style stats?
"""
def __init__(self, name, *args, **kwargs):
self.counts = CounterMetric(name + ":count", **kwargs)
self.totals = CounterMetric(name + ":total", **kwargs)
def inc_by(self, inc, *values):
self.counts.inc(*values)
self.totals.inc_by(inc, *values)
def render(self):
return self.counts.render() + self.totals.render()
class CacheMetric(object):
__slots__ = (
"name", "cache_name", "hits", "misses", "evicted_size", "size_callback",
)
def __init__(self, name, size_callback, cache_name):
self.name = name
self.cache_name = cache_name
self.hits = 0
self.misses = 0
self.evicted_size = 0
self.size_callback = size_callback
def inc_hits(self):
self.hits += 1
def inc_misses(self):
self.misses += 1
def inc_evictions(self, size=1):
self.evicted_size += size
def render(self):
size = self.size_callback()
hits = self.hits
total = self.misses + self.hits
return [
"""%s:hits{name="%s"} %d""" % (self.name, self.cache_name, hits),
"""%s:total{name="%s"} %d""" % (self.name, self.cache_name, total),
"""%s:size{name="%s"} %d""" % (self.name, self.cache_name, size),
"""%s:evicted_size{name="%s"} %d""" % (
self.name, self.cache_name, self.evicted_size
),
]
class MemoryUsageMetric(object):
"""Keeps track of the current memory usage, using psutil.
The class will keep the current min/max/sum/counts of rss over the last
WINDOW_SIZE_SEC, by polling UPDATE_HZ times per second
"""
UPDATE_HZ = 2 # number of times to get memory per second
WINDOW_SIZE_SEC = 30 # the size of the window in seconds
def __init__(self, hs, psutil):
clock = hs.get_clock()
self.memory_snapshots = []
self.process = psutil.Process()
clock.looping_call(self._update_curr_values, 1000 / self.UPDATE_HZ)
def _update_curr_values(self):
max_size = self.UPDATE_HZ * self.WINDOW_SIZE_SEC
self.memory_snapshots.append(self.process.memory_info().rss)
self.memory_snapshots[:] = self.memory_snapshots[-max_size:]
def render(self):
if not self.memory_snapshots:
return []
max_rss = max(self.memory_snapshots)
min_rss = min(self.memory_snapshots)
sum_rss = sum(self.memory_snapshots)
len_rss = len(self.memory_snapshots)
return [
"process_psutil_rss:max %d" % max_rss,
"process_psutil_rss:min %d" % min_rss,
"process_psutil_rss:total %d" % sum_rss,
"process_psutil_rss:count %d" % len_rss,
]
def _escape_character(m):
"""Replaces a single character with its escape sequence.
Args:
m (re.MatchObject): A match object whose first group is the single
character to replace
Returns:
str
"""
c = m.group(1)
if c == "\\":
return "\\\\"
elif c == "\"":
return "\\\""
elif c == "\n":
return "\\n"
return c
def _escape_label_value(value):
"""Takes a label value and escapes quotes, newlines and backslashes
"""
return re.sub(r"([\n\"\\])", _escape_character, str(value))

View File

@ -1,123 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from six import iteritems
TICKS_PER_SEC = 100
BYTES_PER_PAGE = 4096
HAVE_PROC_STAT = os.path.exists("/proc/stat")
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
HAVE_PROC_SELF_LIMITS = os.path.exists("/proc/self/limits")
HAVE_PROC_SELF_FD = os.path.exists("/proc/self/fd")
# Field indexes from /proc/self/stat, taken from the proc(5) manpage
STAT_FIELDS = {
"utime": 14,
"stime": 15,
"starttime": 22,
"vsize": 23,
"rss": 24,
}
stats = {}
# In order to report process_start_time_seconds we need to know the
# machine's boot time, because the value in /proc/self/stat is relative to
# this
boot_time = None
if HAVE_PROC_STAT:
with open("/proc/stat") as _procstat:
for line in _procstat:
if line.startswith("btime "):
boot_time = int(line.split()[1])
def update_resource_metrics():
if HAVE_PROC_SELF_STAT:
global stats
with open("/proc/self/stat") as s:
line = s.read()
# line is PID (command) more stats go here ...
raw_stats = line.split(") ", 1)[1].split(" ")
for (name, index) in iteritems(STAT_FIELDS):
# subtract 3 from the index, because proc(5) is 1-based, and
# we've lost the first two fields in PID and COMMAND above
stats[name] = int(raw_stats[index - 3])
def _count_fds():
# Not every OS will have a /proc/self/fd directory
if not HAVE_PROC_SELF_FD:
return 0
return len(os.listdir("/proc/self/fd"))
def register_process_collector(process_metrics):
process_metrics.register_collector(update_resource_metrics)
if HAVE_PROC_SELF_STAT:
process_metrics.register_callback(
"cpu_user_seconds_total",
lambda: float(stats["utime"]) / TICKS_PER_SEC
)
process_metrics.register_callback(
"cpu_system_seconds_total",
lambda: float(stats["stime"]) / TICKS_PER_SEC
)
process_metrics.register_callback(
"cpu_seconds_total",
lambda: (float(stats["utime"] + stats["stime"])) / TICKS_PER_SEC
)
process_metrics.register_callback(
"virtual_memory_bytes",
lambda: int(stats["vsize"])
)
process_metrics.register_callback(
"resident_memory_bytes",
lambda: int(stats["rss"]) * BYTES_PER_PAGE
)
process_metrics.register_callback(
"start_time_seconds",
lambda: boot_time + int(stats["starttime"]) / TICKS_PER_SEC
)
if HAVE_PROC_SELF_FD:
process_metrics.register_callback(
"open_fds",
lambda: _count_fds()
)
if HAVE_PROC_SELF_LIMITS:
def _get_max_fds():
with open("/proc/self/limits") as limits:
for line in limits:
if not line.startswith("Max open files "):
continue
# Line is Max open files $SOFT $HARD
return int(line.split()[3])
return None
process_metrics.register_callback(
"max_fds",
lambda: _get_max_fds()
)

View File

@ -13,27 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.web.resource import Resource
import synapse.metrics
METRICS_PREFIX = "/_synapse/metrics"
class MetricsResource(Resource):
isLeaf = True
def __init__(self, hs):
Resource.__init__(self) # Resource is old-style, so no super()
self.hs = hs
def render_GET(self, request):
response = synapse.metrics.render_all()
request.setHeader("Content-Type", "text/plain")
request.setHeader("Content-Length", str(len(response)))
# Encode as UTF-8 (default)
return response.encode()

View File

@ -28,22 +28,20 @@ from synapse.util.logcontext import PreserveLoggingContext, run_in_background
from synapse.util.metrics import Measure
from synapse.types import StreamToken
from synapse.visibility import filter_events_for_client
import synapse.metrics
from synapse.metrics import LaterGauge
from collections import namedtuple
from prometheus_client import Counter
import logging
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
notified_events_counter = Counter("synapse_notifier_notified_events", "")
notified_events_counter = metrics.register_counter("notified_events")
users_woken_by_stream_counter = metrics.register_counter(
"users_woken_by_stream", labels=["stream"]
)
users_woken_by_stream_counter = Counter(
"synapse_notifier_users_woken_by_stream", "", ["stream"])
# TODO(paul): Should be shared somewhere
@ -108,7 +106,7 @@ class _NotifierUserStream(object):
self.last_notified_ms = time_now_ms
noify_deferred = self.notify_deferred
users_woken_by_stream_counter.inc(stream_key)
users_woken_by_stream_counter.labels(stream_key).inc()
with PreserveLoggingContext():
self.notify_deferred = ObservableDeferred(defer.Deferred())
@ -197,14 +195,14 @@ class Notifier(object):
all_user_streams.add(x)
return sum(stream.count_listeners() for stream in all_user_streams)
metrics.register_callback("listeners", count_listeners)
LaterGauge("synapse_notifier_listeners", "", [], count_listeners)
metrics.register_callback(
"rooms",
LaterGauge(
"synapse_notifier_rooms", "", [],
lambda: count(bool, self.room_to_user_streams.values()),
)
metrics.register_callback(
"users",
LaterGauge(
"synapse_notifier_users", "", [],
lambda: len(self.user_to_user_stream),
)

View File

@ -22,14 +22,13 @@ from .push_rule_evaluator import PushRuleEvaluatorForEvent
from synapse.event_auth import get_user_power_level
from synapse.api.constants import EventTypes, Membership
from synapse.metrics import get_metrics_for
from synapse.util.caches import metrics as cache_metrics
from synapse.util.caches import register_cache
from synapse.util.caches.descriptors import cached
from synapse.util.async import Linearizer
from synapse.state import POWER_KEY
from collections import namedtuple
from prometheus_client import Counter
from six import itervalues, iteritems
logger = logging.getLogger(__name__)
@ -37,21 +36,18 @@ logger = logging.getLogger(__name__)
rules_by_room = {}
push_metrics = get_metrics_for(__name__)
push_rules_invalidation_counter = push_metrics.register_counter(
"push_rules_invalidation_counter"
)
push_rules_state_size_counter = push_metrics.register_counter(
"push_rules_state_size_counter"
)
push_rules_invalidation_counter = Counter(
"synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter", "")
push_rules_state_size_counter = Counter(
"synapse_push_bulk_push_rule_evaluator_push_rules_state_size_counter", "")
# Measures whether we use the fast path of using state deltas, or if we have to
# recalculate from scratch
push_rules_delta_state_cache_metric = cache_metrics.register_cache(
push_rules_delta_state_cache_metric = register_cache(
"cache",
size_callback=lambda: 0, # Meaningless size, as this isn't a cache that stores values
cache_name="push_rules_delta_state_cache_metric",
"push_rules_delta_state_cache_metric",
cache=[], # Meaningless size, as this isn't a cache that stores values
)
@ -65,10 +61,10 @@ class BulkPushRuleEvaluator(object):
self.store = hs.get_datastore()
self.auth = hs.get_auth()
self.room_push_rule_cache_metrics = cache_metrics.register_cache(
self.room_push_rule_cache_metrics = register_cache(
"cache",
size_callback=lambda: 0, # There's not good value for this
cache_name="room_push_rule_cache",
"room_push_rule_cache",
cache=[], # Meaningless size, as this isn't a cache that stores values
)
@defer.inlineCallbacks
@ -310,7 +306,7 @@ class RulesForRoom(object):
current_state_ids = context.current_state_ids
push_rules_delta_state_cache_metric.inc_misses()
push_rules_state_size_counter.inc_by(len(current_state_ids))
push_rules_state_size_counter.inc(len(current_state_ids))
logger.debug(
"Looking for member changes in %r %r", state_group, current_state_ids

View File

@ -20,22 +20,17 @@ from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from . import push_rule_evaluator
from . import push_tools
import synapse
from synapse.push import PusherConfigException
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure
from prometheus_client import Counter
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
http_push_processed_counter = Counter("synapse_http_httppusher_http_pushes_processed", "")
http_push_processed_counter = metrics.register_counter(
"http_pushes_processed",
)
http_push_failed_counter = metrics.register_counter(
"http_pushes_failed",
)
http_push_failed_counter = Counter("synapse_http_httppusher_http_pushes_failed", "")
class HttpPusher(object):

View File

@ -152,7 +152,7 @@ class PushRuleEvaluatorForEvent(object):
# Caches (glob, word_boundary) -> regex for push. See _glob_matches
regex_cache = LruCache(50000 * CACHE_SIZE_FACTOR)
register_cache("regex_push_cache", regex_cache)
register_cache("cache", "regex_push_cache", regex_cache)
def _glob_matches(glob, value, word_boundary=False):

View File

@ -56,6 +56,7 @@ REQUIREMENTS = {
"msgpack-python>=0.3.0": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"],
"six": ["six"],
"prometheus_client": ["prometheus_client"],
}
CONDITIONAL_REQUIREMENTS = {
"web_client": {

View File

@ -60,22 +60,21 @@ from .commands import (
)
from .streams import STREAMS_MAP
from synapse.metrics import LaterGauge
from synapse.util.stringutils import random_string
from synapse.metrics.metric import CounterMetric
import logging
import synapse.metrics
import struct
import fcntl
from prometheus_client import Counter
from collections import defaultdict
from six import iterkeys, iteritems
metrics = synapse.metrics.get_metrics_for(__name__)
connection_close_counter = metrics.register_counter(
"close_reason", labels=["reason_type"],
)
import logging
import struct
import fcntl
connection_close_counter = Counter(
"synapse_replication_tcp_protocol_close_reason", "", ["reason_type"])
# A list of all connected protocols. This allows us to send metrics about the
# connections.
@ -137,12 +136,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
# The LoopingCall for sending pings.
self._send_ping_loop = None
self.inbound_commands_counter = CounterMetric(
"inbound_commands", labels=["command"],
)
self.outbound_commands_counter = CounterMetric(
"outbound_commands", labels=["command"],
)
self.inbound_commands_counter = defaultdict(int)
self.outbound_commands_counter = defaultdict(int)
def connectionMade(self):
logger.info("[%s] Connection established", self.id())
@ -202,7 +197,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.last_received_command = self.clock.time_msec()
self.inbound_commands_counter.inc(cmd_name)
self.inbound_commands_counter[cmd_name] = (
self.inbound_commands_counter[cmd_name] + 1)
cmd_cls = COMMAND_MAP[cmd_name]
try:
@ -252,8 +248,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self._queue_command(cmd)
return
self.outbound_commands_counter.inc(cmd.NAME)
self.outbound_commands_counter[cmd.NAME] = (
self.outbound_commands_counter[cmd.NAME] + 1)
string = "%s %s" % (cmd.NAME, cmd.to_line(),)
if "\n" in string:
raise Exception("Unexpected newline in command: %r", string)
@ -318,9 +314,9 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
def connectionLost(self, reason):
logger.info("[%s] Replication connection closed: %r", self.id(), reason)
if isinstance(reason, Failure):
connection_close_counter.inc(reason.type.__name__)
connection_close_counter.labels(reason.type.__name__).inc()
else:
connection_close_counter.inc(reason.__class__.__name__)
connection_close_counter.labels(reason.__class__.__name__).inc()
try:
# Remove us from list of connections to be monitored
@ -519,7 +515,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
def on_RDATA(self, cmd):
stream_name = cmd.stream_name
inbound_rdata_count.inc(stream_name)
inbound_rdata_count.labels(stream_name).inc()
try:
row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row)
@ -567,14 +563,12 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
# The following simply registers metrics for the replication connections
metrics.register_callback(
"pending_commands",
pending_commands = LaterGauge(
"pending_commands", "", ["name", "conn_id"],
lambda: {
(p.name, p.conn_id): len(p.pending_commands)
for p in connected_connections
},
labels=["name", "conn_id"],
)
})
def transport_buffer_size(protocol):
@ -584,14 +578,12 @@ def transport_buffer_size(protocol):
return 0
metrics.register_callback(
"transport_send_buffer",
transport_send_buffer = LaterGauge(
"synapse_replication_tcp_transport_send_buffer", "", ["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_buffer_size(p)
for p in connected_connections
},
labels=["name", "conn_id"],
)
})
def transport_kernel_read_buffer_size(protocol, read=True):
@ -609,48 +601,38 @@ def transport_kernel_read_buffer_size(protocol, read=True):
return 0
metrics.register_callback(
"transport_kernel_send_buffer",
tcp_transport_kernel_send_buffer = LaterGauge(
"synapse_replication_tcp_transport_kernel_send_buffer", "", ["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_kernel_read_buffer_size(p, False)
for p in connected_connections
},
labels=["name", "conn_id"],
)
})
metrics.register_callback(
"transport_kernel_read_buffer",
tcp_transport_kernel_read_buffer = LaterGauge(
"synapse_replication_tcp_transport_kernel_read_buffer", "", ["name", "conn_id"],
lambda: {
(p.name, p.conn_id): transport_kernel_read_buffer_size(p, True)
for p in connected_connections
},
labels=["name", "conn_id"],
)
})
metrics.register_callback(
"inbound_commands",
tcp_inbound_commands = LaterGauge(
"synapse_replication_tcp_inbound_commands", "", ["command", "name", "conn_id"],
lambda: {
(k[0], p.name, p.conn_id): count
for p in connected_connections
for k, count in iteritems(p.inbound_commands_counter.counts)
},
labels=["command", "name", "conn_id"],
)
})
metrics.register_callback(
"outbound_commands",
tcp_outbound_commands = LaterGauge(
"synapse_replication_tcp_outbound_commands", "", ["command", "name", "conn_id"],
lambda: {
(k[0], p.name, p.conn_id): count
for p in connected_connections
for k, count in iteritems(p.outbound_commands_counter.counts)
},
labels=["command", "name", "conn_id"],
)
})
# number of updates received for each RDATA stream
inbound_rdata_count = metrics.register_counter(
"inbound_rdata_count",
labels=["stream_name"],
)
inbound_rdata_count = Counter("synapse_replication_tcp_inbound_rdata_count", "",
["stream_name"])

View File

@ -22,21 +22,21 @@ from .streams import STREAMS_MAP, FederationStream
from .protocol import ServerReplicationStreamProtocol
from synapse.util.metrics import Measure, measure_func
from synapse.metrics import LaterGauge
import logging
import synapse.metrics
from prometheus_client import Counter
from six import itervalues
metrics = synapse.metrics.get_metrics_for(__name__)
stream_updates_counter = metrics.register_counter(
"stream_updates", labels=["stream_name"]
)
user_sync_counter = metrics.register_counter("user_sync")
federation_ack_counter = metrics.register_counter("federation_ack")
remove_pusher_counter = metrics.register_counter("remove_pusher")
invalidate_cache_counter = metrics.register_counter("invalidate_cache")
user_ip_cache_counter = metrics.register_counter("user_ip_cache")
stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates",
"", ["stream_name"])
user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "")
federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "")
remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "")
invalidate_cache_counter = Counter("synapse_replication_tcp_resource_invalidate_cache",
"")
user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "")
logger = logging.getLogger(__name__)
@ -75,7 +75,8 @@ class ReplicationStreamer(object):
# Current connections.
self.connections = []
metrics.register_callback("total_connections", lambda: len(self.connections))
LaterGauge("synapse_replication_tcp_resource_total_connections", "", [],
lambda: len(self.connections))
# List of streams that clients can subscribe to.
# We only support federation stream if federation sending hase been
@ -87,17 +88,16 @@ class ReplicationStreamer(object):
self.streams_by_name = {stream.NAME: stream for stream in self.streams}
metrics.register_callback(
"connections_per_stream",
LaterGauge(
"synapse_replication_tcp_resource_connections_per_stream", "",
["stream_name"],
lambda: {
(stream_name,): len([
conn for conn in self.connections
if stream_name in conn.replication_streams
])
for stream_name in self.streams_by_name
},
labels=["stream_name"],
)
})
self.federation_sender = None
if not hs.config.send_federation:
@ -177,7 +177,7 @@ class ReplicationStreamer(object):
logger.info(
"Streaming: %s -> %s", stream.NAME, updates[-1][0]
)
stream_updates_counter.inc_by(len(updates), stream.NAME)
stream_updates_counter.labels(stream.NAME).inc(len(updates))
# Some streams return multiple rows with the same stream IDs,
# we need to make sure they get sent out in batches. We do

View File

@ -18,8 +18,8 @@ from synapse.api.errors import StoreError
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.caches.descriptors import Cache
from synapse.storage.engines import PostgresEngine
import synapse.metrics
from prometheus_client import Histogram
from twisted.internet import defer
@ -42,13 +42,10 @@ sql_logger = logging.getLogger("synapse.storage.SQL")
transaction_logger = logging.getLogger("synapse.storage.txn")
perf_logger = logging.getLogger("synapse.storage.TIME")
sql_scheduling_timer = Histogram("synapse_storage_schedule_time", "sec")
metrics = synapse.metrics.get_metrics_for("synapse.storage")
sql_scheduling_timer = metrics.register_distribution("schedule_time")
sql_query_timer = metrics.register_distribution("query_time", labels=["verb"])
sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"])
sql_query_timer = Histogram("synapse_storage_query_time", "sec", ["verb"])
sql_txn_timer = Histogram("synapse_storage_transaction_time", "sec", ["desc"])
class LoggingTransaction(object):
@ -113,7 +110,7 @@ class LoggingTransaction(object):
# Don't let logging failures stop SQL from working
pass
start = time.time() * 1000
start = time.time()
try:
return func(
@ -123,9 +120,9 @@ class LoggingTransaction(object):
logger.debug("[SQL FAIL] {%s} %s", self.name, e)
raise
finally:
msecs = (time.time() * 1000) - start
sql_logger.debug("[SQL time] {%s} %f", self.name, msecs)
sql_query_timer.inc_by(msecs, sql.split()[0])
secs = time.time() - start
sql_logger.debug("[SQL time] {%s} %f sec", self.name, secs)
sql_query_timer.labels(sql.split()[0]).observe(secs)
class PerformanceCounters(object):
@ -135,7 +132,7 @@ class PerformanceCounters(object):
def update(self, key, start_time, end_time=None):
if end_time is None:
end_time = time.time() * 1000
end_time = time.time()
duration = end_time - start_time
count, cum_time = self.current_counters.get(key, (0, 0))
count += 1
@ -225,7 +222,7 @@ class SQLBaseStore(object):
def _new_transaction(self, conn, desc, after_callbacks, exception_callbacks,
logging_context, func, *args, **kwargs):
start = time.time() * 1000
start = time.time()
txn_id = self._TXN_ID
# We don't really need these to be unique, so lets stop it from
@ -285,17 +282,17 @@ class SQLBaseStore(object):
logger.debug("[TXN FAIL] {%s} %s", name, e)
raise
finally:
end = time.time() * 1000
end = time.time()
duration = end - start
if logging_context is not None:
logging_context.add_database_transaction(duration)
transaction_logger.debug("[TXN END] {%s} %f", name, duration)
transaction_logger.debug("[TXN END] {%s} %f sec", name, duration)
self._current_txn_total_time += duration
self._txn_perf_counters.update(desc, start, end)
sql_txn_timer.inc_by(duration, desc)
sql_txn_timer.labels(desc).observe(duration)
@defer.inlineCallbacks
def runInteraction(self, desc, func, *args, **kwargs):
@ -352,13 +349,13 @@ class SQLBaseStore(object):
"""
current_context = LoggingContext.current_context()
start_time = time.time() * 1000
start_time = time.time()
def inner_func(conn, *args, **kwargs):
with LoggingContext("runWithConnection") as context:
sched_duration_ms = time.time() * 1000 - start_time
sql_scheduling_timer.inc_by(sched_duration_ms)
current_context.add_database_scheduled(sched_duration_ms)
sched_duration_sec = time.time() - start_time
sql_scheduling_timer.observe(sched_duration_sec)
current_context.add_database_scheduled(sched_duration_sec)
if self.database_engine.is_connection_closed(conn):
logger.debug("Reconnecting closed database connection")

View File

@ -40,30 +40,27 @@ import synapse.metrics
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from prometheus_client import Counter
logger = logging.getLogger(__name__)
metrics = synapse.metrics.get_metrics_for(__name__)
persist_event_counter = metrics.register_counter("persisted_events")
event_counter = metrics.register_counter(
"persisted_events_sep", labels=["type", "origin_type", "origin_entity"]
)
persist_event_counter = Counter("synapse_storage_events_persisted_events", "")
event_counter = Counter("synapse_storage_events_persisted_events_sep", "",
["type", "origin_type", "origin_entity"])
# The number of times we are recalculating the current state
state_delta_counter = metrics.register_counter(
"state_delta",
)
state_delta_counter = Counter("synapse_storage_events_state_delta", "")
# The number of times we are recalculating state when there is only a
# single forward extremity
state_delta_single_event_counter = metrics.register_counter(
"state_delta_single_event",
)
state_delta_single_event_counter = Counter(
"synapse_storage_events_state_delta_single_event", "")
# The number of times we are reculating state when we could have resonably
# calculated the delta when we calculated the state for an event we were
# persisting.
state_delta_reuse_delta_counter = metrics.register_counter(
"state_delta_reuse_delta",
)
state_delta_reuse_delta_counter = Counter(
"synapse_storage_events_state_delta_reuse_delta", "")
def encode_json(json_object):
@ -445,7 +442,7 @@ class EventsStore(EventsWorkerStore):
state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties,
)
persist_event_counter.inc_by(len(chunk))
persist_event_counter.inc(len(chunk))
synapse.metrics.event_persisted_position.set(
chunk[-1][0].internal_metadata.stream_ordering,
)
@ -460,7 +457,7 @@ class EventsStore(EventsWorkerStore):
origin_type = "remote"
origin_entity = get_domain_from_id(event.sender)
event_counter.inc(event.type, origin_type, origin_entity)
event_counter.labels(event.type, origin_type, origin_entity).inc()
for room_id, new_state in current_state_for_room.iteritems():
self.get_current_state_ids.prefill(

View File

@ -13,7 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import synapse.metrics
from prometheus_client.core import Gauge, REGISTRY, GaugeMetricFamily
import os
from six.moves import intern
@ -21,23 +22,68 @@ import six
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
caches_by_name = {}
# cache_counter = metrics.register_cache(
# "cache",
# lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
# labels=["name"],
# )
collectors_by_name = {}
cache_size = Gauge("synapse_util_caches_cache:size", "", ["name"])
cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"])
cache_evicted = Gauge("synapse_util_caches_cache:evicted_size", "", ["name"])
cache_total = Gauge("synapse_util_caches_cache:total", "", ["name"])
response_cache_size = Gauge("synapse_util_caches_response_cache:size", "", ["name"])
response_cache_hits = Gauge("synapse_util_caches_response_cache:hits", "", ["name"])
response_cache_evicted = Gauge(
"synapse_util_caches_response_cache:evicted_size", "", ["name"]
)
response_cache_total = Gauge("synapse_util_caches_response_cache:total", "", ["name"])
def register_cache(name, cache):
caches_by_name[name] = cache
return metrics.register_cache(
"cache",
lambda: len(cache),
name,
)
def register_cache(cache_type, cache_name, cache):
# Check if the metric is already registered. Unregister it, if so.
# This usually happens during tests, as at runtime these caches are
# effectively singletons.
metric_name = "cache_%s_%s" % (cache_type, cache_name)
if metric_name in collectors_by_name.keys():
REGISTRY.unregister(collectors_by_name[metric_name])
class CacheMetric(object):
hits = 0
misses = 0
evicted_size = 0
def inc_hits(self):
self.hits += 1
def inc_misses(self):
self.misses += 1
def inc_evictions(self, size=1):
self.evicted_size += size
def describe(self):
return []
def collect(self):
if cache_type == "response_cache":
response_cache_size.labels(cache_name).set(len(cache))
response_cache_hits.labels(cache_name).set(self.hits)
response_cache_evicted.labels(cache_name).set(self.evicted_size)
response_cache_total.labels(cache_name).set(self.hits + self.misses)
else:
cache_size.labels(cache_name).set(len(cache))
cache_hits.labels(cache_name).set(self.hits)
cache_evicted.labels(cache_name).set(self.evicted_size)
cache_total.labels(cache_name).set(self.hits + self.misses)
yield GaugeMetricFamily("__unused", "")
metric = CacheMetric()
REGISTRY.register(metric)
caches_by_name[cache_name] = cache
collectors_by_name[metric_name] = metric
return metric
KNOWN_KEYS = {

View File

@ -80,7 +80,7 @@ class Cache(object):
self.name = name
self.keylen = keylen
self.thread = None
self.metrics = register_cache(name, self.cache)
self.metrics = register_cache("cache", name, self.cache)
def _on_evicted(self, evicted_count):
self.metrics.inc_evictions(evicted_count)

View File

@ -55,7 +55,7 @@ class DictionaryCache(object):
__slots__ = []
self.sentinel = Sentinel()
self.metrics = register_cache(name, self.cache)
self.metrics = register_cache("dictionary", name, self.cache)
def check_thread(self):
expected_thread = self.thread

View File

@ -52,12 +52,12 @@ class ExpiringCache(object):
self._cache = OrderedDict()
self.metrics = register_cache(cache_name, self)
self.iterable = iterable
self._size_estimate = 0
self.metrics = register_cache("expiring", cache_name, self)
def start(self):
if not self._expiry_ms:
# Don't bother starting the loop if things never expire

View File

@ -17,7 +17,7 @@ import logging
from twisted.internet import defer
from synapse.util.async import ObservableDeferred
from synapse.util.caches import metrics as cache_metrics
from synapse.util.caches import register_cache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
@ -38,15 +38,16 @@ class ResponseCache(object):
self.timeout_sec = timeout_ms / 1000.
self._name = name
self._metrics = cache_metrics.register_cache(
"response_cache",
size_callback=lambda: self.size(),
cache_name=name,
self._metrics = register_cache(
"response_cache", name, self
)
def size(self):
return len(self.pending_result_cache)
def __len__(self):
return self.size()
def get(self, key):
"""Look up the given key.

View File

@ -38,7 +38,7 @@ class StreamChangeCache(object):
self._cache = sorteddict()
self._earliest_known_stream_pos = current_stream_pos
self.name = name
self.metrics = register_cache(self.name, self._cache)
self.metrics = register_cache("cache", self.name, self._cache)
for entity, stream_pos in prefilled_cache.items():
self.entity_has_changed(entity, stream_pos)

View File

@ -59,7 +59,7 @@ class LoggingContext(object):
__slots__ = [
"previous_context", "name", "ru_stime", "ru_utime",
"db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms",
"db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec",
"usage_start",
"main_thread", "alive",
"request", "tag",
@ -84,10 +84,10 @@ class LoggingContext(object):
def stop(self):
pass
def add_database_transaction(self, duration_ms):
def add_database_transaction(self, duration_sec):
pass
def add_database_scheduled(self, sched_ms):
def add_database_scheduled(self, sched_sec):
pass
def __nonzero__(self):
@ -103,11 +103,11 @@ class LoggingContext(object):
self.ru_utime = 0.
self.db_txn_count = 0
# ms spent waiting for db txns, excluding scheduling time
self.db_txn_duration_ms = 0
# sec spent waiting for db txns, excluding scheduling time
self.db_txn_duration_sec = 0
# ms spent waiting for db txns to be scheduled
self.db_sched_duration_ms = 0
# sec spent waiting for db txns to be scheduled
self.db_sched_duration_sec = 0
# If alive has the thread resource usage when the logcontext last
# became active.
@ -230,18 +230,18 @@ class LoggingContext(object):
return ru_utime, ru_stime
def add_database_transaction(self, duration_ms):
def add_database_transaction(self, duration_sec):
self.db_txn_count += 1
self.db_txn_duration_ms += duration_ms
self.db_txn_duration_sec += duration_sec
def add_database_scheduled(self, sched_ms):
def add_database_scheduled(self, sched_sec):
"""Record a use of the database pool
Args:
sched_ms (int): number of milliseconds it took us to get a
sched_sec (float): number of seconds it took us to get a
connection
"""
self.db_sched_duration_ms += sched_ms
self.db_sched_duration_sec += sched_sec
class LoggingContextFilter(logging.Filter):

View File

@ -96,7 +96,7 @@ def time_function(f):
id = _TIME_FUNC_ID
_TIME_FUNC_ID += 1
start = time.clock() * 1000
start = time.clock()
try:
_log_debug_as_f(
@ -107,10 +107,10 @@ def time_function(f):
r = f(*args, **kwargs)
finally:
end = time.clock() * 1000
end = time.clock()
_log_debug_as_f(
f,
"[FUNC END] {%s-%d} %f",
"[FUNC END] {%s-%d} %.3f sec",
(func_name, id, end - start,),
)

View File

@ -15,8 +15,8 @@
from twisted.internet import defer
from prometheus_client import Counter
from synapse.util.logcontext import LoggingContext
import synapse.metrics
from functools import wraps
import logging
@ -24,66 +24,26 @@ import logging
logger = logging.getLogger(__name__)
block_counter = Counter("synapse_util_metrics_block_count", "", ["block_name"])
metrics = synapse.metrics.get_metrics_for(__name__)
block_timer = Counter("synapse_util_metrics_block_time_seconds", "", ["block_name"])
# total number of times we have hit this block
block_counter = metrics.register_counter(
"block_count",
labels=["block_name"],
alternative_names=(
# the following are all deprecated aliases for the same metric
metrics.name_prefix + x for x in (
"_block_timer:count",
"_block_ru_utime:count",
"_block_ru_stime:count",
"_block_db_txn_count:count",
"_block_db_txn_duration:count",
)
)
)
block_ru_utime = Counter(
"synapse_util_metrics_block_ru_utime_seconds", "", ["block_name"])
block_timer = metrics.register_counter(
"block_time_seconds",
labels=["block_name"],
alternative_names=(
metrics.name_prefix + "_block_timer:total",
),
)
block_ru_stime = Counter(
"synapse_util_metrics_block_ru_stime_seconds", "", ["block_name"])
block_ru_utime = metrics.register_counter(
"block_ru_utime_seconds", labels=["block_name"],
alternative_names=(
metrics.name_prefix + "_block_ru_utime:total",
),
)
block_ru_stime = metrics.register_counter(
"block_ru_stime_seconds", labels=["block_name"],
alternative_names=(
metrics.name_prefix + "_block_ru_stime:total",
),
)
block_db_txn_count = metrics.register_counter(
"block_db_txn_count", labels=["block_name"],
alternative_names=(
metrics.name_prefix + "_block_db_txn_count:total",
),
)
block_db_txn_count = Counter(
"synapse_util_metrics_block_db_txn_count", "", ["block_name"])
# seconds spent waiting for db txns, excluding scheduling time, in this block
block_db_txn_duration = metrics.register_counter(
"block_db_txn_duration_seconds", labels=["block_name"],
alternative_names=(
metrics.name_prefix + "_block_db_txn_duration:total",
),
)
block_db_txn_duration = Counter(
"synapse_util_metrics_block_db_txn_duration_seconds", "", ["block_name"])
# seconds spent waiting for a db connection, in this block
block_db_sched_duration = metrics.register_counter(
"block_db_sched_duration_seconds", labels=["block_name"],
)
block_db_sched_duration = Counter(
"synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"])
def measure_func(name):
@ -102,7 +62,7 @@ class Measure(object):
__slots__ = [
"clock", "name", "start_context", "start", "new_context", "ru_utime",
"ru_stime",
"db_txn_count", "db_txn_duration_ms", "db_sched_duration_ms",
"db_txn_count", "db_txn_duration_sec", "db_sched_duration_sec",
"created_context",
]
@ -114,7 +74,7 @@ class Measure(object):
self.created_context = False
def __enter__(self):
self.start = self.clock.time_msec()
self.start = self.clock.time()
self.start_context = LoggingContext.current_context()
if not self.start_context:
self.start_context = LoggingContext("Measure")
@ -123,17 +83,17 @@ class Measure(object):
self.ru_utime, self.ru_stime = self.start_context.get_resource_usage()
self.db_txn_count = self.start_context.db_txn_count
self.db_txn_duration_ms = self.start_context.db_txn_duration_ms
self.db_sched_duration_ms = self.start_context.db_sched_duration_ms
self.db_txn_duration_sec = self.start_context.db_txn_duration_sec
self.db_sched_duration_sec = self.start_context.db_sched_duration_sec
def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(exc_type, Exception) or not self.start_context:
return
duration = self.clock.time_msec() - self.start
duration = self.clock.time() - self.start
block_counter.inc(self.name)
block_timer.inc_by(duration, self.name)
block_counter.labels(self.name).inc()
block_timer.labels(self.name).inc(duration)
context = LoggingContext.current_context()
@ -150,19 +110,13 @@ class Measure(object):
ru_utime, ru_stime = context.get_resource_usage()
block_ru_utime.inc_by(ru_utime - self.ru_utime, self.name)
block_ru_stime.inc_by(ru_stime - self.ru_stime, self.name)
block_db_txn_count.inc_by(
context.db_txn_count - self.db_txn_count, self.name
)
block_db_txn_duration.inc_by(
(context.db_txn_duration_ms - self.db_txn_duration_ms) / 1000.,
self.name
)
block_db_sched_duration.inc_by(
(context.db_sched_duration_ms - self.db_sched_duration_ms) / 1000.,
self.name
)
block_ru_utime.labels(self.name).inc(ru_utime - self.ru_utime)
block_ru_stime.labels(self.name).inc(ru_stime - self.ru_stime)
block_db_txn_count.labels(self.name).inc(context.db_txn_count - self.db_txn_count)
block_db_txn_duration.labels(self.name).inc(
context.db_txn_duration_sec - self.db_txn_duration_sec)
block_db_sched_duration.labels(self.name).inc(
context.db_sched_duration_sec - self.db_sched_duration_sec)
if self.created_context:
self.start_context.__exit__(exc_type, exc_val, exc_tb)

View File

@ -12,3 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.trial import util
util.DEFAULT_TIMEOUT_DURATION = 10

View File

@ -1,192 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from tests import unittest
from synapse.metrics.metric import (
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric,
_escape_label_value,
)
class CounterMetricTestCase(unittest.TestCase):
def test_scalar(self):
counter = CounterMetric("scalar")
self.assertEquals(counter.render(), [
'scalar 0',
])
counter.inc()
self.assertEquals(counter.render(), [
'scalar 1',
])
counter.inc_by(2)
self.assertEquals(counter.render(), [
'scalar 3'
])
def test_vector(self):
counter = CounterMetric("vector", labels=["method"])
# Empty counter doesn't yet know what values it has
self.assertEquals(counter.render(), [])
counter.inc("GET")
self.assertEquals(counter.render(), [
'vector{method="GET"} 1',
])
counter.inc("GET")
counter.inc("PUT")
self.assertEquals(counter.render(), [
'vector{method="GET"} 2',
'vector{method="PUT"} 1',
])
class CallbackMetricTestCase(unittest.TestCase):
def test_scalar(self):
d = dict()
metric = CallbackMetric("size", lambda: len(d))
self.assertEquals(metric.render(), [
'size 0',
])
d["key"] = "value"
self.assertEquals(metric.render(), [
'size 1',
])
def test_vector(self):
vals = dict()
metric = CallbackMetric("values", lambda: vals, labels=["type"])
self.assertEquals(metric.render(), [])
# Keys have to be tuples, even if they're 1-element
vals[("foo",)] = 1
vals[("bar",)] = 2
self.assertEquals(metric.render(), [
'values{type="bar"} 2',
'values{type="foo"} 1',
])
class DistributionMetricTestCase(unittest.TestCase):
def test_scalar(self):
metric = DistributionMetric("thing")
self.assertEquals(metric.render(), [
'thing:count 0',
'thing:total 0',
])
metric.inc_by(500)
self.assertEquals(metric.render(), [
'thing:count 1',
'thing:total 500',
])
def test_vector(self):
metric = DistributionMetric("queries", labels=["verb"])
self.assertEquals(metric.render(), [])
metric.inc_by(300, "SELECT")
metric.inc_by(200, "SELECT")
metric.inc_by(800, "INSERT")
self.assertEquals(metric.render(), [
'queries:count{verb="INSERT"} 1',
'queries:count{verb="SELECT"} 2',
'queries:total{verb="INSERT"} 800',
'queries:total{verb="SELECT"} 500',
])
class CacheMetricTestCase(unittest.TestCase):
def test_cache(self):
d = dict()
metric = CacheMetric("cache", lambda: len(d), "cache_name")
self.assertEquals(metric.render(), [
'cache:hits{name="cache_name"} 0',
'cache:total{name="cache_name"} 0',
'cache:size{name="cache_name"} 0',
'cache:evicted_size{name="cache_name"} 0',
])
metric.inc_misses()
d["key"] = "value"
self.assertEquals(metric.render(), [
'cache:hits{name="cache_name"} 0',
'cache:total{name="cache_name"} 1',
'cache:size{name="cache_name"} 1',
'cache:evicted_size{name="cache_name"} 0',
])
metric.inc_hits()
self.assertEquals(metric.render(), [
'cache:hits{name="cache_name"} 1',
'cache:total{name="cache_name"} 2',
'cache:size{name="cache_name"} 1',
'cache:evicted_size{name="cache_name"} 0',
])
metric.inc_evictions(2)
self.assertEquals(metric.render(), [
'cache:hits{name="cache_name"} 1',
'cache:total{name="cache_name"} 2',
'cache:size{name="cache_name"} 1',
'cache:evicted_size{name="cache_name"} 2',
])
class LabelValueEscapeTestCase(unittest.TestCase):
def test_simple(self):
string = "safjhsdlifhyskljfksdfh"
self.assertEqual(string, _escape_label_value(string))
def test_escape(self):
self.assertEqual(
"abc\\\"def\\nghi\\\\",
_escape_label_value("abc\"def\nghi\\"),
)
def test_sequence_of_escapes(self):
self.assertEqual(
"abc\\\"def\\nghi\\\\\\n",
_escape_label_value("abc\"def\nghi\\\n"),
)

View File

@ -51,7 +51,7 @@ usedevelop=true
commands =
/usr/bin/find "{toxinidir}" -name '*.pyc' -delete
coverage run {env:COVERAGE_OPTS:} --source="{toxinidir}/synapse" \
"{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests/metrics tests/config} \
"{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests/config} \
{env:TOXSUFFIX:}
{env:DUMP_COVERAGE_COMMAND:coverage report -m}