mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-08-18 02:30:29 -04:00
Merge remote-tracking branch 'upstream/release-v1.37'
This commit is contained in:
commit
9e81bc6b08
140 changed files with 4734 additions and 2016 deletions
|
@ -14,13 +14,12 @@
|
|||
# limitations under the License.
|
||||
|
||||
""" This module contains REST servlets to do with rooms: /rooms/<paths> """
|
||||
|
||||
import logging
|
||||
import re
|
||||
from typing import TYPE_CHECKING, List, Optional, Tuple
|
||||
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
|
||||
from urllib import parse as urlparse
|
||||
|
||||
from synapse.api.constants import EventTypes, Membership
|
||||
from synapse.api.constants import EventContentFields, EventTypes, Membership
|
||||
from synapse.api.errors import (
|
||||
AuthError,
|
||||
Codes,
|
||||
|
@ -38,6 +37,7 @@ from synapse.http.servlet import (
|
|||
parse_integer,
|
||||
parse_json_object_from_request,
|
||||
parse_string,
|
||||
parse_strings_from_args,
|
||||
)
|
||||
from synapse.http.site import SynapseRequest
|
||||
from synapse.logging.opentracing import set_tag
|
||||
|
@ -269,6 +269,288 @@ class RoomSendEventRestServlet(TransactionRestServlet):
|
|||
)
|
||||
|
||||
|
||||
class RoomBatchSendEventRestServlet(TransactionRestServlet):
|
||||
"""
|
||||
API endpoint which can insert a chunk of events historically back in time
|
||||
next to the given `prev_event`.
|
||||
|
||||
`chunk_id` comes from `next_chunk_id `in the response of the batch send
|
||||
endpoint and is derived from the "insertion" events added to each chunk.
|
||||
It's not required for the first batch send.
|
||||
|
||||
`state_events_at_start` is used to define the historical state events
|
||||
needed to auth the events like join events. These events will float
|
||||
outside of the normal DAG as outlier's and won't be visible in the chat
|
||||
history which also allows us to insert multiple chunks without having a bunch
|
||||
of `@mxid joined the room` noise between each chunk.
|
||||
|
||||
`events` is chronological chunk/list of events you want to insert.
|
||||
There is a reverse-chronological constraint on chunks so once you insert
|
||||
some messages, you can only insert older ones after that.
|
||||
tldr; Insert chunks from your most recent history -> oldest history.
|
||||
|
||||
POST /_matrix/client/unstable/org.matrix.msc2716/rooms/<roomID>/batch_send?prev_event=<eventID>&chunk_id=<chunkID>
|
||||
{
|
||||
"events": [ ... ],
|
||||
"state_events_at_start": [ ... ]
|
||||
}
|
||||
"""
|
||||
|
||||
PATTERNS = (
|
||||
re.compile(
|
||||
"^/_matrix/client/unstable/org.matrix.msc2716"
|
||||
"/rooms/(?P<room_id>[^/]*)/batch_send$"
|
||||
),
|
||||
)
|
||||
|
||||
def __init__(self, hs):
|
||||
super().__init__(hs)
|
||||
self.hs = hs
|
||||
self.store = hs.get_datastore()
|
||||
self.state_store = hs.get_storage().state
|
||||
self.event_creation_handler = hs.get_event_creation_handler()
|
||||
self.room_member_handler = hs.get_room_member_handler()
|
||||
self.auth = hs.get_auth()
|
||||
|
||||
async def inherit_depth_from_prev_ids(self, prev_event_ids) -> int:
|
||||
(
|
||||
most_recent_prev_event_id,
|
||||
most_recent_prev_event_depth,
|
||||
) = await self.store.get_max_depth_of(prev_event_ids)
|
||||
|
||||
# We want to insert the historical event after the `prev_event` but before the successor event
|
||||
#
|
||||
# We inherit depth from the successor event instead of the `prev_event`
|
||||
# because events returned from `/messages` are first sorted by `topological_ordering`
|
||||
# which is just the `depth` and then tie-break with `stream_ordering`.
|
||||
#
|
||||
# We mark these inserted historical events as "backfilled" which gives them a
|
||||
# negative `stream_ordering`. If we use the same depth as the `prev_event`,
|
||||
# then our historical event will tie-break and be sorted before the `prev_event`
|
||||
# when it should come after.
|
||||
#
|
||||
# We want to use the successor event depth so they appear after `prev_event` because
|
||||
# it has a larger `depth` but before the successor event because the `stream_ordering`
|
||||
# is negative before the successor event.
|
||||
successor_event_ids = await self.store.get_successor_events(
|
||||
[most_recent_prev_event_id]
|
||||
)
|
||||
|
||||
# If we can't find any successor events, then it's a forward extremity of
|
||||
# historical messages and we can just inherit from the previous historical
|
||||
# event which we can already assume has the correct depth where we want
|
||||
# to insert into.
|
||||
if not successor_event_ids:
|
||||
depth = most_recent_prev_event_depth
|
||||
else:
|
||||
(
|
||||
_,
|
||||
oldest_successor_depth,
|
||||
) = await self.store.get_min_depth_of(successor_event_ids)
|
||||
|
||||
depth = oldest_successor_depth
|
||||
|
||||
return depth
|
||||
|
||||
async def on_POST(self, request, room_id):
|
||||
requester = await self.auth.get_user_by_req(request, allow_guest=False)
|
||||
|
||||
if not requester.app_service:
|
||||
raise AuthError(
|
||||
403,
|
||||
"Only application services can use the /batchsend endpoint",
|
||||
)
|
||||
|
||||
body = parse_json_object_from_request(request)
|
||||
assert_params_in_dict(body, ["state_events_at_start", "events"])
|
||||
|
||||
prev_events_from_query = parse_strings_from_args(request.args, "prev_event")
|
||||
chunk_id_from_query = parse_string(request, "chunk_id", default=None)
|
||||
|
||||
if prev_events_from_query is None:
|
||||
raise SynapseError(
|
||||
400,
|
||||
"prev_event query parameter is required when inserting historical messages back in time",
|
||||
errcode=Codes.MISSING_PARAM,
|
||||
)
|
||||
|
||||
# For the event we are inserting next to (`prev_events_from_query`),
|
||||
# find the most recent auth events (derived from state events) that
|
||||
# allowed that message to be sent. We will use that as a base
|
||||
# to auth our historical messages against.
|
||||
(
|
||||
most_recent_prev_event_id,
|
||||
_,
|
||||
) = await self.store.get_max_depth_of(prev_events_from_query)
|
||||
# mapping from (type, state_key) -> state_event_id
|
||||
prev_state_map = await self.state_store.get_state_ids_for_event(
|
||||
most_recent_prev_event_id
|
||||
)
|
||||
# List of state event ID's
|
||||
prev_state_ids = list(prev_state_map.values())
|
||||
auth_event_ids = prev_state_ids
|
||||
|
||||
for state_event in body["state_events_at_start"]:
|
||||
assert_params_in_dict(
|
||||
state_event, ["type", "origin_server_ts", "content", "sender"]
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
"RoomBatchSendEventRestServlet inserting state_event=%s, auth_event_ids=%s",
|
||||
state_event,
|
||||
auth_event_ids,
|
||||
)
|
||||
|
||||
event_dict = {
|
||||
"type": state_event["type"],
|
||||
"origin_server_ts": state_event["origin_server_ts"],
|
||||
"content": state_event["content"],
|
||||
"room_id": room_id,
|
||||
"sender": state_event["sender"],
|
||||
"state_key": state_event["state_key"],
|
||||
}
|
||||
|
||||
# Make the state events float off on their own
|
||||
fake_prev_event_id = "$" + random_string(43)
|
||||
|
||||
# TODO: This is pretty much the same as some other code to handle inserting state in this file
|
||||
if event_dict["type"] == EventTypes.Member:
|
||||
membership = event_dict["content"].get("membership", None)
|
||||
event_id, _ = await self.room_member_handler.update_membership(
|
||||
requester,
|
||||
target=UserID.from_string(event_dict["state_key"]),
|
||||
room_id=room_id,
|
||||
action=membership,
|
||||
content=event_dict["content"],
|
||||
outlier=True,
|
||||
prev_event_ids=[fake_prev_event_id],
|
||||
# Make sure to use a copy of this list because we modify it
|
||||
# later in the loop here. Otherwise it will be the same
|
||||
# reference and also update in the event when we append later.
|
||||
auth_event_ids=auth_event_ids.copy(),
|
||||
)
|
||||
else:
|
||||
# TODO: Add some complement tests that adds state that is not member joins
|
||||
# and will use this code path. Maybe we only want to support join state events
|
||||
# and can get rid of this `else`?
|
||||
(
|
||||
event,
|
||||
_,
|
||||
) = await self.event_creation_handler.create_and_send_nonmember_event(
|
||||
requester,
|
||||
event_dict,
|
||||
outlier=True,
|
||||
prev_event_ids=[fake_prev_event_id],
|
||||
# Make sure to use a copy of this list because we modify it
|
||||
# later in the loop here. Otherwise it will be the same
|
||||
# reference and also update in the event when we append later.
|
||||
auth_event_ids=auth_event_ids.copy(),
|
||||
)
|
||||
event_id = event.event_id
|
||||
|
||||
auth_event_ids.append(event_id)
|
||||
|
||||
events_to_create = body["events"]
|
||||
|
||||
# If provided, connect the chunk to the last insertion point
|
||||
# The chunk ID passed in comes from the chunk_id in the
|
||||
# "insertion" event from the previous chunk.
|
||||
if chunk_id_from_query:
|
||||
last_event_in_chunk = events_to_create[-1]
|
||||
last_event_in_chunk["content"][
|
||||
EventContentFields.MSC2716_CHUNK_ID
|
||||
] = chunk_id_from_query
|
||||
|
||||
# Add an "insertion" event to the start of each chunk (next to the oldest
|
||||
# event in the chunk) so the next chunk can be connected to this one.
|
||||
next_chunk_id = random_string(64)
|
||||
insertion_event = {
|
||||
"type": EventTypes.MSC2716_INSERTION,
|
||||
"sender": requester.user.to_string(),
|
||||
"content": {
|
||||
EventContentFields.MSC2716_NEXT_CHUNK_ID: next_chunk_id,
|
||||
EventContentFields.MSC2716_HISTORICAL: True,
|
||||
},
|
||||
# Since the insertion event is put at the start of the chunk,
|
||||
# where the oldest event is, copy the origin_server_ts from
|
||||
# the first event we're inserting
|
||||
"origin_server_ts": events_to_create[0]["origin_server_ts"],
|
||||
}
|
||||
# Prepend the insertion event to the start of the chunk
|
||||
events_to_create = [insertion_event] + events_to_create
|
||||
|
||||
inherited_depth = await self.inherit_depth_from_prev_ids(prev_events_from_query)
|
||||
|
||||
event_ids = []
|
||||
prev_event_ids = prev_events_from_query
|
||||
events_to_persist = []
|
||||
for ev in events_to_create:
|
||||
assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"])
|
||||
|
||||
# Mark all events as historical
|
||||
# This has important semantics within the Synapse internals to backfill properly
|
||||
ev["content"][EventContentFields.MSC2716_HISTORICAL] = True
|
||||
|
||||
event_dict = {
|
||||
"type": ev["type"],
|
||||
"origin_server_ts": ev["origin_server_ts"],
|
||||
"content": ev["content"],
|
||||
"room_id": room_id,
|
||||
"sender": ev["sender"], # requester.user.to_string(),
|
||||
"prev_events": prev_event_ids.copy(),
|
||||
}
|
||||
|
||||
event, context = await self.event_creation_handler.create_event(
|
||||
requester,
|
||||
event_dict,
|
||||
prev_event_ids=event_dict.get("prev_events"),
|
||||
auth_event_ids=auth_event_ids,
|
||||
historical=True,
|
||||
depth=inherited_depth,
|
||||
)
|
||||
logger.debug(
|
||||
"RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s, auth_event_ids=%s",
|
||||
event,
|
||||
prev_event_ids,
|
||||
auth_event_ids,
|
||||
)
|
||||
|
||||
assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
|
||||
event.sender,
|
||||
)
|
||||
|
||||
events_to_persist.append((event, context))
|
||||
event_id = event.event_id
|
||||
|
||||
event_ids.append(event_id)
|
||||
prev_event_ids = [event_id]
|
||||
|
||||
# Persist events in reverse-chronological order so they have the
|
||||
# correct stream_ordering as they are backfilled (which decrements).
|
||||
# Events are sorted by (topological_ordering, stream_ordering)
|
||||
# where topological_ordering is just depth.
|
||||
for (event, context) in reversed(events_to_persist):
|
||||
ev = await self.event_creation_handler.handle_new_client_event(
|
||||
requester=requester,
|
||||
event=event,
|
||||
context=context,
|
||||
)
|
||||
|
||||
return 200, {
|
||||
"state_events": auth_event_ids,
|
||||
"events": event_ids,
|
||||
"next_chunk_id": next_chunk_id,
|
||||
}
|
||||
|
||||
def on_GET(self, request, room_id):
|
||||
return 501, "Not implemented"
|
||||
|
||||
def on_PUT(self, request, room_id):
|
||||
return self.txns.fetch_or_execute_request(
|
||||
request, self.on_POST, request, room_id
|
||||
)
|
||||
|
||||
|
||||
# TODO: Needs unit testing for room ID + alias joins
|
||||
class JoinRoomAliasServlet(TransactionRestServlet):
|
||||
def __init__(self, hs):
|
||||
|
@ -281,7 +563,12 @@ class JoinRoomAliasServlet(TransactionRestServlet):
|
|||
PATTERNS = "/join/(?P<room_identifier>[^/]*)"
|
||||
register_txn_path(self, PATTERNS, http_server)
|
||||
|
||||
async def on_POST(self, request, room_identifier, txn_id=None):
|
||||
async def on_POST(
|
||||
self,
|
||||
request: SynapseRequest,
|
||||
room_identifier: str,
|
||||
txn_id: Optional[str] = None,
|
||||
):
|
||||
requester = await self.auth.get_user_by_req(request, allow_guest=True)
|
||||
|
||||
try:
|
||||
|
@ -293,17 +580,18 @@ class JoinRoomAliasServlet(TransactionRestServlet):
|
|||
|
||||
if RoomID.is_valid(room_identifier):
|
||||
room_id = room_identifier
|
||||
try:
|
||||
remote_room_hosts = [
|
||||
x.decode("ascii") for x in request.args[b"server_name"]
|
||||
] # type: Optional[List[str]]
|
||||
except Exception:
|
||||
remote_room_hosts = None
|
||||
|
||||
# twisted.web.server.Request.args is incorrectly defined as Optional[Any]
|
||||
args: Dict[bytes, List[bytes]] = request.args # type: ignore
|
||||
|
||||
remote_room_hosts = parse_strings_from_args(
|
||||
args, "server_name", required=False
|
||||
)
|
||||
elif RoomAlias.is_valid(room_identifier):
|
||||
handler = self.room_member_handler
|
||||
room_alias = RoomAlias.from_string(room_identifier)
|
||||
room_id, remote_room_hosts = await handler.lookup_room_alias(room_alias)
|
||||
room_id = room_id.to_string()
|
||||
room_id_obj, remote_room_hosts = await handler.lookup_room_alias(room_alias)
|
||||
room_id = room_id_obj.to_string()
|
||||
else:
|
||||
raise SynapseError(
|
||||
400, "%s was not legal room ID or room alias" % (room_identifier,)
|
||||
|
@ -1051,6 +1339,8 @@ class RoomSpaceSummaryRestServlet(RestServlet):
|
|||
|
||||
|
||||
def register_servlets(hs: "HomeServer", http_server, is_worker=False):
|
||||
msc2716_enabled = hs.config.experimental.msc2716_enabled
|
||||
|
||||
RoomStateEventRestServlet(hs).register(http_server)
|
||||
RoomMemberListRestServlet(hs).register(http_server)
|
||||
JoinedRoomMemberListRestServlet(hs).register(http_server)
|
||||
|
@ -1058,6 +1348,8 @@ def register_servlets(hs: "HomeServer", http_server, is_worker=False):
|
|||
JoinRoomAliasServlet(hs).register(http_server)
|
||||
RoomMembershipRestServlet(hs).register(http_server)
|
||||
RoomSendEventRestServlet(hs).register(http_server)
|
||||
if msc2716_enabled:
|
||||
RoomBatchSendEventRestServlet(hs).register(http_server)
|
||||
PublicRoomListRestServlet(hs).register(http_server)
|
||||
RoomStateRestServlet(hs).register(http_server)
|
||||
RoomRedactEventRestServlet(hs).register(http_server)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue