mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2025-01-18 14:57:08 -05:00
Merge pull request #3871 from matrix-org/erikj/in_flight_block_metrics
Add in flight real time metrics for Measure blocks
This commit is contained in:
commit
8b3652831c
1
changelog.d/3871.misc
Normal file
1
changelog.d/3871.misc
Normal file
@ -0,0 +1 @@
|
|||||||
|
Add in flight real time metrics for Measure blocks
|
@ -43,6 +43,7 @@ from synapse.api.errors import (
|
|||||||
from synapse.http.endpoint import matrix_federation_endpoint
|
from synapse.http.endpoint import matrix_federation_endpoint
|
||||||
from synapse.util import logcontext
|
from synapse.util import logcontext
|
||||||
from synapse.util.logcontext import make_deferred_yieldable
|
from synapse.util.logcontext import make_deferred_yieldable
|
||||||
|
from synapse.util.metrics import Measure
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
outbound_logger = logging.getLogger("synapse.http.outbound")
|
outbound_logger = logging.getLogger("synapse.http.outbound")
|
||||||
@ -224,9 +225,11 @@ class MatrixFederationHttpClient(object):
|
|||||||
reactor=self.hs.get_reactor()
|
reactor=self.hs.get_reactor()
|
||||||
)
|
)
|
||||||
request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())
|
request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())
|
||||||
response = yield make_deferred_yieldable(
|
|
||||||
request_deferred,
|
with Measure(self.clock, "outbound_request"):
|
||||||
)
|
response = yield make_deferred_yieldable(
|
||||||
|
request_deferred,
|
||||||
|
)
|
||||||
|
|
||||||
log_result = "%d %s" % (response.code, response.phrase,)
|
log_result = "%d %s" % (response.code, response.phrase,)
|
||||||
break
|
break
|
||||||
|
@ -18,8 +18,11 @@ import gc
|
|||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import platform
|
import platform
|
||||||
|
import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
import six
|
||||||
|
|
||||||
import attr
|
import attr
|
||||||
from prometheus_client import Counter, Gauge, Histogram
|
from prometheus_client import Counter, Gauge, Histogram
|
||||||
from prometheus_client.core import REGISTRY, GaugeMetricFamily
|
from prometheus_client.core import REGISTRY, GaugeMetricFamily
|
||||||
@ -68,7 +71,7 @@ class LaterGauge(object):
|
|||||||
return
|
return
|
||||||
|
|
||||||
if isinstance(calls, dict):
|
if isinstance(calls, dict):
|
||||||
for k, v in calls.items():
|
for k, v in six.iteritems(calls):
|
||||||
g.add_metric(k, v)
|
g.add_metric(k, v)
|
||||||
else:
|
else:
|
||||||
g.add_metric([], calls)
|
g.add_metric([], calls)
|
||||||
@ -87,6 +90,109 @@ class LaterGauge(object):
|
|||||||
all_gauges[self.name] = self
|
all_gauges[self.name] = self
|
||||||
|
|
||||||
|
|
||||||
|
class InFlightGauge(object):
|
||||||
|
"""Tracks number of things (e.g. requests, Measure blocks, etc) in flight
|
||||||
|
at any given time.
|
||||||
|
|
||||||
|
Each InFlightGauge will create a metric called `<name>_total` that counts
|
||||||
|
the number of in flight blocks, as well as a metrics for each item in the
|
||||||
|
given `sub_metrics` as `<name>_<sub_metric>` which will get updated by the
|
||||||
|
callbacks.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name (str)
|
||||||
|
desc (str)
|
||||||
|
labels (list[str])
|
||||||
|
sub_metrics (list[str]): A list of sub metrics that the callbacks
|
||||||
|
will update.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, name, desc, labels, sub_metrics):
|
||||||
|
self.name = name
|
||||||
|
self.desc = desc
|
||||||
|
self.labels = labels
|
||||||
|
self.sub_metrics = sub_metrics
|
||||||
|
|
||||||
|
# Create a class which have the sub_metrics values as attributes, which
|
||||||
|
# default to 0 on initialization. Used to pass to registered callbacks.
|
||||||
|
self._metrics_class = attr.make_class(
|
||||||
|
"_MetricsEntry",
|
||||||
|
attrs={x: attr.ib(0) for x in sub_metrics},
|
||||||
|
slots=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Counts number of in flight blocks for a given set of label values
|
||||||
|
self._registrations = {}
|
||||||
|
|
||||||
|
# Protects access to _registrations
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
self._register_with_collector()
|
||||||
|
|
||||||
|
def register(self, key, callback):
|
||||||
|
"""Registers that we've entered a new block with labels `key`.
|
||||||
|
|
||||||
|
`callback` gets called each time the metrics are collected. The same
|
||||||
|
value must also be given to `unregister`.
|
||||||
|
|
||||||
|
`callback` gets called with an object that has an attribute per
|
||||||
|
sub_metric, which should be updated with the necessary values. Note that
|
||||||
|
the metrics object is shared between all callbacks registered with the
|
||||||
|
same key.
|
||||||
|
|
||||||
|
Note that `callback` may be called on a separate thread.
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
self._registrations.setdefault(key, set()).add(callback)
|
||||||
|
|
||||||
|
def unregister(self, key, callback):
|
||||||
|
"""Registers that we've exited a block with labels `key`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
self._registrations.setdefault(key, set()).discard(callback)
|
||||||
|
|
||||||
|
def collect(self):
|
||||||
|
"""Called by prometheus client when it reads metrics.
|
||||||
|
|
||||||
|
Note: may be called by a separate thread.
|
||||||
|
"""
|
||||||
|
in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels)
|
||||||
|
|
||||||
|
metrics_by_key = {}
|
||||||
|
|
||||||
|
# We copy so that we don't mutate the list while iterating
|
||||||
|
with self._lock:
|
||||||
|
keys = list(self._registrations)
|
||||||
|
|
||||||
|
for key in keys:
|
||||||
|
with self._lock:
|
||||||
|
callbacks = set(self._registrations[key])
|
||||||
|
|
||||||
|
in_flight.add_metric(key, len(callbacks))
|
||||||
|
|
||||||
|
metrics = self._metrics_class()
|
||||||
|
metrics_by_key[key] = metrics
|
||||||
|
for callback in callbacks:
|
||||||
|
callback(metrics)
|
||||||
|
|
||||||
|
yield in_flight
|
||||||
|
|
||||||
|
for name in self.sub_metrics:
|
||||||
|
gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels)
|
||||||
|
for key, metrics in six.iteritems(metrics_by_key):
|
||||||
|
gauge.add_metric(key, getattr(metrics, name))
|
||||||
|
yield gauge
|
||||||
|
|
||||||
|
def _register_with_collector(self):
|
||||||
|
if self.name in all_gauges.keys():
|
||||||
|
logger.warning("%s already registered, reregistering" % (self.name,))
|
||||||
|
REGISTRY.unregister(all_gauges.pop(self.name))
|
||||||
|
|
||||||
|
REGISTRY.register(self)
|
||||||
|
all_gauges[self.name] = self
|
||||||
|
|
||||||
|
|
||||||
#
|
#
|
||||||
# Detailed CPU metrics
|
# Detailed CPU metrics
|
||||||
#
|
#
|
||||||
|
@ -20,6 +20,7 @@ from prometheus_client import Counter
|
|||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
|
from synapse.metrics import InFlightGauge
|
||||||
from synapse.util.logcontext import LoggingContext
|
from synapse.util.logcontext import LoggingContext
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@ -45,6 +46,13 @@ block_db_txn_duration = Counter(
|
|||||||
block_db_sched_duration = Counter(
|
block_db_sched_duration = Counter(
|
||||||
"synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"])
|
"synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"])
|
||||||
|
|
||||||
|
# Tracks the number of blocks currently active
|
||||||
|
in_flight = InFlightGauge(
|
||||||
|
"synapse_util_metrics_block_in_flight", "",
|
||||||
|
labels=["block_name"],
|
||||||
|
sub_metrics=["real_time_max", "real_time_sum"],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def measure_func(name):
|
def measure_func(name):
|
||||||
def wrapper(func):
|
def wrapper(func):
|
||||||
@ -82,10 +90,14 @@ class Measure(object):
|
|||||||
|
|
||||||
self.start_usage = self.start_context.get_resource_usage()
|
self.start_usage = self.start_context.get_resource_usage()
|
||||||
|
|
||||||
|
in_flight.register((self.name,), self._update_in_flight)
|
||||||
|
|
||||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||||
if isinstance(exc_type, Exception) or not self.start_context:
|
if isinstance(exc_type, Exception) or not self.start_context:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
in_flight.unregister((self.name,), self._update_in_flight)
|
||||||
|
|
||||||
duration = self.clock.time() - self.start
|
duration = self.clock.time() - self.start
|
||||||
|
|
||||||
block_counter.labels(self.name).inc()
|
block_counter.labels(self.name).inc()
|
||||||
@ -120,3 +132,13 @@ class Measure(object):
|
|||||||
|
|
||||||
if self.created_context:
|
if self.created_context:
|
||||||
self.start_context.__exit__(exc_type, exc_val, exc_tb)
|
self.start_context.__exit__(exc_type, exc_val, exc_tb)
|
||||||
|
|
||||||
|
def _update_in_flight(self, metrics):
|
||||||
|
"""Gets called when processing in flight metrics
|
||||||
|
"""
|
||||||
|
duration = self.clock.time() - self.start
|
||||||
|
|
||||||
|
metrics.real_time_max = max(metrics.real_time_max, duration)
|
||||||
|
metrics.real_time_sum += duration
|
||||||
|
|
||||||
|
# TODO: Add other in flight metrics.
|
||||||
|
81
tests/test_metrics.py
Normal file
81
tests/test_metrics.py
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2018 New Vector Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
|
||||||
|
from synapse.metrics import InFlightGauge
|
||||||
|
|
||||||
|
from tests import unittest
|
||||||
|
|
||||||
|
|
||||||
|
class TestMauLimit(unittest.TestCase):
|
||||||
|
def test_basic(self):
|
||||||
|
gauge = InFlightGauge(
|
||||||
|
"test1", "",
|
||||||
|
labels=["test_label"],
|
||||||
|
sub_metrics=["foo", "bar"],
|
||||||
|
)
|
||||||
|
|
||||||
|
def handle1(metrics):
|
||||||
|
metrics.foo += 2
|
||||||
|
metrics.bar = max(metrics.bar, 5)
|
||||||
|
|
||||||
|
def handle2(metrics):
|
||||||
|
metrics.foo += 3
|
||||||
|
metrics.bar = max(metrics.bar, 7)
|
||||||
|
|
||||||
|
gauge.register(("key1",), handle1)
|
||||||
|
|
||||||
|
self.assert_dict({
|
||||||
|
"test1_total": {("key1",): 1},
|
||||||
|
"test1_foo": {("key1",): 2},
|
||||||
|
"test1_bar": {("key1",): 5},
|
||||||
|
}, self.get_metrics_from_gauge(gauge))
|
||||||
|
|
||||||
|
gauge.unregister(("key1",), handle1)
|
||||||
|
|
||||||
|
self.assert_dict({
|
||||||
|
"test1_total": {("key1",): 0},
|
||||||
|
"test1_foo": {("key1",): 0},
|
||||||
|
"test1_bar": {("key1",): 0},
|
||||||
|
}, self.get_metrics_from_gauge(gauge))
|
||||||
|
|
||||||
|
gauge.register(("key1",), handle1)
|
||||||
|
gauge.register(("key2",), handle2)
|
||||||
|
|
||||||
|
self.assert_dict({
|
||||||
|
"test1_total": {("key1",): 1, ("key2",): 1},
|
||||||
|
"test1_foo": {("key1",): 2, ("key2",): 3},
|
||||||
|
"test1_bar": {("key1",): 5, ("key2",): 7},
|
||||||
|
}, self.get_metrics_from_gauge(gauge))
|
||||||
|
|
||||||
|
gauge.unregister(("key2",), handle2)
|
||||||
|
gauge.register(("key1",), handle2)
|
||||||
|
|
||||||
|
self.assert_dict({
|
||||||
|
"test1_total": {("key1",): 2, ("key2",): 0},
|
||||||
|
"test1_foo": {("key1",): 5, ("key2",): 0},
|
||||||
|
"test1_bar": {("key1",): 7, ("key2",): 0},
|
||||||
|
}, self.get_metrics_from_gauge(gauge))
|
||||||
|
|
||||||
|
def get_metrics_from_gauge(self, gauge):
|
||||||
|
results = {}
|
||||||
|
|
||||||
|
for r in gauge.collect():
|
||||||
|
results[r.name] = {
|
||||||
|
tuple(labels[x] for x in gauge.labels): value
|
||||||
|
for _, labels, value in r.samples
|
||||||
|
}
|
||||||
|
|
||||||
|
return results
|
Loading…
Reference in New Issue
Block a user