mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-09-28 18:09:30 -04:00
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/public_rooms_federate_develop
This commit is contained in:
commit
802884d4ee
391 changed files with 18834 additions and 6352 deletions
|
@ -20,10 +20,10 @@ import six
|
|||
from twisted.internet import defer
|
||||
from twisted.internet.defer import DeferredList
|
||||
|
||||
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
|
||||
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership, RoomVersions
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
from synapse.crypto.event_signing import check_event_content_hash
|
||||
from synapse.events import FrozenEvent
|
||||
from synapse.events import event_type_from_format_version
|
||||
from synapse.events.utils import prune_event
|
||||
from synapse.http.servlet import assert_params_in_dict
|
||||
from synapse.types import get_domain_from_id
|
||||
|
@ -43,8 +43,8 @@ class FederationBase(object):
|
|||
self._clock = hs.get_clock()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False,
|
||||
include_none=False):
|
||||
def _check_sigs_and_hash_and_fetch(self, origin, pdus, room_version,
|
||||
outlier=False, include_none=False):
|
||||
"""Takes a list of PDUs and checks the signatures and hashs of each
|
||||
one. If a PDU fails its signature check then we check if we have it in
|
||||
the database and if not then request if from the originating server of
|
||||
|
@ -56,13 +56,17 @@ class FederationBase(object):
|
|||
a new list.
|
||||
|
||||
Args:
|
||||
origin (str)
|
||||
pdu (list)
|
||||
outlier (bool)
|
||||
room_version (str)
|
||||
outlier (bool): Whether the events are outliers or not
|
||||
include_none (str): Whether to include None in the returned list
|
||||
for events that have failed their checks
|
||||
|
||||
Returns:
|
||||
Deferred : A list of PDUs that have valid signatures and hashes.
|
||||
"""
|
||||
deferreds = self._check_sigs_and_hashes(pdus)
|
||||
deferreds = self._check_sigs_and_hashes(room_version, pdus)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def handle_check_result(pdu, deferred):
|
||||
|
@ -84,6 +88,7 @@ class FederationBase(object):
|
|||
res = yield self.get_pdu(
|
||||
destinations=[pdu.origin],
|
||||
event_id=pdu.event_id,
|
||||
room_version=room_version,
|
||||
outlier=outlier,
|
||||
timeout=10000,
|
||||
)
|
||||
|
@ -116,16 +121,17 @@ class FederationBase(object):
|
|||
else:
|
||||
defer.returnValue([p for p in valid_pdus if p])
|
||||
|
||||
def _check_sigs_and_hash(self, pdu):
|
||||
def _check_sigs_and_hash(self, room_version, pdu):
|
||||
return logcontext.make_deferred_yieldable(
|
||||
self._check_sigs_and_hashes([pdu])[0],
|
||||
self._check_sigs_and_hashes(room_version, [pdu])[0],
|
||||
)
|
||||
|
||||
def _check_sigs_and_hashes(self, pdus):
|
||||
def _check_sigs_and_hashes(self, room_version, pdus):
|
||||
"""Checks that each of the received events is correctly signed by the
|
||||
sending server.
|
||||
|
||||
Args:
|
||||
room_version (str): The room version of the PDUs
|
||||
pdus (list[FrozenEvent]): the events to be checked
|
||||
|
||||
Returns:
|
||||
|
@ -136,7 +142,7 @@ class FederationBase(object):
|
|||
* throws a SynapseError if the signature check failed.
|
||||
The deferreds run their callbacks in the sentinel logcontext.
|
||||
"""
|
||||
deferreds = _check_sigs_on_pdus(self.keyring, pdus)
|
||||
deferreds = _check_sigs_on_pdus(self.keyring, room_version, pdus)
|
||||
|
||||
ctx = logcontext.LoggingContext.current_context()
|
||||
|
||||
|
@ -198,16 +204,17 @@ class FederationBase(object):
|
|||
|
||||
|
||||
class PduToCheckSig(namedtuple("PduToCheckSig", [
|
||||
"pdu", "redacted_pdu_json", "event_id_domain", "sender_domain", "deferreds",
|
||||
"pdu", "redacted_pdu_json", "sender_domain", "deferreds",
|
||||
])):
|
||||
pass
|
||||
|
||||
|
||||
def _check_sigs_on_pdus(keyring, pdus):
|
||||
def _check_sigs_on_pdus(keyring, room_version, pdus):
|
||||
"""Check that the given events are correctly signed
|
||||
|
||||
Args:
|
||||
keyring (synapse.crypto.Keyring): keyring object to do the checks
|
||||
room_version (str): the room version of the PDUs
|
||||
pdus (Collection[EventBase]): the events to be checked
|
||||
|
||||
Returns:
|
||||
|
@ -220,9 +227,7 @@ def _check_sigs_on_pdus(keyring, pdus):
|
|||
|
||||
# we want to check that the event is signed by:
|
||||
#
|
||||
# (a) the server which created the event_id
|
||||
#
|
||||
# (b) the sender's server.
|
||||
# (a) the sender's server
|
||||
#
|
||||
# - except in the case of invites created from a 3pid invite, which are exempt
|
||||
# from this check, because the sender has to match that of the original 3pid
|
||||
|
@ -236,34 +241,26 @@ def _check_sigs_on_pdus(keyring, pdus):
|
|||
# and signatures are *supposed* to be valid whether or not an event has been
|
||||
# redacted. But this isn't the worst of the ways that 3pid invites are broken.
|
||||
#
|
||||
# (b) for V1 and V2 rooms, the server which created the event_id
|
||||
#
|
||||
# let's start by getting the domain for each pdu, and flattening the event back
|
||||
# to JSON.
|
||||
|
||||
pdus_to_check = [
|
||||
PduToCheckSig(
|
||||
pdu=p,
|
||||
redacted_pdu_json=prune_event(p).get_pdu_json(),
|
||||
event_id_domain=get_domain_from_id(p.event_id),
|
||||
sender_domain=get_domain_from_id(p.sender),
|
||||
deferreds=[],
|
||||
)
|
||||
for p in pdus
|
||||
]
|
||||
|
||||
# first make sure that the event is signed by the event_id's domain
|
||||
deferreds = keyring.verify_json_objects_for_server([
|
||||
(p.event_id_domain, p.redacted_pdu_json)
|
||||
for p in pdus_to_check
|
||||
])
|
||||
|
||||
for p, d in zip(pdus_to_check, deferreds):
|
||||
p.deferreds.append(d)
|
||||
|
||||
# now let's look for events where the sender's domain is different to the
|
||||
# event id's domain (normally only the case for joins/leaves), and add additional
|
||||
# checks.
|
||||
# First we check that the sender event is signed by the sender's domain
|
||||
# (except if its a 3pid invite, in which case it may be sent by any server)
|
||||
pdus_to_check_sender = [
|
||||
p for p in pdus_to_check
|
||||
if p.sender_domain != p.event_id_domain and not _is_invite_via_3pid(p.pdu)
|
||||
if not _is_invite_via_3pid(p.pdu)
|
||||
]
|
||||
|
||||
more_deferreds = keyring.verify_json_objects_for_server([
|
||||
|
@ -274,19 +271,43 @@ def _check_sigs_on_pdus(keyring, pdus):
|
|||
for p, d in zip(pdus_to_check_sender, more_deferreds):
|
||||
p.deferreds.append(d)
|
||||
|
||||
# now let's look for events where the sender's domain is different to the
|
||||
# event id's domain (normally only the case for joins/leaves), and add additional
|
||||
# checks. Only do this if the room version has a concept of event ID domain
|
||||
if room_version in (
|
||||
RoomVersions.V1, RoomVersions.V2, RoomVersions.STATE_V2_TEST,
|
||||
):
|
||||
pdus_to_check_event_id = [
|
||||
p for p in pdus_to_check
|
||||
if p.sender_domain != get_domain_from_id(p.pdu.event_id)
|
||||
]
|
||||
|
||||
more_deferreds = keyring.verify_json_objects_for_server([
|
||||
(get_domain_from_id(p.pdu.event_id), p.redacted_pdu_json)
|
||||
for p in pdus_to_check_event_id
|
||||
])
|
||||
|
||||
for p, d in zip(pdus_to_check_event_id, more_deferreds):
|
||||
p.deferreds.append(d)
|
||||
elif room_version in (RoomVersions.V3,):
|
||||
pass # No further checks needed, as event IDs are hashes here
|
||||
else:
|
||||
raise RuntimeError("Unrecognized room version %s" % (room_version,))
|
||||
|
||||
# replace lists of deferreds with single Deferreds
|
||||
return [_flatten_deferred_list(p.deferreds) for p in pdus_to_check]
|
||||
|
||||
|
||||
def _flatten_deferred_list(deferreds):
|
||||
"""Given a list of one or more deferreds, either return the single deferred, or
|
||||
combine into a DeferredList.
|
||||
"""Given a list of deferreds, either return the single deferred,
|
||||
combine into a DeferredList, or return an already resolved deferred.
|
||||
"""
|
||||
if len(deferreds) > 1:
|
||||
return DeferredList(deferreds, fireOnOneErrback=True, consumeErrors=True)
|
||||
else:
|
||||
assert len(deferreds) == 1
|
||||
elif len(deferreds) == 1:
|
||||
return deferreds[0]
|
||||
else:
|
||||
return defer.succeed(None)
|
||||
|
||||
|
||||
def _is_invite_via_3pid(event):
|
||||
|
@ -297,11 +318,12 @@ def _is_invite_via_3pid(event):
|
|||
)
|
||||
|
||||
|
||||
def event_from_pdu_json(pdu_json, outlier=False):
|
||||
def event_from_pdu_json(pdu_json, event_format_version, outlier=False):
|
||||
"""Construct a FrozenEvent from an event json received over federation
|
||||
|
||||
Args:
|
||||
pdu_json (object): pdu as received over federation
|
||||
event_format_version (int): The event format version
|
||||
outlier (bool): True to mark this event as an outlier
|
||||
|
||||
Returns:
|
||||
|
@ -313,7 +335,7 @@ def event_from_pdu_json(pdu_json, outlier=False):
|
|||
"""
|
||||
# we could probably enforce a bunch of other fields here (room_id, sender,
|
||||
# origin, etc etc)
|
||||
assert_params_in_dict(pdu_json, ('event_id', 'type', 'depth'))
|
||||
assert_params_in_dict(pdu_json, ('type', 'depth'))
|
||||
|
||||
depth = pdu_json['depth']
|
||||
if not isinstance(depth, six.integer_types):
|
||||
|
@ -325,8 +347,8 @@ def event_from_pdu_json(pdu_json, outlier=False):
|
|||
elif depth > MAX_DEPTH:
|
||||
raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
|
||||
|
||||
event = FrozenEvent(
|
||||
pdu_json
|
||||
event = event_type_from_format_version(event_format_version)(
|
||||
pdu_json,
|
||||
)
|
||||
|
||||
event.internal_metadata.outlier = outlier
|
||||
|
|
|
@ -25,14 +25,20 @@ from prometheus_client import Counter
|
|||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership
|
||||
from synapse.api.constants import (
|
||||
KNOWN_ROOM_VERSIONS,
|
||||
EventTypes,
|
||||
Membership,
|
||||
RoomVersions,
|
||||
)
|
||||
from synapse.api.errors import (
|
||||
CodeMessageException,
|
||||
Codes,
|
||||
FederationDeniedError,
|
||||
HttpResponseException,
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.events import builder
|
||||
from synapse.events import builder, room_version_to_event_format
|
||||
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
|
||||
from synapse.util import logcontext, unwrapFirstError
|
||||
from synapse.util.caches.expiringcache import ExpiringCache
|
||||
|
@ -66,6 +72,9 @@ class FederationClient(FederationBase):
|
|||
self.state = hs.get_state_handler()
|
||||
self.transport_layer = hs.get_federation_transport_client()
|
||||
|
||||
self.hostname = hs.hostname
|
||||
self.signing_key = hs.config.signing_key[0]
|
||||
|
||||
self._get_pdu_cache = ExpiringCache(
|
||||
cache_name="get_pdu_cache",
|
||||
clock=self._clock,
|
||||
|
@ -162,13 +171,13 @@ class FederationClient(FederationBase):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def backfill(self, dest, context, limit, extremities):
|
||||
def backfill(self, dest, room_id, limit, extremities):
|
||||
"""Requests some more historic PDUs for the given context from the
|
||||
given destination server.
|
||||
|
||||
Args:
|
||||
dest (str): The remote home server to ask.
|
||||
context (str): The context to backfill.
|
||||
room_id (str): The room_id to backfill.
|
||||
limit (int): The maximum number of PDUs to return.
|
||||
extremities (list): List of PDU id and origins of the first pdus
|
||||
we have seen from the context
|
||||
|
@ -183,18 +192,21 @@ class FederationClient(FederationBase):
|
|||
return
|
||||
|
||||
transaction_data = yield self.transport_layer.backfill(
|
||||
dest, context, extremities, limit)
|
||||
dest, room_id, extremities, limit)
|
||||
|
||||
logger.debug("backfill transaction_data=%s", repr(transaction_data))
|
||||
|
||||
room_version = yield self.store.get_room_version(room_id)
|
||||
format_ver = room_version_to_event_format(room_version)
|
||||
|
||||
pdus = [
|
||||
event_from_pdu_json(p, outlier=False)
|
||||
event_from_pdu_json(p, format_ver, outlier=False)
|
||||
for p in transaction_data["pdus"]
|
||||
]
|
||||
|
||||
# FIXME: We should handle signature failures more gracefully.
|
||||
pdus[:] = yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
||||
self._check_sigs_and_hashes(pdus),
|
||||
self._check_sigs_and_hashes(room_version, pdus),
|
||||
consumeErrors=True,
|
||||
).addErrback(unwrapFirstError))
|
||||
|
||||
|
@ -202,7 +214,8 @@ class FederationClient(FederationBase):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def get_pdu(self, destinations, event_id, outlier=False, timeout=None):
|
||||
def get_pdu(self, destinations, event_id, room_version, outlier=False,
|
||||
timeout=None):
|
||||
"""Requests the PDU with given origin and ID from the remote home
|
||||
servers.
|
||||
|
||||
|
@ -212,6 +225,7 @@ class FederationClient(FederationBase):
|
|||
Args:
|
||||
destinations (list): Which home servers to query
|
||||
event_id (str): event to fetch
|
||||
room_version (str): version of the room
|
||||
outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
|
||||
it's from an arbitary point in the context as opposed to part
|
||||
of the current block of PDUs. Defaults to `False`
|
||||
|
@ -230,6 +244,8 @@ class FederationClient(FederationBase):
|
|||
|
||||
pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
|
||||
|
||||
format_ver = room_version_to_event_format(room_version)
|
||||
|
||||
signed_pdu = None
|
||||
for destination in destinations:
|
||||
now = self._clock.time_msec()
|
||||
|
@ -245,7 +261,7 @@ class FederationClient(FederationBase):
|
|||
logger.debug("transaction_data %r", transaction_data)
|
||||
|
||||
pdu_list = [
|
||||
event_from_pdu_json(p, outlier=outlier)
|
||||
event_from_pdu_json(p, format_ver, outlier=outlier)
|
||||
for p in transaction_data["pdus"]
|
||||
]
|
||||
|
||||
|
@ -253,7 +269,7 @@ class FederationClient(FederationBase):
|
|||
pdu = pdu_list[0]
|
||||
|
||||
# Check signatures are correct.
|
||||
signed_pdu = yield self._check_sigs_and_hash(pdu)
|
||||
signed_pdu = yield self._check_sigs_and_hash(room_version, pdu)
|
||||
|
||||
break
|
||||
|
||||
|
@ -339,12 +355,16 @@ class FederationClient(FederationBase):
|
|||
destination, room_id, event_id=event_id,
|
||||
)
|
||||
|
||||
room_version = yield self.store.get_room_version(room_id)
|
||||
format_ver = room_version_to_event_format(room_version)
|
||||
|
||||
pdus = [
|
||||
event_from_pdu_json(p, outlier=True) for p in result["pdus"]
|
||||
event_from_pdu_json(p, format_ver, outlier=True)
|
||||
for p in result["pdus"]
|
||||
]
|
||||
|
||||
auth_chain = [
|
||||
event_from_pdu_json(p, outlier=True)
|
||||
event_from_pdu_json(p, format_ver, outlier=True)
|
||||
for p in result.get("auth_chain", [])
|
||||
]
|
||||
|
||||
|
@ -355,7 +375,8 @@ class FederationClient(FederationBase):
|
|||
signed_pdus = yield self._check_sigs_and_hash_and_fetch(
|
||||
destination,
|
||||
[p for p in pdus if p.event_id not in seen_events],
|
||||
outlier=True
|
||||
outlier=True,
|
||||
room_version=room_version,
|
||||
)
|
||||
signed_pdus.extend(
|
||||
seen_events[p.event_id] for p in pdus if p.event_id in seen_events
|
||||
|
@ -364,7 +385,8 @@ class FederationClient(FederationBase):
|
|||
signed_auth = yield self._check_sigs_and_hash_and_fetch(
|
||||
destination,
|
||||
[p for p in auth_chain if p.event_id not in seen_events],
|
||||
outlier=True
|
||||
outlier=True,
|
||||
room_version=room_version,
|
||||
)
|
||||
signed_auth.extend(
|
||||
seen_events[p.event_id] for p in auth_chain if p.event_id in seen_events
|
||||
|
@ -411,6 +433,8 @@ class FederationClient(FederationBase):
|
|||
random.shuffle(srvs)
|
||||
return srvs
|
||||
|
||||
room_version = yield self.store.get_room_version(room_id)
|
||||
|
||||
batch_size = 20
|
||||
missing_events = list(missing_events)
|
||||
for i in range(0, len(missing_events), batch_size):
|
||||
|
@ -421,6 +445,7 @@ class FederationClient(FederationBase):
|
|||
self.get_pdu,
|
||||
destinations=random_server_list(),
|
||||
event_id=e_id,
|
||||
room_version=room_version,
|
||||
)
|
||||
for e_id in batch
|
||||
]
|
||||
|
@ -445,13 +470,17 @@ class FederationClient(FederationBase):
|
|||
destination, room_id, event_id,
|
||||
)
|
||||
|
||||
room_version = yield self.store.get_room_version(room_id)
|
||||
format_ver = room_version_to_event_format(room_version)
|
||||
|
||||
auth_chain = [
|
||||
event_from_pdu_json(p, outlier=True)
|
||||
event_from_pdu_json(p, format_ver, outlier=True)
|
||||
for p in res["auth_chain"]
|
||||
]
|
||||
|
||||
signed_auth = yield self._check_sigs_and_hash_and_fetch(
|
||||
destination, auth_chain, outlier=True
|
||||
destination, auth_chain,
|
||||
outlier=True, room_version=room_version,
|
||||
)
|
||||
|
||||
signed_auth.sort(key=lambda e: e.depth)
|
||||
|
@ -522,6 +551,8 @@ class FederationClient(FederationBase):
|
|||
Does so by asking one of the already participating servers to create an
|
||||
event with proper context.
|
||||
|
||||
Returns a fully signed and hashed event.
|
||||
|
||||
Note that this does not append any events to any graphs.
|
||||
|
||||
Args:
|
||||
|
@ -536,8 +567,10 @@ class FederationClient(FederationBase):
|
|||
params (dict[str, str|Iterable[str]]): Query parameters to include in the
|
||||
request.
|
||||
Return:
|
||||
Deferred: resolves to a tuple of (origin (str), event (object))
|
||||
where origin is the remote homeserver which generated the event.
|
||||
Deferred[tuple[str, FrozenEvent, int]]: resolves to a tuple of
|
||||
`(origin, event, event_format)` where origin is the remote
|
||||
homeserver which generated the event, and event_format is one of
|
||||
`synapse.api.constants.EventFormatVersions`.
|
||||
|
||||
Fails with a ``SynapseError`` if the chosen remote server
|
||||
returns a 300/400 code.
|
||||
|
@ -557,6 +590,11 @@ class FederationClient(FederationBase):
|
|||
destination, room_id, user_id, membership, params,
|
||||
)
|
||||
|
||||
# Note: If not supplied, the room version may be either v1 or v2,
|
||||
# however either way the event format version will be v1.
|
||||
room_version = ret.get("room_version", RoomVersions.V1)
|
||||
event_format = room_version_to_event_format(room_version)
|
||||
|
||||
pdu_dict = ret.get("event", None)
|
||||
if not isinstance(pdu_dict, dict):
|
||||
raise InvalidResponseError("Bad 'event' field in response")
|
||||
|
@ -571,17 +609,20 @@ class FederationClient(FederationBase):
|
|||
if "prev_state" not in pdu_dict:
|
||||
pdu_dict["prev_state"] = []
|
||||
|
||||
ev = builder.EventBuilder(pdu_dict)
|
||||
ev = builder.create_local_event_from_event_dict(
|
||||
self._clock, self.hostname, self.signing_key,
|
||||
format_version=event_format, event_dict=pdu_dict,
|
||||
)
|
||||
|
||||
defer.returnValue(
|
||||
(destination, ev)
|
||||
(destination, ev, event_format)
|
||||
)
|
||||
|
||||
return self._try_destination_list(
|
||||
"make_" + membership, destinations, send_request,
|
||||
)
|
||||
|
||||
def send_join(self, destinations, pdu):
|
||||
def send_join(self, destinations, pdu, event_format_version):
|
||||
"""Sends a join event to one of a list of homeservers.
|
||||
|
||||
Doing so will cause the remote server to add the event to the graph,
|
||||
|
@ -591,6 +632,7 @@ class FederationClient(FederationBase):
|
|||
destinations (str): Candidate homeservers which are probably
|
||||
participating in the room.
|
||||
pdu (BaseEvent): event to be sent
|
||||
event_format_version (int): The event format version
|
||||
|
||||
Return:
|
||||
Deferred: resolves to a dict with members ``origin`` (a string
|
||||
|
@ -636,12 +678,12 @@ class FederationClient(FederationBase):
|
|||
logger.debug("Got content: %s", content)
|
||||
|
||||
state = [
|
||||
event_from_pdu_json(p, outlier=True)
|
||||
event_from_pdu_json(p, event_format_version, outlier=True)
|
||||
for p in content.get("state", [])
|
||||
]
|
||||
|
||||
auth_chain = [
|
||||
event_from_pdu_json(p, outlier=True)
|
||||
event_from_pdu_json(p, event_format_version, outlier=True)
|
||||
for p in content.get("auth_chain", [])
|
||||
]
|
||||
|
||||
|
@ -650,9 +692,21 @@ class FederationClient(FederationBase):
|
|||
for p in itertools.chain(state, auth_chain)
|
||||
}
|
||||
|
||||
room_version = None
|
||||
for e in state:
|
||||
if (e.type, e.state_key) == (EventTypes.Create, ""):
|
||||
room_version = e.content.get("room_version", RoomVersions.V1)
|
||||
break
|
||||
|
||||
if room_version is None:
|
||||
# If the state doesn't have a create event then the room is
|
||||
# invalid, and it would fail auth checks anyway.
|
||||
raise SynapseError(400, "No create event in state")
|
||||
|
||||
valid_pdus = yield self._check_sigs_and_hash_and_fetch(
|
||||
destination, list(pdus.values()),
|
||||
outlier=True,
|
||||
room_version=room_version,
|
||||
)
|
||||
|
||||
valid_pdus_map = {
|
||||
|
@ -690,32 +744,90 @@ class FederationClient(FederationBase):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def send_invite(self, destination, room_id, event_id, pdu):
|
||||
time_now = self._clock.time_msec()
|
||||
try:
|
||||
code, content = yield self.transport_layer.send_invite(
|
||||
destination=destination,
|
||||
room_id=room_id,
|
||||
event_id=event_id,
|
||||
content=pdu.get_pdu_json(time_now),
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
if e.code == 403:
|
||||
raise e.to_synapse_error()
|
||||
raise
|
||||
room_version = yield self.store.get_room_version(room_id)
|
||||
|
||||
content = yield self._do_send_invite(destination, pdu, room_version)
|
||||
|
||||
pdu_dict = content["event"]
|
||||
|
||||
logger.debug("Got response to send_invite: %s", pdu_dict)
|
||||
|
||||
pdu = event_from_pdu_json(pdu_dict)
|
||||
room_version = yield self.store.get_room_version(room_id)
|
||||
format_ver = room_version_to_event_format(room_version)
|
||||
|
||||
pdu = event_from_pdu_json(pdu_dict, format_ver)
|
||||
|
||||
# Check signatures are correct.
|
||||
pdu = yield self._check_sigs_and_hash(pdu)
|
||||
pdu = yield self._check_sigs_and_hash(room_version, pdu)
|
||||
|
||||
# FIXME: We should handle signature failures more gracefully.
|
||||
|
||||
defer.returnValue(pdu)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _do_send_invite(self, destination, pdu, room_version):
|
||||
"""Actually sends the invite, first trying v2 API and falling back to
|
||||
v1 API if necessary.
|
||||
|
||||
Args:
|
||||
destination (str): Target server
|
||||
pdu (FrozenEvent)
|
||||
room_version (str)
|
||||
|
||||
Returns:
|
||||
dict: The event as a dict as returned by the remote server
|
||||
"""
|
||||
time_now = self._clock.time_msec()
|
||||
|
||||
try:
|
||||
content = yield self.transport_layer.send_invite_v2(
|
||||
destination=destination,
|
||||
room_id=pdu.room_id,
|
||||
event_id=pdu.event_id,
|
||||
content={
|
||||
"event": pdu.get_pdu_json(time_now),
|
||||
"room_version": room_version,
|
||||
"invite_room_state": pdu.unsigned.get("invite_room_state", []),
|
||||
},
|
||||
)
|
||||
defer.returnValue(content)
|
||||
except HttpResponseException as e:
|
||||
if e.code in [400, 404]:
|
||||
err = e.to_synapse_error()
|
||||
|
||||
# If we receive an error response that isn't a generic error, we
|
||||
# assume that the remote understands the v2 invite API and this
|
||||
# is a legitimate error.
|
||||
if err.errcode != Codes.UNKNOWN:
|
||||
raise err
|
||||
|
||||
# Otherwise, we assume that the remote server doesn't understand
|
||||
# the v2 invite API.
|
||||
|
||||
if room_version in (RoomVersions.V1, RoomVersions.V2):
|
||||
pass # We'll fall through
|
||||
else:
|
||||
raise SynapseError(
|
||||
400,
|
||||
"User's homeserver does not support this room version",
|
||||
Codes.UNSUPPORTED_ROOM_VERSION,
|
||||
)
|
||||
elif e.code == 403:
|
||||
raise e.to_synapse_error()
|
||||
else:
|
||||
raise
|
||||
|
||||
# Didn't work, try v1 API.
|
||||
# Note the v1 API returns a tuple of `(200, content)`
|
||||
|
||||
_, content = yield self.transport_layer.send_invite_v1(
|
||||
destination=destination,
|
||||
room_id=pdu.room_id,
|
||||
event_id=pdu.event_id,
|
||||
content=pdu.get_pdu_json(time_now),
|
||||
)
|
||||
defer.returnValue(content)
|
||||
|
||||
def send_leave(self, destinations, pdu):
|
||||
"""Sends a leave event to one of a list of homeservers.
|
||||
|
||||
|
@ -785,13 +897,16 @@ class FederationClient(FederationBase):
|
|||
content=send_content,
|
||||
)
|
||||
|
||||
room_version = yield self.store.get_room_version(room_id)
|
||||
format_ver = room_version_to_event_format(room_version)
|
||||
|
||||
auth_chain = [
|
||||
event_from_pdu_json(e)
|
||||
event_from_pdu_json(e, format_ver)
|
||||
for e in content["auth_chain"]
|
||||
]
|
||||
|
||||
signed_auth = yield self._check_sigs_and_hash_and_fetch(
|
||||
destination, auth_chain, outlier=True
|
||||
destination, auth_chain, outlier=True, room_version=room_version,
|
||||
)
|
||||
|
||||
signed_auth.sort(key=lambda e: e.depth)
|
||||
|
@ -833,13 +948,16 @@ class FederationClient(FederationBase):
|
|||
timeout=timeout,
|
||||
)
|
||||
|
||||
room_version = yield self.store.get_room_version(room_id)
|
||||
format_ver = room_version_to_event_format(room_version)
|
||||
|
||||
events = [
|
||||
event_from_pdu_json(e)
|
||||
event_from_pdu_json(e, format_ver)
|
||||
for e in content.get("events", [])
|
||||
]
|
||||
|
||||
signed_events = yield self._check_sigs_and_hash_and_fetch(
|
||||
destination, events, outlier=False
|
||||
destination, events, outlier=False, room_version=room_version,
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
if not e.code == 400:
|
||||
|
|
|
@ -25,15 +25,17 @@ from twisted.internet import defer
|
|||
from twisted.internet.abstract import isIPAddress
|
||||
from twisted.python import failure
|
||||
|
||||
from synapse.api.constants import EventTypes
|
||||
from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership
|
||||
from synapse.api.errors import (
|
||||
AuthError,
|
||||
Codes,
|
||||
FederationError,
|
||||
IncompatibleRoomVersionError,
|
||||
NotFoundError,
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.crypto.event_signing import compute_event_signature
|
||||
from synapse.events import room_version_to_event_format
|
||||
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
|
||||
from synapse.federation.persistence import TransactionActions
|
||||
from synapse.federation.units import Edu, Transaction
|
||||
|
@ -147,6 +149,22 @@ class FederationServer(FederationBase):
|
|||
|
||||
logger.debug("[%s] Transaction is new", transaction.transaction_id)
|
||||
|
||||
# Reject if PDU count > 50 and EDU count > 100
|
||||
if (len(transaction.pdus) > 50
|
||||
or (hasattr(transaction, "edus") and len(transaction.edus) > 100)):
|
||||
|
||||
logger.info(
|
||||
"Transaction PDU or EDU count too large. Returning 400",
|
||||
)
|
||||
|
||||
response = {}
|
||||
yield self.transaction_actions.set_response(
|
||||
origin,
|
||||
transaction,
|
||||
400, response
|
||||
)
|
||||
defer.returnValue((400, response))
|
||||
|
||||
received_pdus_counter.inc(len(transaction.pdus))
|
||||
|
||||
origin_host, _ = parse_server_name(origin)
|
||||
|
@ -162,8 +180,29 @@ class FederationServer(FederationBase):
|
|||
p["age_ts"] = request_time - int(p["age"])
|
||||
del p["age"]
|
||||
|
||||
event = event_from_pdu_json(p)
|
||||
room_id = event.room_id
|
||||
# We try and pull out an event ID so that if later checks fail we
|
||||
# can log something sensible. We don't mandate an event ID here in
|
||||
# case future event formats get rid of the key.
|
||||
possible_event_id = p.get("event_id", "<Unknown>")
|
||||
|
||||
# Now we get the room ID so that we can check that we know the
|
||||
# version of the room.
|
||||
room_id = p.get("room_id")
|
||||
if not room_id:
|
||||
logger.info(
|
||||
"Ignoring PDU as does not have a room_id. Event ID: %s",
|
||||
possible_event_id,
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
room_version = yield self.store.get_room_version(room_id)
|
||||
format_ver = room_version_to_event_format(room_version)
|
||||
except NotFoundError:
|
||||
logger.info("Ignoring PDU for unknown room_id: %s", room_id)
|
||||
continue
|
||||
|
||||
event = event_from_pdu_json(p, format_ver)
|
||||
pdus_by_room.setdefault(room_id, []).append(event)
|
||||
|
||||
pdu_results = {}
|
||||
|
@ -201,8 +240,9 @@ class FederationServer(FederationBase):
|
|||
f = failure.Failure()
|
||||
pdu_results[event_id] = {"error": str(e)}
|
||||
logger.error(
|
||||
"Failed to handle PDU %s: %s",
|
||||
event_id, f.getTraceback().rstrip(),
|
||||
"Failed to handle PDU %s",
|
||||
event_id,
|
||||
exc_info=(f.type, f.value, f.getTracebackObject()),
|
||||
)
|
||||
|
||||
yield concurrently_execute(
|
||||
|
@ -300,7 +340,7 @@ class FederationServer(FederationBase):
|
|||
if self.hs.is_mine_id(event.event_id):
|
||||
event.signatures.update(
|
||||
compute_event_signature(
|
||||
event,
|
||||
event.get_pdu_json(),
|
||||
self.hs.hostname,
|
||||
self.hs.config.signing_key[0]
|
||||
)
|
||||
|
@ -323,11 +363,6 @@ class FederationServer(FederationBase):
|
|||
else:
|
||||
defer.returnValue((404, ""))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def on_pull_request(self, origin, versions):
|
||||
raise NotImplementedError("Pull transactions not implemented")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_query_request(self, query_type, args):
|
||||
received_queries_counter.labels(query_type).inc()
|
||||
|
@ -352,18 +387,30 @@ class FederationServer(FederationBase):
|
|||
})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_invite_request(self, origin, content):
|
||||
pdu = event_from_pdu_json(content)
|
||||
def on_invite_request(self, origin, content, room_version):
|
||||
if room_version not in KNOWN_ROOM_VERSIONS:
|
||||
raise SynapseError(
|
||||
400,
|
||||
"Homeserver does not support this room version",
|
||||
Codes.UNSUPPORTED_ROOM_VERSION,
|
||||
)
|
||||
|
||||
format_ver = room_version_to_event_format(room_version)
|
||||
|
||||
pdu = event_from_pdu_json(content, format_ver)
|
||||
origin_host, _ = parse_server_name(origin)
|
||||
yield self.check_server_matches_acl(origin_host, pdu.room_id)
|
||||
ret_pdu = yield self.handler.on_invite_request(origin, pdu)
|
||||
time_now = self._clock.time_msec()
|
||||
defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)}))
|
||||
defer.returnValue({"event": ret_pdu.get_pdu_json(time_now)})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_send_join_request(self, origin, content):
|
||||
def on_send_join_request(self, origin, content, room_id):
|
||||
logger.debug("on_send_join_request: content: %s", content)
|
||||
pdu = event_from_pdu_json(content)
|
||||
|
||||
room_version = yield self.store.get_room_version(room_id)
|
||||
format_ver = room_version_to_event_format(room_version)
|
||||
pdu = event_from_pdu_json(content, format_ver)
|
||||
|
||||
origin_host, _ = parse_server_name(origin)
|
||||
yield self.check_server_matches_acl(origin_host, pdu.room_id)
|
||||
|
@ -383,13 +430,22 @@ class FederationServer(FederationBase):
|
|||
origin_host, _ = parse_server_name(origin)
|
||||
yield self.check_server_matches_acl(origin_host, room_id)
|
||||
pdu = yield self.handler.on_make_leave_request(room_id, user_id)
|
||||
|
||||
room_version = yield self.store.get_room_version(room_id)
|
||||
|
||||
time_now = self._clock.time_msec()
|
||||
defer.returnValue({"event": pdu.get_pdu_json(time_now)})
|
||||
defer.returnValue({
|
||||
"event": pdu.get_pdu_json(time_now),
|
||||
"room_version": room_version,
|
||||
})
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_send_leave_request(self, origin, content):
|
||||
def on_send_leave_request(self, origin, content, room_id):
|
||||
logger.debug("on_send_leave_request: content: %s", content)
|
||||
pdu = event_from_pdu_json(content)
|
||||
|
||||
room_version = yield self.store.get_room_version(room_id)
|
||||
format_ver = room_version_to_event_format(room_version)
|
||||
pdu = event_from_pdu_json(content, format_ver)
|
||||
|
||||
origin_host, _ = parse_server_name(origin)
|
||||
yield self.check_server_matches_acl(origin_host, pdu.room_id)
|
||||
|
@ -435,13 +491,16 @@ class FederationServer(FederationBase):
|
|||
origin_host, _ = parse_server_name(origin)
|
||||
yield self.check_server_matches_acl(origin_host, room_id)
|
||||
|
||||
room_version = yield self.store.get_room_version(room_id)
|
||||
format_ver = room_version_to_event_format(room_version)
|
||||
|
||||
auth_chain = [
|
||||
event_from_pdu_json(e)
|
||||
event_from_pdu_json(e, format_ver)
|
||||
for e in content["auth_chain"]
|
||||
]
|
||||
|
||||
signed_auth = yield self._check_sigs_and_hash_and_fetch(
|
||||
origin, auth_chain, outlier=True
|
||||
origin, auth_chain, outlier=True, room_version=room_version,
|
||||
)
|
||||
|
||||
ret = yield self.handler.on_query_auth(
|
||||
|
@ -586,16 +645,19 @@ class FederationServer(FederationBase):
|
|||
"""
|
||||
# check that it's actually being sent from a valid destination to
|
||||
# workaround bug #1753 in 0.18.5 and 0.18.6
|
||||
if origin != get_domain_from_id(pdu.event_id):
|
||||
if origin != get_domain_from_id(pdu.sender):
|
||||
# We continue to accept join events from any server; this is
|
||||
# necessary for the federation join dance to work correctly.
|
||||
# (When we join over federation, the "helper" server is
|
||||
# responsible for sending out the join event, rather than the
|
||||
# origin. See bug #1893).
|
||||
# origin. See bug #1893. This is also true for some third party
|
||||
# invites).
|
||||
if not (
|
||||
pdu.type == 'm.room.member' and
|
||||
pdu.content and
|
||||
pdu.content.get("membership", None) == 'join'
|
||||
pdu.content.get("membership", None) in (
|
||||
Membership.JOIN, Membership.INVITE,
|
||||
)
|
||||
):
|
||||
logger.info(
|
||||
"Discarding PDU %s from invalid origin %s",
|
||||
|
@ -608,9 +670,12 @@ class FederationServer(FederationBase):
|
|||
pdu.event_id, origin
|
||||
)
|
||||
|
||||
# We've already checked that we know the room version by this point
|
||||
room_version = yield self.store.get_room_version(pdu.room_id)
|
||||
|
||||
# Check signature.
|
||||
try:
|
||||
pdu = yield self._check_sigs_and_hash(pdu)
|
||||
pdu = yield self._check_sigs_and_hash(room_version, pdu)
|
||||
except SynapseError as e:
|
||||
raise FederationError(
|
||||
"ERROR",
|
||||
|
|
|
@ -22,14 +22,17 @@ from prometheus_client import Counter
|
|||
from twisted.internet import defer
|
||||
|
||||
import synapse.metrics
|
||||
from synapse.api.errors import FederationDeniedError, HttpResponseException
|
||||
from synapse.api.errors import (
|
||||
FederationDeniedError,
|
||||
HttpResponseException,
|
||||
RequestSendFailed,
|
||||
)
|
||||
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
|
||||
from synapse.metrics import (
|
||||
LaterGauge,
|
||||
event_processing_loop_counter,
|
||||
event_processing_loop_room_count,
|
||||
events_processed_counter,
|
||||
sent_edus_counter,
|
||||
sent_transactions_counter,
|
||||
)
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
|
@ -43,10 +46,24 @@ from .units import Edu, Transaction
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
sent_pdus_destination_dist_count = Counter(
|
||||
"synapse_federation_client_sent_pdu_destinations:count", ""
|
||||
"synapse_federation_client_sent_pdu_destinations:count",
|
||||
"Number of PDUs queued for sending to one or more destinations",
|
||||
)
|
||||
|
||||
sent_pdus_destination_dist_total = Counter(
|
||||
"synapse_federation_client_sent_pdu_destinations:total", ""
|
||||
"Total number of PDUs queued for sending across all destinations",
|
||||
)
|
||||
|
||||
sent_edus_counter = Counter(
|
||||
"synapse_federation_client_sent_edus",
|
||||
"Total number of EDUs successfully sent",
|
||||
)
|
||||
|
||||
sent_edus_by_type = Counter(
|
||||
"synapse_federation_client_sent_edus_by_type",
|
||||
"Number of sent EDUs successfully sent, by event type",
|
||||
["type"],
|
||||
)
|
||||
|
||||
|
||||
|
@ -171,7 +188,7 @@ class TransactionQueue(object):
|
|||
def handle_event(event):
|
||||
# Only send events for this server.
|
||||
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
|
||||
is_mine = self.is_mine_id(event.event_id)
|
||||
is_mine = self.is_mine_id(event.sender)
|
||||
if not is_mine and send_on_behalf_of is None:
|
||||
return
|
||||
|
||||
|
@ -183,9 +200,7 @@ class TransactionQueue(object):
|
|||
# banned then it won't receive the event because it won't
|
||||
# be in the room after the ban.
|
||||
destinations = yield self.state.get_current_hosts_in_room(
|
||||
event.room_id, latest_event_ids=[
|
||||
prev_id for prev_id, _ in event.prev_events
|
||||
],
|
||||
event.room_id, latest_event_ids=event.prev_event_ids(),
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
|
@ -358,8 +373,6 @@ class TransactionQueue(object):
|
|||
logger.info("Not sending EDU to ourselves")
|
||||
return
|
||||
|
||||
sent_edus_counter.inc()
|
||||
|
||||
if key:
|
||||
self.pending_edus_keyed_by_dest.setdefault(
|
||||
destination, {}
|
||||
|
@ -494,6 +507,9 @@ class TransactionQueue(object):
|
|||
)
|
||||
if success:
|
||||
sent_transactions_counter.inc()
|
||||
sent_edus_counter.inc(len(pending_edus))
|
||||
for edu in pending_edus:
|
||||
sent_edus_by_type.labels(edu.edu_type).inc()
|
||||
# Remove the acknowledged device messages from the database
|
||||
# Only bother if we actually sent some device messages
|
||||
if device_message_edus:
|
||||
|
@ -520,11 +536,21 @@ class TransactionQueue(object):
|
|||
)
|
||||
except FederationDeniedError as e:
|
||||
logger.info(e)
|
||||
except Exception as e:
|
||||
logger.warn(
|
||||
"TX [%s] Failed to send transaction: %s",
|
||||
except HttpResponseException as e:
|
||||
logger.warning(
|
||||
"TX [%s] Received %d response to transaction: %s",
|
||||
destination, e.code, e,
|
||||
)
|
||||
except RequestSendFailed as e:
|
||||
logger.warning("TX [%s] Failed to send transaction: %s", destination, e)
|
||||
|
||||
for p, _ in pending_pdus:
|
||||
logger.info("Failed to send event %s to %s", p.event_id,
|
||||
destination)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"TX [%s] Failed to send transaction",
|
||||
destination,
|
||||
e,
|
||||
)
|
||||
for p, _ in pending_pdus:
|
||||
logger.info("Failed to send event %s to %s", p.event_id,
|
||||
|
|
|
@ -21,7 +21,7 @@ from six.moves import urllib
|
|||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.constants import Membership
|
||||
from synapse.api.urls import FEDERATION_PREFIX as PREFIX
|
||||
from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX
|
||||
from synapse.util.logutils import log_function
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -51,7 +51,7 @@ class TransportLayerClient(object):
|
|||
logger.debug("get_room_state dest=%s, room=%s",
|
||||
destination, room_id)
|
||||
|
||||
path = _create_path(PREFIX, "/state/%s/", room_id)
|
||||
path = _create_v1_path("/state/%s/", room_id)
|
||||
return self.client.get_json(
|
||||
destination, path=path, args={"event_id": event_id},
|
||||
)
|
||||
|
@ -73,7 +73,7 @@ class TransportLayerClient(object):
|
|||
logger.debug("get_room_state_ids dest=%s, room=%s",
|
||||
destination, room_id)
|
||||
|
||||
path = _create_path(PREFIX, "/state_ids/%s/", room_id)
|
||||
path = _create_v1_path("/state_ids/%s/", room_id)
|
||||
return self.client.get_json(
|
||||
destination, path=path, args={"event_id": event_id},
|
||||
)
|
||||
|
@ -95,7 +95,7 @@ class TransportLayerClient(object):
|
|||
logger.debug("get_pdu dest=%s, event_id=%s",
|
||||
destination, event_id)
|
||||
|
||||
path = _create_path(PREFIX, "/event/%s/", event_id)
|
||||
path = _create_v1_path("/event/%s/", event_id)
|
||||
return self.client.get_json(destination, path=path, timeout=timeout)
|
||||
|
||||
@log_function
|
||||
|
@ -121,7 +121,7 @@ class TransportLayerClient(object):
|
|||
# TODO: raise?
|
||||
return
|
||||
|
||||
path = _create_path(PREFIX, "/backfill/%s/", room_id)
|
||||
path = _create_v1_path("/backfill/%s/", room_id)
|
||||
|
||||
args = {
|
||||
"v": event_tuples,
|
||||
|
@ -167,7 +167,7 @@ class TransportLayerClient(object):
|
|||
# generated by the json_data_callback.
|
||||
json_data = transaction.get_dict()
|
||||
|
||||
path = _create_path(PREFIX, "/send/%s/", transaction.transaction_id)
|
||||
path = _create_v1_path("/send/%s/", transaction.transaction_id)
|
||||
|
||||
response = yield self.client.put_json(
|
||||
transaction.destination,
|
||||
|
@ -184,7 +184,7 @@ class TransportLayerClient(object):
|
|||
@log_function
|
||||
def make_query(self, destination, query_type, args, retry_on_dns_fail,
|
||||
ignore_backoff=False):
|
||||
path = _create_path(PREFIX, "/query/%s", query_type)
|
||||
path = _create_v1_path("/query/%s", query_type)
|
||||
|
||||
content = yield self.client.get_json(
|
||||
destination=destination,
|
||||
|
@ -231,7 +231,7 @@ class TransportLayerClient(object):
|
|||
"make_membership_event called with membership='%s', must be one of %s" %
|
||||
(membership, ",".join(valid_memberships))
|
||||
)
|
||||
path = _create_path(PREFIX, "/make_%s/%s/%s", membership, room_id, user_id)
|
||||
path = _create_v1_path("/make_%s/%s/%s", membership, room_id, user_id)
|
||||
|
||||
ignore_backoff = False
|
||||
retry_on_dns_fail = False
|
||||
|
@ -258,7 +258,7 @@ class TransportLayerClient(object):
|
|||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def send_join(self, destination, room_id, event_id, content):
|
||||
path = _create_path(PREFIX, "/send_join/%s/%s", room_id, event_id)
|
||||
path = _create_v1_path("/send_join/%s/%s", room_id, event_id)
|
||||
|
||||
response = yield self.client.put_json(
|
||||
destination=destination,
|
||||
|
@ -271,7 +271,7 @@ class TransportLayerClient(object):
|
|||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def send_leave(self, destination, room_id, event_id, content):
|
||||
path = _create_path(PREFIX, "/send_leave/%s/%s", room_id, event_id)
|
||||
path = _create_v1_path("/send_leave/%s/%s", room_id, event_id)
|
||||
|
||||
response = yield self.client.put_json(
|
||||
destination=destination,
|
||||
|
@ -289,8 +289,22 @@ class TransportLayerClient(object):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def send_invite(self, destination, room_id, event_id, content):
|
||||
path = _create_path(PREFIX, "/invite/%s/%s", room_id, event_id)
|
||||
def send_invite_v1(self, destination, room_id, event_id, content):
|
||||
path = _create_v1_path("/invite/%s/%s", room_id, event_id)
|
||||
|
||||
response = yield self.client.put_json(
|
||||
destination=destination,
|
||||
path=path,
|
||||
data=content,
|
||||
ignore_backoff=True,
|
||||
)
|
||||
|
||||
defer.returnValue(response)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def send_invite_v2(self, destination, room_id, event_id, content):
|
||||
path = _create_v2_path("/invite/%s/%s", room_id, event_id)
|
||||
|
||||
response = yield self.client.put_json(
|
||||
destination=destination,
|
||||
|
@ -306,7 +320,7 @@ class TransportLayerClient(object):
|
|||
def get_public_rooms(self, remote_server, limit, since_token,
|
||||
search_filter=None, include_all_networks=False,
|
||||
third_party_instance_id=None):
|
||||
path = PREFIX + "/publicRooms"
|
||||
path = _create_v1_path("/publicRooms")
|
||||
|
||||
args = {
|
||||
"include_all_networks": "true" if include_all_networks else "false",
|
||||
|
@ -332,7 +346,7 @@ class TransportLayerClient(object):
|
|||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def exchange_third_party_invite(self, destination, room_id, event_dict):
|
||||
path = _create_path(PREFIX, "/exchange_third_party_invite/%s", room_id,)
|
||||
path = _create_v1_path("/exchange_third_party_invite/%s", room_id,)
|
||||
|
||||
response = yield self.client.put_json(
|
||||
destination=destination,
|
||||
|
@ -345,7 +359,7 @@ class TransportLayerClient(object):
|
|||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def get_event_auth(self, destination, room_id, event_id):
|
||||
path = _create_path(PREFIX, "/event_auth/%s/%s", room_id, event_id)
|
||||
path = _create_v1_path("/event_auth/%s/%s", room_id, event_id)
|
||||
|
||||
content = yield self.client.get_json(
|
||||
destination=destination,
|
||||
|
@ -357,7 +371,7 @@ class TransportLayerClient(object):
|
|||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def send_query_auth(self, destination, room_id, event_id, content):
|
||||
path = _create_path(PREFIX, "/query_auth/%s/%s", room_id, event_id)
|
||||
path = _create_v1_path("/query_auth/%s/%s", room_id, event_id)
|
||||
|
||||
content = yield self.client.post_json(
|
||||
destination=destination,
|
||||
|
@ -392,7 +406,7 @@ class TransportLayerClient(object):
|
|||
Returns:
|
||||
A dict containg the device keys.
|
||||
"""
|
||||
path = PREFIX + "/user/keys/query"
|
||||
path = _create_v1_path("/user/keys/query")
|
||||
|
||||
content = yield self.client.post_json(
|
||||
destination=destination,
|
||||
|
@ -419,7 +433,7 @@ class TransportLayerClient(object):
|
|||
Returns:
|
||||
A dict containg the device keys.
|
||||
"""
|
||||
path = _create_path(PREFIX, "/user/devices/%s", user_id)
|
||||
path = _create_v1_path("/user/devices/%s", user_id)
|
||||
|
||||
content = yield self.client.get_json(
|
||||
destination=destination,
|
||||
|
@ -455,7 +469,7 @@ class TransportLayerClient(object):
|
|||
A dict containg the one-time keys.
|
||||
"""
|
||||
|
||||
path = PREFIX + "/user/keys/claim"
|
||||
path = _create_v1_path("/user/keys/claim")
|
||||
|
||||
content = yield self.client.post_json(
|
||||
destination=destination,
|
||||
|
@ -469,7 +483,7 @@ class TransportLayerClient(object):
|
|||
@log_function
|
||||
def get_missing_events(self, destination, room_id, earliest_events,
|
||||
latest_events, limit, min_depth, timeout):
|
||||
path = _create_path(PREFIX, "/get_missing_events/%s", room_id,)
|
||||
path = _create_v1_path("/get_missing_events/%s", room_id,)
|
||||
|
||||
content = yield self.client.post_json(
|
||||
destination=destination,
|
||||
|
@ -489,7 +503,7 @@ class TransportLayerClient(object):
|
|||
def get_group_profile(self, destination, group_id, requester_user_id):
|
||||
"""Get a group profile
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/profile", group_id,)
|
||||
path = _create_v1_path("/groups/%s/profile", group_id,)
|
||||
|
||||
return self.client.get_json(
|
||||
destination=destination,
|
||||
|
@ -508,7 +522,7 @@ class TransportLayerClient(object):
|
|||
requester_user_id (str)
|
||||
content (dict): The new profile of the group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/profile", group_id,)
|
||||
path = _create_v1_path("/groups/%s/profile", group_id,)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
|
@ -522,7 +536,7 @@ class TransportLayerClient(object):
|
|||
def get_group_summary(self, destination, group_id, requester_user_id):
|
||||
"""Get a group summary
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/summary", group_id,)
|
||||
path = _create_v1_path("/groups/%s/summary", group_id,)
|
||||
|
||||
return self.client.get_json(
|
||||
destination=destination,
|
||||
|
@ -535,7 +549,7 @@ class TransportLayerClient(object):
|
|||
def get_rooms_in_group(self, destination, group_id, requester_user_id):
|
||||
"""Get all rooms in a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/rooms", group_id,)
|
||||
path = _create_v1_path("/groups/%s/rooms", group_id,)
|
||||
|
||||
return self.client.get_json(
|
||||
destination=destination,
|
||||
|
@ -548,7 +562,7 @@ class TransportLayerClient(object):
|
|||
content):
|
||||
"""Add a room to a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/room/%s", group_id, room_id,)
|
||||
path = _create_v1_path("/groups/%s/room/%s", group_id, room_id,)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
|
@ -562,8 +576,8 @@ class TransportLayerClient(object):
|
|||
config_key, content):
|
||||
"""Update room in group
|
||||
"""
|
||||
path = _create_path(
|
||||
PREFIX, "/groups/%s/room/%s/config/%s",
|
||||
path = _create_v1_path(
|
||||
"/groups/%s/room/%s/config/%s",
|
||||
group_id, room_id, config_key,
|
||||
)
|
||||
|
||||
|
@ -578,7 +592,7 @@ class TransportLayerClient(object):
|
|||
def remove_room_from_group(self, destination, group_id, requester_user_id, room_id):
|
||||
"""Remove a room from a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/room/%s", group_id, room_id,)
|
||||
path = _create_v1_path("/groups/%s/room/%s", group_id, room_id,)
|
||||
|
||||
return self.client.delete_json(
|
||||
destination=destination,
|
||||
|
@ -591,7 +605,7 @@ class TransportLayerClient(object):
|
|||
def get_users_in_group(self, destination, group_id, requester_user_id):
|
||||
"""Get users in a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/users", group_id,)
|
||||
path = _create_v1_path("/groups/%s/users", group_id,)
|
||||
|
||||
return self.client.get_json(
|
||||
destination=destination,
|
||||
|
@ -604,7 +618,7 @@ class TransportLayerClient(object):
|
|||
def get_invited_users_in_group(self, destination, group_id, requester_user_id):
|
||||
"""Get users that have been invited to a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/invited_users", group_id,)
|
||||
path = _create_v1_path("/groups/%s/invited_users", group_id,)
|
||||
|
||||
return self.client.get_json(
|
||||
destination=destination,
|
||||
|
@ -617,8 +631,8 @@ class TransportLayerClient(object):
|
|||
def accept_group_invite(self, destination, group_id, user_id, content):
|
||||
"""Accept a group invite
|
||||
"""
|
||||
path = _create_path(
|
||||
PREFIX, "/groups/%s/users/%s/accept_invite",
|
||||
path = _create_v1_path(
|
||||
"/groups/%s/users/%s/accept_invite",
|
||||
group_id, user_id,
|
||||
)
|
||||
|
||||
|
@ -633,7 +647,7 @@ class TransportLayerClient(object):
|
|||
def join_group(self, destination, group_id, user_id, content):
|
||||
"""Attempts to join a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/users/%s/join", group_id, user_id)
|
||||
path = _create_v1_path("/groups/%s/users/%s/join", group_id, user_id)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
|
@ -646,7 +660,7 @@ class TransportLayerClient(object):
|
|||
def invite_to_group(self, destination, group_id, user_id, requester_user_id, content):
|
||||
"""Invite a user to a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/users/%s/invite", group_id, user_id)
|
||||
path = _create_v1_path("/groups/%s/users/%s/invite", group_id, user_id)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
|
@ -662,7 +676,7 @@ class TransportLayerClient(object):
|
|||
invited.
|
||||
"""
|
||||
|
||||
path = _create_path(PREFIX, "/groups/local/%s/users/%s/invite", group_id, user_id)
|
||||
path = _create_v1_path("/groups/local/%s/users/%s/invite", group_id, user_id)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
|
@ -676,7 +690,7 @@ class TransportLayerClient(object):
|
|||
user_id, content):
|
||||
"""Remove a user fron a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/users/%s/remove", group_id, user_id)
|
||||
path = _create_v1_path("/groups/%s/users/%s/remove", group_id, user_id)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
|
@ -693,7 +707,7 @@ class TransportLayerClient(object):
|
|||
kicked from the group.
|
||||
"""
|
||||
|
||||
path = _create_path(PREFIX, "/groups/local/%s/users/%s/remove", group_id, user_id)
|
||||
path = _create_v1_path("/groups/local/%s/users/%s/remove", group_id, user_id)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
|
@ -708,7 +722,7 @@ class TransportLayerClient(object):
|
|||
the attestations
|
||||
"""
|
||||
|
||||
path = _create_path(PREFIX, "/groups/%s/renew_attestation/%s", group_id, user_id)
|
||||
path = _create_v1_path("/groups/%s/renew_attestation/%s", group_id, user_id)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
|
@ -723,12 +737,12 @@ class TransportLayerClient(object):
|
|||
"""Update a room entry in a group summary
|
||||
"""
|
||||
if category_id:
|
||||
path = _create_path(
|
||||
PREFIX, "/groups/%s/summary/categories/%s/rooms/%s",
|
||||
path = _create_v1_path(
|
||||
"/groups/%s/summary/categories/%s/rooms/%s",
|
||||
group_id, category_id, room_id,
|
||||
)
|
||||
else:
|
||||
path = _create_path(PREFIX, "/groups/%s/summary/rooms/%s", group_id, room_id,)
|
||||
path = _create_v1_path("/groups/%s/summary/rooms/%s", group_id, room_id,)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
|
@ -744,12 +758,12 @@ class TransportLayerClient(object):
|
|||
"""Delete a room entry in a group summary
|
||||
"""
|
||||
if category_id:
|
||||
path = _create_path(
|
||||
PREFIX + "/groups/%s/summary/categories/%s/rooms/%s",
|
||||
path = _create_v1_path(
|
||||
"/groups/%s/summary/categories/%s/rooms/%s",
|
||||
group_id, category_id, room_id,
|
||||
)
|
||||
else:
|
||||
path = _create_path(PREFIX, "/groups/%s/summary/rooms/%s", group_id, room_id,)
|
||||
path = _create_v1_path("/groups/%s/summary/rooms/%s", group_id, room_id,)
|
||||
|
||||
return self.client.delete_json(
|
||||
destination=destination,
|
||||
|
@ -762,7 +776,7 @@ class TransportLayerClient(object):
|
|||
def get_group_categories(self, destination, group_id, requester_user_id):
|
||||
"""Get all categories in a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/categories", group_id,)
|
||||
path = _create_v1_path("/groups/%s/categories", group_id,)
|
||||
|
||||
return self.client.get_json(
|
||||
destination=destination,
|
||||
|
@ -775,7 +789,7 @@ class TransportLayerClient(object):
|
|||
def get_group_category(self, destination, group_id, requester_user_id, category_id):
|
||||
"""Get category info in a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
|
||||
path = _create_v1_path("/groups/%s/categories/%s", group_id, category_id,)
|
||||
|
||||
return self.client.get_json(
|
||||
destination=destination,
|
||||
|
@ -789,7 +803,7 @@ class TransportLayerClient(object):
|
|||
content):
|
||||
"""Update a category in a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
|
||||
path = _create_v1_path("/groups/%s/categories/%s", group_id, category_id,)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
|
@ -804,7 +818,7 @@ class TransportLayerClient(object):
|
|||
category_id):
|
||||
"""Delete a category in a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/categories/%s", group_id, category_id,)
|
||||
path = _create_v1_path("/groups/%s/categories/%s", group_id, category_id,)
|
||||
|
||||
return self.client.delete_json(
|
||||
destination=destination,
|
||||
|
@ -817,7 +831,7 @@ class TransportLayerClient(object):
|
|||
def get_group_roles(self, destination, group_id, requester_user_id):
|
||||
"""Get all roles in a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/roles", group_id,)
|
||||
path = _create_v1_path("/groups/%s/roles", group_id,)
|
||||
|
||||
return self.client.get_json(
|
||||
destination=destination,
|
||||
|
@ -830,7 +844,7 @@ class TransportLayerClient(object):
|
|||
def get_group_role(self, destination, group_id, requester_user_id, role_id):
|
||||
"""Get a roles info
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
|
||||
path = _create_v1_path("/groups/%s/roles/%s", group_id, role_id,)
|
||||
|
||||
return self.client.get_json(
|
||||
destination=destination,
|
||||
|
@ -844,7 +858,7 @@ class TransportLayerClient(object):
|
|||
content):
|
||||
"""Update a role in a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
|
||||
path = _create_v1_path("/groups/%s/roles/%s", group_id, role_id,)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
|
@ -858,7 +872,7 @@ class TransportLayerClient(object):
|
|||
def delete_group_role(self, destination, group_id, requester_user_id, role_id):
|
||||
"""Delete a role in a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/roles/%s", group_id, role_id,)
|
||||
path = _create_v1_path("/groups/%s/roles/%s", group_id, role_id,)
|
||||
|
||||
return self.client.delete_json(
|
||||
destination=destination,
|
||||
|
@ -873,12 +887,12 @@ class TransportLayerClient(object):
|
|||
"""Update a users entry in a group
|
||||
"""
|
||||
if role_id:
|
||||
path = _create_path(
|
||||
PREFIX, "/groups/%s/summary/roles/%s/users/%s",
|
||||
path = _create_v1_path(
|
||||
"/groups/%s/summary/roles/%s/users/%s",
|
||||
group_id, role_id, user_id,
|
||||
)
|
||||
else:
|
||||
path = _create_path(PREFIX, "/groups/%s/summary/users/%s", group_id, user_id,)
|
||||
path = _create_v1_path("/groups/%s/summary/users/%s", group_id, user_id,)
|
||||
|
||||
return self.client.post_json(
|
||||
destination=destination,
|
||||
|
@ -893,7 +907,7 @@ class TransportLayerClient(object):
|
|||
content):
|
||||
"""Sets the join policy for a group
|
||||
"""
|
||||
path = _create_path(PREFIX, "/groups/%s/settings/m.join_policy", group_id,)
|
||||
path = _create_v1_path("/groups/%s/settings/m.join_policy", group_id,)
|
||||
|
||||
return self.client.put_json(
|
||||
destination=destination,
|
||||
|
@ -909,12 +923,12 @@ class TransportLayerClient(object):
|
|||
"""Delete a users entry in a group
|
||||
"""
|
||||
if role_id:
|
||||
path = _create_path(
|
||||
PREFIX, "/groups/%s/summary/roles/%s/users/%s",
|
||||
path = _create_v1_path(
|
||||
"/groups/%s/summary/roles/%s/users/%s",
|
||||
group_id, role_id, user_id,
|
||||
)
|
||||
else:
|
||||
path = _create_path(PREFIX, "/groups/%s/summary/users/%s", group_id, user_id,)
|
||||
path = _create_v1_path("/groups/%s/summary/users/%s", group_id, user_id,)
|
||||
|
||||
return self.client.delete_json(
|
||||
destination=destination,
|
||||
|
@ -927,7 +941,7 @@ class TransportLayerClient(object):
|
|||
"""Get the groups a list of users are publicising
|
||||
"""
|
||||
|
||||
path = PREFIX + "/get_groups_publicised"
|
||||
path = _create_v1_path("/get_groups_publicised")
|
||||
|
||||
content = {"user_ids": user_ids}
|
||||
|
||||
|
@ -939,20 +953,43 @@ class TransportLayerClient(object):
|
|||
)
|
||||
|
||||
|
||||
def _create_path(prefix, path, *args):
|
||||
"""Creates a path from the prefix, path template and args. Ensures that
|
||||
all args are url encoded.
|
||||
def _create_v1_path(path, *args):
|
||||
"""Creates a path against V1 federation API from the path template and
|
||||
args. Ensures that all args are url encoded.
|
||||
|
||||
Example:
|
||||
|
||||
_create_path(PREFIX, "/event/%s/", event_id)
|
||||
_create_v1_path("/event/%s/", event_id)
|
||||
|
||||
Args:
|
||||
prefix (str)
|
||||
path (str): String template for the path
|
||||
args: ([str]): Args to insert into path. Each arg will be url encoded
|
||||
|
||||
Returns:
|
||||
str
|
||||
"""
|
||||
return prefix + path % tuple(urllib.parse.quote(arg, "") for arg in args)
|
||||
return (
|
||||
FEDERATION_V1_PREFIX
|
||||
+ path % tuple(urllib.parse.quote(arg, "") for arg in args)
|
||||
)
|
||||
|
||||
|
||||
def _create_v2_path(path, *args):
|
||||
"""Creates a path against V2 federation API from the path template and
|
||||
args. Ensures that all args are url encoded.
|
||||
|
||||
Example:
|
||||
|
||||
_create_v2_path("/event/%s/", event_id)
|
||||
|
||||
Args:
|
||||
path (str): String template for the path
|
||||
args: ([str]): Args to insert into path. Each arg will be url encoded
|
||||
|
||||
Returns:
|
||||
str
|
||||
"""
|
||||
return (
|
||||
FEDERATION_V2_PREFIX
|
||||
+ path % tuple(urllib.parse.quote(arg, "") for arg in args)
|
||||
)
|
||||
|
|
|
@ -21,8 +21,9 @@ import re
|
|||
from twisted.internet import defer
|
||||
|
||||
import synapse
|
||||
from synapse.api.constants import RoomVersions
|
||||
from synapse.api.errors import Codes, FederationDeniedError, SynapseError
|
||||
from synapse.api.urls import FEDERATION_PREFIX as PREFIX
|
||||
from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX
|
||||
from synapse.http.endpoint import parse_and_validate_server_name
|
||||
from synapse.http.server import JsonResource
|
||||
from synapse.http.servlet import (
|
||||
|
@ -42,9 +43,20 @@ logger = logging.getLogger(__name__)
|
|||
class TransportLayerServer(JsonResource):
|
||||
"""Handles incoming federation HTTP requests"""
|
||||
|
||||
def __init__(self, hs):
|
||||
def __init__(self, hs, servlet_groups=None):
|
||||
"""Initialize the TransportLayerServer
|
||||
|
||||
Will by default register all servlets. For custom behaviour, pass in
|
||||
a list of servlet_groups to register.
|
||||
|
||||
Args:
|
||||
hs (synapse.server.HomeServer): homeserver
|
||||
servlet_groups (list[str], optional): List of servlet groups to register.
|
||||
Defaults to ``DEFAULT_SERVLET_GROUPS``.
|
||||
"""
|
||||
self.hs = hs
|
||||
self.clock = hs.get_clock()
|
||||
self.servlet_groups = servlet_groups
|
||||
|
||||
super(TransportLayerServer, self).__init__(hs, canonical_json=False)
|
||||
|
||||
|
@ -66,6 +78,7 @@ class TransportLayerServer(JsonResource):
|
|||
resource=self,
|
||||
ratelimiter=self.ratelimiter,
|
||||
authenticator=self.authenticator,
|
||||
servlet_groups=self.servlet_groups,
|
||||
)
|
||||
|
||||
|
||||
|
@ -227,6 +240,8 @@ class BaseFederationServlet(object):
|
|||
"""
|
||||
REQUIRE_AUTH = True
|
||||
|
||||
PREFIX = FEDERATION_V1_PREFIX # Allows specifying the API version
|
||||
|
||||
def __init__(self, handler, authenticator, ratelimiter, server_name):
|
||||
self.handler = handler
|
||||
self.authenticator = authenticator
|
||||
|
@ -286,7 +301,7 @@ class BaseFederationServlet(object):
|
|||
return new_func
|
||||
|
||||
def register(self, server):
|
||||
pattern = re.compile("^" + PREFIX + self.PATH + "$")
|
||||
pattern = re.compile("^" + self.PREFIX + self.PATH + "$")
|
||||
|
||||
for method in ("GET", "PUT", "POST"):
|
||||
code = getattr(self, "on_%s" % (method), None)
|
||||
|
@ -362,14 +377,6 @@ class FederationSendServlet(BaseFederationServlet):
|
|||
defer.returnValue((code, response))
|
||||
|
||||
|
||||
class FederationPullServlet(BaseFederationServlet):
|
||||
PATH = "/pull/"
|
||||
|
||||
# This is for when someone asks us for everything since version X
|
||||
def on_GET(self, origin, content, query):
|
||||
return self.handler.on_pull_request(query["origin"][0], query["v"])
|
||||
|
||||
|
||||
class FederationEventServlet(BaseFederationServlet):
|
||||
PATH = "/event/(?P<event_id>[^/]*)/"
|
||||
|
||||
|
@ -474,7 +481,7 @@ class FederationSendLeaveServlet(BaseFederationServlet):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def on_PUT(self, origin, content, query, room_id, event_id):
|
||||
content = yield self.handler.on_send_leave_request(origin, content)
|
||||
content = yield self.handler.on_send_leave_request(origin, content, room_id)
|
||||
defer.returnValue((200, content))
|
||||
|
||||
|
||||
|
@ -492,18 +499,50 @@ class FederationSendJoinServlet(BaseFederationServlet):
|
|||
def on_PUT(self, origin, content, query, context, event_id):
|
||||
# TODO(paul): assert that context/event_id parsed from path actually
|
||||
# match those given in content
|
||||
content = yield self.handler.on_send_join_request(origin, content)
|
||||
content = yield self.handler.on_send_join_request(origin, content, context)
|
||||
defer.returnValue((200, content))
|
||||
|
||||
|
||||
class FederationInviteServlet(BaseFederationServlet):
|
||||
class FederationV1InviteServlet(BaseFederationServlet):
|
||||
PATH = "/invite/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_PUT(self, origin, content, query, context, event_id):
|
||||
# We don't get a room version, so we have to assume its EITHER v1 or
|
||||
# v2. This is "fine" as the only difference between V1 and V2 is the
|
||||
# state resolution algorithm, and we don't use that for processing
|
||||
# invites
|
||||
content = yield self.handler.on_invite_request(
|
||||
origin, content, room_version=RoomVersions.V1,
|
||||
)
|
||||
|
||||
# V1 federation API is defined to return a content of `[200, {...}]`
|
||||
# due to a historical bug.
|
||||
defer.returnValue((200, (200, content)))
|
||||
|
||||
|
||||
class FederationV2InviteServlet(BaseFederationServlet):
|
||||
PATH = "/invite/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
|
||||
|
||||
PREFIX = FEDERATION_V2_PREFIX
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_PUT(self, origin, content, query, context, event_id):
|
||||
# TODO(paul): assert that context/event_id parsed from path actually
|
||||
# match those given in content
|
||||
content = yield self.handler.on_invite_request(origin, content)
|
||||
|
||||
room_version = content["room_version"]
|
||||
event = content["event"]
|
||||
invite_room_state = content["invite_room_state"]
|
||||
|
||||
# Synapse expects invite_room_state to be in unsigned, as it is in v1
|
||||
# API
|
||||
|
||||
event.setdefault("unsigned", {})["invite_room_state"] = invite_room_state
|
||||
|
||||
content = yield self.handler.on_invite_request(
|
||||
origin, event, room_version=room_version,
|
||||
)
|
||||
defer.returnValue((200, content))
|
||||
|
||||
|
||||
|
@ -1262,7 +1301,6 @@ class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
|
|||
|
||||
FEDERATION_SERVLET_CLASSES = (
|
||||
FederationSendServlet,
|
||||
FederationPullServlet,
|
||||
FederationEventServlet,
|
||||
FederationStateServlet,
|
||||
FederationStateIdsServlet,
|
||||
|
@ -1273,7 +1311,8 @@ FEDERATION_SERVLET_CLASSES = (
|
|||
FederationEventServlet,
|
||||
FederationSendJoinServlet,
|
||||
FederationSendLeaveServlet,
|
||||
FederationInviteServlet,
|
||||
FederationV1InviteServlet,
|
||||
FederationV2InviteServlet,
|
||||
FederationQueryAuthServlet,
|
||||
FederationGetMissingEventsServlet,
|
||||
FederationEventAuthServlet,
|
||||
|
@ -1282,10 +1321,12 @@ FEDERATION_SERVLET_CLASSES = (
|
|||
FederationClientKeysClaimServlet,
|
||||
FederationThirdPartyInviteExchangeServlet,
|
||||
On3pidBindServlet,
|
||||
OpenIdUserInfo,
|
||||
FederationVersionServlet,
|
||||
)
|
||||
|
||||
OPENID_SERVLET_CLASSES = (
|
||||
OpenIdUserInfo,
|
||||
)
|
||||
|
||||
ROOM_LIST_CLASSES = (
|
||||
PublicRoomList,
|
||||
|
@ -1324,44 +1365,83 @@ GROUP_ATTESTATION_SERVLET_CLASSES = (
|
|||
FederationGroupsRenewAttestaionServlet,
|
||||
)
|
||||
|
||||
DEFAULT_SERVLET_GROUPS = (
|
||||
"federation",
|
||||
"room_list",
|
||||
"group_server",
|
||||
"group_local",
|
||||
"group_attestation",
|
||||
"openid",
|
||||
)
|
||||
|
||||
def register_servlets(hs, resource, authenticator, ratelimiter):
|
||||
for servletclass in FEDERATION_SERVLET_CLASSES:
|
||||
servletclass(
|
||||
handler=hs.get_federation_server(),
|
||||
authenticator=authenticator,
|
||||
ratelimiter=ratelimiter,
|
||||
server_name=hs.hostname,
|
||||
).register(resource)
|
||||
|
||||
for servletclass in ROOM_LIST_CLASSES:
|
||||
servletclass(
|
||||
handler=hs.get_room_list_handler(),
|
||||
authenticator=authenticator,
|
||||
ratelimiter=ratelimiter,
|
||||
server_name=hs.hostname,
|
||||
).register(resource)
|
||||
def register_servlets(hs, resource, authenticator, ratelimiter, servlet_groups=None):
|
||||
"""Initialize and register servlet classes.
|
||||
|
||||
for servletclass in GROUP_SERVER_SERVLET_CLASSES:
|
||||
servletclass(
|
||||
handler=hs.get_groups_server_handler(),
|
||||
authenticator=authenticator,
|
||||
ratelimiter=ratelimiter,
|
||||
server_name=hs.hostname,
|
||||
).register(resource)
|
||||
Will by default register all servlets. For custom behaviour, pass in
|
||||
a list of servlet_groups to register.
|
||||
|
||||
for servletclass in GROUP_LOCAL_SERVLET_CLASSES:
|
||||
servletclass(
|
||||
handler=hs.get_groups_local_handler(),
|
||||
authenticator=authenticator,
|
||||
ratelimiter=ratelimiter,
|
||||
server_name=hs.hostname,
|
||||
).register(resource)
|
||||
Args:
|
||||
hs (synapse.server.HomeServer): homeserver
|
||||
resource (TransportLayerServer): resource class to register to
|
||||
authenticator (Authenticator): authenticator to use
|
||||
ratelimiter (util.ratelimitutils.FederationRateLimiter): ratelimiter to use
|
||||
servlet_groups (list[str], optional): List of servlet groups to register.
|
||||
Defaults to ``DEFAULT_SERVLET_GROUPS``.
|
||||
"""
|
||||
if not servlet_groups:
|
||||
servlet_groups = DEFAULT_SERVLET_GROUPS
|
||||
|
||||
for servletclass in GROUP_ATTESTATION_SERVLET_CLASSES:
|
||||
servletclass(
|
||||
handler=hs.get_groups_attestation_renewer(),
|
||||
authenticator=authenticator,
|
||||
ratelimiter=ratelimiter,
|
||||
server_name=hs.hostname,
|
||||
).register(resource)
|
||||
if "federation" in servlet_groups:
|
||||
for servletclass in FEDERATION_SERVLET_CLASSES:
|
||||
servletclass(
|
||||
handler=hs.get_federation_server(),
|
||||
authenticator=authenticator,
|
||||
ratelimiter=ratelimiter,
|
||||
server_name=hs.hostname,
|
||||
).register(resource)
|
||||
|
||||
if "openid" in servlet_groups:
|
||||
for servletclass in OPENID_SERVLET_CLASSES:
|
||||
servletclass(
|
||||
handler=hs.get_federation_server(),
|
||||
authenticator=authenticator,
|
||||
ratelimiter=ratelimiter,
|
||||
server_name=hs.hostname,
|
||||
).register(resource)
|
||||
|
||||
if "room_list" in servlet_groups:
|
||||
for servletclass in ROOM_LIST_CLASSES:
|
||||
servletclass(
|
||||
handler=hs.get_room_list_handler(),
|
||||
authenticator=authenticator,
|
||||
ratelimiter=ratelimiter,
|
||||
server_name=hs.hostname,
|
||||
).register(resource)
|
||||
|
||||
if "group_server" in servlet_groups:
|
||||
for servletclass in GROUP_SERVER_SERVLET_CLASSES:
|
||||
servletclass(
|
||||
handler=hs.get_groups_server_handler(),
|
||||
authenticator=authenticator,
|
||||
ratelimiter=ratelimiter,
|
||||
server_name=hs.hostname,
|
||||
).register(resource)
|
||||
|
||||
if "group_local" in servlet_groups:
|
||||
for servletclass in GROUP_LOCAL_SERVLET_CLASSES:
|
||||
servletclass(
|
||||
handler=hs.get_groups_local_handler(),
|
||||
authenticator=authenticator,
|
||||
ratelimiter=ratelimiter,
|
||||
server_name=hs.hostname,
|
||||
).register(resource)
|
||||
|
||||
if "group_attestation" in servlet_groups:
|
||||
for servletclass in GROUP_ATTESTATION_SERVLET_CLASSES:
|
||||
servletclass(
|
||||
handler=hs.get_groups_attestation_renewer(),
|
||||
authenticator=authenticator,
|
||||
ratelimiter=ratelimiter,
|
||||
server_name=hs.hostname,
|
||||
).register(resource)
|
||||
|
|
|
@ -117,9 +117,6 @@ class Transaction(JsonEncodedObject):
|
|||
"Require 'transaction_id' to construct a Transaction"
|
||||
)
|
||||
|
||||
for p in pdus:
|
||||
p.transaction_id = kwargs["transaction_id"]
|
||||
|
||||
kwargs["pdus"] = [p.get_pdu_json() for p in pdus]
|
||||
|
||||
return Transaction(**kwargs)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue