Support federation in the new spaces summary API (MSC2946). (#10569)

This commit is contained in:
Patrick Cloke 2021-08-16 08:06:17 -04:00 committed by GitHub
parent a3a7514570
commit 7de445161f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 518 additions and 165 deletions

View file

@ -16,17 +16,7 @@ import itertools
import logging
import re
from collections import deque
from typing import (
TYPE_CHECKING,
Deque,
Dict,
Iterable,
List,
Optional,
Sequence,
Set,
Tuple,
)
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Sequence, Set, Tuple
import attr
@ -80,7 +70,7 @@ class _PaginationSession:
# The time the pagination session was created, in milliseconds.
creation_time_ms: int
# The queue of rooms which are still to process.
room_queue: Deque["_RoomQueueEntry"]
room_queue: List["_RoomQueueEntry"]
# A set of rooms which have been processed.
processed_rooms: Set[str]
@ -197,7 +187,7 @@ class SpaceSummaryHandler:
events: Sequence[JsonDict] = []
if room_entry:
rooms_result.append(room_entry.room)
events = room_entry.children
events = room_entry.children_state_events
logger.debug(
"Query of local room %s returned events %s",
@ -232,7 +222,7 @@ class SpaceSummaryHandler:
room.pop("allowed_spaces", None)
rooms_result.append(room)
events.extend(room_entry.children)
events.extend(room_entry.children_state_events)
# All rooms returned don't need visiting again (even if the user
# didn't have access to them).
@ -350,8 +340,8 @@ class SpaceSummaryHandler:
room_queue = pagination_session.room_queue
processed_rooms = pagination_session.processed_rooms
else:
# the queue of rooms to process
room_queue = deque((_RoomQueueEntry(requested_room_id, ()),))
# The queue of rooms to process, the next room is last on the stack.
room_queue = [_RoomQueueEntry(requested_room_id, ())]
# Rooms we have already processed.
processed_rooms = set()
@ -367,7 +357,7 @@ class SpaceSummaryHandler:
# Iterate through the queue until we reach the limit or run out of
# rooms to include.
while room_queue and len(rooms_result) < limit:
queue_entry = room_queue.popleft()
queue_entry = room_queue.pop()
room_id = queue_entry.room_id
current_depth = queue_entry.depth
if room_id in processed_rooms:
@ -376,6 +366,18 @@ class SpaceSummaryHandler:
logger.debug("Processing room %s", room_id)
# A map of summaries for children rooms that might be returned over
# federation. The rationale for caching these and *maybe* using them
# is to prefer any information local to the homeserver before trusting
# data received over federation.
children_room_entries: Dict[str, JsonDict] = {}
# A set of room IDs which are children that did not have information
# returned over federation and are known to be inaccessible to the
# current server. We should not reach out over federation to try to
# summarise these rooms.
inaccessible_children: Set[str] = set()
# If the room is known locally, summarise it!
is_in_room = await self._store.is_host_joined(room_id, self._server_name)
if is_in_room:
room_entry = await self._summarize_local_room(
@ -387,26 +389,68 @@ class SpaceSummaryHandler:
max_children=None,
)
if room_entry:
rooms_result.append(room_entry.as_json())
# Add the child to the queue. We have already validated
# that the vias are a list of server names.
#
# If the current depth is the maximum depth, do not queue
# more entries.
if max_depth is None or current_depth < max_depth:
room_queue.extendleft(
_RoomQueueEntry(
ev["state_key"], ev["content"]["via"], current_depth + 1
)
for ev in reversed(room_entry.children)
)
processed_rooms.add(room_id)
# Otherwise, attempt to use information for federation.
else:
# TODO Federation.
pass
# A previous call might have included information for this room.
# It can be used if either:
#
# 1. The room is not a space.
# 2. The maximum depth has been achieved (since no children
# information is needed).
if queue_entry.remote_room and (
queue_entry.remote_room.get("room_type") != RoomTypes.SPACE
or (max_depth is not None and current_depth >= max_depth)
):
room_entry = _RoomEntry(
queue_entry.room_id, queue_entry.remote_room
)
# If the above isn't true, attempt to fetch the room
# information over federation.
else:
(
room_entry,
children_room_entries,
inaccessible_children,
) = await self._summarize_remote_room_hiearchy(
queue_entry,
suggested_only,
)
# Ensure this room is accessible to the requester (and not just
# the homeserver).
if room_entry and not await self._is_remote_room_accessible(
requester, queue_entry.room_id, room_entry.room
):
room_entry = None
# This room has been processed and should be ignored if it appears
# elsewhere in the hierarchy.
processed_rooms.add(room_id)
# There may or may not be a room entry based on whether it is
# inaccessible to the requesting user.
if room_entry:
# Add the room (including the stripped m.space.child events).
rooms_result.append(room_entry.as_json())
# If this room is not at the max-depth, check if there are any
# children to process.
if max_depth is None or current_depth < max_depth:
# The children get added in reverse order so that the next
# room to process, according to the ordering, is the last
# item in the list.
room_queue.extend(
_RoomQueueEntry(
ev["state_key"],
ev["content"]["via"],
current_depth + 1,
children_room_entries.get(ev["state_key"]),
)
for ev in reversed(room_entry.children_state_events)
if ev["type"] == EventTypes.SpaceChild
and ev["state_key"] not in inaccessible_children
)
result: JsonDict = {"rooms": rooms_result}
@ -477,15 +521,78 @@ class SpaceSummaryHandler:
if room_entry:
rooms_result.append(room_entry.room)
events_result.extend(room_entry.children)
events_result.extend(room_entry.children_state_events)
# add any children to the queue
room_queue.extend(
edge_event["state_key"] for edge_event in room_entry.children
edge_event["state_key"]
for edge_event in room_entry.children_state_events
)
return {"rooms": rooms_result, "events": events_result}
async def get_federation_hierarchy(
self,
origin: str,
requested_room_id: str,
suggested_only: bool,
):
"""
Implementation of the room hierarchy Federation API.
This is similar to get_room_hierarchy, but does not recurse into the space.
It also considers whether anyone on the server may be able to access the
room, as opposed to whether a specific user can.
Args:
origin: The server requesting the spaces summary.
requested_room_id: The room ID to start the hierarchy at (the "root" room).
suggested_only: whether we should only return children with the "suggested"
flag set.
Returns:
The JSON hierarchy dictionary.
"""
root_room_entry = await self._summarize_local_room(
None, origin, requested_room_id, suggested_only, max_children=None
)
if root_room_entry is None:
# Room is inaccessible to the requesting server.
raise SynapseError(404, "Unknown room: %s" % (requested_room_id,))
children_rooms_result: List[JsonDict] = []
inaccessible_children: List[str] = []
# Iterate through each child and potentially add it, but not its children,
# to the response.
for child_room in root_room_entry.children_state_events:
room_id = child_room.get("state_key")
assert isinstance(room_id, str)
# If the room is unknown, skip it.
if not await self._store.is_host_joined(room_id, self._server_name):
continue
room_entry = await self._summarize_local_room(
None, origin, room_id, suggested_only, max_children=0
)
# If the room is accessible, include it in the results.
#
# Note that only the room summary (without information on children)
# is included in the summary.
if room_entry:
children_rooms_result.append(room_entry.room)
# Otherwise, note that the requesting server shouldn't bother
# trying to summarize this room - they do not have access to it.
else:
inaccessible_children.append(room_id)
return {
# Include the requested room (including the stripped children events).
"room": root_room_entry.as_json(),
"children": children_rooms_result,
"inaccessible_children": inaccessible_children,
}
async def _summarize_local_room(
self,
requester: Optional[str],
@ -519,8 +626,9 @@ class SpaceSummaryHandler:
room_entry = await self._build_room_entry(room_id, for_federation=bool(origin))
# If the room is not a space, return just the room information.
if room_entry.get("room_type") != RoomTypes.SPACE:
# If the room is not a space or the children don't matter, return just
# the room information.
if room_entry.get("room_type") != RoomTypes.SPACE or max_children == 0:
return _RoomEntry(room_id, room_entry)
# Otherwise, look for child rooms/spaces.
@ -616,6 +724,59 @@ class SpaceSummaryHandler:
return results
async def _summarize_remote_room_hiearchy(
self, room: "_RoomQueueEntry", suggested_only: bool
) -> Tuple[Optional["_RoomEntry"], Dict[str, JsonDict], Set[str]]:
"""
Request room entries and a list of event entries for a given room by querying a remote server.
Args:
room: The room to summarize.
suggested_only: True if only suggested children should be returned.
Otherwise, all children are returned.
Returns:
A tuple of:
The room entry.
Partial room data return over federation.
A set of inaccessible children room IDs.
"""
room_id = room.room_id
logger.info("Requesting summary for %s via %s", room_id, room.via)
via = itertools.islice(room.via, MAX_SERVERS_PER_SPACE)
try:
(
room_response,
children,
inaccessible_children,
) = await self._federation_client.get_room_hierarchy(
via,
room_id,
suggested_only=suggested_only,
)
except Exception as e:
logger.warning(
"Unable to get hierarchy of %s via federation: %s",
room_id,
e,
exc_info=logger.isEnabledFor(logging.DEBUG),
)
return None, {}, set()
# Map the children to their room ID.
children_by_room_id = {
c["room_id"]: c
for c in children
if "room_id" in c and isinstance(c["room_id"], str)
}
return (
_RoomEntry(room_id, room_response, room_response.pop("children_state", ())),
children_by_room_id,
set(inaccessible_children),
)
async def _is_local_room_accessible(
self, room_id: str, requester: Optional[str], origin: Optional[str] = None
) -> bool:
@ -866,9 +1027,16 @@ class SpaceSummaryHandler:
@attr.s(frozen=True, slots=True, auto_attribs=True)
class _RoomQueueEntry:
# The room ID of this entry.
room_id: str
# The server to query if the room is not known locally.
via: Sequence[str]
# The minimum number of hops necessary to get to this room (compared to the
# originally requested room).
depth: int = 0
# The room summary for this room returned via federation. This will only be
# used if the room is not known locally (and is not a space).
remote_room: Optional[JsonDict] = None
@attr.s(frozen=True, slots=True, auto_attribs=True)
@ -879,11 +1047,17 @@ class _RoomEntry:
# An iterable of the sorted, stripped children events for children of this room.
#
# This may not include all children.
children: Sequence[JsonDict] = ()
children_state_events: Sequence[JsonDict] = ()
def as_json(self) -> JsonDict:
"""
Returns a JSON dictionary suitable for the room hierarchy endpoint.
It returns the room summary including the stripped m.space.child events
as a sub-key.
"""
result = dict(self.room)
result["children_state"] = self.children
result["children_state"] = self.children_state_events
return result