mirror of
https://mau.dev/maunium/synapse.git
synced 2024-10-01 01:36:05 -04:00
23740eaa3d
During the migration the automated script to update the copyright headers accidentally got rid of some of the existing copyright lines. Reinstate them.
487 lines
15 KiB
Python
487 lines
15 KiB
Python
#
|
|
# This file is licensed under the Affero General Public License (AGPL) version 3.
|
|
#
|
|
# Copyright 2022 The Matrix.org Foundation C.I.C.
|
|
# Copyright 2015, 2016 OpenMarket Ltd
|
|
# Copyright (C) 2023 New Vector, Ltd
|
|
#
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as
|
|
# published by the Free Software Foundation, either version 3 of the
|
|
# License, or (at your option) any later version.
|
|
#
|
|
# See the GNU Affero General Public License for more details:
|
|
# <https://www.gnu.org/licenses/agpl-3.0.html>.
|
|
#
|
|
# Originally licensed under the Apache License, Version 2.0:
|
|
# <http://www.apache.org/licenses/LICENSE-2.0>.
|
|
#
|
|
# [This file includes modifications made by New Vector Limited]
|
|
#
|
|
#
|
|
|
|
import itertools
|
|
import logging
|
|
import os
|
|
import platform
|
|
import threading
|
|
from typing import (
|
|
Callable,
|
|
Dict,
|
|
Generic,
|
|
Iterable,
|
|
Mapping,
|
|
Optional,
|
|
Set,
|
|
Tuple,
|
|
Type,
|
|
TypeVar,
|
|
Union,
|
|
cast,
|
|
)
|
|
|
|
import attr
|
|
from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, Metric
|
|
from prometheus_client.core import (
|
|
REGISTRY,
|
|
GaugeHistogramMetricFamily,
|
|
GaugeMetricFamily,
|
|
)
|
|
|
|
from twisted.python.threadpool import ThreadPool
|
|
|
|
# This module is imported for its side effects; flake8 needn't warn that it's unused.
|
|
import synapse.metrics._reactor_metrics # noqa: F401
|
|
from synapse.metrics._gc import MIN_TIME_BETWEEN_GCS, install_gc_manager
|
|
from synapse.metrics._twisted_exposition import MetricsResource, generate_latest
|
|
from synapse.metrics._types import Collector
|
|
from synapse.types import StrSequence
|
|
from synapse.util import SYNAPSE_VERSION
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
METRICS_PREFIX = "/_synapse/metrics"
|
|
|
|
all_gauges: Dict[str, Collector] = {}
|
|
|
|
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
|
|
|
|
|
|
class _RegistryProxy:
|
|
@staticmethod
|
|
def collect() -> Iterable[Metric]:
|
|
for metric in REGISTRY.collect():
|
|
if not metric.name.startswith("__"):
|
|
yield metric
|
|
|
|
|
|
# A little bit nasty, but collect() above is static so a Protocol doesn't work.
|
|
# _RegistryProxy matches the signature of a CollectorRegistry instance enough
|
|
# for it to be usable in the contexts in which we use it.
|
|
# TODO Do something nicer about this.
|
|
RegistryProxy = cast(CollectorRegistry, _RegistryProxy)
|
|
|
|
|
|
@attr.s(slots=True, hash=True, auto_attribs=True)
|
|
class LaterGauge(Collector):
|
|
"""A Gauge which periodically calls a user-provided callback to produce metrics."""
|
|
|
|
name: str
|
|
desc: str
|
|
labels: Optional[StrSequence] = attr.ib(hash=False)
|
|
# callback: should either return a value (if there are no labels for this metric),
|
|
# or dict mapping from a label tuple to a value
|
|
caller: Callable[
|
|
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
|
|
]
|
|
|
|
def collect(self) -> Iterable[Metric]:
|
|
g = GaugeMetricFamily(self.name, self.desc, labels=self.labels)
|
|
|
|
try:
|
|
calls = self.caller()
|
|
except Exception:
|
|
logger.exception("Exception running callback for LaterGauge(%s)", self.name)
|
|
yield g
|
|
return
|
|
|
|
if isinstance(calls, (int, float)):
|
|
g.add_metric([], calls)
|
|
else:
|
|
for k, v in calls.items():
|
|
g.add_metric(k, v)
|
|
|
|
yield g
|
|
|
|
def __attrs_post_init__(self) -> None:
|
|
self._register()
|
|
|
|
def _register(self) -> None:
|
|
if self.name in all_gauges.keys():
|
|
logger.warning("%s already registered, reregistering" % (self.name,))
|
|
REGISTRY.unregister(all_gauges.pop(self.name))
|
|
|
|
REGISTRY.register(self)
|
|
all_gauges[self.name] = self
|
|
|
|
|
|
# `MetricsEntry` only makes sense when it is a `Protocol`,
|
|
# but `Protocol` can't be used as a `TypeVar` bound.
|
|
MetricsEntry = TypeVar("MetricsEntry")
|
|
|
|
|
|
class InFlightGauge(Generic[MetricsEntry], Collector):
|
|
"""Tracks number of things (e.g. requests, Measure blocks, etc) in flight
|
|
at any given time.
|
|
|
|
Each InFlightGauge will create a metric called `<name>_total` that counts
|
|
the number of in flight blocks, as well as a metrics for each item in the
|
|
given `sub_metrics` as `<name>_<sub_metric>` which will get updated by the
|
|
callbacks.
|
|
|
|
Args:
|
|
name
|
|
desc
|
|
labels
|
|
sub_metrics: A list of sub metrics that the callbacks will update.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
desc: str,
|
|
labels: StrSequence,
|
|
sub_metrics: StrSequence,
|
|
):
|
|
self.name = name
|
|
self.desc = desc
|
|
self.labels = labels
|
|
self.sub_metrics = sub_metrics
|
|
|
|
# Create a class which have the sub_metrics values as attributes, which
|
|
# default to 0 on initialization. Used to pass to registered callbacks.
|
|
self._metrics_class: Type[MetricsEntry] = attr.make_class(
|
|
"_MetricsEntry",
|
|
attrs={x: attr.ib(default=0) for x in sub_metrics},
|
|
slots=True,
|
|
)
|
|
|
|
# Counts number of in flight blocks for a given set of label values
|
|
self._registrations: Dict[
|
|
Tuple[str, ...], Set[Callable[[MetricsEntry], None]]
|
|
] = {}
|
|
|
|
# Protects access to _registrations
|
|
self._lock = threading.Lock()
|
|
|
|
self._register_with_collector()
|
|
|
|
def register(
|
|
self,
|
|
key: Tuple[str, ...],
|
|
callback: Callable[[MetricsEntry], None],
|
|
) -> None:
|
|
"""Registers that we've entered a new block with labels `key`.
|
|
|
|
`callback` gets called each time the metrics are collected. The same
|
|
value must also be given to `unregister`.
|
|
|
|
`callback` gets called with an object that has an attribute per
|
|
sub_metric, which should be updated with the necessary values. Note that
|
|
the metrics object is shared between all callbacks registered with the
|
|
same key.
|
|
|
|
Note that `callback` may be called on a separate thread.
|
|
"""
|
|
with self._lock:
|
|
self._registrations.setdefault(key, set()).add(callback)
|
|
|
|
def unregister(
|
|
self,
|
|
key: Tuple[str, ...],
|
|
callback: Callable[[MetricsEntry], None],
|
|
) -> None:
|
|
"""Registers that we've exited a block with labels `key`."""
|
|
|
|
with self._lock:
|
|
self._registrations.setdefault(key, set()).discard(callback)
|
|
|
|
def collect(self) -> Iterable[Metric]:
|
|
"""Called by prometheus client when it reads metrics.
|
|
|
|
Note: may be called by a separate thread.
|
|
"""
|
|
in_flight = GaugeMetricFamily(
|
|
self.name + "_total", self.desc, labels=self.labels
|
|
)
|
|
|
|
metrics_by_key = {}
|
|
|
|
# We copy so that we don't mutate the list while iterating
|
|
with self._lock:
|
|
keys = list(self._registrations)
|
|
|
|
for key in keys:
|
|
with self._lock:
|
|
callbacks = set(self._registrations[key])
|
|
|
|
in_flight.add_metric(key, len(callbacks))
|
|
|
|
metrics = self._metrics_class()
|
|
metrics_by_key[key] = metrics
|
|
for callback in callbacks:
|
|
callback(metrics)
|
|
|
|
yield in_flight
|
|
|
|
for name in self.sub_metrics:
|
|
gauge = GaugeMetricFamily(
|
|
"_".join([self.name, name]), "", labels=self.labels
|
|
)
|
|
for key, metrics in metrics_by_key.items():
|
|
gauge.add_metric(key, getattr(metrics, name))
|
|
yield gauge
|
|
|
|
def _register_with_collector(self) -> None:
|
|
if self.name in all_gauges.keys():
|
|
logger.warning("%s already registered, reregistering" % (self.name,))
|
|
REGISTRY.unregister(all_gauges.pop(self.name))
|
|
|
|
REGISTRY.register(self)
|
|
all_gauges[self.name] = self
|
|
|
|
|
|
class GaugeBucketCollector(Collector):
|
|
"""Like a Histogram, but the buckets are Gauges which are updated atomically.
|
|
|
|
The data is updated by calling `update_data` with an iterable of measurements.
|
|
|
|
We assume that the data is updated less frequently than it is reported to
|
|
Prometheus, and optimise for that case.
|
|
"""
|
|
|
|
__slots__ = (
|
|
"_name",
|
|
"_documentation",
|
|
"_bucket_bounds",
|
|
"_metric",
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
name: str,
|
|
documentation: str,
|
|
buckets: Iterable[float],
|
|
registry: CollectorRegistry = REGISTRY,
|
|
):
|
|
"""
|
|
Args:
|
|
name: base name of metric to be exported to Prometheus. (a _bucket suffix
|
|
will be added.)
|
|
documentation: help text for the metric
|
|
buckets: The top bounds of the buckets to report
|
|
registry: metric registry to register with
|
|
"""
|
|
self._name = name
|
|
self._documentation = documentation
|
|
|
|
# the tops of the buckets
|
|
self._bucket_bounds = [float(b) for b in buckets]
|
|
if self._bucket_bounds != sorted(self._bucket_bounds):
|
|
raise ValueError("Buckets not in sorted order")
|
|
|
|
if self._bucket_bounds[-1] != float("inf"):
|
|
self._bucket_bounds.append(float("inf"))
|
|
|
|
# We initially set this to None. We won't report metrics until
|
|
# this has been initialised after a successful data update
|
|
self._metric: Optional[GaugeHistogramMetricFamily] = None
|
|
|
|
registry.register(self)
|
|
|
|
def collect(self) -> Iterable[Metric]:
|
|
# Don't report metrics unless we've already collected some data
|
|
if self._metric is not None:
|
|
yield self._metric
|
|
|
|
def update_data(self, values: Iterable[float]) -> None:
|
|
"""Update the data to be reported by the metric
|
|
|
|
The existing data is cleared, and each measurement in the input is assigned
|
|
to the relevant bucket.
|
|
"""
|
|
self._metric = self._values_to_metric(values)
|
|
|
|
def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFamily:
|
|
total = 0.0
|
|
bucket_values = [0 for _ in self._bucket_bounds]
|
|
|
|
for v in values:
|
|
# assign each value to a bucket
|
|
for i, bound in enumerate(self._bucket_bounds):
|
|
if v <= bound:
|
|
bucket_values[i] += 1
|
|
break
|
|
|
|
# ... and increment the sum
|
|
total += v
|
|
|
|
# now, aggregate the bucket values so that they count the number of entries in
|
|
# that bucket or below.
|
|
accumulated_values = itertools.accumulate(bucket_values)
|
|
|
|
return GaugeHistogramMetricFamily(
|
|
self._name,
|
|
self._documentation,
|
|
buckets=list(
|
|
zip((str(b) for b in self._bucket_bounds), accumulated_values)
|
|
),
|
|
gsum_value=total,
|
|
)
|
|
|
|
|
|
#
|
|
# Detailed CPU metrics
|
|
#
|
|
|
|
|
|
class CPUMetrics(Collector):
|
|
def __init__(self) -> None:
|
|
ticks_per_sec = 100
|
|
try:
|
|
# Try and get the system config
|
|
ticks_per_sec = os.sysconf("SC_CLK_TCK")
|
|
except (ValueError, TypeError, AttributeError):
|
|
pass
|
|
|
|
self.ticks_per_sec = ticks_per_sec
|
|
|
|
def collect(self) -> Iterable[Metric]:
|
|
if not HAVE_PROC_SELF_STAT:
|
|
return
|
|
|
|
with open("/proc/self/stat") as s:
|
|
line = s.read()
|
|
raw_stats = line.split(") ", 1)[1].split(" ")
|
|
|
|
user = GaugeMetricFamily("process_cpu_user_seconds_total", "")
|
|
user.add_metric([], float(raw_stats[11]) / self.ticks_per_sec)
|
|
yield user
|
|
|
|
sys = GaugeMetricFamily("process_cpu_system_seconds_total", "")
|
|
sys.add_metric([], float(raw_stats[12]) / self.ticks_per_sec)
|
|
yield sys
|
|
|
|
|
|
REGISTRY.register(CPUMetrics())
|
|
|
|
|
|
#
|
|
# Federation Metrics
|
|
#
|
|
|
|
sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "")
|
|
|
|
events_processed_counter = Counter("synapse_federation_client_events_processed", "")
|
|
|
|
event_processing_loop_counter = Counter(
|
|
"synapse_event_processing_loop_count", "Event processing loop iterations", ["name"]
|
|
)
|
|
|
|
event_processing_loop_room_count = Counter(
|
|
"synapse_event_processing_loop_room_count",
|
|
"Rooms seen per event processing loop iteration",
|
|
["name"],
|
|
)
|
|
|
|
|
|
# Used to track where various components have processed in the event stream,
|
|
# e.g. federation sending, appservice sending, etc.
|
|
event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"])
|
|
|
|
# Used to track the current max events stream position
|
|
event_persisted_position = Gauge("synapse_event_persisted_position", "")
|
|
|
|
# Used to track the received_ts of the last event processed by various
|
|
# components
|
|
event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"])
|
|
|
|
# Used to track the lag processing events. This is the time difference
|
|
# between the last processed event's received_ts and the time it was
|
|
# finished being processed.
|
|
event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
|
|
|
|
event_processing_lag_by_event = Histogram(
|
|
"synapse_event_processing_lag_by_event",
|
|
"Time between an event being persisted and it being queued up to be sent to the relevant remote servers",
|
|
["name"],
|
|
)
|
|
|
|
# Build info of the running server.
|
|
build_info = Gauge(
|
|
"synapse_build_info", "Build information", ["pythonversion", "version", "osversion"]
|
|
)
|
|
build_info.labels(
|
|
" ".join([platform.python_implementation(), platform.python_version()]),
|
|
SYNAPSE_VERSION,
|
|
" ".join([platform.system(), platform.release()]),
|
|
).set(1)
|
|
|
|
# 3PID send info
|
|
threepid_send_requests = Histogram(
|
|
"synapse_threepid_send_requests_with_tries",
|
|
documentation="Number of requests for a 3pid token by try count. Note if"
|
|
" there is a request with try count of 4, then there would have been one"
|
|
" each for 1, 2 and 3",
|
|
buckets=(1, 2, 3, 4, 5, 10),
|
|
labelnames=("type", "reason"),
|
|
)
|
|
|
|
threadpool_total_threads = Gauge(
|
|
"synapse_threadpool_total_threads",
|
|
"Total number of threads currently in the threadpool",
|
|
["name"],
|
|
)
|
|
|
|
threadpool_total_working_threads = Gauge(
|
|
"synapse_threadpool_working_threads",
|
|
"Number of threads currently working in the threadpool",
|
|
["name"],
|
|
)
|
|
|
|
threadpool_total_min_threads = Gauge(
|
|
"synapse_threadpool_min_threads",
|
|
"Minimum number of threads configured in the threadpool",
|
|
["name"],
|
|
)
|
|
|
|
threadpool_total_max_threads = Gauge(
|
|
"synapse_threadpool_max_threads",
|
|
"Maximum number of threads configured in the threadpool",
|
|
["name"],
|
|
)
|
|
|
|
|
|
def register_threadpool(name: str, threadpool: ThreadPool) -> None:
|
|
"""Add metrics for the threadpool."""
|
|
|
|
threadpool_total_min_threads.labels(name).set(threadpool.min)
|
|
threadpool_total_max_threads.labels(name).set(threadpool.max)
|
|
|
|
threadpool_total_threads.labels(name).set_function(lambda: len(threadpool.threads))
|
|
threadpool_total_working_threads.labels(name).set_function(
|
|
lambda: len(threadpool.working)
|
|
)
|
|
|
|
|
|
__all__ = [
|
|
"Collector",
|
|
"MetricsResource",
|
|
"generate_latest",
|
|
"LaterGauge",
|
|
"InFlightGauge",
|
|
"GaugeBucketCollector",
|
|
"MIN_TIME_BETWEEN_GCS",
|
|
"install_gc_manager",
|
|
]
|