Make FederationRateLimiter queue requests properly

popitem removes the *most recent* item by default [1]. We want the oldest.

Fixes #3524

[1]: https://docs.python.org/2/library/collections.html#collections.OrderedDict.popitem
This commit is contained in:
Richard van der Hoff 2018-07-13 16:19:40 +01:00
parent 6dff49b8a9
commit 33b40d0a25

View File

@ -92,13 +92,22 @@ class _PerHostRatelimiter(object):
self.window_size = window_size
self.sleep_limit = sleep_limit
self.sleep_msec = sleep_msec
self.sleep_sec = sleep_msec / 1000.0
self.reject_limit = reject_limit
self.concurrent_requests = concurrent_requests
# request_id objects for requests which have been slept
self.sleeping_requests = set()
# map from request_id object to Deferred for requests which are ready
# for processing but have been queued
self.ready_request_queue = collections.OrderedDict()
# request id objects for requests which are in progress
self.current_processing = set()
# times at which we have recently (within the last window_size ms)
# received requests.
self.request_times = []
@contextlib.contextmanager
@ -117,11 +126,15 @@ class _PerHostRatelimiter(object):
def _on_enter(self, request_id):
time_now = self.clock.time_msec()
# remove any entries from request_times which aren't within the window
self.request_times[:] = [
r for r in self.request_times
if time_now - r < self.window_size
]
# reject the request if we already have too many queued up (either
# sleeping or in the ready queue).
queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
if queue_size > self.reject_limit:
raise LimitExceededError(
@ -134,9 +147,13 @@ class _PerHostRatelimiter(object):
def queue_request():
if len(self.current_processing) > self.concurrent_requests:
logger.debug("Ratelimit [%s]: Queue req", id(request_id))
queue_defer = defer.Deferred()
self.ready_request_queue[request_id] = queue_defer
logger.info(
"Ratelimiter: queueing request (queue now %i items)",
len(self.ready_request_queue),
)
return queue_defer
else:
return defer.succeed(None)
@ -148,10 +165,9 @@ class _PerHostRatelimiter(object):
if len(self.request_times) > self.sleep_limit:
logger.debug(
"Ratelimit [%s]: sleeping req",
id(request_id),
"Ratelimiter: sleeping request for %f sec", self.sleep_sec,
)
ret_defer = run_in_background(self.clock.sleep, self.sleep_msec / 1000.0)
ret_defer = run_in_background(self.clock.sleep, self.sleep_sec)
self.sleeping_requests.add(request_id)
@ -200,11 +216,8 @@ class _PerHostRatelimiter(object):
)
self.current_processing.discard(request_id)
try:
request_id, deferred = self.ready_request_queue.popitem()
# XXX: why do we do the following? the on_start callback above will
# do it for us.
self.current_processing.add(request_id)
# start processing the next item on the queue.
_, deferred = self.ready_request_queue.popitem(last=False)
with PreserveLoggingContext():
deferred.callback(None)