mirror of
https://mau.dev/maunium/synapse.git
synced 2024-10-01 01:36:05 -04:00
Add prometheus metrics for the number of active pushers (#7103)
This commit is contained in:
parent
443162e577
commit
8c75667ad7
1
changelog.d/7103.feature
Normal file
1
changelog.d/7103.feature
Normal file
@ -0,0 +1 @@
|
|||||||
|
Add prometheus metrics for the number of active pushers.
|
@ -20,7 +20,7 @@ import os
|
|||||||
import platform
|
import platform
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
from typing import Dict, Union
|
from typing import Callable, Dict, Iterable, Optional, Tuple, Union
|
||||||
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
@ -59,10 +59,12 @@ class RegistryProxy(object):
|
|||||||
@attr.s(hash=True)
|
@attr.s(hash=True)
|
||||||
class LaterGauge(object):
|
class LaterGauge(object):
|
||||||
|
|
||||||
name = attr.ib()
|
name = attr.ib(type=str)
|
||||||
desc = attr.ib()
|
desc = attr.ib(type=str)
|
||||||
labels = attr.ib(hash=False)
|
labels = attr.ib(hash=False, type=Optional[Iterable[str]])
|
||||||
caller = attr.ib()
|
# callback: should either return a value (if there are no labels for this metric),
|
||||||
|
# or dict mapping from a label tuple to a value
|
||||||
|
caller = attr.ib(type=Callable[[], Union[Dict[Tuple[str, ...], float], float]])
|
||||||
|
|
||||||
def collect(self):
|
def collect(self):
|
||||||
|
|
||||||
|
@ -17,6 +17,7 @@ import logging
|
|||||||
import threading
|
import threading
|
||||||
from asyncio import iscoroutine
|
from asyncio import iscoroutine
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
|
from typing import Dict, Set
|
||||||
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
@ -80,13 +81,13 @@ _background_process_db_sched_duration = Counter(
|
|||||||
# map from description to a counter, so that we can name our logcontexts
|
# map from description to a counter, so that we can name our logcontexts
|
||||||
# incrementally. (It actually duplicates _background_process_start_count, but
|
# incrementally. (It actually duplicates _background_process_start_count, but
|
||||||
# it's much simpler to do so than to try to combine them.)
|
# it's much simpler to do so than to try to combine them.)
|
||||||
_background_process_counts = {} # type: dict[str, int]
|
_background_process_counts = {} # type: Dict[str, int]
|
||||||
|
|
||||||
# map from description to the currently running background processes.
|
# map from description to the currently running background processes.
|
||||||
#
|
#
|
||||||
# it's kept as a dict of sets rather than a big set so that we can keep track
|
# it's kept as a dict of sets rather than a big set so that we can keep track
|
||||||
# of process descriptions that no longer have any active processes.
|
# of process descriptions that no longer have any active processes.
|
||||||
_background_processes = {} # type: dict[str, set[_BackgroundProcess]]
|
_background_processes = {} # type: Dict[str, Set[_BackgroundProcess]]
|
||||||
|
|
||||||
# A lock that covers the above dicts
|
# A lock that covers the above dicts
|
||||||
_bg_metrics_lock = threading.Lock()
|
_bg_metrics_lock = threading.Lock()
|
||||||
|
@ -15,11 +15,16 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
from collections import defaultdict
|
||||||
|
from typing import Dict, Tuple, Union
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
|
from synapse.metrics import LaterGauge
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.push import PusherConfigException
|
from synapse.push import PusherConfigException
|
||||||
|
from synapse.push.emailpusher import EmailPusher
|
||||||
|
from synapse.push.httppusher import HttpPusher
|
||||||
from synapse.push.pusher import PusherFactory
|
from synapse.push.pusher import PusherFactory
|
||||||
from synapse.util.async_helpers import concurrently_execute
|
from synapse.util.async_helpers import concurrently_execute
|
||||||
|
|
||||||
@ -47,7 +52,24 @@ class PusherPool:
|
|||||||
self._should_start_pushers = _hs.config.start_pushers
|
self._should_start_pushers = _hs.config.start_pushers
|
||||||
self.store = self.hs.get_datastore()
|
self.store = self.hs.get_datastore()
|
||||||
self.clock = self.hs.get_clock()
|
self.clock = self.hs.get_clock()
|
||||||
self.pushers = {}
|
|
||||||
|
# map from user id to app_id:pushkey to pusher
|
||||||
|
self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
|
||||||
|
|
||||||
|
def count_pushers():
|
||||||
|
results = defaultdict(int) # type: Dict[Tuple[str, str], int]
|
||||||
|
for pushers in self.pushers.values():
|
||||||
|
for pusher in pushers.values():
|
||||||
|
k = (type(pusher).__name__, pusher.app_id)
|
||||||
|
results[k] += 1
|
||||||
|
return results
|
||||||
|
|
||||||
|
LaterGauge(
|
||||||
|
name="synapse_pushers",
|
||||||
|
desc="the number of active pushers",
|
||||||
|
labels=["kind", "app_id"],
|
||||||
|
caller=count_pushers,
|
||||||
|
)
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
"""Starts the pushers off in a background process.
|
"""Starts the pushers off in a background process.
|
||||||
|
2
tox.ini
2
tox.ini
@ -191,7 +191,9 @@ commands = mypy \
|
|||||||
synapse/handlers/sync.py \
|
synapse/handlers/sync.py \
|
||||||
synapse/handlers/ui_auth \
|
synapse/handlers/ui_auth \
|
||||||
synapse/logging/ \
|
synapse/logging/ \
|
||||||
|
synapse/metrics \
|
||||||
synapse/module_api \
|
synapse/module_api \
|
||||||
|
synapse/push/pusherpool.py \
|
||||||
synapse/replication \
|
synapse/replication \
|
||||||
synapse/rest \
|
synapse/rest \
|
||||||
synapse/spam_checker_api \
|
synapse/spam_checker_api \
|
||||||
|
Loading…
Reference in New Issue
Block a user