Generate real events when we reject invites (#7804)

Fixes #2181. 

The basic premise is that, when we
fail to reject an invite via the remote server, we can generate our own
out-of-band leave event and persist it as an outlier, so that we have something
to send to the client.
This commit is contained in:
Richard van der Hoff 2020-07-09 10:40:19 +01:00 committed by GitHub
parent 67593b1728
commit 2ab0b021f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 184 additions and 169 deletions

1
changelog.d/7804.bugfix Normal file
View File

@ -0,0 +1 @@
Fix 'stuck invites' which happen when we are unable to reject a room invite received over federation.

View File

@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import logging import logging
from typing import Optional, Tuple from typing import TYPE_CHECKING, Optional, Tuple
from canonicaljson import encode_canonical_json, json from canonicaljson import encode_canonical_json, json
@ -55,6 +55,9 @@ from synapse.visibility import filter_events_for_client
from ._base import BaseHandler from ._base import BaseHandler
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -349,7 +352,7 @@ _DUMMY_EVENT_ROOM_EXCLUSION_EXPIRY = 7 * 24 * 60 * 60 * 1000
class EventCreationHandler(object): class EventCreationHandler(object):
def __init__(self, hs): def __init__(self, hs: "HomeServer"):
self.hs = hs self.hs = hs
self.auth = hs.get_auth() self.auth = hs.get_auth()
self.store = hs.get_datastore() self.store = hs.get_datastore()
@ -814,11 +817,17 @@ class EventCreationHandler(object):
403, "This event is not allowed in this context", Codes.FORBIDDEN 403, "This event is not allowed in this context", Codes.FORBIDDEN
) )
try: if event.internal_metadata.is_out_of_band_membership():
await self.auth.check_from_context(room_version, event, context) # the only sort of out-of-band-membership events we expect to see here
except AuthError as err: # are invite rejections we have generated ourselves.
logger.warning("Denying new event %r because %s", event, err) assert event.type == EventTypes.Member
raise err assert event.content["membership"] == Membership.LEAVE
else:
try:
await self.auth.check_from_context(room_version, event, context)
except AuthError as err:
logger.warning("Denying new event %r because %s", event, err)
raise err
# Ensure that we can round trip before trying to persist in db # Ensure that we can round trip before trying to persist in db
try: try:

View File

@ -1,7 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd # Copyright 2016-2020 The Matrix.org Foundation C.I.C.
# Copyright 2018 New Vector Ltd
# Copyright 2019 The Matrix.org Foundation C.I.C.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -18,17 +16,21 @@
import abc import abc
import logging import logging
from http import HTTPStatus from http import HTTPStatus
from typing import Dict, Iterable, List, Optional, Tuple from typing import Dict, Iterable, List, Optional, Tuple, Union
from unpaddedbase64 import encode_base64
from synapse import types from synapse import types
from synapse.api.constants import EventTypes, Membership from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.api.room_versions import EventFormatVersions
from synapse.crypto.event_signing import compute_event_reference_hash
from synapse.events import EventBase from synapse.events import EventBase
from synapse.events.builder import create_local_event_from_event_dict
from synapse.events.snapshot import EventContext from synapse.events.snapshot import EventContext
from synapse.replication.http.membership import ( from synapse.events.validator import EventValidator
ReplicationLocallyRejectInviteRestServlet, from synapse.storage.roommember import RoomsForUser
) from synapse.types import Collection, JsonDict, Requester, RoomAlias, RoomID, UserID
from synapse.types import Collection, Requester, RoomAlias, RoomID, UserID
from synapse.util.async_helpers import Linearizer from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room from synapse.util.distributor import user_joined_room, user_left_room
@ -74,10 +76,6 @@ class RoomMemberHandler(object):
) )
if self._is_on_event_persistence_instance: if self._is_on_event_persistence_instance:
self.persist_event_storage = hs.get_storage().persistence self.persist_event_storage = hs.get_storage().persistence
else:
self._locally_reject_client = ReplicationLocallyRejectInviteRestServlet.make_client(
hs
)
# This is only used to get at ratelimit function, and # This is only used to get at ratelimit function, and
# maybe_kick_guest_users. It's fine there are multiple of these as # maybe_kick_guest_users. It's fine there are multiple of these as
@ -105,46 +103,28 @@ class RoomMemberHandler(object):
raise NotImplementedError() raise NotImplementedError()
@abc.abstractmethod @abc.abstractmethod
async def _remote_reject_invite( async def remote_reject_invite(
self, self,
invite_event_id: str,
txn_id: Optional[str],
requester: Requester, requester: Requester,
remote_room_hosts: List[str], content: JsonDict,
room_id: str,
target: UserID,
content: dict,
) -> Tuple[Optional[str], int]: ) -> Tuple[Optional[str], int]:
"""Attempt to reject an invite for a room this server is not in. If we """
fail to do so we locally mark the invite as rejected. Rejects an out-of-band invite we have received from a remote server
Args: Args:
requester invite_event_id: ID of the invite to be rejected
remote_room_hosts: List of servers to use to try and reject invite txn_id: optional transaction ID supplied by the client
room_id requester: user making the rejection request, according to the access token
target: The user rejecting the invite content: additional content to include in the rejection event.
content: The content for the rejection event Normally an empty dict.
Returns: Returns:
A dictionary to be returned to the client, may event id, stream_id of the leave event
include event_id etc, or nothing if we locally rejected
""" """
raise NotImplementedError() raise NotImplementedError()
async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
"""Mark the invite has having been rejected even though we failed to
create a leave event for it.
"""
if self._is_on_event_persistence_instance:
return await self.persist_event_storage.locally_reject_invite(
user_id, room_id
)
else:
result = await self._locally_reject_client(
instance_name=self._event_stream_writer_instance,
user_id=user_id,
room_id=room_id,
)
return result["stream_id"]
@abc.abstractmethod @abc.abstractmethod
async def _user_joined_room(self, target: UserID, room_id: str) -> None: async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Notifies distributor on master process that the user has joined the """Notifies distributor on master process that the user has joined the
@ -485,11 +465,17 @@ class RoomMemberHandler(object):
elif effective_membership_state == Membership.LEAVE: elif effective_membership_state == Membership.LEAVE:
if not is_host_in_room: if not is_host_in_room:
# perhaps we've been invited # perhaps we've been invited
inviter = await self._get_inviter(target.to_string(), room_id) invite = await self.store.get_invite_for_local_user_in_room(
if not inviter: user_id=target.to_string(), room_id=room_id
) # type: Optional[RoomsForUser]
if not invite:
raise SynapseError(404, "Not a known room") raise SynapseError(404, "Not a known room")
if self.hs.is_mine(inviter): logger.info(
"%s rejects invite to %s from %s", target, room_id, invite.sender
)
if self.hs.is_mine_id(invite.sender):
# the inviter was on our server, but has now left. Carry on # the inviter was on our server, but has now left. Carry on
# with the normal rejection codepath. # with the normal rejection codepath.
# #
@ -497,10 +483,10 @@ class RoomMemberHandler(object):
# active on other servers. # active on other servers.
pass pass
else: else:
# send the rejection to the inviter's HS. # send the rejection to the inviter's HS (with fallback to
remote_room_hosts = remote_room_hosts + [inviter.domain] # local event)
return await self._remote_reject_invite( return await self.remote_reject_invite(
requester, remote_room_hosts, room_id, target, content, invite.event_id, txn_id, requester, content,
) )
return await self._local_membership_update( return await self._local_membership_update(
@ -1014,33 +1000,119 @@ class RoomMemberMasterHandler(RoomMemberHandler):
return event_id, stream_id return event_id, stream_id
async def _remote_reject_invite( async def remote_reject_invite(
self, self,
invite_event_id: str,
txn_id: Optional[str],
requester: Requester, requester: Requester,
remote_room_hosts: List[str], content: JsonDict,
room_id: str,
target: UserID,
content: dict,
) -> Tuple[Optional[str], int]: ) -> Tuple[Optional[str], int]:
"""Implements RoomMemberHandler._remote_reject_invite
""" """
Rejects an out-of-band invite received from a remote user
Implements RoomMemberHandler.remote_reject_invite
"""
invite_event = await self.store.get_event(invite_event_id)
room_id = invite_event.room_id
target_user = invite_event.state_key
# first of all, try doing a rejection via the inviting server
fed_handler = self.federation_handler fed_handler = self.federation_handler
try: try:
inviter_id = UserID.from_string(invite_event.sender)
event, stream_id = await fed_handler.do_remotely_reject_invite( event, stream_id = await fed_handler.do_remotely_reject_invite(
remote_room_hosts, room_id, target.to_string(), content=content, [inviter_id.domain], room_id, target_user, content=content
) )
return event.event_id, stream_id return event.event_id, stream_id
except Exception as e: except Exception as e:
# if we were unable to reject the exception, just mark # if we were unable to reject the invite, we will generate our own
# it as rejected on our end and plough ahead. # leave event.
# #
# The 'except' clause is very broad, but we need to # The 'except' clause is very broad, but we need to
# capture everything from DNS failures upwards # capture everything from DNS failures upwards
# #
logger.warning("Failed to reject invite: %s", e) logger.warning("Failed to reject invite: %s", e)
stream_id = await self.locally_reject_invite(target.to_string(), room_id) return await self._locally_reject_invite(
return None, stream_id invite_event, txn_id, requester, content
)
async def _locally_reject_invite(
self,
invite_event: EventBase,
txn_id: Optional[str],
requester: Requester,
content: JsonDict,
) -> Tuple[str, int]:
"""Generate a local invite rejection
This is called after we fail to reject an invite via a remote server. It
generates an out-of-band membership event locally.
Args:
invite_event: the invite to be rejected
txn_id: optional transaction ID supplied by the client
requester: user making the rejection request, according to the access token
content: additional content to include in the rejection event.
Normally an empty dict.
"""
room_id = invite_event.room_id
target_user = invite_event.state_key
room_version = await self.store.get_room_version(room_id)
content["membership"] = Membership.LEAVE
# the auth events for the new event are the same as that of the invite, plus
# the invite itself.
#
# the prev_events are just the invite.
invite_hash = invite_event.event_id # type: Union[str, Tuple]
if room_version.event_format == EventFormatVersions.V1:
alg, h = compute_event_reference_hash(invite_event)
invite_hash = (invite_event.event_id, {alg: encode_base64(h)})
auth_events = invite_event.auth_events + (invite_hash,)
prev_events = (invite_hash,)
# we cap depth of generated events, to ensure that they are not
# rejected by other servers (and so that they can be persisted in
# the db)
depth = min(invite_event.depth + 1, MAX_DEPTH)
event_dict = {
"depth": depth,
"auth_events": auth_events,
"prev_events": prev_events,
"type": EventTypes.Member,
"room_id": room_id,
"sender": target_user,
"content": content,
"state_key": target_user,
}
event = create_local_event_from_event_dict(
clock=self.clock,
hostname=self.hs.hostname,
signing_key=self.hs.signing_key,
room_version=room_version,
event_dict=event_dict,
)
event.internal_metadata.outlier = True
event.internal_metadata.out_of_band_membership = True
if txn_id is not None:
event.internal_metadata.txn_id = txn_id
if requester.access_token_id is not None:
event.internal_metadata.token_id = requester.access_token_id
EventValidator().validate_new(event, self.config)
context = await self.state_handler.compute_event_context(event)
context.app_service = requester.app_service
stream_id = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[UserID.from_string(target_user)],
)
return event.event_id, stream_id
async def _user_joined_room(self, target: UserID, room_id: str) -> None: async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_joined_room """Implements RoomMemberHandler._user_joined_room

View File

@ -61,21 +61,22 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
return ret["event_id"], ret["stream_id"] return ret["event_id"], ret["stream_id"]
async def _remote_reject_invite( async def remote_reject_invite(
self, self,
invite_event_id: str,
txn_id: Optional[str],
requester: Requester, requester: Requester,
remote_room_hosts: List[str],
room_id: str,
target: UserID,
content: dict, content: dict,
) -> Tuple[Optional[str], int]: ) -> Tuple[Optional[str], int]:
"""Implements RoomMemberHandler._remote_reject_invite """
Rejects an out-of-band invite received from a remote user
Implements RoomMemberHandler.remote_reject_invite
""" """
ret = await self._remote_reject_client( ret = await self._remote_reject_client(
invite_event_id=invite_event_id,
txn_id=txn_id,
requester=requester, requester=requester,
remote_room_hosts=remote_room_hosts,
room_id=room_id,
user_id=target.to_string(),
content=content, content=content,
) )
return ret["event_id"], ret["stream_id"] return ret["event_id"], ret["stream_id"]

View File

@ -14,11 +14,11 @@
# limitations under the License. # limitations under the License.
import logging import logging
from typing import TYPE_CHECKING from typing import TYPE_CHECKING, Optional
from synapse.http.servlet import parse_json_object_from_request from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import Requester, UserID from synapse.types import JsonDict, Requester, UserID
from synapse.util.distributor import user_joined_room, user_left_room from synapse.util.distributor import user_joined_room, user_left_room
if TYPE_CHECKING: if TYPE_CHECKING:
@ -88,49 +88,54 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
"""Rejects the invite for the user and room. """Rejects an out-of-band invite we have received from a remote server
Request format: Request format:
POST /_synapse/replication/remote_reject_invite/:room_id/:user_id POST /_synapse/replication/remote_reject_invite/:event_id
{ {
"txn_id": ...,
"requester": ..., "requester": ...,
"remote_room_hosts": [...],
"content": { ... } "content": { ... }
} }
""" """
NAME = "remote_reject_invite" NAME = "remote_reject_invite"
PATH_ARGS = ("room_id", "user_id") PATH_ARGS = ("invite_event_id",)
def __init__(self, hs): def __init__(self, hs: "HomeServer"):
super(ReplicationRemoteRejectInviteRestServlet, self).__init__(hs) super(ReplicationRemoteRejectInviteRestServlet, self).__init__(hs)
self.federation_handler = hs.get_handlers().federation_handler
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.member_handler = hs.get_room_member_handler() self.member_handler = hs.get_room_member_handler()
@staticmethod @staticmethod
def _serialize_payload(requester, room_id, user_id, remote_room_hosts, content): def _serialize_payload( # type: ignore
invite_event_id: str,
txn_id: Optional[str],
requester: Requester,
content: JsonDict,
):
""" """
Args: Args:
requester(Requester) invite_event_id: ID of the invite to be rejected
room_id (str) txn_id: optional transaction ID supplied by the client
user_id (str) requester: user making the rejection request, according to the access token
remote_room_hosts (list[str]): Servers to try and reject via content: additional content to include in the rejection event.
Normally an empty dict.
""" """
return { return {
"txn_id": txn_id,
"requester": requester.serialize(), "requester": requester.serialize(),
"remote_room_hosts": remote_room_hosts,
"content": content, "content": content,
} }
async def _handle_request(self, request, room_id, user_id): async def _handle_request(self, request, invite_event_id):
content = parse_json_object_from_request(request) content = parse_json_object_from_request(request)
remote_room_hosts = content["remote_room_hosts"] txn_id = content["txn_id"]
event_content = content["content"] event_content = content["content"]
requester = Requester.deserialize(self.store, content["requester"]) requester = Requester.deserialize(self.store, content["requester"])
@ -138,60 +143,14 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
if requester.user: if requester.user:
request.authenticated_entity = requester.user.to_string() request.authenticated_entity = requester.user.to_string()
logger.info("remote_reject_invite: %s out of room: %s", user_id, room_id) # hopefully we're now on the master, so this won't recurse!
event_id, stream_id = await self.member_handler.remote_reject_invite(
try: invite_event_id, txn_id, requester, event_content,
event, stream_id = await self.federation_handler.do_remotely_reject_invite( )
remote_room_hosts, room_id, user_id, event_content,
)
event_id = event.event_id
except Exception as e:
# if we were unable to reject the exception, just mark
# it as rejected on our end and plough ahead.
#
# The 'except' clause is very broad, but we need to
# capture everything from DNS failures upwards
#
logger.warning("Failed to reject invite: %s", e)
stream_id = await self.member_handler.locally_reject_invite(
user_id, room_id
)
event_id = None
return 200, {"event_id": event_id, "stream_id": stream_id} return 200, {"event_id": event_id, "stream_id": stream_id}
class ReplicationLocallyRejectInviteRestServlet(ReplicationEndpoint):
"""Rejects the invite for the user and room locally.
Request format:
POST /_synapse/replication/locally_reject_invite/:room_id/:user_id
{}
"""
NAME = "locally_reject_invite"
PATH_ARGS = ("room_id", "user_id")
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.member_handler = hs.get_room_member_handler()
@staticmethod
def _serialize_payload(room_id, user_id):
return {}
async def _handle_request(self, request, room_id, user_id):
logger.info("locally_reject_invite: %s out of room: %s", user_id, room_id)
stream_id = await self.member_handler.locally_reject_invite(user_id, room_id)
return 200, {"stream_id": stream_id}
class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint): class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
"""Notifies that a user has joined or left the room """Notifies that a user has joined or left the room
@ -245,4 +204,3 @@ def register_servlets(hs, http_server):
ReplicationRemoteJoinRestServlet(hs).register(http_server) ReplicationRemoteJoinRestServlet(hs).register(http_server)
ReplicationRemoteRejectInviteRestServlet(hs).register(http_server) ReplicationRemoteRejectInviteRestServlet(hs).register(http_server)
ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server) ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server)
ReplicationLocallyRejectInviteRestServlet(hs).register(http_server)

View File

@ -1541,23 +1541,3 @@ class PersistEventsStore:
if not ev.internal_metadata.is_outlier() if not ev.internal_metadata.is_outlier()
], ],
) )
async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
"""Mark the invite has having been rejected even though we failed to
create a leave event for it.
"""
def f(txn, stream_ordering):
# Clear this entry from `local_current_membership`.
# Ideally we'd point to a leave event, but we don't have one, so
# nevermind.
self.db.simple_delete_txn(
txn,
table="local_current_membership",
keyvalues={"room_id": room_id, "user_id": user_id},
)
with self._stream_id_gen.get_next() as stream_ordering:
await self.db.runInteraction("locally_reject_invite", f, stream_ordering)
return stream_ordering

View File

@ -783,9 +783,3 @@ class EventsPersistenceStorage(object):
for user_id in left_users: for user_id in left_users:
await self.main_store.mark_remote_user_device_list_as_unsubscribed(user_id) await self.main_store.mark_remote_user_device_list_as_unsubscribed(user_id)
async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
"""Mark the invite has having been rejected even though we failed to
create a leave event for it.
"""
return await self.persist_events_store.locally_reject_invite(user_id, room_id)