Merge pull request #3571 from matrix-org/rav/limiter_fixes

A set of improvements to the Limiter
This commit is contained in:
Richard van der Hoff 2018-07-20 13:53:03 +01:00 committed by GitHub
commit 4f67623674
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 95 additions and 171 deletions

1
changelog.d/3570.bugfix Normal file
View File

@ -0,0 +1 @@
Fix potential stack overflow and deadlock under heavy load

1
changelog.d/3571.misc Normal file
View File

@ -0,0 +1 @@
Merge Linearizer and Limiter

View File

@ -33,7 +33,7 @@ from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import send_event_to_master
from synapse.types import RoomAlias, RoomStreamToken, UserID
from synapse.util.async import Limiter, ReadWriteLock
from synapse.util.async import Linearizer, ReadWriteLock
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
@ -427,7 +427,7 @@ class EventCreationHandler(object):
# We arbitrarily limit concurrent event creation for a room to 5.
# This is to stop us from diverging history *too* much.
self.limiter = Limiter(max_count=5)
self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
self.action_generator = hs.get_action_generator()

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -12,7 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import logging
from contextlib import contextmanager
@ -156,91 +157,8 @@ def concurrently_execute(func, args, limit):
class Linearizer(object):
"""Linearizes access to resources based on a key. Useful to ensure only one
thing is happening at a time on a given resource.
Example:
with (yield linearizer.queue("test_key")):
# do some work.
"""
def __init__(self, name=None, clock=None):
if name is None:
self.name = id(self)
else:
self.name = name
self.key_to_defer = {}
if not clock:
from twisted.internet import reactor
clock = Clock(reactor)
self._clock = clock
@defer.inlineCallbacks
def queue(self, key):
# If there is already a deferred in the queue, we pull it out so that
# we can wait on it later.
# Then we replace it with a deferred that we resolve *after* the
# context manager has exited.
# We only return the context manager after the previous deferred has
# resolved.
# This all has the net effect of creating a chain of deferreds that
# wait for the previous deferred before starting their work.
current_defer = self.key_to_defer.get(key)
new_defer = defer.Deferred()
self.key_to_defer[key] = new_defer
if current_defer:
logger.info(
"Waiting to acquire linearizer lock %r for key %r", self.name, key
)
try:
with PreserveLoggingContext():
yield current_defer
except Exception:
logger.exception("Unexpected exception in Linearizer")
logger.info("Acquired linearizer lock %r for key %r", self.name,
key)
# if the code holding the lock completes synchronously, then it
# will recursively run the next claimant on the list. That can
# relatively rapidly lead to stack exhaustion. This is essentially
# the same problem as http://twistedmatrix.com/trac/ticket/9304.
#
# In order to break the cycle, we add a cheeky sleep(0) here to
# ensure that we fall back to the reactor between each iteration.
#
# (There's no particular need for it to happen before we return
# the context manager, but it needs to happen while we hold the
# lock, and the context manager's exit code must be synchronous,
# so actually this is the only sensible place.
yield self._clock.sleep(0)
else:
logger.info("Acquired uncontended linearizer lock %r for key %r",
self.name, key)
@contextmanager
def _ctx_manager():
try:
yield
finally:
logger.info("Releasing linearizer lock %r for key %r", self.name, key)
with PreserveLoggingContext():
new_defer.callback(None)
current_d = self.key_to_defer.get(key)
if current_d is new_defer:
self.key_to_defer.pop(key, None)
defer.returnValue(_ctx_manager())
class Limiter(object):
"""Limits concurrent access to resources based on a key. Useful to ensure
only a few thing happen at a time on a given resource.
only a few things happen at a time on a given resource.
Example:
@ -248,22 +166,31 @@ class Limiter(object):
# do some work.
"""
def __init__(self, max_count):
def __init__(self, name=None, max_count=1, clock=None):
"""
Args:
max_count(int): The maximum number of concurrent access
max_count(int): The maximum number of concurrent accesses
"""
if name is None:
self.name = id(self)
else:
self.name = name
if not clock:
from twisted.internet import reactor
clock = Clock(reactor)
self._clock = clock
self.max_count = max_count
# key_to_defer is a map from the key to a 2 element list where
# the first element is the number of things executing
# the second element is a list of deferreds for the things blocked from
# the first element is the number of things executing, and
# the second element is a deque of deferreds for the things blocked from
# executing.
self.key_to_defer = {}
@defer.inlineCallbacks
def queue(self, key):
entry = self.key_to_defer.setdefault(key, [0, []])
entry = self.key_to_defer.setdefault(key, [0, collections.deque()])
# If the number of things executing is greater than the maximum
# then add a deferred to the list of blocked items
@ -273,29 +200,47 @@ class Limiter(object):
new_defer = defer.Deferred()
entry[1].append(new_defer)
logger.info("Waiting to acquire limiter lock for key %r", key)
with PreserveLoggingContext():
yield new_defer
logger.info("Acquired limiter lock for key %r", key)
else:
logger.info("Acquired uncontended limiter lock for key %r", key)
logger.info(
"Waiting to acquire linearizer lock %r for key %r", self.name, key,
)
yield make_deferred_yieldable(new_defer)
entry[0] += 1
logger.info("Acquired linearizer lock %r for key %r", self.name, key)
entry[0] += 1
# if the code holding the lock completes synchronously, then it
# will recursively run the next claimant on the list. That can
# relatively rapidly lead to stack exhaustion. This is essentially
# the same problem as http://twistedmatrix.com/trac/ticket/9304.
#
# In order to break the cycle, we add a cheeky sleep(0) here to
# ensure that we fall back to the reactor between each iteration.
#
# (This needs to happen while we hold the lock, and the context manager's exit
# code must be synchronous, so this is the only sensible place.)
yield self._clock.sleep(0)
else:
logger.info(
"Acquired uncontended linearizer lock %r for key %r", self.name, key,
)
entry[0] += 1
@contextmanager
def _ctx_manager():
try:
yield
finally:
logger.info("Releasing limiter lock for key %r", key)
logger.info("Releasing linearizer lock %r for key %r", self.name, key)
# We've finished executing so check if there are any things
# blocked waiting to execute and start one of them
entry[0] -= 1
if entry[1]:
next_def = entry[1].pop(0)
next_def = entry[1].popleft()
# we need to run the next thing in the sentinel context.
with PreserveLoggingContext():
next_def.callback(None)
elif entry[0] == 0:

View File

@ -1,70 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer
from synapse.util.async import Limiter
from tests import unittest
class LimiterTestCase(unittest.TestCase):
@defer.inlineCallbacks
def test_limiter(self):
limiter = Limiter(3)
key = object()
d1 = limiter.queue(key)
cm1 = yield d1
d2 = limiter.queue(key)
cm2 = yield d2
d3 = limiter.queue(key)
cm3 = yield d3
d4 = limiter.queue(key)
self.assertFalse(d4.called)
d5 = limiter.queue(key)
self.assertFalse(d5.called)
with cm1:
self.assertFalse(d4.called)
self.assertFalse(d5.called)
self.assertTrue(d4.called)
self.assertFalse(d5.called)
with cm3:
self.assertFalse(d5.called)
self.assertTrue(d5.called)
with cm2:
pass
with (yield d4):
pass
with (yield d5):
pass
d6 = limiter.queue(key)
with (yield d6):
pass

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -65,3 +66,49 @@ class LinearizerTestCase(unittest.TestCase):
func(i)
return func(1000)
@defer.inlineCallbacks
def test_multiple_entries(self):
limiter = Linearizer(max_count=3)
key = object()
d1 = limiter.queue(key)
cm1 = yield d1
d2 = limiter.queue(key)
cm2 = yield d2
d3 = limiter.queue(key)
cm3 = yield d3
d4 = limiter.queue(key)
self.assertFalse(d4.called)
d5 = limiter.queue(key)
self.assertFalse(d5.called)
with cm1:
self.assertFalse(d4.called)
self.assertFalse(d5.called)
cm4 = yield d4
self.assertFalse(d5.called)
with cm3:
self.assertFalse(d5.called)
cm5 = yield d5
with cm2:
pass
with cm4:
pass
with cm5:
pass
d6 = limiter.queue(key)
with (yield d6):
pass