From 93d90765c4b09bc870fc91c6ddcf21fd4389659d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Feb 2015 16:15:26 +0000 Subject: [PATCH 1/5] Initial implementation of federation server rate limiting --- synapse/federation/transport/__init__.py | 12 +- synapse/federation/transport/server.py | 175 ++++++++++++++++++++++- 2 files changed, 182 insertions(+), 5 deletions(-) diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py index 6800ac46c..7028ca694 100644 --- a/synapse/federation/transport/__init__.py +++ b/synapse/federation/transport/__init__.py @@ -21,7 +21,7 @@ support HTTPS), however individual pairings of servers may decide to communicate over a different (albeit still reliable) protocol. """ -from .server import TransportLayerServer +from .server import TransportLayerServer, FederationRateLimiter from .client import TransportLayerClient @@ -55,8 +55,18 @@ class TransportLayer(TransportLayerServer, TransportLayerClient): send requests """ self.keyring = homeserver.get_keyring() + self.clock = homeserver.get_clock() self.server_name = server_name self.server = server self.client = client self.request_handler = None self.received_handler = None + + self.ratelimiter = FederationRateLimiter( + self.clock, + window_size=10000, + sleep_limit=10, + sleep_msec=500, + reject_limit=50, + concurrent_requests=3, + ) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 2ffb37aa1..a9e625f12 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -16,9 +16,11 @@ from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX -from synapse.api.errors import Codes, SynapseError +from synapse.api.errors import Codes, SynapseError, LimitExceededError +from synapse.util.async import sleep from synapse.util.logutils import log_function +import collections import logging import simplejson as json import re @@ -27,6 +29,163 @@ import re logger = logging.getLogger(__name__) +class FederationRateLimiter(object): + def __init__(self, clock, window_size, sleep_limit, sleep_msec, + reject_limit, concurrent_requests): + self.clock = clock + + self.window_size = window_size + self.sleep_limit = sleep_limit + self.sleep_msec = sleep_msec + self.reject_limit = reject_limit + self.concurrent_requests = concurrent_requests + + self.ratelimiters = {} + + def ratelimit(self, host): + return self.ratelimiters.setdefault( + host, + PerHostRatelimiter( + clock=self.clock, + window_size=self.window_size, + sleep_limit=self.sleep_limit, + sleep_msec=self.sleep_msec, + reject_limit=self.reject_limit, + concurrent_requests=self.concurrent_requests, + ) + ).ratelimit() + + +class PerHostRatelimiter(object): + def __init__(self, clock, window_size, sleep_limit, sleep_msec, + reject_limit, concurrent_requests): + self.clock = clock + + self.window_size = window_size + self.sleep_limit = sleep_limit + self.sleep_msec = sleep_msec + self.reject_limit = reject_limit + self.concurrent_requests = concurrent_requests + + self.sleeping_requests = set() + self.ready_request_queue = collections.OrderedDict() + self.current_processing = set() + self.request_times = [] + + def is_empty(self): + time_now = self.clock.time_msec() + self.request_times[:] = [ + r for r in self.request_times + if time_now - r < self.window_size + ] + + return not ( + self.ready_request_queue + or self.sleeping_requests + or self.current_processing + or self.request_times + ) + + def ratelimit(self): + request_id = object() + + def on_enter(): + return self._on_enter(request_id) + + def on_exit(exc_type, exc_val, exc_tb): + return self._on_exit(request_id) + + return ContextManagerFunction(on_enter, on_exit) + + def _on_enter(self, request_id): + time_now = self.clock.time_msec() + self.request_times[:] = [ + r for r in self.request_times + if time_now - r < self.window_size + ] + + queue_size = len(self.ready_request_queue) + len(self.sleeping_requests) + if queue_size > self.reject_limit: + raise LimitExceededError( + retry_after_ms=int( + self.window_size / self.sleep_limit + ), + ) + + self.request_times.append(time_now) + + def queue_request(): + if len(self.current_processing) > self.concurrent_requests: + logger.debug("Ratelimit [%s]: Queue req", id(request_id)) + queue_defer = defer.Deferred() + self.ready_request_queue[request_id] = queue_defer + return queue_defer + else: + return defer.succeed(None) + + logger.debug("Ratelimit [%s]: len(self.request_times)=%d", id(request_id), len(self.request_times)) + logger.debug("Ratelimit [%s]: len(self.request_times)=%d", id(request_id), len(self.request_times)) + + if len(self.request_times) > self.sleep_limit: + logger.debug("Ratelimit [%s]: sleeping req", id(request_id)) + ret_defer = sleep(self.sleep_msec/1000.0) + + self.sleeping_requests.add(request_id) + + def on_wait_finished(_): + logger.debug("Ratelimit [%s]: Finished sleeping", id(request_id)) + self.sleeping_requests.discard(request_id) + queue_defer = queue_request() + return queue_defer + + ret_defer.addBoth(on_wait_finished) + else: + ret_defer = queue_request() + + def on_start(r): + logger.debug("Ratelimit [%s]: Processing req", id(request_id)) + self.current_processing.add(request_id) + return r + + def on_err(r): + self.current_processing.discard(request_id) + return r + + def on_both(r): + # Ensure that we've properly cleaned up. + self.sleeping_requests.discard(request_id) + self.ready_request_queue.pop(request_id, None) + return r + + ret_defer.addCallbacks(on_start, on_err) + ret_defer.addBoth(on_both) + return ret_defer + + def _on_exit(self, request_id): + logger.debug("Ratelimit [%s]: Processed req", id(request_id)) + self.current_processing.discard(request_id) + try: + request_id, deferred = self.ready_request_queue.popitem() + self.current_processing.add(request_id) + deferred.callback(None) + except KeyError: + pass + + +class ContextManagerFunction(object): + def __init__(self, on_enter, on_exit): + self.on_enter = on_enter + self.on_exit = on_exit + + def __enter__(self): + if self.on_enter: + return self.on_enter() + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.on_exit: + return self.on_exit(exc_type, exc_val, exc_tb) + + class TransportLayerServer(object): """Handles incoming federation HTTP requests""" @@ -98,15 +257,23 @@ class TransportLayerServer(object): def new_handler(request, *args, **kwargs): try: (origin, content) = yield self._authenticate_request(request) - response = yield handler( - origin, content, request.args, *args, **kwargs - ) + with self.ratelimiter.ratelimit(origin) as d: + yield d + response = yield handler( + origin, content, request.args, *args, **kwargs + ) except: logger.exception("_authenticate_request failed") raise defer.returnValue(response) return new_handler + def rate_limit_origin(self, handler): + def new_handler(origin, *args, **kwargs): + response = yield handler(origin, *args, **kwargs) + defer.returnValue(response) + return new_handler() + @log_function def register_received_handler(self, handler): """ Register a handler that will be fired when we receive data. From 9dc9118e552bfddaa7579b4ded8b1a0da7eff0e6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Feb 2015 15:16:47 +0000 Subject: [PATCH 2/5] Document FederationRateLimiter --- synapse/federation/transport/server.py | 59 ++++++++++++++++++++++---- 1 file changed, 51 insertions(+), 8 deletions(-) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index a9e625f12..390e54b9f 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -32,6 +32,21 @@ logger = logging.getLogger(__name__) class FederationRateLimiter(object): def __init__(self, clock, window_size, sleep_limit, sleep_msec, reject_limit, concurrent_requests): + """ + Args: + clock (Clock) + window_size (int): The window size in milliseconds. + sleep_limit (int): The number of requests received in the last + `window_size` milliseconds before we artificially start + delaying processing of requests. + sleep_msec (int): The number of milliseconds to delay processing + of incoming requests by. + reject_limit (int): The maximum number of requests that are can be + queued for processing before we start rejecting requests with + a 429 Too Many Requests response. + concurrent_requests (int): The number of concurrent requests to + process. + """ self.clock = clock self.window_size = window_size @@ -43,9 +58,23 @@ class FederationRateLimiter(object): self.ratelimiters = {} def ratelimit(self, host): + """Used to ratelimit an incoming request from given host + + Example usage: + + with rate_limiter.ratelimit(origin) as wait_deferred: + yield wait_deferred + # Handle request ... + + Args: + host (str): Origin of incoming request. + + Returns: + _PerHostRatelimiter + """ return self.ratelimiters.setdefault( host, - PerHostRatelimiter( + _PerHostRatelimiter( clock=self.clock, window_size=self.window_size, sleep_limit=self.sleep_limit, @@ -56,7 +85,7 @@ class FederationRateLimiter(object): ).ratelimit() -class PerHostRatelimiter(object): +class _PerHostRatelimiter(object): def __init__(self, clock, window_size, sleep_limit, sleep_msec, reject_limit, concurrent_requests): self.clock = clock @@ -123,17 +152,25 @@ class PerHostRatelimiter(object): else: return defer.succeed(None) - logger.debug("Ratelimit [%s]: len(self.request_times)=%d", id(request_id), len(self.request_times)) - logger.debug("Ratelimit [%s]: len(self.request_times)=%d", id(request_id), len(self.request_times)) + logger.debug( + "Ratelimit [%s]: len(self.request_times)=%d", + id(request_id), len(self.request_times), + ) if len(self.request_times) > self.sleep_limit: - logger.debug("Ratelimit [%s]: sleeping req", id(request_id)) + logger.debug( + "Ratelimit [%s]: sleeping req", + id(request_id), + ) ret_defer = sleep(self.sleep_msec/1000.0) self.sleeping_requests.add(request_id) def on_wait_finished(_): - logger.debug("Ratelimit [%s]: Finished sleeping", id(request_id)) + logger.debug( + "Ratelimit [%s]: Finished sleeping", + id(request_id), + ) self.sleeping_requests.discard(request_id) queue_defer = queue_request() return queue_defer @@ -143,7 +180,10 @@ class PerHostRatelimiter(object): ret_defer = queue_request() def on_start(r): - logger.debug("Ratelimit [%s]: Processing req", id(request_id)) + logger.debug( + "Ratelimit [%s]: Processing req", + id(request_id), + ) self.current_processing.add(request_id) return r @@ -162,7 +202,10 @@ class PerHostRatelimiter(object): return ret_defer def _on_exit(self, request_id): - logger.debug("Ratelimit [%s]: Processed req", id(request_id)) + logger.debug( + "Ratelimit [%s]: Processed req", + id(request_id), + ) self.current_processing.discard(request_id) try: request_id, deferred = self.ready_request_queue.popitem() From 0554d0708225afe13d141bd00e3aaca2509f3f78 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Feb 2015 15:41:52 +0000 Subject: [PATCH 3/5] Move federation rate limiting out of transport layer --- synapse/federation/transport/__init__.py | 4 +- synapse/federation/transport/server.py | 204 +------------------- synapse/util/ratelimitutils.py | 226 +++++++++++++++++++++++ 3 files changed, 230 insertions(+), 204 deletions(-) create mode 100644 synapse/util/ratelimitutils.py diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py index 7028ca694..f0283b510 100644 --- a/synapse/federation/transport/__init__.py +++ b/synapse/federation/transport/__init__.py @@ -21,9 +21,11 @@ support HTTPS), however individual pairings of servers may decide to communicate over a different (albeit still reliable) protocol. """ -from .server import TransportLayerServer, FederationRateLimiter +from .server import TransportLayerServer from .client import TransportLayerClient +from synapse.util.ratelimitutils import FederationRateLimiter + class TransportLayer(TransportLayerServer, TransportLayerClient): """This is a basic implementation of the transport layer that translates diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 390e54b9f..fce9c0195 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -16,11 +16,9 @@ from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX -from synapse.api.errors import Codes, SynapseError, LimitExceededError -from synapse.util.async import sleep +from synapse.api.errors import Codes, SynapseError from synapse.util.logutils import log_function -import collections import logging import simplejson as json import re @@ -29,206 +27,6 @@ import re logger = logging.getLogger(__name__) -class FederationRateLimiter(object): - def __init__(self, clock, window_size, sleep_limit, sleep_msec, - reject_limit, concurrent_requests): - """ - Args: - clock (Clock) - window_size (int): The window size in milliseconds. - sleep_limit (int): The number of requests received in the last - `window_size` milliseconds before we artificially start - delaying processing of requests. - sleep_msec (int): The number of milliseconds to delay processing - of incoming requests by. - reject_limit (int): The maximum number of requests that are can be - queued for processing before we start rejecting requests with - a 429 Too Many Requests response. - concurrent_requests (int): The number of concurrent requests to - process. - """ - self.clock = clock - - self.window_size = window_size - self.sleep_limit = sleep_limit - self.sleep_msec = sleep_msec - self.reject_limit = reject_limit - self.concurrent_requests = concurrent_requests - - self.ratelimiters = {} - - def ratelimit(self, host): - """Used to ratelimit an incoming request from given host - - Example usage: - - with rate_limiter.ratelimit(origin) as wait_deferred: - yield wait_deferred - # Handle request ... - - Args: - host (str): Origin of incoming request. - - Returns: - _PerHostRatelimiter - """ - return self.ratelimiters.setdefault( - host, - _PerHostRatelimiter( - clock=self.clock, - window_size=self.window_size, - sleep_limit=self.sleep_limit, - sleep_msec=self.sleep_msec, - reject_limit=self.reject_limit, - concurrent_requests=self.concurrent_requests, - ) - ).ratelimit() - - -class _PerHostRatelimiter(object): - def __init__(self, clock, window_size, sleep_limit, sleep_msec, - reject_limit, concurrent_requests): - self.clock = clock - - self.window_size = window_size - self.sleep_limit = sleep_limit - self.sleep_msec = sleep_msec - self.reject_limit = reject_limit - self.concurrent_requests = concurrent_requests - - self.sleeping_requests = set() - self.ready_request_queue = collections.OrderedDict() - self.current_processing = set() - self.request_times = [] - - def is_empty(self): - time_now = self.clock.time_msec() - self.request_times[:] = [ - r for r in self.request_times - if time_now - r < self.window_size - ] - - return not ( - self.ready_request_queue - or self.sleeping_requests - or self.current_processing - or self.request_times - ) - - def ratelimit(self): - request_id = object() - - def on_enter(): - return self._on_enter(request_id) - - def on_exit(exc_type, exc_val, exc_tb): - return self._on_exit(request_id) - - return ContextManagerFunction(on_enter, on_exit) - - def _on_enter(self, request_id): - time_now = self.clock.time_msec() - self.request_times[:] = [ - r for r in self.request_times - if time_now - r < self.window_size - ] - - queue_size = len(self.ready_request_queue) + len(self.sleeping_requests) - if queue_size > self.reject_limit: - raise LimitExceededError( - retry_after_ms=int( - self.window_size / self.sleep_limit - ), - ) - - self.request_times.append(time_now) - - def queue_request(): - if len(self.current_processing) > self.concurrent_requests: - logger.debug("Ratelimit [%s]: Queue req", id(request_id)) - queue_defer = defer.Deferred() - self.ready_request_queue[request_id] = queue_defer - return queue_defer - else: - return defer.succeed(None) - - logger.debug( - "Ratelimit [%s]: len(self.request_times)=%d", - id(request_id), len(self.request_times), - ) - - if len(self.request_times) > self.sleep_limit: - logger.debug( - "Ratelimit [%s]: sleeping req", - id(request_id), - ) - ret_defer = sleep(self.sleep_msec/1000.0) - - self.sleeping_requests.add(request_id) - - def on_wait_finished(_): - logger.debug( - "Ratelimit [%s]: Finished sleeping", - id(request_id), - ) - self.sleeping_requests.discard(request_id) - queue_defer = queue_request() - return queue_defer - - ret_defer.addBoth(on_wait_finished) - else: - ret_defer = queue_request() - - def on_start(r): - logger.debug( - "Ratelimit [%s]: Processing req", - id(request_id), - ) - self.current_processing.add(request_id) - return r - - def on_err(r): - self.current_processing.discard(request_id) - return r - - def on_both(r): - # Ensure that we've properly cleaned up. - self.sleeping_requests.discard(request_id) - self.ready_request_queue.pop(request_id, None) - return r - - ret_defer.addCallbacks(on_start, on_err) - ret_defer.addBoth(on_both) - return ret_defer - - def _on_exit(self, request_id): - logger.debug( - "Ratelimit [%s]: Processed req", - id(request_id), - ) - self.current_processing.discard(request_id) - try: - request_id, deferred = self.ready_request_queue.popitem() - self.current_processing.add(request_id) - deferred.callback(None) - except KeyError: - pass - - -class ContextManagerFunction(object): - def __init__(self, on_enter, on_exit): - self.on_enter = on_enter - self.on_exit = on_exit - - def __enter__(self): - if self.on_enter: - return self.on_enter() - - def __exit__(self, exc_type, exc_val, exc_tb): - if self.on_exit: - return self.on_exit(exc_type, exc_val, exc_tb) - - class TransportLayerServer(object): """Handles incoming federation HTTP requests""" diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py new file mode 100644 index 000000000..259d5f6f8 --- /dev/null +++ b/synapse/util/ratelimitutils.py @@ -0,0 +1,226 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.api.errors import LimitExceededError + +from synapse.util.async import sleep + +import collections +import logging + + +logger = logging.getLogger(__name__) + + +class FederationRateLimiter(object): + def __init__(self, clock, window_size, sleep_limit, sleep_msec, + reject_limit, concurrent_requests): + """ + Args: + clock (Clock) + window_size (int): The window size in milliseconds. + sleep_limit (int): The number of requests received in the last + `window_size` milliseconds before we artificially start + delaying processing of requests. + sleep_msec (int): The number of milliseconds to delay processing + of incoming requests by. + reject_limit (int): The maximum number of requests that are can be + queued for processing before we start rejecting requests with + a 429 Too Many Requests response. + concurrent_requests (int): The number of concurrent requests to + process. + """ + self.clock = clock + + self.window_size = window_size + self.sleep_limit = sleep_limit + self.sleep_msec = sleep_msec + self.reject_limit = reject_limit + self.concurrent_requests = concurrent_requests + + self.ratelimiters = {} + + def ratelimit(self, host): + """Used to ratelimit an incoming request from given host + + Example usage: + + with rate_limiter.ratelimit(origin) as wait_deferred: + yield wait_deferred + # Handle request ... + + Args: + host (str): Origin of incoming request. + + Returns: + _PerHostRatelimiter + """ + return self.ratelimiters.setdefault( + host, + _PerHostRatelimiter( + clock=self.clock, + window_size=self.window_size, + sleep_limit=self.sleep_limit, + sleep_msec=self.sleep_msec, + reject_limit=self.reject_limit, + concurrent_requests=self.concurrent_requests, + ) + ).ratelimit() + + +class _PerHostRatelimiter(object): + def __init__(self, clock, window_size, sleep_limit, sleep_msec, + reject_limit, concurrent_requests): + self.clock = clock + + self.window_size = window_size + self.sleep_limit = sleep_limit + self.sleep_msec = sleep_msec + self.reject_limit = reject_limit + self.concurrent_requests = concurrent_requests + + self.sleeping_requests = set() + self.ready_request_queue = collections.OrderedDict() + self.current_processing = set() + self.request_times = [] + + def is_empty(self): + time_now = self.clock.time_msec() + self.request_times[:] = [ + r for r in self.request_times + if time_now - r < self.window_size + ] + + return not ( + self.ready_request_queue + or self.sleeping_requests + or self.current_processing + or self.request_times + ) + + def ratelimit(self): + request_id = object() + + def on_enter(): + return self._on_enter(request_id) + + def on_exit(exc_type, exc_val, exc_tb): + return self._on_exit(request_id) + + return ContextManagerFunction(on_enter, on_exit) + + def _on_enter(self, request_id): + time_now = self.clock.time_msec() + self.request_times[:] = [ + r for r in self.request_times + if time_now - r < self.window_size + ] + + queue_size = len(self.ready_request_queue) + len(self.sleeping_requests) + if queue_size > self.reject_limit: + raise LimitExceededError( + retry_after_ms=int( + self.window_size / self.sleep_limit + ), + ) + + self.request_times.append(time_now) + + def queue_request(): + if len(self.current_processing) > self.concurrent_requests: + logger.debug("Ratelimit [%s]: Queue req", id(request_id)) + queue_defer = defer.Deferred() + self.ready_request_queue[request_id] = queue_defer + return queue_defer + else: + return defer.succeed(None) + + logger.debug( + "Ratelimit [%s]: len(self.request_times)=%d", + id(request_id), len(self.request_times), + ) + + if len(self.request_times) > self.sleep_limit: + logger.debug( + "Ratelimit [%s]: sleeping req", + id(request_id), + ) + ret_defer = sleep(self.sleep_msec/1000.0) + + self.sleeping_requests.add(request_id) + + def on_wait_finished(_): + logger.debug( + "Ratelimit [%s]: Finished sleeping", + id(request_id), + ) + self.sleeping_requests.discard(request_id) + queue_defer = queue_request() + return queue_defer + + ret_defer.addBoth(on_wait_finished) + else: + ret_defer = queue_request() + + def on_start(r): + logger.debug( + "Ratelimit [%s]: Processing req", + id(request_id), + ) + self.current_processing.add(request_id) + return r + + def on_err(r): + self.current_processing.discard(request_id) + return r + + def on_both(r): + # Ensure that we've properly cleaned up. + self.sleeping_requests.discard(request_id) + self.ready_request_queue.pop(request_id, None) + return r + + ret_defer.addCallbacks(on_start, on_err) + ret_defer.addBoth(on_both) + return ret_defer + + def _on_exit(self, request_id): + logger.debug( + "Ratelimit [%s]: Processed req", + id(request_id), + ) + self.current_processing.discard(request_id) + try: + request_id, deferred = self.ready_request_queue.popitem() + self.current_processing.add(request_id) + deferred.callback(None) + except KeyError: + pass + + +class ContextManagerFunction(object): + def __init__(self, on_enter, on_exit): + self.on_enter = on_enter + self.on_exit = on_exit + + def __enter__(self): + if self.on_enter: + return self.on_enter() + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.on_exit: + return self.on_exit(exc_type, exc_val, exc_tb) From 9d9b230501915c326136567349b0995623c48a21 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 2 Mar 2015 11:33:45 +0000 Subject: [PATCH 4/5] Make the federation server ratelimiting configurable. --- synapse/config/ratelimiting.py | 36 ++++++++++++++++++++++++ synapse/federation/transport/__init__.py | 10 +++---- 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py index 17c7e64ce..862c07ef8 100644 --- a/synapse/config/ratelimiting.py +++ b/synapse/config/ratelimiting.py @@ -22,6 +22,12 @@ class RatelimitConfig(Config): self.rc_messages_per_second = args.rc_messages_per_second self.rc_message_burst_count = args.rc_message_burst_count + self.federation_rc_window_size = args.federation_rc_window_size + self.federation_rc_sleep_limit = args.federation_rc_sleep_limit + self.federation_rc_sleep_delay = args.federation_rc_sleep_delay + self.federation_rc_reject_limit = args.federation_rc_reject_limit + self.federation_rc_concurrent = args.federation_rc_concurrent + @classmethod def add_arguments(cls, parser): super(RatelimitConfig, cls).add_arguments(parser) @@ -34,3 +40,33 @@ class RatelimitConfig(Config): "--rc-message-burst-count", type=float, default=10, help="number of message a client can send before being throttled" ) + + rc_group.add_argument( + "--federation-rc-window-size", type=int, default=10000, + help="The federation window size in milliseconds", + ) + + rc_group.add_argument( + "--federation-rc-sleep-limit", type=int, default=10, + help="The number of federation requests from a single server" + " in a window before the server will delay processing the" + " request.", + ) + + rc_group.add_argument( + "--federation-rc-sleep-delay", type=int, default=500, + help="The duration in milliseconds to delay processing events from" + " remote servers by if they go over the sleep limit.", + ) + + rc_group.add_argument( + "--federation-rc-reject-limit", type=int, default=50, + help="The maximum number of concurrent federation requests allowed" + " from a single server", + ) + + rc_group.add_argument( + "--federation-rc-concurrent", type=int, default=3, + help="The number of federation requests to concurrently process" + " from a single server", + ) diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py index f0283b510..2a671b9ae 100644 --- a/synapse/federation/transport/__init__.py +++ b/synapse/federation/transport/__init__.py @@ -66,9 +66,9 @@ class TransportLayer(TransportLayerServer, TransportLayerClient): self.ratelimiter = FederationRateLimiter( self.clock, - window_size=10000, - sleep_limit=10, - sleep_msec=500, - reject_limit=50, - concurrent_requests=3, + window_size=homeserver.config.federation_rc_window_size, + sleep_limit=homeserver.config.federation_rc_sleep_limit, + sleep_msec=homeserver.config.federation_rc_sleep_delay, + reject_limit=homeserver.config.federation_rc_reject_limit, + concurrent_requests=homeserver.config.federation_rc_concurrent, ) From 3077cb291590a7ba3b24a3d1a9985d65980924fb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 2 Mar 2015 13:32:44 +0000 Subject: [PATCH 5/5] Use contextlib.contextmanager instead of a custom class --- synapse/util/ratelimitutils.py | 34 ++++++++++++---------------------- 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 259d5f6f8..d4457af95 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -20,6 +20,7 @@ from synapse.api.errors import LimitExceededError from synapse.util.async import sleep import collections +import contextlib import logging @@ -112,16 +113,19 @@ class _PerHostRatelimiter(object): or self.request_times ) + @contextlib.contextmanager def ratelimit(self): + # `contextlib.contextmanager` takes a generator and turns it into a + # context manager. The generator should only yield once with a value + # to be returned by manager. + # Exceptions will be reraised at the yield. + request_id = object() - - def on_enter(): - return self._on_enter(request_id) - - def on_exit(exc_type, exc_val, exc_tb): - return self._on_exit(request_id) - - return ContextManagerFunction(on_enter, on_exit) + ret = self._on_enter(request_id) + try: + yield ret + finally: + self._on_exit(request_id) def _on_enter(self, request_id): time_now = self.clock.time_msec() @@ -210,17 +214,3 @@ class _PerHostRatelimiter(object): deferred.callback(None) except KeyError: pass - - -class ContextManagerFunction(object): - def __init__(self, on_enter, on_exit): - self.on_enter = on_enter - self.on_exit = on_exit - - def __enter__(self): - if self.on_enter: - return self.on_enter() - - def __exit__(self, exc_type, exc_val, exc_tb): - if self.on_exit: - return self.on_exit(exc_type, exc_val, exc_tb)