Convert federation client to async/await. (#7975)

This commit is contained in:
Patrick Cloke 2020-07-30 08:01:33 -04:00 committed by GitHub
parent 4cce8ef74e
commit c978f6c451
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 209 additions and 221 deletions

View file

@ -288,8 +288,7 @@ class FederationSender(object):
for destination in destinations:
self._get_per_destination_queue(destination).send_pdu(pdu, order)
@defer.inlineCallbacks
def send_read_receipt(self, receipt: ReadReceipt):
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""Send a RR to any other servers in the room
Args:
@ -330,9 +329,7 @@ class FederationSender(object):
room_id = receipt.room_id
# Work out which remote servers should be poked and poke them.
domains = yield defer.ensureDeferred(
self.state.get_current_hosts_in_room(room_id)
)
domains = await self.state.get_current_hosts_in_room(room_id)
domains = [
d
for d in domains
@ -387,8 +384,7 @@ class FederationSender(object):
queue.flush_read_receipts_for_room(room_id)
@preserve_fn # the caller should not yield on this
@defer.inlineCallbacks
def send_presence(self, states: List[UserPresenceState]):
async def send_presence(self, states: List[UserPresenceState]):
"""Send the new presence states to the appropriate destinations.
This actually queues up the presence states ready for sending and
@ -423,7 +419,7 @@ class FederationSender(object):
if not states_map:
break
yield self._process_presence_inner(list(states_map.values()))
await self._process_presence_inner(list(states_map.values()))
except Exception:
logger.exception("Error sending presence states to servers")
finally:
@ -450,14 +446,11 @@ class FederationSender(object):
self._get_per_destination_queue(destination).send_presence(states)
@measure_func("txnqueue._process_presence")
@defer.inlineCallbacks
def _process_presence_inner(self, states: List[UserPresenceState]):
async def _process_presence_inner(self, states: List[UserPresenceState]):
"""Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination
"""
hosts_and_states = yield defer.ensureDeferred(
get_interested_remotes(self.store, states, self.state)
)
hosts_and_states = await get_interested_remotes(self.store, states, self.state)
for destinations, states in hosts_and_states:
for destination in destinations: