Rewrite BucketCollector

This was a bit unweildy for what I wanted: in particular, I wanted to assign
each measurement straight into a bucket, rather than storing an intermediate
Counter which didn't do any bucketing at all.

I've replaced it with something that is hopefully a bit easier to use.

(I'm not entirely sure what the difference between a HistogramMetricFamily and
a GaugeHistogramMetricFamily is, but given our counters can go down as well as
up the latter *sounds* more accurate?)
This commit is contained in:
Richard van der Hoff 2020-09-29 22:26:28 +01:00
parent 1c8ca2c543
commit 6d2d42f8fb
3 changed files with 88 additions and 70 deletions

View File

@ -15,6 +15,7 @@
import functools import functools
import gc import gc
import itertools
import logging import logging
import os import os
import platform import platform
@ -27,8 +28,8 @@ from prometheus_client import Counter, Gauge, Histogram
from prometheus_client.core import ( from prometheus_client.core import (
REGISTRY, REGISTRY,
CounterMetricFamily, CounterMetricFamily,
GaugeHistogramMetricFamily,
GaugeMetricFamily, GaugeMetricFamily,
HistogramMetricFamily,
) )
from twisted.internet import reactor from twisted.internet import reactor
@ -46,7 +47,7 @@ logger = logging.getLogger(__name__)
METRICS_PREFIX = "/_synapse/metrics" METRICS_PREFIX = "/_synapse/metrics"
running_on_pypy = platform.python_implementation() == "PyPy" running_on_pypy = platform.python_implementation() == "PyPy"
all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge, BucketCollector]] all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge]]
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
@ -205,63 +206,83 @@ class InFlightGauge:
all_gauges[self.name] = self all_gauges[self.name] = self
@attr.s(slots=True, hash=True) class GaugeBucketCollector:
class BucketCollector: """Like a Histogram, but the buckets are Gauges which are updated atomically.
"""
Like a Histogram, but allows buckets to be point-in-time instead of
incrementally added to.
Args: The data is updated by calling `update_data` with an iterable of measurements.
name (str): Base name of metric to be exported to Prometheus.
data_collector (callable -> dict): A synchronous callable that
returns a dict mapping bucket to number of items in the
bucket. If these buckets are not the same as the buckets
given to this class, they will be remapped into them.
buckets (list[float]): List of floats/ints of the buckets to
give to Prometheus. +Inf is ignored, if given.
We assume that the data is updated less frequently than it is reported to
Prometheus, and optimise for that case.
""" """
name = attr.ib() __slots__ = ("_name", "_documentation", "_bucket_bounds", "_metric")
data_collector = attr.ib()
buckets = attr.ib() def __init__(
self,
name: str,
documentation: str,
buckets: Iterable[float],
registry=REGISTRY,
):
"""
Args:
name: base name of metric to be exported to Prometheus. (a _bucket suffix
will be added.)
documentation: help text for the metric
buckets: The top bounds of the buckets to report
registry: metric registry to register with
"""
self._name = name
self._documentation = documentation
# the tops of the buckets
self._bucket_bounds = [float(b) for b in buckets]
if self._bucket_bounds != sorted(self._bucket_bounds):
raise ValueError("Buckets not in sorted order")
if self._bucket_bounds[-1] != float("inf"):
self._bucket_bounds.append(float("inf"))
self._metric = self._values_to_metric([])
registry.register(self)
def collect(self): def collect(self):
yield self._metric
# Fetch the data -- this must be synchronous! def update_data(self, values: Iterable[float]):
data = self.data_collector() """Update the data to be reported by the metric
buckets = {} # type: Dict[float, int] The existing data is cleared, and each measurement in the input is assigned
to the relevant bucket.
"""
self._metric = self._values_to_metric(values)
res = [] def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFamily:
for x in data.keys(): total = 0.0
for i, bound in enumerate(self.buckets): bucket_values = [0 for _ in self._bucket_bounds]
if x <= bound:
buckets[bound] = buckets.get(bound, 0) + data[x]
for i in self.buckets: for v in values:
res.append([str(i), buckets.get(i, 0)]) # assign each value to a bucket
for i, bound in enumerate(self._bucket_bounds):
if v <= bound:
bucket_values[i] += 1
break
res.append(["+Inf", sum(data.values())]) # ... and increment the sum
total += v
metric = HistogramMetricFamily( # now, aggregate the bucket values so that they count the number of entries in
self.name, "", buckets=res, sum_value=sum(x * y for x, y in data.items()) # that bucket or below.
accumulated_values = itertools.accumulate(bucket_values)
return GaugeHistogramMetricFamily(
self._name,
self._documentation,
buckets=list(
zip((str(b) for b in self._bucket_bounds), accumulated_values)
),
gsum_value=total,
) )
yield metric
def __attrs_post_init__(self):
self.buckets = [float(x) for x in self.buckets if x != "+Inf"]
if self.buckets != sorted(self.buckets):
raise ValueError("Buckets not sorted")
self.buckets = tuple(self.buckets)
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
# #

View File

@ -12,10 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import typing
from collections import Counter
from synapse.metrics import BucketCollector from synapse.metrics import GaugeBucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import SQLBaseStore from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool from synapse.storage.database import DatabasePool
@ -23,6 +21,14 @@ from synapse.storage.databases.main.event_push_actions import (
EventPushActionsWorkerStore, EventPushActionsWorkerStore,
) )
# Collect metrics on the number of forward extremities that exist.
_extremities_collecter = GaugeBucketCollector(
"synapse_forward_extremities",
"Number of rooms on the server with the given number of forward extremities"
" or fewer",
buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500],
)
class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
"""Functions to pull various metrics from the DB, for e.g. phone home """Functions to pull various metrics from the DB, for e.g. phone home
@ -32,18 +38,6 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs): def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs) super().__init__(database, db_conn, hs)
# Collect metrics on the number of forward extremities that exist.
# Counter of number of extremities to count
self._current_forward_extremities_amount = (
Counter()
) # type: typing.Counter[int]
BucketCollector(
"synapse_forward_extremities",
lambda: self._current_forward_extremities_amount,
buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"],
)
# Read the extrems every 60 minutes # Read the extrems every 60 minutes
def read_forward_extremities(): def read_forward_extremities():
# run as a background process to make sure that the database transactions # run as a background process to make sure that the database transactions
@ -65,7 +59,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
return txn.fetchall() return txn.fetchall()
res = await self.db_pool.runInteraction("read_forward_extremities", fetch) res = await self.db_pool.runInteraction("read_forward_extremities", fetch)
self._current_forward_extremities_amount = Counter([x[0] for x in res]) _extremities_collecter.update_data(x[0] for x in res)
async def count_daily_messages(self): async def count_daily_messages(self):
""" """

View File

@ -52,14 +52,14 @@ class ExtremStatisticsTestCase(HomeserverTestCase):
self.reactor.advance(60 * 60 * 1000) self.reactor.advance(60 * 60 * 1000)
self.pump(1) self.pump(1)
items = set( items = list(
filter( filter(
lambda x: b"synapse_forward_extremities_" in x, lambda x: b"synapse_forward_extremities_" in x,
generate_latest(REGISTRY).split(b"\n"), generate_latest(REGISTRY, emit_help=False).split(b"\n"),
) )
) )
expected = { expected = [
b'synapse_forward_extremities_bucket{le="1.0"} 0.0', b'synapse_forward_extremities_bucket{le="1.0"} 0.0',
b'synapse_forward_extremities_bucket{le="2.0"} 2.0', b'synapse_forward_extremities_bucket{le="2.0"} 2.0',
b'synapse_forward_extremities_bucket{le="3.0"} 2.0', b'synapse_forward_extremities_bucket{le="3.0"} 2.0',
@ -72,9 +72,12 @@ class ExtremStatisticsTestCase(HomeserverTestCase):
b'synapse_forward_extremities_bucket{le="100.0"} 3.0', b'synapse_forward_extremities_bucket{le="100.0"} 3.0',
b'synapse_forward_extremities_bucket{le="200.0"} 3.0', b'synapse_forward_extremities_bucket{le="200.0"} 3.0',
b'synapse_forward_extremities_bucket{le="500.0"} 3.0', b'synapse_forward_extremities_bucket{le="500.0"} 3.0',
b'synapse_forward_extremities_bucket{le="+Inf"} 3.0', # per https://docs.google.com/document/d/1KwV0mAXwwbvvifBvDKH_LU1YjyXE_wxCkHNoCGq1GX0/edit#heading=h.wghdjzzh72j9,
b"synapse_forward_extremities_count 3.0", # "inf" is valid: "this includes variants such as inf"
b"synapse_forward_extremities_sum 10.0", b'synapse_forward_extremities_bucket{le="inf"} 3.0',
} b"# TYPE synapse_forward_extremities_gcount gauge",
b"synapse_forward_extremities_gcount 3.0",
b"# TYPE synapse_forward_extremities_gsum gauge",
b"synapse_forward_extremities_gsum 10.0",
]
self.assertEqual(items, expected) self.assertEqual(items, expected)