Merge pull request #700 from matrix-org/erikj/deduplicate_joins

Deduplicate membership changes
This commit is contained in:
Erik Johnston 2016-04-07 16:35:40 +01:00
commit a294b04bf0
4 changed files with 125 additions and 2 deletions

View file

@ -24,6 +24,7 @@ from synapse.api.constants import (
)
from synapse.api.errors import AuthError, SynapseError, Codes
from synapse.util.logcontext import preserve_context_over_fn
from synapse.util.async import Linearizer
from signedjson.sign import verify_signed_json
from signedjson.key import decode_verify_key_bytes
@ -60,6 +61,8 @@ class RoomMemberHandler(BaseHandler):
def __init__(self, hs):
super(RoomMemberHandler, self).__init__(hs)
self.member_linearizer = Linearizer()
self.clock = hs.get_clock()
self.distributor = hs.get_distributor()
@ -182,6 +185,34 @@ class RoomMemberHandler(BaseHandler):
remote_room_hosts=None,
third_party_signed=None,
ratelimit=True,
):
key = (target, room_id,)
with (yield self.member_linearizer.queue(key)):
result = yield self._update_membership(
requester,
target,
room_id,
action,
txn_id=txn_id,
remote_room_hosts=remote_room_hosts,
third_party_signed=third_party_signed,
ratelimit=ratelimit,
)
defer.returnValue(result)
@defer.inlineCallbacks
def _update_membership(
self,
requester,
target,
room_id,
action,
txn_id=None,
remote_room_hosts=None,
third_party_signed=None,
ratelimit=True,
):
effective_membership_state = action
if action in ["kick", "unban"]:

View file

@ -16,9 +16,13 @@
from twisted.internet import defer, reactor
from .logcontext import PreserveLoggingContext, preserve_fn
from .logcontext import (
PreserveLoggingContext, preserve_fn, preserve_context_over_deferred,
)
from synapse.util import unwrapFirstError
from contextlib import contextmanager
@defer.inlineCallbacks
def sleep(seconds):
@ -137,3 +141,47 @@ def concurrently_execute(func, args, limit):
preserve_fn(_concurrently_execute_inner)()
for _ in xrange(limit)
], consumeErrors=True).addErrback(unwrapFirstError)
class Linearizer(object):
"""Linearizes access to resources based on a key. Useful to ensure only one
thing is happening at a time on a given resource.
Example:
with (yield linearizer.queue("test_key")):
# do some work.
"""
def __init__(self):
self.key_to_defer = {}
@defer.inlineCallbacks
def queue(self, key):
# If there is already a deferred in the queue, we pull it out so that
# we can wait on it later.
# Then we replace it with a deferred that we resolve *after* the
# context manager has exited.
# We only return the context manager after the previous deferred has
# resolved.
# This all has the net effect of creating a chain of deferreds that
# wait for the previous deferred before starting their work.
current_defer = self.key_to_defer.get(key)
new_defer = defer.Deferred()
self.key_to_defer[key] = new_defer
if current_defer:
yield preserve_context_over_deferred(current_defer)
@contextmanager
def _ctx_manager():
try:
yield
finally:
new_defer.callback(None)
current_d = self.key_to_defer.get(key)
if current_d is new_defer:
self.key_to_defer.pop(key, None)
defer.returnValue(_ctx_manager())

View file

@ -35,7 +35,7 @@ class ResponseCache(object):
return None
def set(self, key, deferred):
result = ObservableDeferred(deferred)
result = ObservableDeferred(deferred, consumeErrors=True)
self.pending_result_cache[key] = result
def remove(r):