Federation API for Space summary (#9652)

Builds on the work done in #9643 to add a federation API for space summaries.

There's a bit of refactoring of the existing client-server code first, to avoid too much duplication.
This commit is contained in:
Richard van der Hoff 2021-03-23 11:51:12 +00:00 committed by GitHub
parent b7748d3c00
commit 4ecba9bd5c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 200 additions and 57 deletions

1
changelog.d/9652.feature Normal file
View File

@ -0,0 +1 @@
Add initial experimental support for a "space summary" API.

View File

@ -18,7 +18,7 @@
import functools import functools
import logging import logging
import re import re
from typing import Optional, Tuple, Type from typing import Container, Mapping, Optional, Sequence, Tuple, Type
import synapse import synapse
from synapse.api.constants import MAX_GROUP_CATEGORYID_LENGTH, MAX_GROUP_ROLEID_LENGTH from synapse.api.constants import MAX_GROUP_CATEGORYID_LENGTH, MAX_GROUP_ROLEID_LENGTH
@ -29,7 +29,7 @@ from synapse.api.urls import (
FEDERATION_V1_PREFIX, FEDERATION_V1_PREFIX,
FEDERATION_V2_PREFIX, FEDERATION_V2_PREFIX,
) )
from synapse.http.server import JsonResource from synapse.http.server import HttpServer, JsonResource
from synapse.http.servlet import ( from synapse.http.servlet import (
parse_boolean_from_args, parse_boolean_from_args,
parse_integer_from_args, parse_integer_from_args,
@ -44,7 +44,8 @@ from synapse.logging.opentracing import (
whitelisted_homeserver, whitelisted_homeserver,
) )
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.types import ThirdPartyInstanceID, get_domain_from_id from synapse.types import JsonDict, ThirdPartyInstanceID, get_domain_from_id
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.stringutils import parse_and_validate_server_name from synapse.util.stringutils import parse_and_validate_server_name
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
@ -1376,6 +1377,40 @@ class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
return 200, new_content return 200, new_content
class FederationSpaceSummaryServlet(BaseFederationServlet):
PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc2946"
PATH = "/spaces/(?P<room_id>[^/]*)"
async def on_POST(
self,
origin: str,
content: JsonDict,
query: Mapping[bytes, Sequence[bytes]],
room_id: str,
) -> Tuple[int, JsonDict]:
suggested_only = content.get("suggested_only", False)
if not isinstance(suggested_only, bool):
raise SynapseError(
400, "'suggested_only' must be a boolean", Codes.BAD_JSON
)
exclude_rooms = content.get("exclude_rooms", [])
if not isinstance(exclude_rooms, list) or any(
not isinstance(x, str) for x in exclude_rooms
):
raise SynapseError(400, "bad value for 'exclude_rooms'", Codes.BAD_JSON)
max_rooms_per_space = content.get("max_rooms_per_space")
if max_rooms_per_space is not None and not isinstance(max_rooms_per_space, int):
raise SynapseError(
400, "bad value for 'max_rooms_per_space'", Codes.BAD_JSON
)
return 200, await self.handler.federation_space_summary(
room_id, suggested_only, max_rooms_per_space, exclude_rooms
)
class RoomComplexityServlet(BaseFederationServlet): class RoomComplexityServlet(BaseFederationServlet):
""" """
Indicates to other servers how complex (and therefore likely Indicates to other servers how complex (and therefore likely
@ -1474,18 +1509,24 @@ DEFAULT_SERVLET_GROUPS = (
) )
def register_servlets(hs, resource, authenticator, ratelimiter, servlet_groups=None): def register_servlets(
hs: HomeServer,
resource: HttpServer,
authenticator: Authenticator,
ratelimiter: FederationRateLimiter,
servlet_groups: Optional[Container[str]] = None,
):
"""Initialize and register servlet classes. """Initialize and register servlet classes.
Will by default register all servlets. For custom behaviour, pass in Will by default register all servlets. For custom behaviour, pass in
a list of servlet_groups to register. a list of servlet_groups to register.
Args: Args:
hs (synapse.server.HomeServer): homeserver hs: homeserver
resource (JsonResource): resource class to register to resource: resource class to register to
authenticator (Authenticator): authenticator to use authenticator: authenticator to use
ratelimiter (util.ratelimitutils.FederationRateLimiter): ratelimiter to use ratelimiter: ratelimiter to use
servlet_groups (list[str], optional): List of servlet groups to register. servlet_groups: List of servlet groups to register.
Defaults to ``DEFAULT_SERVLET_GROUPS``. Defaults to ``DEFAULT_SERVLET_GROUPS``.
""" """
if not servlet_groups: if not servlet_groups:
@ -1500,6 +1541,14 @@ def register_servlets(hs, resource, authenticator, ratelimiter, servlet_groups=N
server_name=hs.hostname, server_name=hs.hostname,
).register(resource) ).register(resource)
if hs.config.experimental.spaces_enabled:
FederationSpaceSummaryServlet(
handler=hs.get_space_summary_handler(),
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
).register(resource)
if "openid" in servlet_groups: if "openid" in servlet_groups:
for servletclass in OPENID_SERVLET_CLASSES: for servletclass in OPENID_SERVLET_CLASSES:
servletclass( servletclass(

View File

@ -16,7 +16,9 @@
import itertools import itertools
import logging import logging
from collections import deque from collections import deque
from typing import TYPE_CHECKING, Iterable, List, Optional, Set from typing import TYPE_CHECKING, Iterable, List, Optional, Sequence, Set, Tuple
import attr
from synapse.api.constants import EventContentFields, EventTypes, HistoryVisibility from synapse.api.constants import EventContentFields, EventTypes, HistoryVisibility
from synapse.api.errors import AuthError from synapse.api.errors import AuthError
@ -54,7 +56,7 @@ class SpaceSummaryHandler:
max_rooms_per_space: Optional[int] = None, max_rooms_per_space: Optional[int] = None,
) -> JsonDict: ) -> JsonDict:
""" """
Implementation of the space summary API Implementation of the space summary C-S API
Args: Args:
requester: user id of the user making this request requester: user id of the user making this request
@ -66,7 +68,7 @@ class SpaceSummaryHandler:
max_rooms_per_space: an optional limit on the number of child rooms we will max_rooms_per_space: an optional limit on the number of child rooms we will
return. This does not apply to the root room (ie, room_id), and return. This does not apply to the root room (ie, room_id), and
is overridden by ROOMS_PER_SPACE_LIMIT. is overridden by MAX_ROOMS_PER_SPACE.
Returns: Returns:
summary dict to return summary dict to return
@ -76,68 +78,154 @@ class SpaceSummaryHandler:
await self._auth.check_user_in_room_or_world_readable(room_id, requester) await self._auth.check_user_in_room_or_world_readable(room_id, requester)
# the queue of rooms to process # the queue of rooms to process
room_queue = deque((room_id,)) room_queue = deque((_RoomQueueEntry(room_id),))
processed_rooms = set() # type: Set[str] processed_rooms = set() # type: Set[str]
rooms_result = [] # type: List[JsonDict] rooms_result = [] # type: List[JsonDict]
events_result = [] # type: List[JsonDict] events_result = [] # type: List[JsonDict]
now = self._clock.time_msec() while room_queue and len(rooms_result) < MAX_ROOMS:
queue_entry = room_queue.popleft()
room_id = queue_entry.room_id
logger.debug("Processing room %s", room_id)
processed_rooms.add(room_id)
# The client-specified max_rooms_per_space limit doesn't apply to the
# room_id specified in the request, so we ignore it if this is the
# first room we are processing.
max_children = max_rooms_per_space if processed_rooms else None
rooms, events = await self._summarize_local_room(
requester, room_id, suggested_only, max_children
)
rooms_result.extend(rooms)
events_result.extend(events)
# add any children that we haven't already processed to the queue
for edge_event in events:
if edge_event["state_key"] not in processed_rooms:
room_queue.append(_RoomQueueEntry(edge_event["state_key"]))
return {"rooms": rooms_result, "events": events_result}
async def federation_space_summary(
self,
room_id: str,
suggested_only: bool,
max_rooms_per_space: Optional[int],
exclude_rooms: Iterable[str],
) -> JsonDict:
"""
Implementation of the space summary Federation API
Args:
room_id: room id to start the summary at
suggested_only: whether we should only return children with the "suggested"
flag set.
max_rooms_per_space: an optional limit on the number of child rooms we will
return. Unlike the C-S API, this applies to the root room (room_id).
It is clipped to MAX_ROOMS_PER_SPACE.
exclude_rooms: a list of rooms to skip over (presumably because the
calling server has already seen them).
Returns:
summary dict to return
"""
# the queue of rooms to process
room_queue = deque((room_id,))
# the set of rooms that we should not walk further. Initialise it with the
# excluded-rooms list; we will add other rooms as we process them so that
# we do not loop.
processed_rooms = set(exclude_rooms) # type: Set[str]
rooms_result = [] # type: List[JsonDict]
events_result = [] # type: List[JsonDict]
while room_queue and len(rooms_result) < MAX_ROOMS: while room_queue and len(rooms_result) < MAX_ROOMS:
room_id = room_queue.popleft() room_id = room_queue.popleft()
logger.debug("Processing room %s", room_id) logger.debug("Processing room %s", room_id)
processed_rooms.add(room_id) processed_rooms.add(room_id)
try: rooms, events = await self._summarize_local_room(
await self._auth.check_user_in_room_or_world_readable( None, room_id, suggested_only, max_rooms_per_space
room_id, requester )
)
except AuthError:
logger.info(
"user %s cannot view room %s, omitting from summary",
requester,
room_id,
)
continue
room_entry = await self._build_room_entry(room_id) rooms_result.extend(rooms)
rooms_result.append(room_entry) events_result.extend(events)
# look for child rooms/spaces. # add any children that we haven't already processed to the queue
child_events = await self._get_child_events(room_id) for edge_event in events:
if edge_event["state_key"] not in processed_rooms:
if suggested_only: room_queue.append(edge_event["state_key"])
# we only care about suggested children
child_events = filter(_is_suggested_child_event, child_events)
# The client-specified max_rooms_per_space limit doesn't apply to the
# room_id specified in the request, so we ignore it if this is the
# first room we are processing. Otherwise, apply any client-specified
# limit, capping to our built-in limit.
if max_rooms_per_space is not None and len(processed_rooms) > 1:
max_rooms = min(MAX_ROOMS_PER_SPACE, max_rooms_per_space)
else:
max_rooms = MAX_ROOMS_PER_SPACE
for edge_event in itertools.islice(child_events, max_rooms):
edge_room_id = edge_event.state_key
events_result.append(
await self._event_serializer.serialize_event(
edge_event,
time_now=now,
event_format=format_event_for_client_v2,
)
)
# if we haven't yet visited the target of this link, add it to the queue
if edge_room_id not in processed_rooms:
room_queue.append(edge_room_id)
return {"rooms": rooms_result, "events": events_result} return {"rooms": rooms_result, "events": events_result}
async def _summarize_local_room(
self,
requester: Optional[str],
room_id: str,
suggested_only: bool,
max_children: Optional[int],
) -> Tuple[Sequence[JsonDict], Sequence[JsonDict]]:
if not await self._is_room_accessible(room_id, requester):
return (), ()
room_entry = await self._build_room_entry(room_id)
# look for child rooms/spaces.
child_events = await self._get_child_events(room_id)
if suggested_only:
# we only care about suggested children
child_events = filter(_is_suggested_child_event, child_events)
if max_children is None or max_children > MAX_ROOMS_PER_SPACE:
max_children = MAX_ROOMS_PER_SPACE
now = self._clock.time_msec()
events_result = [] # type: List[JsonDict]
for edge_event in itertools.islice(child_events, max_children):
events_result.append(
await self._event_serializer.serialize_event(
edge_event,
time_now=now,
event_format=format_event_for_client_v2,
)
)
return (room_entry,), events_result
async def _is_room_accessible(self, room_id: str, requester: Optional[str]) -> bool:
# if we have an authenticated requesting user, first check if they are in the
# room
if requester:
try:
await self._auth.check_user_in_room(room_id, requester)
return True
except AuthError:
pass
# otherwise, check if the room is peekable
hist_vis_ev = await self._state_handler.get_current_state(
room_id, EventTypes.RoomHistoryVisibility, ""
)
if hist_vis_ev:
hist_vis = hist_vis_ev.content.get("history_visibility")
if hist_vis == HistoryVisibility.WORLD_READABLE:
return True
logger.info(
"room %s is unpeekable and user %s is not a member, omitting from summary",
room_id,
requester,
)
return False
async def _build_room_entry(self, room_id: str) -> JsonDict: async def _build_room_entry(self, room_id: str) -> JsonDict:
"""Generate en entry suitable for the 'rooms' list in the summary response""" """Generate en entry suitable for the 'rooms' list in the summary response"""
stats = await self._store.get_room_with_stats(room_id) stats = await self._store.get_room_with_stats(room_id)
@ -191,6 +279,11 @@ class SpaceSummaryHandler:
return (e for e in events if e.content.get("via")) return (e for e in events if e.content.get("via"))
@attr.s(frozen=True, slots=True)
class _RoomQueueEntry:
room_id = attr.ib(type=str)
def _is_suggested_child_event(edge_event: EventBase) -> bool: def _is_suggested_child_event(edge_event: EventBase) -> bool:
suggested = edge_event.content.get("suggested") suggested = edge_event.content.get("suggested")
if isinstance(suggested, bool) and suggested: if isinstance(suggested, bool) and suggested: