diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 2c811906d..ac716a811 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -15,13 +15,11 @@ from twisted.internet import defer -from synapse.api.errors import LimitExceededError, SynapseError, AuthError -from synapse.crypto.event_signing import add_hashes_and_signatures +from synapse.api.errors import LimitExceededError from synapse.api.constants import Membership, EventTypes -from synapse.types import UserID, RoomAlias, Requester, get_domian_from_id -from synapse.push.action_generator import ActionGenerator +from synapse.types import UserID, Requester -from synapse.util.logcontext import PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import preserve_fn import logging @@ -65,7 +63,6 @@ class BaseHandler(object): self.clock = hs.get_clock() self.hs = hs - self.signing_key = hs.config.signing_key[0] self.server_name = hs.hostname self.event_builder_factory = hs.get_event_builder_factory() @@ -248,56 +245,6 @@ class BaseHandler(object): retry_after_ms=int(1000 * (time_allowed - time_now)), ) - @defer.inlineCallbacks - def _create_new_client_event(self, builder, prev_event_ids=None): - if prev_event_ids: - prev_events = yield self.store.add_event_hashes(prev_event_ids) - prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) - depth = prev_max_depth + 1 - else: - latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room( - builder.room_id, - ) - - if latest_ret: - depth = max([d for _, _, d in latest_ret]) + 1 - else: - depth = 1 - - prev_events = [ - (event_id, prev_hashes) - for event_id, prev_hashes, _ in latest_ret - ] - - builder.prev_events = prev_events - builder.depth = depth - - state_handler = self.state_handler - - context = yield state_handler.compute_event_context(builder) - - if builder.is_state(): - builder.prev_state = yield self.store.add_event_hashes( - context.prev_state_events - ) - - yield self.auth.add_auth_events(builder, context) - - add_hashes_and_signatures( - builder, self.server_name, self.signing_key - ) - - event = builder.build() - - logger.debug( - "Created event %s with current state: %s", - event.event_id, context.current_state, - ) - - defer.returnValue( - (event, context,) - ) - def is_host_in_room(self, current_state): room_members = [ (state_key, event.membership) @@ -318,145 +265,6 @@ class BaseHandler(object): return True return False - @defer.inlineCallbacks - def handle_new_client_event( - self, - requester, - event, - context, - ratelimit=True, - extra_users=[] - ): - # We now need to go and hit out to wherever we need to hit out to. - - if ratelimit: - self.ratelimit(requester) - - try: - self.auth.check(event, auth_events=context.current_state) - except AuthError as err: - logger.warn("Denying new event %r because %s", event, err) - raise err - - yield self.maybe_kick_guest_users(event, context.current_state.values()) - - if event.type == EventTypes.CanonicalAlias: - # Check the alias is acually valid (at this time at least) - room_alias_str = event.content.get("alias", None) - if room_alias_str: - room_alias = RoomAlias.from_string(room_alias_str) - directory_handler = self.hs.get_handlers().directory_handler - mapping = yield directory_handler.get_association(room_alias) - - if mapping["room_id"] != event.room_id: - raise SynapseError( - 400, - "Room alias %s does not point to the room" % ( - room_alias_str, - ) - ) - - federation_handler = self.hs.get_handlers().federation_handler - - if event.type == EventTypes.Member: - if event.content["membership"] == Membership.INVITE: - def is_inviter_member_event(e): - return ( - e.type == EventTypes.Member and - e.sender == event.sender - ) - - event.unsigned["invite_room_state"] = [ - { - "type": e.type, - "state_key": e.state_key, - "content": e.content, - "sender": e.sender, - } - for k, e in context.current_state.items() - if e.type in self.hs.config.room_invite_state_types - or is_inviter_member_event(e) - ] - - invitee = UserID.from_string(event.state_key) - if not self.hs.is_mine(invitee): - # TODO: Can we add signature from remote server in a nicer - # way? If we have been invited by a remote server, we need - # to get them to sign the event. - - returned_invite = yield federation_handler.send_invite( - invitee.domain, - event, - ) - - event.unsigned.pop("room_state", None) - - # TODO: Make sure the signatures actually are correct. - event.signatures.update( - returned_invite.signatures - ) - - if event.type == EventTypes.Redaction: - if self.auth.check_redaction(event, auth_events=context.current_state): - original_event = yield self.store.get_event( - event.redacts, - check_redacted=False, - get_prev_content=False, - allow_rejected=False, - allow_none=False - ) - if event.user_id != original_event.user_id: - raise AuthError( - 403, - "You don't have permission to redact events" - ) - - if event.type == EventTypes.Create and context.current_state: - raise AuthError( - 403, - "Changing the room create event is forbidden", - ) - - action_generator = ActionGenerator(self.hs) - yield action_generator.handle_push_actions_for_event( - event, context, self - ) - - (event_stream_id, max_stream_id) = yield self.store.persist_event( - event, context=context - ) - - # this intentionally does not yield: we don't care about the result - # and don't need to wait for it. - preserve_fn(self.hs.get_pusherpool().on_new_notifications)( - event_stream_id, max_stream_id - ) - - destinations = set() - for k, s in context.current_state.items(): - try: - if k[0] == EventTypes.Member: - if s.content["membership"] == Membership.JOIN: - destinations.add(get_domian_from_id(s.state_key)) - except SynapseError: - logger.warn( - "Failed to get destination from event %s", s.event_id - ) - - with PreserveLoggingContext(): - # Don't block waiting on waking up all the listeners. - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users - ) - - # If invite, remove room_state from unsigned before sending. - event.unsigned.pop("invite_room_state", None) - - federation_handler.handle_new_event( - event, destinations=destinations, - ) - @defer.inlineCallbacks def maybe_kick_guest_users(self, event, current_state): # Technically this function invalidates current_state by changing it. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f38c6a871..4a65b246e 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -682,7 +682,8 @@ class FederationHandler(BaseHandler): }) try: - event, context = yield self._create_new_client_event( + message_handler = self.hs.get_handlers().message_handler + event, context = yield message_handler._create_new_client_event( builder=builder, ) except AuthError as e: @@ -913,7 +914,8 @@ class FederationHandler(BaseHandler): "state_key": user_id, }) - event, context = yield self._create_new_client_event( + message_handler = self.hs.get_handlers().message_handler + event, context = yield message_handler._create_new_client_event( builder=builder, ) @@ -1688,7 +1690,10 @@ class FederationHandler(BaseHandler): if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)): builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) - event, context = yield self._create_new_client_event(builder=builder) + message_handler = self.hs.get_handlers().message_handler + event, context = yield message_handler._create_new_client_event( + builder=builder + ) event, context = yield self.add_display_name_to_third_party_invite( event_dict, event, context @@ -1716,7 +1721,8 @@ class FederationHandler(BaseHandler): def on_exchange_third_party_invite_request(self, origin, room_id, event_dict): builder = self.event_builder_factory.new(event_dict) - event, context = yield self._create_new_client_event( + message_handler = self.hs.get_handlers().message_handler + event, context = yield message_handler._create_new_client_event( builder=builder, ) @@ -1755,7 +1761,8 @@ class FederationHandler(BaseHandler): event_dict["content"]["third_party_invite"]["display_name"] = display_name builder = self.event_builder_factory.new(event_dict) EventValidator().validate_new(builder) - event, context = yield self._create_new_client_event(builder=builder) + message_handler = self.hs.get_handlers().message_handler + event, context = yield message_handler._create_new_client_event(builder=builder) defer.returnValue((event, context)) @defer.inlineCallbacks diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 7d9e3cf36..45d3d47fc 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -17,13 +17,18 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, SynapseError -from synapse.streams.config import PaginationConfig +from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator +from synapse.push.action_generator import ActionGenerator +from synapse.streams.config import PaginationConfig +from synapse.types import ( + UserID, RoomAlias, RoomStreamToken, StreamToken, get_domian_from_id +) from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.types import UserID, RoomStreamToken, StreamToken +from synapse.util.logcontext import PreserveLoggingContext, preserve_fn from ._base import BaseHandler @@ -43,6 +48,7 @@ class MessageHandler(BaseHandler): self.clock = hs.get_clock() self.validator = EventValidator() self.snapshot_cache = SnapshotCache() + self.signing_key = hs.config.signing_key[0] @defer.inlineCallbacks def get_messages(self, requester, room_id=None, pagin_config=None, @@ -724,3 +730,192 @@ class MessageHandler(BaseHandler): ret["membership"] = membership defer.returnValue(ret) + + @defer.inlineCallbacks + def _create_new_client_event(self, builder, prev_event_ids=None): + if prev_event_ids: + prev_events = yield self.store.add_event_hashes(prev_event_ids) + prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) + depth = prev_max_depth + 1 + else: + latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room( + builder.room_id, + ) + + if latest_ret: + depth = max([d for _, _, d in latest_ret]) + 1 + else: + depth = 1 + + prev_events = [ + (event_id, prev_hashes) + for event_id, prev_hashes, _ in latest_ret + ] + + builder.prev_events = prev_events + builder.depth = depth + + state_handler = self.state_handler + + context = yield state_handler.compute_event_context(builder) + + if builder.is_state(): + builder.prev_state = yield self.store.add_event_hashes( + context.prev_state_events + ) + + yield self.auth.add_auth_events(builder, context) + + add_hashes_and_signatures( + builder, self.server_name, self.signing_key + ) + + event = builder.build() + + logger.debug( + "Created event %s with current state: %s", + event.event_id, context.current_state, + ) + + defer.returnValue( + (event, context,) + ) + + @defer.inlineCallbacks + def handle_new_client_event( + self, + requester, + event, + context, + ratelimit=True, + extra_users=[] + ): + # We now need to go and hit out to wherever we need to hit out to. + + if ratelimit: + self.ratelimit(requester) + + try: + self.auth.check(event, auth_events=context.current_state) + except AuthError as err: + logger.warn("Denying new event %r because %s", event, err) + raise err + + yield self.maybe_kick_guest_users(event, context.current_state.values()) + + if event.type == EventTypes.CanonicalAlias: + # Check the alias is acually valid (at this time at least) + room_alias_str = event.content.get("alias", None) + if room_alias_str: + room_alias = RoomAlias.from_string(room_alias_str) + directory_handler = self.hs.get_handlers().directory_handler + mapping = yield directory_handler.get_association(room_alias) + + if mapping["room_id"] != event.room_id: + raise SynapseError( + 400, + "Room alias %s does not point to the room" % ( + room_alias_str, + ) + ) + + federation_handler = self.hs.get_handlers().federation_handler + + if event.type == EventTypes.Member: + if event.content["membership"] == Membership.INVITE: + def is_inviter_member_event(e): + return ( + e.type == EventTypes.Member and + e.sender == event.sender + ) + + event.unsigned["invite_room_state"] = [ + { + "type": e.type, + "state_key": e.state_key, + "content": e.content, + "sender": e.sender, + } + for k, e in context.current_state.items() + if e.type in self.hs.config.room_invite_state_types + or is_inviter_member_event(e) + ] + + invitee = UserID.from_string(event.state_key) + if not self.hs.is_mine(invitee): + # TODO: Can we add signature from remote server in a nicer + # way? If we have been invited by a remote server, we need + # to get them to sign the event. + + returned_invite = yield federation_handler.send_invite( + invitee.domain, + event, + ) + + event.unsigned.pop("room_state", None) + + # TODO: Make sure the signatures actually are correct. + event.signatures.update( + returned_invite.signatures + ) + + if event.type == EventTypes.Redaction: + if self.auth.check_redaction(event, auth_events=context.current_state): + original_event = yield self.store.get_event( + event.redacts, + check_redacted=False, + get_prev_content=False, + allow_rejected=False, + allow_none=False + ) + if event.user_id != original_event.user_id: + raise AuthError( + 403, + "You don't have permission to redact events" + ) + + if event.type == EventTypes.Create and context.current_state: + raise AuthError( + 403, + "Changing the room create event is forbidden", + ) + + action_generator = ActionGenerator(self.hs) + yield action_generator.handle_push_actions_for_event( + event, context, self + ) + + (event_stream_id, max_stream_id) = yield self.store.persist_event( + event, context=context + ) + + # this intentionally does not yield: we don't care about the result + # and don't need to wait for it. + preserve_fn(self.hs.get_pusherpool().on_new_notifications)( + event_stream_id, max_stream_id + ) + + destinations = set() + for k, s in context.current_state.items(): + try: + if k[0] == EventTypes.Member: + if s.content["membership"] == Membership.JOIN: + destinations.add(get_domian_from_id(s.state_key)) + except SynapseError: + logger.warn( + "Failed to get destination from event %s", s.event_id + ) + + with PreserveLoggingContext(): + # Don't block waiting on waking up all the listeners. + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) + + # If invite, remove room_state from unsigned before sending. + event.unsigned.pop("invite_room_state", None) + + federation_handler.handle_new_event( + event, destinations=destinations, + ) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index ed2cda837..69de145c6 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -113,7 +113,7 @@ class RoomMemberHandler(BaseHandler): prev_event_ids=prev_event_ids, ) - yield self.handle_new_client_event( + yield self.msg_handler.handle_new_client_event( requester, event, context, @@ -357,7 +357,7 @@ class RoomMemberHandler(BaseHandler): # so don't really fit into the general auth process. raise AuthError(403, "Guest access not allowed") - yield self.handle_new_client_event( + yield message_handler.handle_new_client_event( requester, event, context,