Merge pull request #3572 from matrix-org/rav/linearizer_cancellation

Test and fix support for cancellation in Linearizer
This commit is contained in:
Richard van der Hoff 2018-07-20 14:44:02 +01:00 committed by GitHub
commit ff48ab8527
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 54 additions and 6 deletions

1
changelog.d/3572.misc Normal file
View File

@ -0,0 +1 @@
Merge Linearizer and Limiter

View File

@ -184,13 +184,13 @@ class Linearizer(object):
# key_to_defer is a map from the key to a 2 element list where # key_to_defer is a map from the key to a 2 element list where
# the first element is the number of things executing, and # the first element is the number of things executing, and
# the second element is a deque of deferreds for the things blocked from # the second element is an OrderedDict, where the keys are deferreds for the
# executing. # things blocked from executing.
self.key_to_defer = {} self.key_to_defer = {}
@defer.inlineCallbacks @defer.inlineCallbacks
def queue(self, key): def queue(self, key):
entry = self.key_to_defer.setdefault(key, [0, collections.deque()]) entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()])
# If the number of things executing is greater than the maximum # If the number of things executing is greater than the maximum
# then add a deferred to the list of blocked items # then add a deferred to the list of blocked items
@ -198,12 +198,28 @@ class Linearizer(object):
# this item so that it can continue executing. # this item so that it can continue executing.
if entry[0] >= self.max_count: if entry[0] >= self.max_count:
new_defer = defer.Deferred() new_defer = defer.Deferred()
entry[1].append(new_defer) entry[1][new_defer] = 1
logger.info( logger.info(
"Waiting to acquire linearizer lock %r for key %r", self.name, key, "Waiting to acquire linearizer lock %r for key %r", self.name, key,
) )
yield make_deferred_yieldable(new_defer) try:
yield make_deferred_yieldable(new_defer)
except Exception as e:
if isinstance(e, CancelledError):
logger.info(
"Cancelling wait for linearizer lock %r for key %r",
self.name, key,
)
else:
logger.warn(
"Unexpected exception waiting for linearizer lock %r for key %r",
self.name, key,
)
# we just have to take ourselves back out of the queue.
del entry[1][new_defer]
raise
logger.info("Acquired linearizer lock %r for key %r", self.name, key) logger.info("Acquired linearizer lock %r for key %r", self.name, key)
entry[0] += 1 entry[0] += 1
@ -238,7 +254,7 @@ class Linearizer(object):
entry[0] -= 1 entry[0] -= 1
if entry[1]: if entry[1]:
next_def = entry[1].popleft() (next_def, _) = entry[1].popitem(last=False)
# we need to run the next thing in the sentinel context. # we need to run the next thing in the sentinel context.
with PreserveLoggingContext(): with PreserveLoggingContext():

View File

@ -17,6 +17,7 @@
from six.moves import range from six.moves import range
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
from twisted.internet.defer import CancelledError
from synapse.util import Clock, logcontext from synapse.util import Clock, logcontext
from synapse.util.async import Linearizer from synapse.util.async import Linearizer
@ -112,3 +113,33 @@ class LinearizerTestCase(unittest.TestCase):
d6 = limiter.queue(key) d6 = limiter.queue(key)
with (yield d6): with (yield d6):
pass pass
@defer.inlineCallbacks
def test_cancellation(self):
linearizer = Linearizer()
key = object()
d1 = linearizer.queue(key)
cm1 = yield d1
d2 = linearizer.queue(key)
self.assertFalse(d2.called)
d3 = linearizer.queue(key)
self.assertFalse(d3.called)
d2.cancel()
with cm1:
pass
self.assertTrue(d2.called)
try:
yield d2
self.fail("Expected d2 to raise CancelledError")
except CancelledError:
pass
with (yield d3):
pass