forked-synapse/synapse/metrics/__init__.py

264 lines
7.3 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
2016-01-07 04:26:29 +00:00
# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
2015-08-13 10:38:59 +00:00
import functools
import time
import gc
2018-05-23 18:03:56 +00:00
import os
import platform
2018-05-22 00:47:37 +00:00
import attr
2015-08-13 10:38:59 +00:00
2018-05-22 00:47:37 +00:00
from prometheus_client import Gauge, Histogram, Counter
2018-05-22 22:36:20 +00:00
from prometheus_client.core import GaugeMetricFamily, REGISTRY
2018-05-22 00:47:37 +00:00
from twisted.internet import reactor
logger = logging.getLogger(__name__)
2018-05-22 22:32:57 +00:00
running_on_pypy = platform.python_implementation() == "PyPy"
all_metrics = []
all_collectors = []
2018-05-22 00:47:37 +00:00
all_gauges = {}
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
2018-05-22 21:28:23 +00:00
2018-05-29 01:32:15 +00:00
2018-05-22 21:28:23 +00:00
class RegistryProxy(object):
@staticmethod
def collect():
2018-05-22 21:28:23 +00:00
for metric in REGISTRY.collect():
if not metric.name.startswith("__"):
yield metric
2018-05-22 00:47:37 +00:00
@attr.s(hash=True)
class LaterGauge(object):
2018-05-22 00:47:37 +00:00
name = attr.ib()
desc = attr.ib()
labels = attr.ib(hash=False)
caller = attr.ib()
2018-04-11 10:07:33 +00:00
2018-05-22 00:47:37 +00:00
def collect(self):
2018-05-22 21:28:23 +00:00
g = GaugeMetricFamily(self.name, self.desc, labels=self.labels)
2018-05-22 00:47:37 +00:00
try:
calls = self.caller()
except Exception:
logger.exception(
"Exception running callback for LaterGuage(%s)",
self.name,
)
2018-05-22 00:47:37 +00:00
yield g
return
2018-05-22 00:47:37 +00:00
if isinstance(calls, dict):
for k, v in calls.items():
g.add_metric(k, v)
else:
g.add_metric([], calls)
2018-05-22 00:47:37 +00:00
yield g
def __attrs_post_init__(self):
self._register()
def _register(self):
2018-05-22 00:47:37 +00:00
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
2018-05-22 00:47:37 +00:00
REGISTRY.unregister(all_gauges.pop(self.name))
2018-05-22 00:47:37 +00:00
REGISTRY.register(self)
all_gauges[self.name] = self
2018-05-23 18:03:56 +00:00
#
# Detailed CPU metrics
#
class CPUMetrics(object):
def __init__(self):
ticks_per_sec = 100
try:
# Try and get the system config
ticks_per_sec = os.sysconf('SC_CLK_TCK')
except (ValueError, TypeError, AttributeError):
pass
self.ticks_per_sec = ticks_per_sec
def collect(self):
if not HAVE_PROC_SELF_STAT:
return
2018-05-23 18:03:56 +00:00
with open("/proc/self/stat") as s:
line = s.read()
raw_stats = line.split(") ", 1)[1].split(" ")
user = GaugeMetricFamily("process_cpu_user_seconds_total", "")
user.add_metric([], float(raw_stats[11]) / self.ticks_per_sec)
yield user
sys = GaugeMetricFamily("process_cpu_system_seconds_total", "")
sys.add_metric([], float(raw_stats[12]) / self.ticks_per_sec)
yield sys
2018-05-23 18:08:59 +00:00
2018-05-23 18:03:56 +00:00
REGISTRY.register(CPUMetrics())
2018-05-22 00:47:37 +00:00
#
# Python GC metrics
#
2018-05-22 00:47:37 +00:00
gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"])
2018-05-22 22:32:57 +00:00
gc_time = Histogram(
"python_gc_time",
2018-05-28 09:10:27 +00:00
"Time taken to GC (sec)",
2018-05-22 22:32:57 +00:00
["gen"],
2018-05-28 09:10:27 +00:00
buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50,
5.00, 7.50, 15.00, 30.00, 45.00, 60.00],
2018-05-22 22:32:57 +00:00
)
2018-05-22 00:47:37 +00:00
class GCCounts(object):
2018-05-22 22:32:57 +00:00
2018-05-22 00:47:37 +00:00
def collect(self):
cm = GaugeMetricFamily("python_gc_counts", "GC cycle counts", labels=["gen"])
for n, m in enumerate(gc.get_count()):
cm.add_metric([str(n)], m)
2018-05-22 00:47:37 +00:00
yield cm
2018-05-22 22:32:57 +00:00
2018-05-22 00:47:37 +00:00
REGISTRY.register(GCCounts())
2018-05-22 00:47:37 +00:00
#
# Twisted reactor metrics
#
2018-05-22 22:32:57 +00:00
tick_time = Histogram(
"python_twisted_reactor_tick_time",
2018-05-28 09:10:27 +00:00
"Tick time of the Twisted reactor (sec)",
2018-05-28 09:16:09 +00:00
buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5],
2018-05-22 22:32:57 +00:00
)
pending_calls_metric = Histogram(
"python_twisted_reactor_pending_calls",
"Pending calls",
buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000],
)
2015-08-13 10:38:59 +00:00
2018-05-22 00:47:37 +00:00
#
# Federation Metrics
#
2015-08-13 10:38:59 +00:00
2018-05-22 00:47:37 +00:00
sent_edus_counter = Counter("synapse_federation_client_sent_edus", "")
2016-06-07 15:51:01 +00:00
2018-05-22 00:47:37 +00:00
sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "")
2018-05-22 00:47:37 +00:00
events_processed_counter = Counter("synapse_federation_client_events_processed", "")
# Used to track where various components have processed in the event stream,
# e.g. federation sending, appservice sending, etc.
2018-05-22 00:47:37 +00:00
event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"])
# Used to track the current max events stream position
2018-05-22 00:47:37 +00:00
event_persisted_position = Gauge("synapse_event_persisted_position", "")
2018-04-11 10:52:19 +00:00
# Used to track the received_ts of the last event processed by various
# components
2018-05-22 00:47:37 +00:00
event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"])
2018-04-11 10:52:19 +00:00
# Used to track the lag processing events. This is the time difference
# between the last processed event's received_ts and the time it was
# finished being processed.
2018-05-22 00:47:37 +00:00
event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
2015-08-13 10:38:59 +00:00
2018-05-22 22:32:57 +00:00
2015-08-13 10:38:59 +00:00
def runUntilCurrentTimer(func):
@functools.wraps(func)
def f(*args, **kwargs):
2015-08-14 14:42:52 +00:00
now = reactor.seconds()
num_pending = 0
# _newTimedCalls is one long list of *all* pending calls. Below loop
# is based off of impl of reactor.runUntilCurrent
2015-08-18 10:47:00 +00:00
for delayed_call in reactor._newTimedCalls:
if delayed_call.time > now:
2015-08-14 14:42:52 +00:00
break
2015-08-18 10:47:00 +00:00
if delayed_call.delayed_time > 0:
2015-08-14 14:42:52 +00:00
continue
num_pending += 1
num_pending += len(reactor.threadCallQueue)
2018-05-28 09:10:27 +00:00
start = time.time()
2015-08-13 10:38:59 +00:00
ret = func(*args, **kwargs)
2018-05-28 09:10:27 +00:00
end = time.time()
# record the amount of wallclock time spent running pending calls.
# This is a proxy for the actual amount of time between reactor polls,
# since about 25% of time is actually spent running things triggered by
# I/O events, but that is harder to capture without rewriting half the
# reactor.
2018-05-22 00:47:37 +00:00
tick_time.observe(end - start)
pending_calls_metric.observe(num_pending)
if running_on_pypy:
return ret
2016-05-13 15:31:08 +00:00
# Check if we need to do a manual GC (since its been disabled), and do
# one if necessary.
threshold = gc.get_threshold()
counts = gc.get_count()
for i in (2, 1, 0):
if threshold[i] < counts[i]:
logger.info("Collecting gc %d", i)
2016-05-16 08:32:29 +00:00
2018-05-28 09:10:27 +00:00
start = time.time()
unreachable = gc.collect(i)
2018-05-28 09:10:27 +00:00
end = time.time()
2016-05-16 08:32:29 +00:00
2018-05-22 00:47:37 +00:00
gc_time.labels(i).observe(end - start)
gc_unreachable.labels(i).set(unreachable)
2015-08-13 10:38:59 +00:00
return ret
return f
try:
# Ensure the reactor has all the attributes we expect
reactor.runUntilCurrent
reactor._newTimedCalls
reactor.threadCallQueue
2015-08-13 10:38:59 +00:00
# runUntilCurrent is called when we have pending calls. It is called once
# per iteratation after fd polling.
reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent)
2016-05-13 15:31:08 +00:00
# We manually run the GC each reactor tick so that we can get some metrics
# about time spent doing GC,
if not running_on_pypy:
gc.disable()
except AttributeError:
pass