Merge pull request #2987 from matrix-org/erikj/split_room_member_handler

Split RoomMemberHandler into base and master class
This commit is contained in:
Erik Johnston 2018-03-13 17:40:00 +00:00 committed by GitHub
commit 1a69c6d590
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 189 additions and 100 deletions

View File

@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import abc
import logging import logging
from signedjson.key import decode_verify_key_bytes from signedjson.key import decode_verify_key_bytes
@ -31,6 +31,7 @@ from synapse.types import UserID, RoomID
from synapse.util.async import Linearizer from synapse.util.async import Linearizer
from synapse.util.distributor import user_left_room, user_joined_room from synapse.util.distributor import user_left_room, user_joined_room
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
id_server_scheme = "https://" id_server_scheme = "https://"
@ -42,6 +43,8 @@ class RoomMemberHandler(object):
# API that takes ID strings and returns pagination chunks. These concerns # API that takes ID strings and returns pagination chunks. These concerns
# ought to be separated out a lot better. # ought to be separated out a lot better.
__metaclass__ = abc.ABCMeta
def __init__(self, hs): def __init__(self, hs):
self.hs = hs self.hs = hs
self.store = hs.get_datastore() self.store = hs.get_datastore()
@ -61,9 +64,87 @@ class RoomMemberHandler(object):
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.spam_checker = hs.get_spam_checker() self.spam_checker = hs.get_spam_checker()
self.distributor = hs.get_distributor() @abc.abstractmethod
self.distributor.declare("user_joined_room") def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
self.distributor.declare("user_left_room") """Try and join a room that this server is not in
Args:
requester (Requester)
remote_room_hosts (list[str]): List of servers that can be used
to join via.
room_id (str): Room that we are trying to join
user (UserID): User who is trying to join
content (dict): A dict that should be used as the content of the
join event.
Returns:
Deferred
"""
raise NotImplementedError()
@abc.abstractmethod
def _remote_reject_invite(self, remote_room_hosts, room_id, target):
"""Attempt to reject an invite for a room this server is not in. If we
fail to do so we locally mark the invite as rejected.
Args:
requester (Requester)
remote_room_hosts (list[str]): List of servers to use to try and
reject invite
room_id (str)
target (UserID): The user rejecting the invite
Returns:
Deferred[dict]: A dictionary to be returned to the client, may
include event_id etc, or nothing if we locally rejected
"""
raise NotImplementedError()
@abc.abstractmethod
def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
"""Get a guest access token for a 3PID, creating a guest account if
one doesn't already exist.
Args:
requester (Requester)
medium (str)
address (str)
inviter_user_id (str): The user ID who is trying to invite the
3PID
Returns:
Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the
3PID guest account.
"""
raise NotImplementedError()
@abc.abstractmethod
def _user_joined_room(self, target, room_id):
"""Notifies distributor on master process that the user has joined the
room.
Args:
target (UserID)
room_id (str)
Returns:
Deferred|None
"""
raise NotImplementedError()
@abc.abstractmethod
def _user_left_room(self, target, room_id):
"""Notifies distributor on master process that the user has left the
room.
Args:
target (UserID)
room_id (str)
Returns:
Deferred|None
"""
raise NotImplementedError()
@defer.inlineCallbacks @defer.inlineCallbacks
def _local_membership_update( def _local_membership_update(
@ -127,82 +208,15 @@ class RoomMemberHandler(object):
prev_member_event = yield self.store.get_event(prev_member_event_id) prev_member_event = yield self.store.get_event(prev_member_event_id)
newly_joined = prev_member_event.membership != Membership.JOIN newly_joined = prev_member_event.membership != Membership.JOIN
if newly_joined: if newly_joined:
yield user_joined_room(self.distributor, target, room_id) yield self._user_joined_room(target, room_id)
elif event.membership == Membership.LEAVE: elif event.membership == Membership.LEAVE:
if prev_member_event_id: if prev_member_event_id:
prev_member_event = yield self.store.get_event(prev_member_event_id) prev_member_event = yield self.store.get_event(prev_member_event_id)
if prev_member_event.membership == Membership.JOIN: if prev_member_event.membership == Membership.JOIN:
user_left_room(self.distributor, target, room_id) yield self._user_left_room(target, room_id)
defer.returnValue(event) defer.returnValue(event)
@defer.inlineCallbacks
def _remote_join(self, remote_room_hosts, room_id, user, content):
"""Try and join a room that this server is not in
Args:
remote_room_hosts (list[str]): List of servers that can be used
to join via.
room_id (str): Room that we are trying to join
user (UserID): User who is trying to join
content (dict): A dict that should be used as the content of the
join event.
Returns:
Deferred
"""
if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers")
# We don't do an auth check if we are doing an invite
# join dance for now, since we're kinda implicitly checking
# that we are allowed to join when we decide whether or not we
# need to do the invite/join dance.
yield self.federation_handler.do_invite_join(
remote_room_hosts,
room_id,
user.to_string(),
content,
)
yield user_joined_room(self.distributor, user, room_id)
@defer.inlineCallbacks
def _remote_reject_invite(self, remote_room_hosts, room_id, target):
"""Attempt to reject an invite for a room this server is not in. If we
fail to do so we locally mark the invite as rejected.
Args:
remote_room_hosts (list[str]): List of servers to use to try and
reject invite
room_id (str)
target (UserID): The user rejecting the invite
Returns:
Deferred[dict]: A dictionary to be returned to the client, may
include event_id etc, or nothing if we locally rejected
"""
fed_handler = self.federation_handler
try:
ret = yield fed_handler.do_remotely_reject_invite(
remote_room_hosts,
room_id,
target.to_string(),
)
defer.returnValue(ret)
except Exception as e:
# if we were unable to reject the exception, just mark
# it as rejected on our end and plough ahead.
#
# The 'except' clause is very broad, but we need to
# capture everything from DNS failures upwards
#
logger.warn("Failed to reject invite: %s", e)
yield self.store.locally_reject_invite(
target.to_string(), room_id
)
defer.returnValue({})
@defer.inlineCallbacks @defer.inlineCallbacks
def update_membership( def update_membership(
self, self,
@ -476,12 +490,12 @@ class RoomMemberHandler(object):
prev_member_event = yield self.store.get_event(prev_member_event_id) prev_member_event = yield self.store.get_event(prev_member_event_id)
newly_joined = prev_member_event.membership != Membership.JOIN newly_joined = prev_member_event.membership != Membership.JOIN
if newly_joined: if newly_joined:
yield user_joined_room(self.distributor, target_user, room_id) yield self._user_joined_room(target_user, room_id)
elif event.membership == Membership.LEAVE: elif event.membership == Membership.LEAVE:
if prev_member_event_id: if prev_member_event_id:
prev_member_event = yield self.store.get_event(prev_member_event_id) prev_member_event = yield self.store.get_event(prev_member_event_id)
if prev_member_event.membership == Membership.JOIN: if prev_member_event.membership == Membership.JOIN:
user_left_room(self.distributor, target_user, room_id) yield self._user_left_room(target_user, room_id)
@defer.inlineCallbacks @defer.inlineCallbacks
def _can_guest_join(self, current_state_ids): def _can_guest_join(self, current_state_ids):
@ -672,6 +686,7 @@ class RoomMemberHandler(object):
token, public_keys, fallback_public_key, display_name = ( token, public_keys, fallback_public_key, display_name = (
yield self._ask_id_server_for_third_party_invite( yield self._ask_id_server_for_third_party_invite(
requester=requester,
id_server=id_server, id_server=id_server,
medium=medium, medium=medium,
address=address, address=address,
@ -708,6 +723,7 @@ class RoomMemberHandler(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def _ask_id_server_for_third_party_invite( def _ask_id_server_for_third_party_invite(
self, self,
requester,
id_server, id_server,
medium, medium,
address, address,
@ -724,6 +740,7 @@ class RoomMemberHandler(object):
Asks an identity server for a third party invite. Asks an identity server for a third party invite.
Args: Args:
requester (Requester)
id_server (str): hostname + optional port for the identity server. id_server (str): hostname + optional port for the identity server.
medium (str): The literal string "email". medium (str): The literal string "email".
address (str): The third party address being invited. address (str): The third party address being invited.
@ -766,8 +783,8 @@ class RoomMemberHandler(object):
} }
if self.config.invite_3pid_guest: if self.config.invite_3pid_guest:
rh = self.registration_handler guest_access_token, guest_user_id = yield self.get_or_register_3pid_guest(
guest_user_id, guest_access_token = yield rh.get_or_register_3pid_guest( requester=requester,
medium=medium, medium=medium,
address=address, address=address,
inviter_user_id=inviter_user_id, inviter_user_id=inviter_user_id,
@ -800,27 +817,6 @@ class RoomMemberHandler(object):
display_name = data["display_name"] display_name = data["display_name"]
defer.returnValue((token, public_keys, fallback_public_key, display_name)) defer.returnValue((token, public_keys, fallback_public_key, display_name))
@defer.inlineCallbacks
def forget(self, user, room_id):
user_id = user.to_string()
member = yield self.state_handler.get_current_state(
room_id=room_id,
event_type=EventTypes.Member,
state_key=user_id
)
membership = member.membership if member else None
if membership is not None and membership not in [
Membership.LEAVE, Membership.BAN
]:
raise SynapseError(400, "User %s in room %s" % (
user_id, room_id
))
if membership:
yield self.store.forget(user_id, room_id)
@defer.inlineCallbacks @defer.inlineCallbacks
def _is_host_in_room(self, current_state_ids): def _is_host_in_room(self, current_state_ids):
# Have we just created the room, and is this about to be the very # Have we just created the room, and is this about to be the very
@ -842,3 +838,94 @@ class RoomMemberHandler(object):
defer.returnValue(True) defer.returnValue(True)
defer.returnValue(False) defer.returnValue(False)
class RoomMemberMasterHandler(RoomMemberHandler):
def __init__(self, hs):
super(RoomMemberMasterHandler, self).__init__(hs)
self.distributor = hs.get_distributor()
self.distributor.declare("user_joined_room")
self.distributor.declare("user_left_room")
@defer.inlineCallbacks
def _remote_join(self, remote_room_hosts, room_id, user, content):
"""Implements RoomMemberHandler._remote_join
"""
if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers")
# We don't do an auth check if we are doing an invite
# join dance for now, since we're kinda implicitly checking
# that we are allowed to join when we decide whether or not we
# need to do the invite/join dance.
yield self.federation_handler.do_invite_join(
remote_room_hosts,
room_id,
user.to_string(),
content,
)
yield self._user_joined_room(user, room_id)
@defer.inlineCallbacks
def _remote_reject_invite(self, remote_room_hosts, room_id, target):
"""Implements RoomMemberHandler._remote_reject_invite
"""
fed_handler = self.federation_handler
try:
ret = yield fed_handler.do_remotely_reject_invite(
remote_room_hosts,
room_id,
target.to_string(),
)
defer.returnValue(ret)
except Exception as e:
# if we were unable to reject the exception, just mark
# it as rejected on our end and plough ahead.
#
# The 'except' clause is very broad, but we need to
# capture everything from DNS failures upwards
#
logger.warn("Failed to reject invite: %s", e)
yield self.store.locally_reject_invite(
target.to_string(), room_id
)
defer.returnValue({})
def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
"""Implements RoomMemberHandler.get_or_register_3pid_guest
"""
rg = self.registration_handler
return rg.get_or_register_3pid_guest(medium, address, inviter_user_id)
def _user_joined_room(self, target, room_id):
"""Implements RoomMemberHandler._user_joined_room
"""
return user_joined_room(self.distributor, target, room_id)
def _user_left_room(self, target, room_id):
"""Implements RoomMemberHandler._user_left_room
"""
return user_left_room(self.distributor, target, room_id)
@defer.inlineCallbacks
def forget(self, user, room_id):
user_id = user.to_string()
member = yield self.state_handler.get_current_state(
room_id=room_id,
event_type=EventTypes.Member,
state_key=user_id
)
membership = member.membership if member else None
if membership is not None and membership not in [
Membership.LEAVE, Membership.BAN
]:
raise SynapseError(400, "User %s in room %s" % (
user_id, room_id
))
if membership:
yield self.store.forget(user_id, room_id)

View File

@ -47,7 +47,7 @@ from synapse.handlers.device import DeviceHandler
from synapse.handlers.e2e_keys import E2eKeysHandler from synapse.handlers.e2e_keys import E2eKeysHandler
from synapse.handlers.presence import PresenceHandler from synapse.handlers.presence import PresenceHandler
from synapse.handlers.room_list import RoomListHandler from synapse.handlers.room_list import RoomListHandler
from synapse.handlers.room_member import RoomMemberHandler from synapse.handlers.room_member import RoomMemberMasterHandler
from synapse.handlers.set_password import SetPasswordHandler from synapse.handlers.set_password import SetPasswordHandler
from synapse.handlers.sync import SyncHandler from synapse.handlers.sync import SyncHandler
from synapse.handlers.typing import TypingHandler from synapse.handlers.typing import TypingHandler
@ -392,7 +392,9 @@ class HomeServer(object):
return SpamChecker(self) return SpamChecker(self)
def build_room_member_handler(self): def build_room_member_handler(self):
return RoomMemberHandler(self) if self.config.worker_app:
raise Exception("Can't use RoomMemberHandler on workers")
return RoomMemberMasterHandler(self)
def build_federation_registry(self): def build_federation_registry(self):
return FederationHandlerRegistry() return FederationHandlerRegistry()