Add in flight real time metrics for Measure blocks

This commit is contained in:
Erik Johnston 2018-09-14 14:39:59 +01:00
parent ad9198cc34
commit 0a81038ea0
2 changed files with 131 additions and 1 deletions

View File

@ -18,8 +18,11 @@ import gc
import logging import logging
import os import os
import platform import platform
import threading
import time import time
import six
import attr import attr
from prometheus_client import Counter, Gauge, Histogram from prometheus_client import Counter, Gauge, Histogram
from prometheus_client.core import REGISTRY, GaugeMetricFamily from prometheus_client.core import REGISTRY, GaugeMetricFamily
@ -68,7 +71,7 @@ class LaterGauge(object):
return return
if isinstance(calls, dict): if isinstance(calls, dict):
for k, v in calls.items(): for k, v in six.iteritems(calls):
g.add_metric(k, v) g.add_metric(k, v)
else: else:
g.add_metric([], calls) g.add_metric([], calls)
@ -87,6 +90,111 @@ class LaterGauge(object):
all_gauges[self.name] = self all_gauges[self.name] = self
class InFlightGauge(object):
"""Tracks number of things (e.g. requests, Measure blocks, etc) in flight
at any given time.
Each InFlightGauge will create a metric called `<name>_total` that counts
the number of in flight blocks, as well as a metrics for each item in the
given `sub_metrics` as `<name>_<sub_metric>` which will get updated by the
callbacks.
Args:
name (str)
desc (str)
labels (list[str])
sub_metrics (list[str]): A list of sub metrics that the callbacks
will update.
"""
# TODO: Expand this to
def __init__(self, name, desc, labels, sub_metrics):
self.name = name
self.desc = desc
self.labels = labels
self.sub_metrics = sub_metrics
# Create a class which have the sub_metrics values as attributes, which
# default to 0 on initialization. Used to pass to registered callbacks.
self._metrics_class = attr.make_class(
"_MetricsEntry",
attrs={x: attr.ib(0) for x in sub_metrics},
slots=True,
)
# Counts number of in flight blocks for a given set of label values
self._registrations = {}
# Protects access to _registrations
self._lock = threading.Lock()
self._register_with_collector()
def register(self, key, callback):
"""Registers that we've entered a new block with labels `key`.
`callback` gets called each time the metrics are collected. The same
value must also be given to `unregister`.
`callback` gets called with an object that has an attribute per
sub_metric, which should be updated with the necessary values. Note that
the metrics object is shared between all callbacks registered with the
same key.
Note that `callback` may be called on a separate thread.
"""
with self._lock:
self._registrations.setdefault(key, set()).add(callback)
def unregister(self, key, callback):
"""Registers that we've exited a block with labels `key`.
"""
with self._lock:
self._registrations.setdefault(key, set()).discard(callback)
def collect(self):
"""Called by prometheus client when it reads metrics.
Note: may be called by a separate thread.
"""
in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels)
metrics_by_key = {}
# We copy so that we don't mutate the list while iterating
with self._lock:
keys = list(self._registrations)
for key in keys:
with self._lock:
callbacks = set(self._registrations[key])
in_flight.add_metric(key, len(callbacks))
metrics = self._metrics_class()
metrics_by_key[key] = metrics
for callback in callbacks:
callback(metrics)
yield in_flight
for name in self.sub_metrics:
gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels)
for key, metrics in six.iteritems(metrics_by_key):
gauge.add_metric(key, getattr(metrics, name))
yield gauge
def _register_with_collector(self):
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
# #
# Detailed CPU metrics # Detailed CPU metrics
# #

View File

@ -20,6 +20,7 @@ from prometheus_client import Counter
from twisted.internet import defer from twisted.internet import defer
from synapse.metrics import InFlightGauge
from synapse.util.logcontext import LoggingContext from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -45,6 +46,13 @@ block_db_txn_duration = Counter(
block_db_sched_duration = Counter( block_db_sched_duration = Counter(
"synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"]) "synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"])
# Tracks the number of blocks currently active
in_flight = InFlightGauge(
"synapse_util_metrics_block_in_flight", "",
labels=["block_name"],
sub_metrics=["real_time_max", "real_time_sum"],
)
def measure_func(name): def measure_func(name):
def wrapper(func): def wrapper(func):
@ -82,10 +90,14 @@ class Measure(object):
self.start_usage = self.start_context.get_resource_usage() self.start_usage = self.start_context.get_resource_usage()
in_flight.register((self.name,), self._update_in_flight)
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(exc_type, Exception) or not self.start_context: if isinstance(exc_type, Exception) or not self.start_context:
return return
in_flight.unregister((self.name,), self._update_in_flight)
duration = self.clock.time() - self.start duration = self.clock.time() - self.start
block_counter.labels(self.name).inc() block_counter.labels(self.name).inc()
@ -120,3 +132,13 @@ class Measure(object):
if self.created_context: if self.created_context:
self.start_context.__exit__(exc_type, exc_val, exc_tb) self.start_context.__exit__(exc_type, exc_val, exc_tb)
def _update_in_flight(self, metrics):
"""Gets called when processing in flight metrics
"""
duration = self.clock.time() - self.start
metrics.real_time_max = max(metrics.real_time_max, duration)
metrics.real_time_sum += duration
# TODO: Add other in flight metrics.