Queue up federation PDUs while a room join is in progress

This just takes the existing `room_queues` logic and moves it out to
`on_receive_pdu` instead of `_process_received_pdu`, which ensures that we
don't start trying to fetch prev_events and whathaveyou until the join has
completed.
This commit is contained in:
Richard van der Hoff 2017-03-14 11:26:57 +00:00
parent b5d1c68beb
commit 9ce53a3861

View File

@ -14,6 +14,7 @@
# limitations under the License. # limitations under the License.
"""Contains handlers for federation events.""" """Contains handlers for federation events."""
import synapse.util.logcontext
from signedjson.key import decode_verify_key_bytes from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json from signedjson.sign import verify_signed_json
from unpaddedbase64 import decode_base64 from unpaddedbase64 import decode_base64
@ -114,6 +115,14 @@ class FederationHandler(BaseHandler):
logger.debug("Already seen pdu %s", pdu.event_id) logger.debug("Already seen pdu %s", pdu.event_id)
return return
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
if pdu.room_id in self.room_queues:
logger.info("Ignoring PDU %s for room %s from %s for now; join "
"in progress", pdu.event_id, pdu.room_id, origin)
self.room_queues[pdu.room_id].append((pdu, origin))
return
state = None state = None
auth_chain = [] auth_chain = []
@ -274,26 +283,13 @@ class FederationHandler(BaseHandler):
@log_function @log_function
@defer.inlineCallbacks @defer.inlineCallbacks
def _process_received_pdu(self, origin, pdu, state=None, auth_chain=None): def _process_received_pdu(self, origin, pdu, state, auth_chain):
""" Called when we have a new pdu. We need to do auth checks and put it """ Called when we have a new pdu. We need to do auth checks and put it
through the StateHandler. through the StateHandler.
auth_chain and state are None if we already have the necessary state
and prev_events in the db
""" """
event = pdu event = pdu
logger.debug("Got event: %s", event.event_id) logger.debug("Processing event: %s", event)
# If we are currently in the process of joining this room, then we
# queue up events for later processing.
if event.room_id in self.room_queues:
self.room_queues[event.room_id].append((pdu, origin))
return
logger.debug("Processing event: %s", event.event_id)
logger.debug("Event: %s", event)
# FIXME (erikj): Awful hack to make the case where we are not currently # FIXME (erikj): Awful hack to make the case where we are not currently
# in the room work # in the room work
@ -862,8 +858,6 @@ class FederationHandler(BaseHandler):
""" """
logger.debug("Joining %s to %s", joinee, room_id) logger.debug("Joining %s to %s", joinee, room_id)
yield self.store.clean_room_for_join(room_id)
origin, event = yield self._make_and_verify_event( origin, event = yield self._make_and_verify_event(
target_hosts, target_hosts,
room_id, room_id,
@ -872,7 +866,15 @@ class FederationHandler(BaseHandler):
content, content,
) )
# This shouldn't happen, because the RoomMemberHandler has a
# linearizer lock which only allows one operation per user per room
# at a time - so this is just paranoia.
assert (room_id not in self.room_queues)
self.room_queues[room_id] = [] self.room_queues[room_id] = []
yield self.store.clean_room_for_join(room_id)
handled_events = set() handled_events = set()
try: try:
@ -925,17 +927,35 @@ class FederationHandler(BaseHandler):
room_queue = self.room_queues[room_id] room_queue = self.room_queues[room_id]
del self.room_queues[room_id] del self.room_queues[room_id]
for p, origin in room_queue: # we don't need to wait for the queued events to be processed -
if p.event_id in handled_events: # it's just a best-effort thing at this point. We do want to do
continue # them roughly in order, though, otherwise we'll end up making
# lots of requests for missing prev_events which we do actually
# have. Hence we fire off the deferred, but don't wait for it.
try: synapse.util.logcontext.reset_context_after_deferred(
self._process_received_pdu(origin, p) self._handle_queued_pdus(room_queue))
except:
logger.exception("Couldn't handle pdu")
defer.returnValue(True) defer.returnValue(True)
@defer.inlineCallbacks
def _handle_queued_pdus(self, room_queue):
"""Process PDUs which got queued up while we were busy send_joining.
Args:
room_queue (list[FrozenEvent, str]): list of PDUs to be processed
and the servers that sent them
"""
for p, origin in room_queue:
try:
logger.info("Processing queued PDU %s which was received "
"while we were joining %s", p.event_id, p.room_id)
yield self.on_receive_pdu(origin, p)
except Exception as e:
logger.warn(
"Error handling queued PDU %s from %s: %s",
p.event_id, origin, e)
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def on_make_join_request(self, room_id, user_id): def on_make_join_request(self, room_id, user_id):