convert to async: FederationHandler._process_received_pdu

also fix user_joined_room to consistently return deferreds
This commit is contained in:
Richard van der Hoff 2019-12-10 17:27:13 +00:00
parent 4db394a4b3
commit 6637d90d77

View File

@ -670,8 +670,7 @@ class FederationHandler(BaseHandler):
return fetched_events return fetched_events
@defer.inlineCallbacks async def _process_received_pdu(self, origin, event, state, auth_chain):
def _process_received_pdu(self, origin, event, state, auth_chain):
""" Called when we have a new pdu. We need to do auth checks and put it """ Called when we have a new pdu. We need to do auth checks and put it
through the StateHandler. through the StateHandler.
""" """
@ -686,7 +685,7 @@ class FederationHandler(BaseHandler):
if auth_chain: if auth_chain:
event_ids |= {e.event_id for e in auth_chain} event_ids |= {e.event_id for e in auth_chain}
seen_ids = yield self.store.have_seen_events(event_ids) seen_ids = await self.store.have_seen_events(event_ids)
if state and auth_chain is not None: if state and auth_chain is not None:
# If we have any state or auth_chain given to us by the replication # If we have any state or auth_chain given to us by the replication
@ -713,18 +712,18 @@ class FederationHandler(BaseHandler):
event_id, event_id,
[e.event.event_id for e in event_infos], [e.event.event_id for e in event_infos],
) )
yield self._handle_new_events(origin, event_infos) await self._handle_new_events(origin, event_infos)
try: try:
context = yield self._handle_new_event(origin, event, state=state) context = await self._handle_new_event(origin, event, state=state)
except AuthError as e: except AuthError as e:
raise FederationError("ERROR", e.code, e.msg, affected=event.event_id) raise FederationError("ERROR", e.code, e.msg, affected=event.event_id)
room = yield self.store.get_room(room_id) room = await self.store.get_room(room_id)
if not room: if not room:
try: try:
yield self.store.store_room( await self.store.store_room(
room_id=room_id, room_creator_user_id="", is_public=False room_id=room_id, room_creator_user_id="", is_public=False
) )
except StoreError: except StoreError:
@ -737,11 +736,11 @@ class FederationHandler(BaseHandler):
# changing their profile info. # changing their profile info.
newly_joined = True newly_joined = True
prev_state_ids = yield context.get_prev_state_ids(self.store) prev_state_ids = await context.get_prev_state_ids(self.store)
prev_state_id = prev_state_ids.get((event.type, event.state_key)) prev_state_id = prev_state_ids.get((event.type, event.state_key))
if prev_state_id: if prev_state_id:
prev_state = yield self.store.get_event( prev_state = await self.store.get_event(
prev_state_id, allow_none=True prev_state_id, allow_none=True
) )
if prev_state and prev_state.membership == Membership.JOIN: if prev_state and prev_state.membership == Membership.JOIN:
@ -749,7 +748,7 @@ class FederationHandler(BaseHandler):
if newly_joined: if newly_joined:
user = UserID.from_string(event.state_key) user = UserID.from_string(event.state_key)
yield self.user_joined_room(user, room_id) await self.user_joined_room(user, room_id)
@log_function @log_function
async def backfill(self, dest, room_id, limit, extremities): async def backfill(self, dest, room_id, limit, extremities):
@ -2899,7 +2898,7 @@ class FederationHandler(BaseHandler):
room_id=room_id, user_id=user.to_string(), change="joined" room_id=room_id, user_id=user.to_string(), change="joined"
) )
else: else:
return user_joined_room(self.distributor, user, room_id) return defer.succeed(user_joined_room(self.distributor, user, room_id))
@defer.inlineCallbacks @defer.inlineCallbacks
def get_room_complexity(self, remote_room_hosts, room_id): def get_room_complexity(self, remote_room_hosts, room_id):