Convert presence handler helpers to async/await. (#7939)

This commit is contained in:
Patrick Cloke 2020-07-23 16:47:36 -04:00 committed by GitHub
parent 83434df381
commit fefe9943ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 24 deletions

1
changelog.d/7939.misc Normal file
View File

@ -0,0 +1 @@
Convert presence handler helpers to async/await.

View File

@ -453,7 +453,9 @@ class FederationSender(object):
"""Given a list of states populate self.pending_presence_by_dest and """Given a list of states populate self.pending_presence_by_dest and
poke to send a new transaction to each destination poke to send a new transaction to each destination
""" """
hosts_and_states = yield get_interested_remotes(self.store, states, self.state) hosts_and_states = yield defer.ensureDeferred(
get_interested_remotes(self.store, states, self.state)
)
for destinations, states in hosts_and_states: for destinations, states in hosts_and_states:
for destination in destinations: for destination in destinations:

View File

@ -30,8 +30,6 @@ from typing import Dict, Iterable, List, Set, Tuple
from prometheus_client import Counter from prometheus_client import Counter
from typing_extensions import ContextManager from typing_extensions import ContextManager
from twisted.internet import defer
import synapse.metrics import synapse.metrics
from synapse.api.constants import EventTypes, Membership, PresenceState from synapse.api.constants import EventTypes, Membership, PresenceState
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
@ -39,6 +37,8 @@ from synapse.logging.context import run_in_background
from synapse.logging.utils import log_function from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.state import StateHandler
from synapse.storage.data_stores.main import DataStore
from synapse.storage.presence import UserPresenceState from synapse.storage.presence import UserPresenceState
from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer from synapse.util.async_helpers import Linearizer
@ -895,16 +895,9 @@ class PresenceHandler(BasePresenceHandler):
await self._on_user_joined_room(room_id, state_key) await self._on_user_joined_room(room_id, state_key)
async def _on_user_joined_room(self, room_id, user_id): async def _on_user_joined_room(self, room_id: str, user_id: str) -> None:
"""Called when we detect a user joining the room via the current state """Called when we detect a user joining the room via the current state
delta stream. delta stream.
Args:
room_id (str)
user_id (str)
Returns:
Deferred
""" """
if self.is_mine_id(user_id): if self.is_mine_id(user_id):
@ -1296,22 +1289,24 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
return new_state, persist_and_notify, federation_ping return new_state, persist_and_notify, federation_ping
@defer.inlineCallbacks async def get_interested_parties(
def get_interested_parties(store, states): store: DataStore, states: List[UserPresenceState]
) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]:
"""Given a list of states return which entities (rooms, users) """Given a list of states return which entities (rooms, users)
are interested in the given states. are interested in the given states.
Args: Args:
states (list(UserPresenceState)) store
states
Returns: Returns:
2-tuple: `(room_ids_to_states, users_to_states)`, A 2-tuple of `(room_ids_to_states, users_to_states)`,
with each item being a dict of `entity_name` -> `[UserPresenceState]` with each item being a dict of `entity_name` -> `[UserPresenceState]`
""" """
room_ids_to_states = {} # type: Dict[str, List[UserPresenceState]] room_ids_to_states = {} # type: Dict[str, List[UserPresenceState]]
users_to_states = {} # type: Dict[str, List[UserPresenceState]] users_to_states = {} # type: Dict[str, List[UserPresenceState]]
for state in states: for state in states:
room_ids = yield store.get_rooms_for_user(state.user_id) room_ids = await store.get_rooms_for_user(state.user_id)
for room_id in room_ids: for room_id in room_ids:
room_ids_to_states.setdefault(room_id, []).append(state) room_ids_to_states.setdefault(room_id, []).append(state)
@ -1321,20 +1316,22 @@ def get_interested_parties(store, states):
return room_ids_to_states, users_to_states return room_ids_to_states, users_to_states
@defer.inlineCallbacks async def get_interested_remotes(
def get_interested_remotes(store, states, state_handler): store: DataStore, states: List[UserPresenceState], state_handler: StateHandler
) -> List[Tuple[List[str], List[UserPresenceState]]]:
"""Given a list of presence states figure out which remote servers """Given a list of presence states figure out which remote servers
should be sent which. should be sent which.
All the presence states should be for local users only. All the presence states should be for local users only.
Args: Args:
store (DataStore) store
states (list(UserPresenceState)) states
state_handler
Returns: Returns:
Deferred list of ([destinations], [UserPresenceState]), where for A list of 2-tuples of destinations and states, where for
each row the list of UserPresenceState should be sent to each each tuple the list of UserPresenceState should be sent to each
destination destination
""" """
hosts_and_states = [] hosts_and_states = []
@ -1342,10 +1339,10 @@ def get_interested_remotes(store, states, state_handler):
# First we look up the rooms each user is in (as well as any explicit # First we look up the rooms each user is in (as well as any explicit
# subscriptions), then for each distinct room we look up the remote # subscriptions), then for each distinct room we look up the remote
# hosts in those rooms. # hosts in those rooms.
room_ids_to_states, users_to_states = yield get_interested_parties(store, states) room_ids_to_states, users_to_states = await get_interested_parties(store, states)
for room_id, states in room_ids_to_states.items(): for room_id, states in room_ids_to_states.items():
hosts = yield state_handler.get_current_hosts_in_room(room_id) hosts = await state_handler.get_current_hosts_in_room(room_id)
hosts_and_states.append((hosts, states)) hosts_and_states.append((hosts, states))
for user_id, states in users_to_states.items(): for user_id, states in users_to_states.items():