Merge pull request #2016 from matrix-org/rav/queue_pdus_during_join

Queue up federation PDUs while a room join is in progress
This commit is contained in:
Richard van der Hoff 2017-03-17 11:32:44 +00:00 committed by GitHub
commit 2abe85d50e
2 changed files with 69 additions and 24 deletions

View File

@ -14,6 +14,7 @@
# limitations under the License. # limitations under the License.
"""Contains handlers for federation events.""" """Contains handlers for federation events."""
import synapse.util.logcontext
from signedjson.key import decode_verify_key_bytes from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json from signedjson.sign import verify_signed_json
from unpaddedbase64 import decode_base64 from unpaddedbase64 import decode_base64
@ -114,6 +115,14 @@ class FederationHandler(BaseHandler):
logger.debug("Already seen pdu %s", pdu.event_id) logger.debug("Already seen pdu %s", pdu.event_id)
return return
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
if pdu.room_id in self.room_queues:
logger.info("Ignoring PDU %s for room %s from %s for now; join "
"in progress", pdu.event_id, pdu.room_id, origin)
self.room_queues[pdu.room_id].append((pdu, origin))
return
state = None state = None
auth_chain = [] auth_chain = []
@ -274,26 +283,13 @@ class FederationHandler(BaseHandler):
@log_function @log_function
@defer.inlineCallbacks @defer.inlineCallbacks
def _process_received_pdu(self, origin, pdu, state=None, auth_chain=None): def _process_received_pdu(self, origin, pdu, state, auth_chain):
""" Called when we have a new pdu. We need to do auth checks and put it """ Called when we have a new pdu. We need to do auth checks and put it
through the StateHandler. through the StateHandler.
auth_chain and state are None if we already have the necessary state
and prev_events in the db
""" """
event = pdu event = pdu
logger.debug("Got event: %s", event.event_id) logger.debug("Processing event: %s", event)
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
if event.room_id in self.room_queues:
self.room_queues[event.room_id].append((pdu, origin))
return
logger.debug("Processing event: %s", event.event_id)
logger.debug("Event: %s", event)
# FIXME (erikj): Awful hack to make the case where we are not currently # FIXME (erikj): Awful hack to make the case where we are not currently
# in the room work # in the room work
@ -862,8 +858,6 @@ class FederationHandler(BaseHandler):
""" """
logger.debug("Joining %s to %s", joinee, room_id) logger.debug("Joining %s to %s", joinee, room_id)
yield self.store.clean_room_for_join(room_id)
origin, event = yield self._make_and_verify_event( origin, event = yield self._make_and_verify_event(
target_hosts, target_hosts,
room_id, room_id,
@ -872,7 +866,15 @@ class FederationHandler(BaseHandler):
content, content,
) )
# This shouldn't happen, because the RoomMemberHandler has a
# linearizer lock which only allows one operation per user per room
# at a time - so this is just paranoia.
assert (room_id not in self.room_queues)
self.room_queues[room_id] = [] self.room_queues[room_id] = []
yield self.store.clean_room_for_join(room_id)
handled_events = set() handled_events = set()
try: try:
@ -925,17 +927,35 @@ class FederationHandler(BaseHandler):
room_queue = self.room_queues[room_id] room_queue = self.room_queues[room_id]
del self.room_queues[room_id] del self.room_queues[room_id]
for p, origin in room_queue: # we don't need to wait for the queued events to be processed -
if p.event_id in handled_events: # it's just a best-effort thing at this point. We do want to do
continue # them roughly in order, though, otherwise we'll end up making
# lots of requests for missing prev_events which we do actually
# have. Hence we fire off the deferred, but don't wait for it.
try: synapse.util.logcontext.reset_context_after_deferred(
self._process_received_pdu(origin, p) self._handle_queued_pdus(room_queue))
except:
logger.exception("Couldn't handle pdu")
defer.returnValue(True) defer.returnValue(True)
@defer.inlineCallbacks
def _handle_queued_pdus(self, room_queue):
"""Process PDUs which got queued up while we were busy send_joining.
Args:
room_queue (list[FrozenEvent, str]): list of PDUs to be processed
and the servers that sent them
"""
for p, origin in room_queue:
try:
logger.info("Processing queued PDU %s which was received "
"while we were joining %s", p.event_id, p.room_id)
yield self.on_receive_pdu(origin, p)
except Exception as e:
logger.warn(
"Error handling queued PDU %s from %s: %s",
p.event_id, origin, e)
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def on_make_join_request(self, room_id, user_id): def on_make_join_request(self, room_id, user_id):

View File

@ -308,6 +308,31 @@ def preserve_context_over_deferred(deferred, context=None):
return d return d
def reset_context_after_deferred(deferred):
"""If the deferred is incomplete, add a callback which will reset the
context.
This is useful when you want to fire off a deferred, but don't want to
wait for it to complete. (The deferred will restore the current log context
when it completes, so if you don't do anything, it will leak log context.)
(If this feels asymmetric, consider it this way: we are effectively forking
a new thread of execution. We are probably currently within a
``with LoggingContext()`` block, which is supposed to have a single entry
and exit point. But by spawning off another deferred, we are effectively
adding a new exit point.)
Args:
deferred (defer.Deferred): deferred
"""
def reset_context(result):
LoggingContext.set_current_context(LoggingContext.sentinel)
return result
if not deferred.called:
deferred.addBoth(reset_context)
def preserve_fn(f): def preserve_fn(f):
"""Ensures that function is called with correct context and that context is """Ensures that function is called with correct context and that context is
restored after return. Useful for wrapping functions that return a deferred restored after return. Useful for wrapping functions that return a deferred