Merge pull request #3632 from matrix-org/erikj/refactor_repl_servlet

Add helper base class for generating new replication endpoints
This commit is contained in:
Erik Johnston 2018-08-09 10:06:23 +01:00 committed by GitHub
commit 2bdafaf3c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 396 additions and 274 deletions

1
changelog.d/3632.misc Normal file
View File

@ -0,0 +1 @@
Refactor HTTP replication endpoints to reduce code duplication

View File

@ -30,7 +30,7 @@ from synapse.api.urls import ConsentURIBuilder
from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import send_event_to_master from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.types import RoomAlias, UserID from synapse.types import RoomAlias, UserID
from synapse.util.async import Linearizer from synapse.util.async import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.frozenutils import frozendict_json_encoder
@ -171,7 +171,7 @@ class EventCreationHandler(object):
self.notifier = hs.get_notifier() self.notifier = hs.get_notifier()
self.config = hs.config self.config = hs.config
self.http_client = hs.get_simple_http_client() self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
# This is only used to get at ratelimit function, and maybe_kick_guest_users # This is only used to get at ratelimit function, and maybe_kick_guest_users
self.base_handler = BaseHandler(hs) self.base_handler = BaseHandler(hs)
@ -559,12 +559,9 @@ class EventCreationHandler(object):
try: try:
# If we're a worker we need to hit out to the master. # If we're a worker we need to hit out to the master.
if self.config.worker_app: if self.config.worker_app:
yield send_event_to_master( yield self.send_event_to_master(
clock=self.hs.get_clock(), event_id=event.event_id,
store=self.store, store=self.store,
client=self.http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
requester=requester, requester=requester,
event=event, event=event,
context=context, context=context,

View File

@ -20,16 +20,24 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.handlers.room_member import RoomMemberHandler from synapse.handlers.room_member import RoomMemberHandler
from synapse.replication.http.membership import ( from synapse.replication.http.membership import (
get_or_register_3pid_guest, ReplicationRegister3PIDGuestRestServlet as Repl3PID,
notify_user_membership_change, ReplicationRemoteJoinRestServlet as ReplRemoteJoin,
remote_join, ReplicationRemoteRejectInviteRestServlet as ReplRejectInvite,
remote_reject_invite, ReplicationUserJoinedLeftRoomRestServlet as ReplJoinedLeft,
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class RoomMemberWorkerHandler(RoomMemberHandler): class RoomMemberWorkerHandler(RoomMemberHandler):
def __init__(self, hs):
super(RoomMemberWorkerHandler, self).__init__(hs)
self._get_register_3pid_client = Repl3PID.make_client(hs)
self._remote_join_client = ReplRemoteJoin.make_client(hs)
self._remote_reject_client = ReplRejectInvite.make_client(hs)
self._notify_change_client = ReplJoinedLeft.make_client(hs)
@defer.inlineCallbacks @defer.inlineCallbacks
def _remote_join(self, requester, remote_room_hosts, room_id, user, content): def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
"""Implements RoomMemberHandler._remote_join """Implements RoomMemberHandler._remote_join
@ -37,10 +45,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
if len(remote_room_hosts) == 0: if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers") raise SynapseError(404, "No known servers")
ret = yield remote_join( ret = yield self._remote_join_client(
self.simple_http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
requester=requester, requester=requester,
remote_room_hosts=remote_room_hosts, remote_room_hosts=remote_room_hosts,
room_id=room_id, room_id=room_id,
@ -55,10 +60,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
"""Implements RoomMemberHandler._remote_reject_invite """Implements RoomMemberHandler._remote_reject_invite
""" """
return remote_reject_invite( return self._remote_reject_client(
self.simple_http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
requester=requester, requester=requester,
remote_room_hosts=remote_room_hosts, remote_room_hosts=remote_room_hosts,
room_id=room_id, room_id=room_id,
@ -68,10 +70,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def _user_joined_room(self, target, room_id): def _user_joined_room(self, target, room_id):
"""Implements RoomMemberHandler._user_joined_room """Implements RoomMemberHandler._user_joined_room
""" """
return notify_user_membership_change( return self._notify_change_client(
self.simple_http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
user_id=target.to_string(), user_id=target.to_string(),
room_id=room_id, room_id=room_id,
change="joined", change="joined",
@ -80,10 +79,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def _user_left_room(self, target, room_id): def _user_left_room(self, target, room_id):
"""Implements RoomMemberHandler._user_left_room """Implements RoomMemberHandler._user_left_room
""" """
return notify_user_membership_change( return self._notify_change_client(
self.simple_http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
user_id=target.to_string(), user_id=target.to_string(),
room_id=room_id, room_id=room_id,
change="left", change="left",
@ -92,10 +88,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id): def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
"""Implements RoomMemberHandler.get_or_register_3pid_guest """Implements RoomMemberHandler.get_or_register_3pid_guest
""" """
return get_or_register_3pid_guest( return self._get_register_3pid_client(
self.simple_http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
requester=requester, requester=requester,
medium=medium, medium=medium,
address=address, address=address,

View File

@ -0,0 +1,215 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import logging
import re
from six.moves import urllib
from twisted.internet import defer
from synapse.api.errors import CodeMessageException, HttpResponseException
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import random_string
logger = logging.getLogger(__name__)
class ReplicationEndpoint(object):
"""Helper base class for defining new replication HTTP endpoints.
This creates an endpoint under `/_synapse/replication/:NAME/:PATH_ARGS..`
(with an `/:txn_id` prefix for cached requests.), where NAME is a name,
PATH_ARGS are a tuple of parameters to be encoded in the URL.
For example, if `NAME` is "send_event" and `PATH_ARGS` is `("event_id",)`,
with `CACHE` set to true then this generates an endpoint:
/_synapse/replication/send_event/:event_id/:txn_id
For POST/PUT requests the payload is serialized to json and sent as the
body, while for GET requests the payload is added as query parameters. See
`_serialize_payload` for details.
Incoming requests are handled by overriding `_handle_request`. Servers
must call `register` to register the path with the HTTP server.
Requests can be sent by calling the client returned by `make_client`.
Attributes:
NAME (str): A name for the endpoint, added to the path as well as used
in logging and metrics.
PATH_ARGS (tuple[str]): A list of parameters to be added to the path.
Adding parameters to the path (rather than payload) can make it
easier to follow along in the log files.
METHOD (str): The method of the HTTP request, defaults to POST. Can be
one of POST, PUT or GET. If GET then the payload is sent as query
parameters rather than a JSON body.
CACHE (bool): Whether server should cache the result of the request/
If true then transparently adds a txn_id to all requests, and
`_handle_request` must return a Deferred.
RETRY_ON_TIMEOUT(bool): Whether or not to retry the request when a 504
is received.
"""
__metaclass__ = abc.ABCMeta
NAME = abc.abstractproperty()
PATH_ARGS = abc.abstractproperty()
METHOD = "POST"
CACHE = True
RETRY_ON_TIMEOUT = True
def __init__(self, hs):
if self.CACHE:
self.response_cache = ResponseCache(
hs, "repl." + self.NAME,
timeout_ms=30 * 60 * 1000,
)
assert self.METHOD in ("PUT", "POST", "GET")
@abc.abstractmethod
def _serialize_payload(**kwargs):
"""Static method that is called when creating a request.
Concrete implementations should have explicit parameters (rather than
kwargs) so that an appropriate exception is raised if the client is
called with unexpected parameters. All PATH_ARGS must appear in
argument list.
Returns:
Deferred[dict]|dict: If POST/PUT request then dictionary must be
JSON serialisable, otherwise must be appropriate for adding as
query args.
"""
return {}
@abc.abstractmethod
def _handle_request(self, request, **kwargs):
"""Handle incoming request.
This is called with the request object and PATH_ARGS.
Returns:
Deferred[dict]: A JSON serialisable dict to be used as response
body of request.
"""
pass
@classmethod
def make_client(cls, hs):
"""Create a client that makes requests.
Returns a callable that accepts the same parameters as `_serialize_payload`.
"""
clock = hs.get_clock()
host = hs.config.worker_replication_host
port = hs.config.worker_replication_http_port
client = hs.get_simple_http_client()
@defer.inlineCallbacks
def send_request(**kwargs):
data = yield cls._serialize_payload(**kwargs)
url_args = [urllib.parse.quote(kwargs[name]) for name in cls.PATH_ARGS]
if cls.CACHE:
txn_id = random_string(10)
url_args.append(txn_id)
if cls.METHOD == "POST":
request_func = client.post_json_get_json
elif cls.METHOD == "PUT":
request_func = client.put_json
elif cls.METHOD == "GET":
request_func = client.get_json
else:
# We have already asserted in the constructor that a
# compatible was picked, but lets be paranoid.
raise Exception(
"Unknown METHOD on %s replication endpoint" % (cls.NAME,)
)
uri = "http://%s:%s/_synapse/replication/%s/%s" % (
host, port, cls.NAME, "/".join(url_args)
)
try:
# We keep retrying the same request for timeouts. This is so that we
# have a good idea that the request has either succeeded or failed on
# the master, and so whether we should clean up or not.
while True:
try:
result = yield request_func(uri, data)
break
except CodeMessageException as e:
if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
raise
logger.warn("%s request timed out", cls.NAME)
# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
yield clock.sleep(1)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
raise e.to_synapse_error()
defer.returnValue(result)
return send_request
def register(self, http_server):
"""Called by the server to register this as a handler to the
appropriate path.
"""
url_args = list(self.PATH_ARGS)
handler = self._handle_request
method = self.METHOD
if self.CACHE:
handler = self._cached_handler
url_args.append("txn_id")
args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args)
pattern = re.compile("^/_synapse/replication/%s/%s$" % (
self.NAME,
args
))
http_server.register_paths(method, [pattern], handler)
def _cached_handler(self, request, txn_id, **kwargs):
"""Called on new incoming requests when caching is enabled. Checks
if there is a cached response for the request and returns that,
otherwise calls `_handle_request` and caches its response.
"""
# We just use the txn_id here, but we probably also want to use the
# other PATH_ARGS as well.
assert self.CACHE
return self.response_cache.wrap(
txn_id,
self._handle_request,
request, **kwargs
)

View File

@ -14,182 +14,63 @@
# limitations under the License. # limitations under the License.
import logging import logging
import re
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import HttpResponseException from synapse.http.servlet import parse_json_object_from_request
from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import Requester, UserID from synapse.types import Requester, UserID
from synapse.util.distributor import user_joined_room, user_left_room from synapse.util.distributor import user_joined_room, user_left_room
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@defer.inlineCallbacks class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
def remote_join(client, host, port, requester, remote_room_hosts, """Does a remote join for the given user to the given room
room_id, user_id, content):
"""Ask the master to do a remote join for the given user to the given room
Args: Request format:
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
requester (Requester)
remote_room_hosts (list[str]): Servers to try and join via
room_id (str)
user_id (str)
content (dict): The event content to use for the join event
Returns: POST /_synapse/replication/remote_join/:room_id/:user_id
Deferred
"""
uri = "http://%s:%s/_synapse/replication/remote_join" % (host, port)
payload = { {
"requester": requester.serialize(), "requester": ...,
"remote_room_hosts": remote_room_hosts, "remote_room_hosts": [...],
"room_id": room_id, "content": { ... }
"user_id": user_id, }
"content": content,
}
try:
result = yield client.post_json_get_json(uri, payload)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
raise e.to_synapse_error()
defer.returnValue(result)
@defer.inlineCallbacks
def remote_reject_invite(client, host, port, requester, remote_room_hosts,
room_id, user_id):
"""Ask master to reject the invite for the user and room.
Args:
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
requester (Requester)
remote_room_hosts (list[str]): Servers to try and reject via
room_id (str)
user_id (str)
Returns:
Deferred
"""
uri = "http://%s:%s/_synapse/replication/remote_reject_invite" % (host, port)
payload = {
"requester": requester.serialize(),
"remote_room_hosts": remote_room_hosts,
"room_id": room_id,
"user_id": user_id,
}
try:
result = yield client.post_json_get_json(uri, payload)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
raise e.to_synapse_error()
defer.returnValue(result)
@defer.inlineCallbacks
def get_or_register_3pid_guest(client, host, port, requester,
medium, address, inviter_user_id):
"""Ask the master to get/create a guest account for given 3PID.
Args:
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
requester (Requester)
medium (str)
address (str)
inviter_user_id (str): The user ID who is trying to invite the
3PID
Returns:
Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the
3PID guest account.
""" """
uri = "http://%s:%s/_synapse/replication/get_or_register_3pid_guest" % (host, port) NAME = "remote_join"
PATH_ARGS = ("room_id", "user_id",)
payload = {
"requester": requester.serialize(),
"medium": medium,
"address": address,
"inviter_user_id": inviter_user_id,
}
try:
result = yield client.post_json_get_json(uri, payload)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
raise e.to_synapse_error()
defer.returnValue(result)
@defer.inlineCallbacks
def notify_user_membership_change(client, host, port, user_id, room_id, change):
"""Notify master that a user has joined or left the room
Args:
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication.
user_id (str)
room_id (str)
change (str): Either "join" or "left"
Returns:
Deferred
"""
assert change in ("joined", "left")
uri = "http://%s:%s/_synapse/replication/user_%s_room" % (host, port, change)
payload = {
"user_id": user_id,
"room_id": room_id,
}
try:
result = yield client.post_json_get_json(uri, payload)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
raise e.to_synapse_error()
defer.returnValue(result)
class ReplicationRemoteJoinRestServlet(RestServlet):
PATTERNS = [re.compile("^/_synapse/replication/remote_join$")]
def __init__(self, hs): def __init__(self, hs):
super(ReplicationRemoteJoinRestServlet, self).__init__() super(ReplicationRemoteJoinRestServlet, self).__init__(hs)
self.federation_handler = hs.get_handlers().federation_handler self.federation_handler = hs.get_handlers().federation_handler
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.clock = hs.get_clock() self.clock = hs.get_clock()
@staticmethod
def _serialize_payload(requester, room_id, user_id, remote_room_hosts,
content):
"""
Args:
requester(Requester)
room_id (str)
user_id (str)
remote_room_hosts (list[str]): Servers to try and join via
content(dict): The event content to use for the join event
"""
return {
"requester": requester.serialize(),
"remote_room_hosts": remote_room_hosts,
"content": content,
}
@defer.inlineCallbacks @defer.inlineCallbacks
def on_POST(self, request): def _handle_request(self, request, room_id, user_id):
content = parse_json_object_from_request(request) content = parse_json_object_from_request(request)
remote_room_hosts = content["remote_room_hosts"] remote_room_hosts = content["remote_room_hosts"]
room_id = content["room_id"]
user_id = content["user_id"]
event_content = content["content"] event_content = content["content"]
requester = Requester.deserialize(self.store, content["requester"]) requester = Requester.deserialize(self.store, content["requester"])
@ -212,23 +93,48 @@ class ReplicationRemoteJoinRestServlet(RestServlet):
defer.returnValue((200, {})) defer.returnValue((200, {}))
class ReplicationRemoteRejectInviteRestServlet(RestServlet): class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
PATTERNS = [re.compile("^/_synapse/replication/remote_reject_invite$")] """Rejects the invite for the user and room.
Request format:
POST /_synapse/replication/remote_reject_invite/:room_id/:user_id
{
"requester": ...,
"remote_room_hosts": [...],
}
"""
NAME = "remote_reject_invite"
PATH_ARGS = ("room_id", "user_id",)
def __init__(self, hs): def __init__(self, hs):
super(ReplicationRemoteRejectInviteRestServlet, self).__init__() super(ReplicationRemoteRejectInviteRestServlet, self).__init__(hs)
self.federation_handler = hs.get_handlers().federation_handler self.federation_handler = hs.get_handlers().federation_handler
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.clock = hs.get_clock() self.clock = hs.get_clock()
@staticmethod
def _serialize_payload(requester, room_id, user_id, remote_room_hosts):
"""
Args:
requester(Requester)
room_id (str)
user_id (str)
remote_room_hosts (list[str]): Servers to try and reject via
"""
return {
"requester": requester.serialize(),
"remote_room_hosts": remote_room_hosts,
}
@defer.inlineCallbacks @defer.inlineCallbacks
def on_POST(self, request): def _handle_request(self, request, room_id, user_id):
content = parse_json_object_from_request(request) content = parse_json_object_from_request(request)
remote_room_hosts = content["remote_room_hosts"] remote_room_hosts = content["remote_room_hosts"]
room_id = content["room_id"]
user_id = content["user_id"]
requester = Requester.deserialize(self.store, content["requester"]) requester = Requester.deserialize(self.store, content["requester"])
@ -264,18 +170,50 @@ class ReplicationRemoteRejectInviteRestServlet(RestServlet):
defer.returnValue((200, ret)) defer.returnValue((200, ret))
class ReplicationRegister3PIDGuestRestServlet(RestServlet): class ReplicationRegister3PIDGuestRestServlet(ReplicationEndpoint):
PATTERNS = [re.compile("^/_synapse/replication/get_or_register_3pid_guest$")] """Gets/creates a guest account for given 3PID.
Request format:
POST /_synapse/replication/get_or_register_3pid_guest/
{
"requester": ...,
"medium": ...,
"address": ...,
"inviter_user_id": ...
}
"""
NAME = "get_or_register_3pid_guest"
PATH_ARGS = ()
def __init__(self, hs): def __init__(self, hs):
super(ReplicationRegister3PIDGuestRestServlet, self).__init__() super(ReplicationRegister3PIDGuestRestServlet, self).__init__(hs)
self.registeration_handler = hs.get_handlers().registration_handler self.registeration_handler = hs.get_handlers().registration_handler
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.clock = hs.get_clock() self.clock = hs.get_clock()
@staticmethod
def _serialize_payload(requester, medium, address, inviter_user_id):
"""
Args:
requester(Requester)
medium (str)
address (str)
inviter_user_id (str): The user ID who is trying to invite the
3PID
"""
return {
"requester": requester.serialize(),
"medium": medium,
"address": address,
"inviter_user_id": inviter_user_id,
}
@defer.inlineCallbacks @defer.inlineCallbacks
def on_POST(self, request): def _handle_request(self, request):
content = parse_json_object_from_request(request) content = parse_json_object_from_request(request)
medium = content["medium"] medium = content["medium"]
@ -296,23 +234,41 @@ class ReplicationRegister3PIDGuestRestServlet(RestServlet):
defer.returnValue((200, ret)) defer.returnValue((200, ret))
class ReplicationUserJoinedLeftRoomRestServlet(RestServlet): class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
PATTERNS = [re.compile("^/_synapse/replication/user_(?P<change>joined|left)_room$")] """Notifies that a user has joined or left the room
Request format:
POST /_synapse/replication/membership_change/:room_id/:user_id/:change
{}
"""
NAME = "membership_change"
PATH_ARGS = ("room_id", "user_id", "change")
CACHE = False # No point caching as should return instantly.
def __init__(self, hs): def __init__(self, hs):
super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__() super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__(hs)
self.registeration_handler = hs.get_handlers().registration_handler self.registeration_handler = hs.get_handlers().registration_handler
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.clock = hs.get_clock() self.clock = hs.get_clock()
self.distributor = hs.get_distributor() self.distributor = hs.get_distributor()
def on_POST(self, request, change): @staticmethod
content = parse_json_object_from_request(request) def _serialize_payload(room_id, user_id, change):
"""
Args:
room_id (str)
user_id (str)
change (str): Either "joined" or "left"
"""
assert change in ("joined", "left",)
user_id = content["user_id"] return {}
room_id = content["room_id"]
def _handle_request(self, request, room_id, user_id, change):
logger.info("user membership change: %s in %s", user_id, room_id) logger.info("user membership change: %s in %s", user_id, room_id)
user = UserID.from_string(user_id) user = UserID.from_string(user_id)

View File

@ -14,86 +14,26 @@
# limitations under the License. # limitations under the License.
import logging import logging
import re
from twisted.internet import defer from twisted.internet import defer
from synapse.api.errors import CodeMessageException, HttpResponseException
from synapse.events import FrozenEvent from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext from synapse.events.snapshot import EventContext
from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import Requester, UserID from synapse.types import Requester, UserID
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@defer.inlineCallbacks class ReplicationSendEventRestServlet(ReplicationEndpoint):
def send_event_to_master(clock, store, client, host, port, requester, event, context,
ratelimit, extra_users):
"""Send event to be handled on the master
Args:
clock (synapse.util.Clock)
store (DataStore)
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
requester (Requester)
event (FrozenEvent)
context (EventContext)
ratelimit (bool)
extra_users (list(UserID)): Any extra users to notify about event
"""
uri = "http://%s:%s/_synapse/replication/send_event/%s" % (
host, port, event.event_id,
)
serialized_context = yield context.serialize(event, store)
payload = {
"event": event.get_pdu_json(),
"internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason,
"context": serialized_context,
"requester": requester.serialize(),
"ratelimit": ratelimit,
"extra_users": [u.to_string() for u in extra_users],
}
try:
# We keep retrying the same request for timeouts. This is so that we
# have a good idea that the request has either succeeded or failed on
# the master, and so whether we should clean up or not.
while True:
try:
result = yield client.put_json(uri, payload)
break
except CodeMessageException as e:
if e.code != 504:
raise
logger.warn("send_event request timed out")
# If we timed out we probably don't need to worry about backing
# off too much, but lets just wait a little anyway.
yield clock.sleep(1)
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the master process that we should send to the client. (And
# importantly, not stack traces everywhere)
raise e.to_synapse_error()
defer.returnValue(result)
class ReplicationSendEventRestServlet(RestServlet):
"""Handles events newly created on workers, including persisting and """Handles events newly created on workers, including persisting and
notifying. notifying.
The API looks like: The API looks like:
POST /_synapse/replication/send_event/:event_id POST /_synapse/replication/send_event/:event_id/:txn_id
{ {
"event": { .. serialized event .. }, "event": { .. serialized event .. },
@ -105,27 +45,47 @@ class ReplicationSendEventRestServlet(RestServlet):
"extra_users": [], "extra_users": [],
} }
""" """
PATTERNS = [re.compile("^/_synapse/replication/send_event/(?P<event_id>[^/]+)$")] NAME = "send_event"
PATH_ARGS = ("event_id",)
def __init__(self, hs): def __init__(self, hs):
super(ReplicationSendEventRestServlet, self).__init__() super(ReplicationSendEventRestServlet, self).__init__(hs)
self.event_creation_handler = hs.get_event_creation_handler() self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastore() self.store = hs.get_datastore()
self.clock = hs.get_clock() self.clock = hs.get_clock()
# The responses are tiny, so we may as well cache them for a while @staticmethod
self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000) @defer.inlineCallbacks
def _serialize_payload(event_id, store, event, context, requester,
ratelimit, extra_users):
"""
Args:
event_id (str)
store (DataStore)
requester (Requester)
event (FrozenEvent)
context (EventContext)
ratelimit (bool)
extra_users (list(UserID)): Any extra users to notify about event
"""
def on_PUT(self, request, event_id): serialized_context = yield context.serialize(event, store)
return self.response_cache.wrap(
event_id, payload = {
self._handle_request, "event": event.get_pdu_json(),
request "internal_metadata": event.internal_metadata.get_dict(),
) "rejected_reason": event.rejected_reason,
"context": serialized_context,
"requester": requester.serialize(),
"ratelimit": ratelimit,
"extra_users": [u.to_string() for u in extra_users],
}
defer.returnValue(payload)
@defer.inlineCallbacks @defer.inlineCallbacks
def _handle_request(self, request): def _handle_request(self, request, event_id):
with Measure(self.clock, "repl_send_event_parse"): with Measure(self.clock, "repl_send_event_parse"):
content = parse_json_object_from_request(request) content = parse_json_object_from_request(request)