mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2024-10-01 11:49:51 -04:00
Fix rate limit metrics registering twice and misreporting (#13649)
* Fix rate limit metrics registering twice and misreporting Fix https://github.com/matrix-org/synapse/issues/13641 * Fix lints * Add changelog * Document `metrics_name=None`.
This commit is contained in:
parent
ea85a2bf6c
commit
1eea73b413
1
changelog.d/13649.bugfix
Normal file
1
changelog.d/13649.bugfix
Normal file
@ -0,0 +1 @@
|
|||||||
|
Fix rate limit gauge metrics registering twice and misreporting (`synapse_rate_limit_sleep_affected_hosts`, `synapse_rate_limit_reject_affected_hosts`).
|
@ -756,7 +756,9 @@ class HomeServer(metaclass=abc.ABCMeta):
|
|||||||
@cache_in_self
|
@cache_in_self
|
||||||
def get_federation_ratelimiter(self) -> FederationRateLimiter:
|
def get_federation_ratelimiter(self) -> FederationRateLimiter:
|
||||||
return FederationRateLimiter(
|
return FederationRateLimiter(
|
||||||
self.get_clock(), config=self.config.ratelimiting.rc_federation
|
self.get_clock(),
|
||||||
|
config=self.config.ratelimiting.rc_federation,
|
||||||
|
metrics_name="federation_servlets",
|
||||||
)
|
)
|
||||||
|
|
||||||
@cache_in_self
|
@cache_in_self
|
||||||
|
@ -15,10 +15,23 @@
|
|||||||
import collections
|
import collections
|
||||||
import contextlib
|
import contextlib
|
||||||
import logging
|
import logging
|
||||||
|
import threading
|
||||||
import typing
|
import typing
|
||||||
from typing import Any, DefaultDict, Iterator, List, Set
|
from typing import (
|
||||||
|
Any,
|
||||||
|
Callable,
|
||||||
|
DefaultDict,
|
||||||
|
Dict,
|
||||||
|
Iterator,
|
||||||
|
List,
|
||||||
|
Mapping,
|
||||||
|
Optional,
|
||||||
|
Set,
|
||||||
|
Tuple,
|
||||||
|
)
|
||||||
|
|
||||||
from prometheus_client.core import Counter
|
from prometheus_client.core import Counter
|
||||||
|
from typing_extensions import ContextManager
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
@ -40,12 +53,20 @@ logger = logging.getLogger(__name__)
|
|||||||
|
|
||||||
|
|
||||||
# Track how much the ratelimiter is affecting requests
|
# Track how much the ratelimiter is affecting requests
|
||||||
rate_limit_sleep_counter = Counter("synapse_rate_limit_sleep", "")
|
rate_limit_sleep_counter = Counter(
|
||||||
rate_limit_reject_counter = Counter("synapse_rate_limit_reject", "")
|
"synapse_rate_limit_sleep",
|
||||||
|
"Number of requests slept by the rate limiter",
|
||||||
|
["rate_limiter_name"],
|
||||||
|
)
|
||||||
|
rate_limit_reject_counter = Counter(
|
||||||
|
"synapse_rate_limit_reject",
|
||||||
|
"Number of requests rejected by the rate limiter",
|
||||||
|
["rate_limiter_name"],
|
||||||
|
)
|
||||||
queue_wait_timer = Histogram(
|
queue_wait_timer = Histogram(
|
||||||
"synapse_rate_limit_queue_wait_time_seconds",
|
"synapse_rate_limit_queue_wait_time_seconds",
|
||||||
"sec",
|
"Amount of time spent waiting for the rate limiter to let our request through.",
|
||||||
[],
|
["rate_limiter_name"],
|
||||||
buckets=(
|
buckets=(
|
||||||
0.005,
|
0.005,
|
||||||
0.01,
|
0.01,
|
||||||
@ -65,35 +86,92 @@ queue_wait_timer = Histogram(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
_rate_limiter_instances: Set["FederationRateLimiter"] = set()
|
||||||
|
# Protects the _rate_limiter_instances set from concurrent access
|
||||||
|
_rate_limiter_instances_lock = threading.Lock()
|
||||||
|
|
||||||
|
|
||||||
|
def _get_counts_from_rate_limiter_instance(
|
||||||
|
count_func: Callable[["FederationRateLimiter"], int]
|
||||||
|
) -> Mapping[Tuple[str, ...], int]:
|
||||||
|
"""Returns a count of something (slept/rejected hosts) by (metrics_name)"""
|
||||||
|
# Cast to a list to prevent it changing while the Prometheus
|
||||||
|
# thread is collecting metrics
|
||||||
|
with _rate_limiter_instances_lock:
|
||||||
|
rate_limiter_instances = list(_rate_limiter_instances)
|
||||||
|
|
||||||
|
# Map from (metrics_name,) -> int, the number of something like slept hosts
|
||||||
|
# or rejected hosts. The key type is Tuple[str], but we leave the length
|
||||||
|
# unspecified for compatability with LaterGauge's annotations.
|
||||||
|
counts: Dict[Tuple[str, ...], int] = {}
|
||||||
|
for rate_limiter_instance in rate_limiter_instances:
|
||||||
|
# Only track metrics if they provided a `metrics_name` to
|
||||||
|
# differentiate this instance of the rate limiter.
|
||||||
|
if rate_limiter_instance.metrics_name:
|
||||||
|
key = (rate_limiter_instance.metrics_name,)
|
||||||
|
counts[key] = count_func(rate_limiter_instance)
|
||||||
|
|
||||||
|
return counts
|
||||||
|
|
||||||
|
|
||||||
|
# We track the number of affected hosts per time-period so we can
|
||||||
|
# differentiate one really noisy homeserver from a general
|
||||||
|
# ratelimit tuning problem across the federation.
|
||||||
|
LaterGauge(
|
||||||
|
"synapse_rate_limit_sleep_affected_hosts",
|
||||||
|
"Number of hosts that had requests put to sleep",
|
||||||
|
["rate_limiter_name"],
|
||||||
|
lambda: _get_counts_from_rate_limiter_instance(
|
||||||
|
lambda rate_limiter_instance: sum(
|
||||||
|
ratelimiter.should_sleep()
|
||||||
|
for ratelimiter in rate_limiter_instance.ratelimiters.values()
|
||||||
|
)
|
||||||
|
),
|
||||||
|
)
|
||||||
|
LaterGauge(
|
||||||
|
"synapse_rate_limit_reject_affected_hosts",
|
||||||
|
"Number of hosts that had requests rejected",
|
||||||
|
["rate_limiter_name"],
|
||||||
|
lambda: _get_counts_from_rate_limiter_instance(
|
||||||
|
lambda rate_limiter_instance: sum(
|
||||||
|
ratelimiter.should_reject()
|
||||||
|
for ratelimiter in rate_limiter_instance.ratelimiters.values()
|
||||||
|
)
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class FederationRateLimiter:
|
class FederationRateLimiter:
|
||||||
def __init__(self, clock: Clock, config: FederationRatelimitSettings):
|
"""Used to rate limit request per-host."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
clock: Clock,
|
||||||
|
config: FederationRatelimitSettings,
|
||||||
|
metrics_name: Optional[str] = None,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Args:
|
||||||
|
clock
|
||||||
|
config
|
||||||
|
metrics_name: The name of the rate limiter so we can differentiate it
|
||||||
|
from the rest in the metrics. If `None`, we don't track metrics
|
||||||
|
for this rate limiter.
|
||||||
|
|
||||||
|
"""
|
||||||
|
self.metrics_name = metrics_name
|
||||||
|
|
||||||
def new_limiter() -> "_PerHostRatelimiter":
|
def new_limiter() -> "_PerHostRatelimiter":
|
||||||
return _PerHostRatelimiter(clock=clock, config=config)
|
return _PerHostRatelimiter(
|
||||||
|
clock=clock, config=config, metrics_name=metrics_name
|
||||||
|
)
|
||||||
|
|
||||||
self.ratelimiters: DefaultDict[
|
self.ratelimiters: DefaultDict[
|
||||||
str, "_PerHostRatelimiter"
|
str, "_PerHostRatelimiter"
|
||||||
] = collections.defaultdict(new_limiter)
|
] = collections.defaultdict(new_limiter)
|
||||||
|
|
||||||
# We track the number of affected hosts per time-period so we can
|
with _rate_limiter_instances_lock:
|
||||||
# differentiate one really noisy homeserver from a general
|
_rate_limiter_instances.add(self)
|
||||||
# ratelimit tuning problem across the federation.
|
|
||||||
LaterGauge(
|
|
||||||
"synapse_rate_limit_sleep_affected_hosts",
|
|
||||||
"Number of hosts that had requests put to sleep",
|
|
||||||
[],
|
|
||||||
lambda: sum(
|
|
||||||
ratelimiter.should_sleep() for ratelimiter in self.ratelimiters.values()
|
|
||||||
),
|
|
||||||
)
|
|
||||||
LaterGauge(
|
|
||||||
"synapse_rate_limit_reject_affected_hosts",
|
|
||||||
"Number of hosts that had requests rejected",
|
|
||||||
[],
|
|
||||||
lambda: sum(
|
|
||||||
ratelimiter.should_reject()
|
|
||||||
for ratelimiter in self.ratelimiters.values()
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
def ratelimit(self, host: str) -> "_GeneratorContextManager[defer.Deferred[None]]":
|
def ratelimit(self, host: str) -> "_GeneratorContextManager[defer.Deferred[None]]":
|
||||||
"""Used to ratelimit an incoming request from a given host
|
"""Used to ratelimit an incoming request from a given host
|
||||||
@ -114,13 +192,23 @@ class FederationRateLimiter:
|
|||||||
|
|
||||||
|
|
||||||
class _PerHostRatelimiter:
|
class _PerHostRatelimiter:
|
||||||
def __init__(self, clock: Clock, config: FederationRatelimitSettings):
|
def __init__(
|
||||||
|
self,
|
||||||
|
clock: Clock,
|
||||||
|
config: FederationRatelimitSettings,
|
||||||
|
metrics_name: Optional[str] = None,
|
||||||
|
):
|
||||||
"""
|
"""
|
||||||
Args:
|
Args:
|
||||||
clock
|
clock
|
||||||
config
|
config
|
||||||
|
metrics_name: The name of the rate limiter so we can differentiate it
|
||||||
|
from the rest in the metrics. If `None`, we don't track metrics
|
||||||
|
for this rate limiter.
|
||||||
|
from the rest in the metrics
|
||||||
"""
|
"""
|
||||||
self.clock = clock
|
self.clock = clock
|
||||||
|
self.metrics_name = metrics_name
|
||||||
|
|
||||||
self.window_size = config.window_size
|
self.window_size = config.window_size
|
||||||
self.sleep_limit = config.sleep_limit
|
self.sleep_limit = config.sleep_limit
|
||||||
@ -178,7 +266,10 @@ class _PerHostRatelimiter:
|
|||||||
return len(self.request_times) > self.sleep_limit
|
return len(self.request_times) > self.sleep_limit
|
||||||
|
|
||||||
async def _on_enter_with_tracing(self, request_id: object) -> None:
|
async def _on_enter_with_tracing(self, request_id: object) -> None:
|
||||||
with start_active_span("ratelimit wait"), queue_wait_timer.time():
|
maybe_metrics_cm: ContextManager = contextlib.nullcontext()
|
||||||
|
if self.metrics_name:
|
||||||
|
maybe_metrics_cm = queue_wait_timer.labels(self.metrics_name).time()
|
||||||
|
with start_active_span("ratelimit wait"), maybe_metrics_cm:
|
||||||
await self._on_enter(request_id)
|
await self._on_enter(request_id)
|
||||||
|
|
||||||
def _on_enter(self, request_id: object) -> "defer.Deferred[None]":
|
def _on_enter(self, request_id: object) -> "defer.Deferred[None]":
|
||||||
@ -193,7 +284,8 @@ class _PerHostRatelimiter:
|
|||||||
# sleeping or in the ready queue).
|
# sleeping or in the ready queue).
|
||||||
if self.should_reject():
|
if self.should_reject():
|
||||||
logger.debug("Ratelimiter(%s): rejecting request", self.host)
|
logger.debug("Ratelimiter(%s): rejecting request", self.host)
|
||||||
rate_limit_reject_counter.inc()
|
if self.metrics_name:
|
||||||
|
rate_limit_reject_counter.labels(self.metrics_name).inc()
|
||||||
raise LimitExceededError(
|
raise LimitExceededError(
|
||||||
retry_after_ms=int(self.window_size / self.sleep_limit)
|
retry_after_ms=int(self.window_size / self.sleep_limit)
|
||||||
)
|
)
|
||||||
@ -228,7 +320,8 @@ class _PerHostRatelimiter:
|
|||||||
id(request_id),
|
id(request_id),
|
||||||
self.sleep_sec,
|
self.sleep_sec,
|
||||||
)
|
)
|
||||||
rate_limit_sleep_counter.inc()
|
if self.metrics_name:
|
||||||
|
rate_limit_sleep_counter.labels(self.metrics_name).inc()
|
||||||
ret_defer = run_in_background(self.clock.sleep, self.sleep_sec)
|
ret_defer = run_in_background(self.clock.sleep, self.sleep_sec)
|
||||||
|
|
||||||
self.sleeping_requests.add(request_id)
|
self.sleeping_requests.add(request_id)
|
||||||
|
Loading…
Reference in New Issue
Block a user