diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 39d772477..bcb093ba3 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -30,7 +30,7 @@ from synapse.api.urls import ConsentURIBuilder from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator -from synapse.replication.http.send_event import send_event_to_master +from synapse.replication.http.send_event import ReplicationSendEventRestServlet from synapse.types import RoomAlias, UserID from synapse.util.async import Linearizer from synapse.util.frozenutils import frozendict_json_encoder @@ -171,7 +171,7 @@ class EventCreationHandler(object): self.notifier = hs.get_notifier() self.config = hs.config - self.http_client = hs.get_simple_http_client() + self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs) # This is only used to get at ratelimit function, and maybe_kick_guest_users self.base_handler = BaseHandler(hs) @@ -559,12 +559,9 @@ class EventCreationHandler(object): try: # If we're a worker we need to hit out to the master. if self.config.worker_app: - yield send_event_to_master( - clock=self.hs.get_clock(), + yield self.send_event_to_master( + event_id=event.event_id, store=self.store, - client=self.http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, requester=requester, event=event, context=context, diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 5227bc333..50810d94c 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -14,90 +14,26 @@ # limitations under the License. import logging -import re from twisted.internet import defer -from synapse.api.errors import ( - CodeMessageException, - MatrixCodeMessageException, - SynapseError, -) from synapse.events import FrozenEvent from synapse.events.snapshot import EventContext -from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.http.servlet import parse_json_object_from_request +from synapse.replication.http._base import ReplicationEndpoint from synapse.types import Requester, UserID -from synapse.util.caches.response_cache import ResponseCache from synapse.util.metrics import Measure logger = logging.getLogger(__name__) -@defer.inlineCallbacks -def send_event_to_master(clock, store, client, host, port, requester, event, context, - ratelimit, extra_users): - """Send event to be handled on the master - - Args: - clock (synapse.util.Clock) - store (DataStore) - client (SimpleHttpClient) - host (str): host of master - port (int): port on master listening for HTTP replication - requester (Requester) - event (FrozenEvent) - context (EventContext) - ratelimit (bool) - extra_users (list(UserID)): Any extra users to notify about event - """ - uri = "http://%s:%s/_synapse/replication/send_event/%s" % ( - host, port, event.event_id, - ) - - serialized_context = yield context.serialize(event, store) - - payload = { - "event": event.get_pdu_json(), - "internal_metadata": event.internal_metadata.get_dict(), - "rejected_reason": event.rejected_reason, - "context": serialized_context, - "requester": requester.serialize(), - "ratelimit": ratelimit, - "extra_users": [u.to_string() for u in extra_users], - } - - try: - # We keep retrying the same request for timeouts. This is so that we - # have a good idea that the request has either succeeded or failed on - # the master, and so whether we should clean up or not. - while True: - try: - result = yield client.put_json(uri, payload) - break - except CodeMessageException as e: - if e.code != 504: - raise - - logger.warn("send_event request timed out") - - # If we timed out we probably don't need to worry about backing - # off too much, but lets just wait a little anyway. - yield clock.sleep(1) - except MatrixCodeMessageException as e: - # We convert to SynapseError as we know that it was a SynapseError - # on the master process that we should send to the client. (And - # importantly, not stack traces everywhere) - raise SynapseError(e.code, e.msg, e.errcode) - defer.returnValue(result) - - -class ReplicationSendEventRestServlet(RestServlet): +class ReplicationSendEventRestServlet(ReplicationEndpoint): """Handles events newly created on workers, including persisting and notifying. The API looks like: - POST /_synapse/replication/send_event/:event_id + POST /_synapse/replication/send_event/:event_id/:txn_id { "event": { .. serialized event .. }, @@ -109,27 +45,48 @@ class ReplicationSendEventRestServlet(RestServlet): "extra_users": [], } """ - PATTERNS = [re.compile("^/_synapse/replication/send_event/(?P[^/]+)$")] + NAME = "send_event" + PATH_ARGS = ("event_id",) + POST = True def __init__(self, hs): - super(ReplicationSendEventRestServlet, self).__init__() + super(ReplicationSendEventRestServlet, self).__init__(hs) self.event_creation_handler = hs.get_event_creation_handler() self.store = hs.get_datastore() self.clock = hs.get_clock() - # The responses are tiny, so we may as well cache them for a while - self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000) + @staticmethod + @defer.inlineCallbacks + def _serialize_payload(event_id, store, event, context, requester, + ratelimit, extra_users): + """ + Args: + event_id (str) + store (DataStore) + requester (Requester) + event (FrozenEvent) + context (EventContext) + ratelimit (bool) + extra_users (list(UserID)): Any extra users to notify about event + """ - def on_PUT(self, request, event_id): - return self.response_cache.wrap( - event_id, - self._handle_request, - request - ) + serialized_context = yield context.serialize(event, store) + + payload = { + "event": event.get_pdu_json(), + "internal_metadata": event.internal_metadata.get_dict(), + "rejected_reason": event.rejected_reason, + "context": serialized_context, + "requester": requester.serialize(), + "ratelimit": ratelimit, + "extra_users": [u.to_string() for u in extra_users], + } + + defer.returnValue(payload) @defer.inlineCallbacks - def _handle_request(self, request): + def _handle_request(self, request, event_id): with Measure(self.clock, "repl_send_event_parse"): content = parse_json_object_from_request(request)