make FederationClient.get_pdu async

This commit is contained in:
Richard van der Hoff 2020-02-03 20:41:54 +00:00
parent 0536d0c9be
commit 0cb0c7bcd5

View File

@ -17,7 +17,7 @@
import copy import copy
import itertools import itertools
import logging import logging
from typing import Dict, Iterable, List, Tuple from typing import Dict, Iterable, List, Optional, Tuple
from prometheus_client import Counter from prometheus_client import Counter
@ -211,11 +211,14 @@ class FederationClient(FederationBase):
return pdus return pdus
@defer.inlineCallbacks async def get_pdu(
@log_function self,
def get_pdu( destinations: Iterable[str],
self, destinations, event_id, room_version, outlier=False, timeout=None event_id: str,
): room_version: str,
outlier: bool = False,
timeout: Optional[int] = None,
) -> Optional[EventBase]:
"""Requests the PDU with given origin and ID from the remote home """Requests the PDU with given origin and ID from the remote home
servers. servers.
@ -223,18 +226,17 @@ class FederationClient(FederationBase):
one succeeds. one succeeds.
Args: Args:
destinations (list): Which homeservers to query destinations: Which homeservers to query
event_id (str): event to fetch event_id: event to fetch
room_version (str): version of the room room_version: version of the room
outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if outlier: Indicates whether the PDU is an `outlier`, i.e. if
it's from an arbitary point in the context as opposed to part it's from an arbitary point in the context as opposed to part
of the current block of PDUs. Defaults to `False` of the current block of PDUs. Defaults to `False`
timeout (int): How long to try (in ms) each destination for before timeout: How long to try (in ms) each destination for before
moving to the next destination. None indicates no timeout. moving to the next destination. None indicates no timeout.
Returns: Returns:
Deferred: Results in the requested PDU, or None if we were unable to find The requested PDU, or None if we were unable to find it.
it.
""" """
# TODO: Rate limit the number of times we try and get the same event. # TODO: Rate limit the number of times we try and get the same event.
@ -255,7 +257,7 @@ class FederationClient(FederationBase):
continue continue
try: try:
transaction_data = yield self.transport_layer.get_event( transaction_data = await self.transport_layer.get_event(
destination, event_id, timeout=timeout destination, event_id, timeout=timeout
) )
@ -275,7 +277,7 @@ class FederationClient(FederationBase):
pdu = pdu_list[0] pdu = pdu_list[0]
# Check signatures are correct. # Check signatures are correct.
signed_pdu = yield self._check_sigs_and_hash(room_version, pdu) signed_pdu = await self._check_sigs_and_hash(room_version, pdu)
break break