Implementation of MSC2314 (#6176)

This commit is contained in:
Amber Brown 2019-11-28 08:54:07 +11:00 committed by GitHub
parent 0d27aba900
commit 0f87b912ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 174 additions and 65 deletions

1
changelog.d/6176.feature Normal file
View File

@ -0,0 +1 @@
Implement the `/_matrix/federation/unstable/net.atleastfornow/state/<context>` API as drafted in MSC2314.

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
# Copyright 2019 Matrix.org Federation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -73,6 +74,7 @@ class FederationServer(FederationBase):
self.auth = hs.get_auth()
self.handler = hs.get_handlers().federation_handler
self.state = hs.get_state_handler()
self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")
@ -264,9 +266,6 @@ class FederationServer(FederationBase):
await self.registry.on_edu(edu_type, origin, content)
async def on_context_state_request(self, origin, room_id, event_id):
if not event_id:
raise NotImplementedError("Specify an event")
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)
@ -280,13 +279,18 @@ class FederationServer(FederationBase):
# - but that's non-trivial to get right, and anyway somewhat defeats
# the point of the linearizer.
with (await self._server_linearizer.queue((origin, room_id))):
resp = await self._state_resp_cache.wrap(
(room_id, event_id),
self._on_context_state_request_compute,
room_id,
event_id,
resp = dict(
await self._state_resp_cache.wrap(
(room_id, event_id),
self._on_context_state_request_compute,
room_id,
event_id,
)
)
room_version = await self.store.get_room_version(room_id)
resp["room_version"] = room_version
return 200, resp
async def on_state_ids_request(self, origin, room_id, event_id):
@ -306,7 +310,11 @@ class FederationServer(FederationBase):
return 200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids}
async def _on_context_state_request_compute(self, room_id, event_id):
pdus = await self.handler.get_state_for_pdu(room_id, event_id)
if event_id:
pdus = await self.handler.get_state_for_pdu(room_id, event_id)
else:
pdus = (await self.state.get_current_state(room_id)).values()
auth_chain = await self.store.get_auth_chain([pdu.event_id for pdu in pdus])
return {

View File

@ -421,7 +421,7 @@ class FederationEventServlet(BaseFederationServlet):
return await self.handler.on_pdu_request(origin, event_id)
class FederationStateServlet(BaseFederationServlet):
class FederationStateV1Servlet(BaseFederationServlet):
PATH = "/state/(?P<context>[^/]*)/?"
# This is when someone asks for all data for a given context.
@ -429,7 +429,7 @@ class FederationStateServlet(BaseFederationServlet):
return await self.handler.on_context_state_request(
origin,
context,
parse_string_from_args(query, "event_id", None, required=True),
parse_string_from_args(query, "event_id", None, required=False),
)
@ -1360,7 +1360,7 @@ class RoomComplexityServlet(BaseFederationServlet):
FEDERATION_SERVLET_CLASSES = (
FederationSendServlet,
FederationEventServlet,
FederationStateServlet,
FederationStateV1Servlet,
FederationStateIdsServlet,
FederationBackfillServlet,
FederationQueryServlet,

View File

@ -29,3 +29,7 @@ Enabling an unknown default rule fails with 404
# Blacklisted due to https://github.com/matrix-org/synapse/issues/1663
New federated private chats get full presence information (SYN-115)
# Blacklisted due to https://github.com/matrix-org/matrix-doc/pull/2314 removing
# this requirement from the spec
Inbound federation of state requires event_id as a mandatory paramater

View File

@ -18,17 +18,14 @@ from mock import Mock
from twisted.internet import defer
from synapse.api.errors import Codes, SynapseError
from synapse.config.ratelimiting import FederationRateLimitConfig
from synapse.federation.transport import server
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from synapse.types import UserID
from synapse.util.ratelimitutils import FederationRateLimiter
from tests import unittest
class RoomComplexityTests(unittest.HomeserverTestCase):
class RoomComplexityTests(unittest.FederatingHomeserverTestCase):
servlets = [
admin.register_servlets,
@ -41,25 +38,6 @@ class RoomComplexityTests(unittest.HomeserverTestCase):
config["limit_remote_rooms"] = {"enabled": True, "complexity": 0.05}
return config
def prepare(self, reactor, clock, homeserver):
class Authenticator(object):
def authenticate_request(self, request, content):
return defer.succeed("otherserver.nottld")
ratelimiter = FederationRateLimiter(
clock,
FederationRateLimitConfig(
window_size=1,
sleep_limit=1,
sleep_msec=1,
reject_limit=1000,
concurrent_requests=1000,
),
)
server.register_servlets(
homeserver, self.resource, Authenticator(), ratelimiter
)
def test_complexity_simple(self):
u1 = self.register_user("u1", "pass")
@ -105,7 +83,7 @@ class RoomComplexityTests(unittest.HomeserverTestCase):
d = handler._remote_join(
None,
["otherserver.example"],
["other.example.com"],
"roomid",
UserID.from_string(u1),
{"membership": "join"},
@ -146,7 +124,7 @@ class RoomComplexityTests(unittest.HomeserverTestCase):
d = handler._remote_join(
None,
["otherserver.example"],
["other.example.com"],
room_1,
UserID.from_string(u1),
{"membership": "join"},

View File

@ -19,7 +19,7 @@ from twisted.internet import defer
from synapse.types import ReadReceipt
from tests.unittest import HomeserverTestCase
from tests.unittest import HomeserverTestCase, override_config
class FederationSenderTestCases(HomeserverTestCase):
@ -29,6 +29,7 @@ class FederationSenderTestCases(HomeserverTestCase):
federation_transport_client=Mock(spec=["send_transaction"]),
)
@override_config({"send_federation": True})
def test_send_receipts(self):
mock_state_handler = self.hs.get_state_handler()
mock_state_handler.get_current_hosts_in_room.return_value = ["test", "host2"]
@ -69,6 +70,7 @@ class FederationSenderTestCases(HomeserverTestCase):
],
)
@override_config({"send_federation": True})
def test_send_receipts_with_backoff(self):
"""Send two receipts in quick succession; the second should be flushed, but
only after 20ms"""

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
# Copyright 2019 Matrix.org Federation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -16,6 +17,8 @@ import logging
from synapse.events import FrozenEvent
from synapse.federation.federation_server import server_matches_acl_event
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from tests import unittest
@ -41,6 +44,66 @@ class ServerACLsTestCase(unittest.TestCase):
self.assertTrue(server_matches_acl_event("1:2:3:4", e))
class StateQueryTests(unittest.FederatingHomeserverTestCase):
servlets = [
admin.register_servlets,
room.register_servlets,
login.register_servlets,
]
def test_without_event_id(self):
"""
Querying v1/state/<room_id> without an event ID will return the current
known state.
"""
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")
room_1 = self.helper.create_room_as(u1, tok=u1_token)
self.inject_room_member(room_1, "@user:other.example.com", "join")
request, channel = self.make_request(
"GET", "/_matrix/federation/v1/state/%s" % (room_1,)
)
self.render(request)
self.assertEquals(200, channel.code, channel.result)
self.assertEqual(
channel.json_body["room_version"],
self.hs.config.default_room_version.identifier,
)
members = set(
map(
lambda x: x["state_key"],
filter(
lambda x: x["type"] == "m.room.member", channel.json_body["pdus"]
),
)
)
self.assertEqual(members, set(["@user:other.example.com", u1]))
self.assertEqual(len(channel.json_body["pdus"]), 6)
def test_needs_to_be_in_room(self):
"""
Querying v1/state/<room_id> requires the server
be in the room to provide data.
"""
u1 = self.register_user("u1", "pass")
u1_token = self.login("u1", "pass")
room_1 = self.helper.create_room_as(u1, tok=u1_token)
request, channel = self.make_request(
"GET", "/_matrix/federation/v1/state/%s" % (room_1,)
)
self.render(request)
self.assertEquals(403, channel.code, channel.result)
self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN")
def _create_acl_event(content):
return FrozenEvent(
{

View File

@ -24,6 +24,7 @@ from synapse.api.errors import AuthError
from synapse.types import UserID
from tests import unittest
from tests.unittest import override_config
from tests.utils import register_federation_servlets
# Some local users to test with
@ -174,6 +175,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
],
)
@override_config({"send_federation": True})
def test_started_typing_remote_send(self):
self.room_members = [U_APPLE, U_ONION]
@ -237,6 +239,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
],
)
@override_config({"send_federation": True})
def test_stopped_typing(self):
self.room_members = [U_APPLE, U_BANANA, U_ONION]

View File

@ -48,7 +48,10 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase):
server_factory = ReplicationStreamProtocolFactory(self.hs)
self.streamer = server_factory.streamer
handler_factory = Mock()
self.replication_handler = ReplicationClientHandler(self.slaved_store)
self.replication_handler.factory = handler_factory
client_factory = ReplicationClientFactory(
self.hs, "client_name", self.replication_handler
)

View File

@ -12,6 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mock import Mock
from synapse.replication.tcp.commands import ReplicateCommand
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
@ -30,7 +32,9 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
server = server_factory.buildProtocol(None)
# build a replication client, with a dummy handler
handler_factory = Mock()
self.test_handler = TestReplicationClientHandler()
self.test_handler.factory = handler_factory
self.client = ClientReplicationStreamProtocol(
"client", "test", clock, self.test_handler
)

View File

@ -16,8 +16,7 @@
from unittest.mock import Mock
from synapse.api.constants import EventTypes, Membership
from synapse.api.room_versions import RoomVersions
from synapse.api.constants import Membership
from synapse.rest.admin import register_servlets_for_client_rest_resource
from synapse.rest.client.v1 import login, room
from synapse.types import Requester, UserID
@ -44,9 +43,6 @@ class RoomMemberStoreTestCase(unittest.HomeserverTestCase):
# We can't test the RoomMemberStore on its own without the other event
# storage logic
self.store = hs.get_datastore()
self.storage = hs.get_storage()
self.event_builder_factory = hs.get_event_builder_factory()
self.event_creation_handler = hs.get_event_creation_handler()
self.u_alice = self.register_user("alice", "pass")
self.t_alice = self.login("alice", "pass")
@ -55,26 +51,6 @@ class RoomMemberStoreTestCase(unittest.HomeserverTestCase):
# User elsewhere on another host
self.u_charlie = UserID.from_string("@charlie:elsewhere")
def inject_room_member(self, room, user, membership, replaces_state=None):
builder = self.event_builder_factory.for_room_version(
RoomVersions.V1,
{
"type": EventTypes.Member,
"sender": user,
"state_key": user,
"room_id": room,
"content": {"membership": membership},
},
)
event, context = self.get_success(
self.event_creation_handler.create_new_client_event(builder)
)
self.get_success(self.storage.persistence.persist_event(event, context))
return event
def test_one_member(self):
# Alice creates the room, and is automatically joined

View File

@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018 New Vector
# Copyright 2019 Matrix.org Federation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -13,6 +14,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gc
import hashlib
import hmac
@ -27,13 +29,17 @@ from twisted.internet.defer import Deferred, succeed
from twisted.python.threadpool import ThreadPool
from twisted.trial import unittest
from synapse.api.constants import EventTypes
from synapse.api.constants import EventTypes, Membership
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.config.homeserver import HomeServerConfig
from synapse.config.ratelimiting import FederationRateLimitConfig
from synapse.federation.transport import server as federation_server
from synapse.http.server import JsonResource
from synapse.http.site import SynapseRequest
from synapse.logging.context import LoggingContext
from synapse.server import HomeServer
from synapse.types import Requester, UserID, create_requester
from synapse.util.ratelimitutils import FederationRateLimiter
from tests.server import get_clock, make_request, render, setup_test_homeserver
from tests.test_utils.logging_setup import setup_logging
@ -559,6 +565,66 @@ class HomeserverTestCase(TestCase):
self.render(request)
self.assertEqual(channel.code, 403, channel.result)
def inject_room_member(self, room: str, user: str, membership: Membership) -> None:
"""
Inject a membership event into a room.
Args:
room: Room ID to inject the event into.
user: MXID of the user to inject the membership for.
membership: The membership type.
"""
event_builder_factory = self.hs.get_event_builder_factory()
event_creation_handler = self.hs.get_event_creation_handler()
room_version = self.get_success(self.hs.get_datastore().get_room_version(room))
builder = event_builder_factory.for_room_version(
KNOWN_ROOM_VERSIONS[room_version],
{
"type": EventTypes.Member,
"sender": user,
"state_key": user,
"room_id": room,
"content": {"membership": membership},
},
)
event, context = self.get_success(
event_creation_handler.create_new_client_event(builder)
)
self.get_success(
self.hs.get_storage().persistence.persist_event(event, context)
)
class FederatingHomeserverTestCase(HomeserverTestCase):
"""
A federating homeserver that authenticates incoming requests as `other.example.com`.
"""
def prepare(self, reactor, clock, homeserver):
class Authenticator(object):
def authenticate_request(self, request, content):
return succeed("other.example.com")
ratelimiter = FederationRateLimiter(
clock,
FederationRateLimitConfig(
window_size=1,
sleep_limit=1,
sleep_msec=1,
reject_limit=1000,
concurrent_requests=1000,
),
)
federation_server.register_servlets(
homeserver, self.resource, Authenticator(), ratelimiter
)
return super().prepare(reactor, clock, homeserver)
def override_config(extra_config):
"""A decorator which can be applied to test functions to give additional HS config

View File

@ -109,6 +109,7 @@ def default_config(name, parse=False):
"""
config_dict = {
"server_name": name,
"send_federation": False,
"media_store_path": "media",
"uploads_path": "uploads",
# the test signing key is just an arbitrary ed25519 key to keep the config