mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2025-01-07 16:37:53 -05:00
0bd968921c
This could cause a minor data leak if someone defined a non-restricted join rule with an allow key or used a restricted join rule in an older room version, but this is unlikely. Additionally this starts adding unit tests to the spaces summary handler.
179 lines
6.5 KiB
Python
179 lines
6.5 KiB
Python
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
from typing import Any, Iterable, Optional, Tuple
|
|
from unittest import mock
|
|
|
|
from synapse.api.errors import AuthError
|
|
from synapse.handlers.space_summary import _child_events_comparison_key
|
|
from synapse.rest import admin
|
|
from synapse.rest.client.v1 import login, room
|
|
from synapse.server import HomeServer
|
|
from synapse.types import JsonDict
|
|
|
|
from tests import unittest
|
|
|
|
|
|
def _create_event(room_id: str, order: Optional[Any] = None):
|
|
result = mock.Mock()
|
|
result.room_id = room_id
|
|
result.content = {}
|
|
if order is not None:
|
|
result.content["order"] = order
|
|
return result
|
|
|
|
|
|
def _order(*events):
|
|
return sorted(events, key=_child_events_comparison_key)
|
|
|
|
|
|
class TestSpaceSummarySort(unittest.TestCase):
|
|
def test_no_order_last(self):
|
|
"""An event with no ordering is placed behind those with an ordering."""
|
|
ev1 = _create_event("!abc:test")
|
|
ev2 = _create_event("!xyz:test", "xyz")
|
|
|
|
self.assertEqual([ev2, ev1], _order(ev1, ev2))
|
|
|
|
def test_order(self):
|
|
"""The ordering should be used."""
|
|
ev1 = _create_event("!abc:test", "xyz")
|
|
ev2 = _create_event("!xyz:test", "abc")
|
|
|
|
self.assertEqual([ev2, ev1], _order(ev1, ev2))
|
|
|
|
def test_order_room_id(self):
|
|
"""Room ID is a tie-breaker for ordering."""
|
|
ev1 = _create_event("!abc:test", "abc")
|
|
ev2 = _create_event("!xyz:test", "abc")
|
|
|
|
self.assertEqual([ev1, ev2], _order(ev1, ev2))
|
|
|
|
def test_invalid_ordering_type(self):
|
|
"""Invalid orderings are considered the same as missing."""
|
|
ev1 = _create_event("!abc:test", 1)
|
|
ev2 = _create_event("!xyz:test", "xyz")
|
|
|
|
self.assertEqual([ev2, ev1], _order(ev1, ev2))
|
|
|
|
ev1 = _create_event("!abc:test", {})
|
|
self.assertEqual([ev2, ev1], _order(ev1, ev2))
|
|
|
|
ev1 = _create_event("!abc:test", [])
|
|
self.assertEqual([ev2, ev1], _order(ev1, ev2))
|
|
|
|
ev1 = _create_event("!abc:test", True)
|
|
self.assertEqual([ev2, ev1], _order(ev1, ev2))
|
|
|
|
def test_invalid_ordering_value(self):
|
|
"""Invalid orderings are considered the same as missing."""
|
|
ev1 = _create_event("!abc:test", "foo\n")
|
|
ev2 = _create_event("!xyz:test", "xyz")
|
|
|
|
self.assertEqual([ev2, ev1], _order(ev1, ev2))
|
|
|
|
ev1 = _create_event("!abc:test", "a" * 51)
|
|
self.assertEqual([ev2, ev1], _order(ev1, ev2))
|
|
|
|
|
|
class SpaceSummaryTestCase(unittest.HomeserverTestCase):
|
|
servlets = [
|
|
admin.register_servlets_for_client_rest_resource,
|
|
room.register_servlets,
|
|
login.register_servlets,
|
|
]
|
|
|
|
def prepare(self, reactor, clock, hs: HomeServer):
|
|
self.hs = hs
|
|
self.handler = self.hs.get_space_summary_handler()
|
|
|
|
self.user = self.register_user("user", "pass")
|
|
self.token = self.login("user", "pass")
|
|
|
|
def _add_child(self, space_id: str, room_id: str, token: str) -> None:
|
|
"""Add a child room to a space."""
|
|
self.helper.send_state(
|
|
space_id,
|
|
event_type="m.space.child",
|
|
body={"via": [self.hs.hostname]},
|
|
tok=token,
|
|
state_key=room_id,
|
|
)
|
|
|
|
def _assert_rooms(self, result: JsonDict, rooms: Iterable[str]) -> None:
|
|
"""Assert that the expected room IDs are in the response."""
|
|
self.assertCountEqual([room.get("room_id") for room in result["rooms"]], rooms)
|
|
|
|
def _assert_events(
|
|
self, result: JsonDict, events: Iterable[Tuple[str, str]]
|
|
) -> None:
|
|
"""Assert that the expected parent / child room IDs are in the response."""
|
|
self.assertCountEqual(
|
|
[
|
|
(event.get("room_id"), event.get("state_key"))
|
|
for event in result["events"]
|
|
],
|
|
events,
|
|
)
|
|
|
|
def test_simple_space(self):
|
|
"""Test a simple space with a single room."""
|
|
space = self.helper.create_room_as(self.user, tok=self.token)
|
|
room = self.helper.create_room_as(self.user, tok=self.token)
|
|
self._add_child(space, room, self.token)
|
|
|
|
result = self.get_success(self.handler.get_space_summary(self.user, space))
|
|
# The result should have the space and the room in it, along with a link
|
|
# from space -> room.
|
|
self._assert_rooms(result, [space, room])
|
|
self._assert_events(result, [(space, room)])
|
|
|
|
def test_visibility(self):
|
|
"""A user not in a space cannot inspect it."""
|
|
space = self.helper.create_room_as(self.user, tok=self.token)
|
|
room = self.helper.create_room_as(self.user, tok=self.token)
|
|
self._add_child(space, room, self.token)
|
|
|
|
user2 = self.register_user("user2", "pass")
|
|
token2 = self.login("user2", "pass")
|
|
|
|
# The user cannot see the space.
|
|
self.get_failure(self.handler.get_space_summary(user2, space), AuthError)
|
|
|
|
# Joining the room causes it to be visible.
|
|
self.helper.join(space, user2, tok=token2)
|
|
result = self.get_success(self.handler.get_space_summary(user2, space))
|
|
|
|
# The result should only have the space, but includes the link to the room.
|
|
self._assert_rooms(result, [space])
|
|
self._assert_events(result, [(space, room)])
|
|
|
|
def test_world_readable(self):
|
|
"""A world-readable room is visible to everyone."""
|
|
space = self.helper.create_room_as(self.user, tok=self.token)
|
|
room = self.helper.create_room_as(self.user, tok=self.token)
|
|
self._add_child(space, room, self.token)
|
|
self.helper.send_state(
|
|
space,
|
|
event_type="m.room.history_visibility",
|
|
body={"history_visibility": "world_readable"},
|
|
tok=self.token,
|
|
)
|
|
|
|
user2 = self.register_user("user2", "pass")
|
|
|
|
# The space should be visible, as well as the link to the room.
|
|
result = self.get_success(self.handler.get_space_summary(user2, space))
|
|
self._assert_rooms(result, [space])
|
|
self._assert_events(result, [(space, room)])
|