Update the room member handler to use async/await. (#7507)

This commit is contained in:
Patrick Cloke 2020-05-15 09:32:13 -04:00 committed by GitHub
parent 08bc80ef09
commit e9f3de0bab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 59 additions and 74 deletions

1
changelog.d/7507.misc Normal file
View File

@ -0,0 +1 @@
Convert the room member handler to async/await.

View File

@ -20,8 +20,6 @@ import logging
from six.moves import http_client from six.moves import http_client
from twisted.internet import defer
from synapse import types from synapse import types
from synapse.api.constants import EventTypes, Membership from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError from synapse.api.errors import AuthError, Codes, SynapseError
@ -76,7 +74,7 @@ class RoomMemberHandler(object):
self.base_handler = BaseHandler(hs) self.base_handler = BaseHandler(hs)
@abc.abstractmethod @abc.abstractmethod
def _remote_join(self, requester, remote_room_hosts, room_id, user, content): async def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
"""Try and join a room that this server is not in """Try and join a room that this server is not in
Args: Args:
@ -94,7 +92,7 @@ class RoomMemberHandler(object):
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def _remote_reject_invite( async def _remote_reject_invite(
self, requester, remote_room_hosts, room_id, target, content self, requester, remote_room_hosts, room_id, target, content
): ):
"""Attempt to reject an invite for a room this server is not in. If we """Attempt to reject an invite for a room this server is not in. If we
@ -115,7 +113,7 @@ class RoomMemberHandler(object):
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def _user_joined_room(self, target, room_id): async def _user_joined_room(self, target, room_id):
"""Notifies distributor on master process that the user has joined the """Notifies distributor on master process that the user has joined the
room. room.
@ -124,12 +122,12 @@ class RoomMemberHandler(object):
room_id (str) room_id (str)
Returns: Returns:
Deferred|None None
""" """
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
def _user_left_room(self, target, room_id): async def _user_left_room(self, target, room_id):
"""Notifies distributor on master process that the user has left the """Notifies distributor on master process that the user has left the
room. room.
@ -138,7 +136,7 @@ class RoomMemberHandler(object):
room_id (str) room_id (str)
Returns: Returns:
Deferred|None None
""" """
raise NotImplementedError() raise NotImplementedError()
@ -214,8 +212,9 @@ class RoomMemberHandler(object):
return event return event
@defer.inlineCallbacks async def copy_room_tags_and_direct_to_room(
def copy_room_tags_and_direct_to_room(self, old_room_id, new_room_id, user_id): self, old_room_id, new_room_id, user_id
):
"""Copies the tags and direct room state from one room to another. """Copies the tags and direct room state from one room to another.
Args: Args:
@ -227,7 +226,7 @@ class RoomMemberHandler(object):
Deferred[None] Deferred[None]
""" """
# Retrieve user account data for predecessor room # Retrieve user account data for predecessor room
user_account_data, _ = yield self.store.get_account_data_for_user(user_id) user_account_data, _ = await self.store.get_account_data_for_user(user_id)
# Copy direct message state if applicable # Copy direct message state if applicable
direct_rooms = user_account_data.get("m.direct", {}) direct_rooms = user_account_data.get("m.direct", {})
@ -240,17 +239,17 @@ class RoomMemberHandler(object):
direct_rooms[key].append(new_room_id) direct_rooms[key].append(new_room_id)
# Save back to user's m.direct account data # Save back to user's m.direct account data
yield self.store.add_account_data_for_user( await self.store.add_account_data_for_user(
user_id, "m.direct", direct_rooms user_id, "m.direct", direct_rooms
) )
break break
# Copy room tags if applicable # Copy room tags if applicable
room_tags = yield self.store.get_tags_for_room(user_id, old_room_id) room_tags = await self.store.get_tags_for_room(user_id, old_room_id)
# Copy each room tag to the new room # Copy each room tag to the new room
for tag, tag_content in room_tags.items(): for tag, tag_content in room_tags.items():
yield self.store.add_tag_to_room(user_id, new_room_id, tag, tag_content) await self.store.add_tag_to_room(user_id, new_room_id, tag, tag_content)
async def update_membership( async def update_membership(
self, self,
@ -487,8 +486,7 @@ class RoomMemberHandler(object):
) )
return res return res
@defer.inlineCallbacks async def transfer_room_state_on_room_upgrade(self, old_room_id, room_id):
def transfer_room_state_on_room_upgrade(self, old_room_id, room_id):
"""Upon our server becoming aware of an upgraded room, either by upgrading a room """Upon our server becoming aware of an upgraded room, either by upgrading a room
ourselves or joining one, we can transfer over information from the previous room. ourselves or joining one, we can transfer over information from the previous room.
@ -506,30 +504,29 @@ class RoomMemberHandler(object):
logger.info("Transferring room state from %s to %s", old_room_id, room_id) logger.info("Transferring room state from %s to %s", old_room_id, room_id)
# Find all local users that were in the old room and copy over each user's state # Find all local users that were in the old room and copy over each user's state
users = yield self.store.get_users_in_room(old_room_id) users = await self.store.get_users_in_room(old_room_id)
yield self.copy_user_state_on_room_upgrade(old_room_id, room_id, users) await self.copy_user_state_on_room_upgrade(old_room_id, room_id, users)
# Add new room to the room directory if the old room was there # Add new room to the room directory if the old room was there
# Remove old room from the room directory # Remove old room from the room directory
old_room = yield self.store.get_room(old_room_id) old_room = await self.store.get_room(old_room_id)
if old_room and old_room["is_public"]: if old_room and old_room["is_public"]:
yield self.store.set_room_is_public(old_room_id, False) await self.store.set_room_is_public(old_room_id, False)
yield self.store.set_room_is_public(room_id, True) await self.store.set_room_is_public(room_id, True)
# Transfer alias mappings in the room directory # Transfer alias mappings in the room directory
yield self.store.update_aliases_for_room(old_room_id, room_id) await self.store.update_aliases_for_room(old_room_id, room_id)
# Check if any groups we own contain the predecessor room # Check if any groups we own contain the predecessor room
local_group_ids = yield self.store.get_local_groups_for_room(old_room_id) local_group_ids = await self.store.get_local_groups_for_room(old_room_id)
for group_id in local_group_ids: for group_id in local_group_ids:
# Add new the new room to those groups # Add new the new room to those groups
yield self.store.add_room_to_group(group_id, room_id, old_room["is_public"]) await self.store.add_room_to_group(group_id, room_id, old_room["is_public"])
# Remove the old room from those groups # Remove the old room from those groups
yield self.store.remove_room_from_group(group_id, old_room_id) await self.store.remove_room_from_group(group_id, old_room_id)
@defer.inlineCallbacks async def copy_user_state_on_room_upgrade(self, old_room_id, new_room_id, user_ids):
def copy_user_state_on_room_upgrade(self, old_room_id, new_room_id, user_ids):
"""Copy user-specific information when they join a new room when that new room is the """Copy user-specific information when they join a new room when that new room is the
result of a room upgrade result of a room upgrade
@ -552,11 +549,11 @@ class RoomMemberHandler(object):
for user_id in user_ids: for user_id in user_ids:
try: try:
# It is an upgraded room. Copy over old tags # It is an upgraded room. Copy over old tags
yield self.copy_room_tags_and_direct_to_room( await self.copy_room_tags_and_direct_to_room(
old_room_id, new_room_id, user_id old_room_id, new_room_id, user_id
) )
# Copy over push rules # Copy over push rules
yield self.store.copy_push_rules_from_room_to_room_for_user( await self.store.copy_push_rules_from_room_to_room_for_user(
old_room_id, new_room_id, user_id old_room_id, new_room_id, user_id
) )
except Exception: except Exception:
@ -639,8 +636,7 @@ class RoomMemberHandler(object):
if prev_member_event.membership == Membership.JOIN: if prev_member_event.membership == Membership.JOIN:
await self._user_left_room(target_user, room_id) await self._user_left_room(target_user, room_id)
@defer.inlineCallbacks async def _can_guest_join(self, current_state_ids):
def _can_guest_join(self, current_state_ids):
""" """
Returns whether a guest can join a room based on its current state. Returns whether a guest can join a room based on its current state.
""" """
@ -648,7 +644,7 @@ class RoomMemberHandler(object):
if not guest_access_id: if not guest_access_id:
return False return False
guest_access = yield self.store.get_event(guest_access_id) guest_access = await self.store.get_event(guest_access_id)
return ( return (
guest_access guest_access
@ -657,8 +653,7 @@ class RoomMemberHandler(object):
and guest_access.content["guest_access"] == "can_join" and guest_access.content["guest_access"] == "can_join"
) )
@defer.inlineCallbacks async def lookup_room_alias(self, room_alias):
def lookup_room_alias(self, room_alias):
""" """
Get the room ID associated with a room alias. Get the room ID associated with a room alias.
@ -672,7 +667,7 @@ class RoomMemberHandler(object):
SynapseError if room alias could not be found. SynapseError if room alias could not be found.
""" """
directory_handler = self.directory_handler directory_handler = self.directory_handler
mapping = yield directory_handler.get_association(room_alias) mapping = await directory_handler.get_association(room_alias)
if not mapping: if not mapping:
raise SynapseError(404, "No such room alias") raise SynapseError(404, "No such room alias")
@ -687,9 +682,8 @@ class RoomMemberHandler(object):
return RoomID.from_string(room_id), servers return RoomID.from_string(room_id), servers
@defer.inlineCallbacks async def _get_inviter(self, user_id, room_id):
def _get_inviter(self, user_id, room_id): invite = await self.store.get_invite_for_local_user_in_room(
invite = yield self.store.get_invite_for_local_user_in_room(
user_id=user_id, room_id=room_id user_id=user_id, room_id=room_id
) )
if invite: if invite:
@ -836,8 +830,7 @@ class RoomMemberHandler(object):
txn_id=txn_id, txn_id=txn_id,
) )
@defer.inlineCallbacks async def _is_host_in_room(self, current_state_ids):
def _is_host_in_room(self, current_state_ids):
# Have we just created the room, and is this about to be the very # Have we just created the room, and is this about to be the very
# first member event? # first member event?
create_event_id = current_state_ids.get(("m.room.create", "")) create_event_id = current_state_ids.get(("m.room.create", ""))
@ -850,7 +843,7 @@ class RoomMemberHandler(object):
continue continue
event_id = current_state_ids[(etype, state_key)] event_id = current_state_ids[(etype, state_key)]
event = yield self.store.get_event(event_id, allow_none=True) event = await self.store.get_event(event_id, allow_none=True)
if not event: if not event:
continue continue
@ -859,11 +852,10 @@ class RoomMemberHandler(object):
return False return False
@defer.inlineCallbacks async def _is_server_notice_room(self, room_id):
def _is_server_notice_room(self, room_id):
if self._server_notices_mxid is None: if self._server_notices_mxid is None:
return False return False
user_ids = yield self.store.get_users_in_room(room_id) user_ids = await self.store.get_users_in_room(room_id)
return self._server_notices_mxid in user_ids return self._server_notices_mxid in user_ids
@ -895,8 +887,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
return complexity["v1"] > max_complexity return complexity["v1"] > max_complexity
return None return None
@defer.inlineCallbacks async def _is_local_room_too_complex(self, room_id):
def _is_local_room_too_complex(self, room_id):
""" """
Check if the complexity of a local room is too great. Check if the complexity of a local room is too great.
@ -906,7 +897,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
Returns: bool Returns: bool
""" """
max_complexity = self.hs.config.limit_remote_rooms.complexity max_complexity = self.hs.config.limit_remote_rooms.complexity
complexity = yield self.store.get_room_complexity(room_id) complexity = await self.store.get_room_complexity(room_id)
return complexity["v1"] > max_complexity return complexity["v1"] > max_complexity
@ -969,18 +960,15 @@ class RoomMemberMasterHandler(RoomMemberHandler):
errcode=Codes.RESOURCE_LIMIT_EXCEEDED, errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
) )
@defer.inlineCallbacks async def _remote_reject_invite(
def _remote_reject_invite(
self, requester, remote_room_hosts, room_id, target, content self, requester, remote_room_hosts, room_id, target, content
): ):
"""Implements RoomMemberHandler._remote_reject_invite """Implements RoomMemberHandler._remote_reject_invite
""" """
fed_handler = self.federation_handler fed_handler = self.federation_handler
try: try:
ret = yield defer.ensureDeferred( ret = await fed_handler.do_remotely_reject_invite(
fed_handler.do_remotely_reject_invite( remote_room_hosts, room_id, target.to_string(), content=content,
remote_room_hosts, room_id, target.to_string(), content=content,
)
) )
return ret return ret
except Exception as e: except Exception as e:
@ -992,24 +980,23 @@ class RoomMemberMasterHandler(RoomMemberHandler):
# #
logger.warning("Failed to reject invite: %s", e) logger.warning("Failed to reject invite: %s", e)
yield self.store.locally_reject_invite(target.to_string(), room_id) await self.store.locally_reject_invite(target.to_string(), room_id)
return {} return {}
def _user_joined_room(self, target, room_id): async def _user_joined_room(self, target, room_id):
"""Implements RoomMemberHandler._user_joined_room """Implements RoomMemberHandler._user_joined_room
""" """
return defer.succeed(user_joined_room(self.distributor, target, room_id)) return user_joined_room(self.distributor, target, room_id)
def _user_left_room(self, target, room_id): async def _user_left_room(self, target, room_id):
"""Implements RoomMemberHandler._user_left_room """Implements RoomMemberHandler._user_left_room
""" """
return defer.succeed(user_left_room(self.distributor, target, room_id)) return user_left_room(self.distributor, target, room_id)
@defer.inlineCallbacks async def forget(self, user, room_id):
def forget(self, user, room_id):
user_id = user.to_string() user_id = user.to_string()
member = yield self.state_handler.get_current_state( member = await self.state_handler.get_current_state(
room_id=room_id, event_type=EventTypes.Member, state_key=user_id room_id=room_id, event_type=EventTypes.Member, state_key=user_id
) )
membership = member.membership if member else None membership = member.membership if member else None
@ -1021,4 +1008,4 @@ class RoomMemberMasterHandler(RoomMemberHandler):
raise SynapseError(400, "User %s in room %s" % (user_id, room_id)) raise SynapseError(400, "User %s in room %s" % (user_id, room_id))
if membership: if membership:
yield self.store.forget(user_id, room_id) await self.store.forget(user_id, room_id)

View File

@ -15,8 +15,6 @@
import logging import logging
from twisted.internet import defer
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.handlers.room_member import RoomMemberHandler from synapse.handlers.room_member import RoomMemberHandler
from synapse.replication.http.membership import ( from synapse.replication.http.membership import (
@ -36,14 +34,13 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
self._remote_reject_client = ReplRejectInvite.make_client(hs) self._remote_reject_client = ReplRejectInvite.make_client(hs)
self._notify_change_client = ReplJoinedLeft.make_client(hs) self._notify_change_client = ReplJoinedLeft.make_client(hs)
@defer.inlineCallbacks async def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
"""Implements RoomMemberHandler._remote_join """Implements RoomMemberHandler._remote_join
""" """
if len(remote_room_hosts) == 0: if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers") raise SynapseError(404, "No known servers")
ret = yield self._remote_join_client( ret = await self._remote_join_client(
requester=requester, requester=requester,
remote_room_hosts=remote_room_hosts, remote_room_hosts=remote_room_hosts,
room_id=room_id, room_id=room_id,
@ -51,16 +48,16 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
content=content, content=content,
) )
yield self._user_joined_room(user, room_id) await self._user_joined_room(user, room_id)
return ret return ret
def _remote_reject_invite( async def _remote_reject_invite(
self, requester, remote_room_hosts, room_id, target, content self, requester, remote_room_hosts, room_id, target, content
): ):
"""Implements RoomMemberHandler._remote_reject_invite """Implements RoomMemberHandler._remote_reject_invite
""" """
return self._remote_reject_client( return await self._remote_reject_client(
requester=requester, requester=requester,
remote_room_hosts=remote_room_hosts, remote_room_hosts=remote_room_hosts,
room_id=room_id, room_id=room_id,
@ -68,16 +65,16 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
content=content, content=content,
) )
def _user_joined_room(self, target, room_id): async def _user_joined_room(self, target, room_id):
"""Implements RoomMemberHandler._user_joined_room """Implements RoomMemberHandler._user_joined_room
""" """
return self._notify_change_client( return await self._notify_change_client(
user_id=target.to_string(), room_id=room_id, change="joined" user_id=target.to_string(), room_id=room_id, change="joined"
) )
def _user_left_room(self, target, room_id): async def _user_left_room(self, target, room_id):
"""Implements RoomMemberHandler._user_left_room """Implements RoomMemberHandler._user_left_room
""" """
return self._notify_change_client( return await self._notify_change_client(
user_id=target.to_string(), room_id=room_id, change="left" user_id=target.to_string(), room_id=room_id, change="left"
) )