mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-08-16 12:40:32 -04:00
Merge remote-tracking branch 'upstream/release-v1.76'
This commit is contained in:
commit
d7a3d540ed
135 changed files with 4101 additions and 2588 deletions
|
@ -27,6 +27,7 @@ from typing import (
|
|||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Set,
|
||||
Tuple,
|
||||
Union,
|
||||
)
|
||||
|
@ -47,7 +48,6 @@ from synapse.api.errors import (
|
|||
FederationError,
|
||||
FederationPullAttemptBackoffError,
|
||||
HttpResponseException,
|
||||
LimitExceededError,
|
||||
NotFoundError,
|
||||
RequestSendFailed,
|
||||
SynapseError,
|
||||
|
@ -171,12 +171,29 @@ class FederationHandler:
|
|||
|
||||
self.third_party_event_rules = hs.get_third_party_event_rules()
|
||||
|
||||
# Tracks running partial state syncs by room ID.
|
||||
# Partial state syncs currently only run on the main process, so it's okay to
|
||||
# track them in-memory for now.
|
||||
self._active_partial_state_syncs: Set[str] = set()
|
||||
# Tracks partial state syncs we may want to restart.
|
||||
# A dictionary mapping room IDs to (initial destination, other destinations)
|
||||
# tuples.
|
||||
self._partial_state_syncs_maybe_needing_restart: Dict[
|
||||
str, Tuple[Optional[str], Collection[str]]
|
||||
] = {}
|
||||
# A lock guarding the partial state flag for rooms.
|
||||
# When the lock is held for a given room, no other concurrent code may
|
||||
# partial state or un-partial state the room.
|
||||
self._is_partial_state_room_linearizer = Linearizer(
|
||||
name="_is_partial_state_room_linearizer"
|
||||
)
|
||||
|
||||
# if this is the main process, fire off a background process to resume
|
||||
# any partial-state-resync operations which were in flight when we
|
||||
# were shut down.
|
||||
if not hs.config.worker.worker_app:
|
||||
run_as_background_process(
|
||||
"resume_sync_partial_state_room", self._resume_sync_partial_state_room
|
||||
"resume_sync_partial_state_room", self._resume_partial_state_room_sync
|
||||
)
|
||||
|
||||
@trace
|
||||
|
@ -587,7 +604,23 @@ class FederationHandler:
|
|||
|
||||
self._federation_event_handler.room_queues[room_id] = []
|
||||
|
||||
await self._clean_room_for_join(room_id)
|
||||
is_host_joined = await self.store.is_host_joined(room_id, self.server_name)
|
||||
|
||||
if not is_host_joined:
|
||||
# We may have old forward extremities lying around if the homeserver left
|
||||
# the room completely in the past. Clear them out.
|
||||
#
|
||||
# Note that this check-then-clear is subject to races where
|
||||
# * the homeserver is in the room and stops being in the room just after
|
||||
# the check. We won't reset the forward extremities, but that's okay,
|
||||
# since they will be almost up to date.
|
||||
# * the homeserver is not in the room and starts being in the room just
|
||||
# after the check. This can't happen, since `RoomMemberHandler` has a
|
||||
# linearizer lock which prevents concurrent remote joins into the same
|
||||
# room.
|
||||
# In short, the races either have an acceptable outcome or should be
|
||||
# impossible.
|
||||
await self._clean_room_for_join(room_id)
|
||||
|
||||
try:
|
||||
# Try the host we successfully got a response to /make_join/
|
||||
|
@ -599,94 +632,116 @@ class FederationHandler:
|
|||
except ValueError:
|
||||
pass
|
||||
|
||||
ret = await self.federation_client.send_join(
|
||||
host_list, event, room_version_obj
|
||||
)
|
||||
async with self._is_partial_state_room_linearizer.queue(room_id):
|
||||
already_partial_state_room = await self.store.is_partial_state_room(
|
||||
room_id
|
||||
)
|
||||
|
||||
event = ret.event
|
||||
origin = ret.origin
|
||||
state = ret.state
|
||||
auth_chain = ret.auth_chain
|
||||
auth_chain.sort(key=lambda e: e.depth)
|
||||
ret = await self.federation_client.send_join(
|
||||
host_list,
|
||||
event,
|
||||
room_version_obj,
|
||||
# Perform a full join when we are already in the room and it is a
|
||||
# full state room, since we are not allowed to persist a partial
|
||||
# state join event in a full state room. In the future, we could
|
||||
# optimize this by always performing a partial state join and
|
||||
# computing the state ourselves or retrieving it from the remote
|
||||
# homeserver if necessary.
|
||||
#
|
||||
# There's a race where we leave the room, then perform a full join
|
||||
# anyway. This should end up being fast anyway, since we would
|
||||
# already have the full room state and auth chain persisted.
|
||||
partial_state=not is_host_joined or already_partial_state_room,
|
||||
)
|
||||
|
||||
logger.debug("do_invite_join auth_chain: %s", auth_chain)
|
||||
logger.debug("do_invite_join state: %s", state)
|
||||
event = ret.event
|
||||
origin = ret.origin
|
||||
state = ret.state
|
||||
auth_chain = ret.auth_chain
|
||||
auth_chain.sort(key=lambda e: e.depth)
|
||||
|
||||
logger.debug("do_invite_join event: %s", event)
|
||||
logger.debug("do_invite_join auth_chain: %s", auth_chain)
|
||||
logger.debug("do_invite_join state: %s", state)
|
||||
|
||||
# if this is the first time we've joined this room, it's time to add
|
||||
# a row to `rooms` with the correct room version. If there's already a
|
||||
# row there, we should override it, since it may have been populated
|
||||
# based on an invite request which lied about the room version.
|
||||
#
|
||||
# federation_client.send_join has already checked that the room
|
||||
# version in the received create event is the same as room_version_obj,
|
||||
# so we can rely on it now.
|
||||
#
|
||||
await self.store.upsert_room_on_join(
|
||||
room_id=room_id,
|
||||
room_version=room_version_obj,
|
||||
state_events=state,
|
||||
)
|
||||
logger.debug("do_invite_join event: %s", event)
|
||||
|
||||
if ret.partial_state:
|
||||
# Mark the room as having partial state.
|
||||
# The background process is responsible for unmarking this flag,
|
||||
# even if the join fails.
|
||||
await self.store.store_partial_state_room(
|
||||
# if this is the first time we've joined this room, it's time to add
|
||||
# a row to `rooms` with the correct room version. If there's already a
|
||||
# row there, we should override it, since it may have been populated
|
||||
# based on an invite request which lied about the room version.
|
||||
#
|
||||
# federation_client.send_join has already checked that the room
|
||||
# version in the received create event is the same as room_version_obj,
|
||||
# so we can rely on it now.
|
||||
#
|
||||
await self.store.upsert_room_on_join(
|
||||
room_id=room_id,
|
||||
servers=ret.servers_in_room,
|
||||
device_lists_stream_id=self.store.get_device_stream_token(),
|
||||
joined_via=origin,
|
||||
room_version=room_version_obj,
|
||||
state_events=state,
|
||||
)
|
||||
|
||||
try:
|
||||
max_stream_id = (
|
||||
await self._federation_event_handler.process_remote_join(
|
||||
origin,
|
||||
room_id,
|
||||
auth_chain,
|
||||
state,
|
||||
event,
|
||||
room_version_obj,
|
||||
partial_state=ret.partial_state,
|
||||
)
|
||||
)
|
||||
except PartialStateConflictError as e:
|
||||
# The homeserver was already in the room and it is no longer partial
|
||||
# stated. We ought to be doing a local join instead. Turn the error into
|
||||
# a 429, as a hint to the client to try again.
|
||||
# TODO(faster_joins): `_should_perform_remote_join` suggests that we may
|
||||
# do a remote join for restricted rooms even if we have full state.
|
||||
logger.error(
|
||||
"Room %s was un-partial stated while processing remote join.",
|
||||
room_id,
|
||||
)
|
||||
raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
|
||||
else:
|
||||
# Record the join event id for future use (when we finish the full
|
||||
# join). We have to do this after persisting the event to keep foreign
|
||||
# key constraints intact.
|
||||
if ret.partial_state:
|
||||
await self.store.write_partial_state_rooms_join_event_id(
|
||||
room_id, event.event_id
|
||||
)
|
||||
finally:
|
||||
# Always kick off the background process that asynchronously fetches
|
||||
# state for the room.
|
||||
# If the join failed, the background process is responsible for
|
||||
# cleaning up — including unmarking the room as a partial state room.
|
||||
if ret.partial_state:
|
||||
# Kick off the process of asynchronously fetching the state for this
|
||||
# room.
|
||||
run_as_background_process(
|
||||
desc="sync_partial_state_room",
|
||||
func=self._sync_partial_state_room,
|
||||
initial_destination=origin,
|
||||
other_destinations=ret.servers_in_room,
|
||||
if ret.partial_state and not already_partial_state_room:
|
||||
# Mark the room as having partial state.
|
||||
# The background process is responsible for unmarking this flag,
|
||||
# even if the join fails.
|
||||
# TODO(faster_joins):
|
||||
# We may want to reset the partial state info if it's from an
|
||||
# old, failed partial state join.
|
||||
# https://github.com/matrix-org/synapse/issues/13000
|
||||
await self.store.store_partial_state_room(
|
||||
room_id=room_id,
|
||||
servers=ret.servers_in_room,
|
||||
device_lists_stream_id=self.store.get_device_stream_token(),
|
||||
joined_via=origin,
|
||||
)
|
||||
|
||||
try:
|
||||
max_stream_id = (
|
||||
await self._federation_event_handler.process_remote_join(
|
||||
origin,
|
||||
room_id,
|
||||
auth_chain,
|
||||
state,
|
||||
event,
|
||||
room_version_obj,
|
||||
partial_state=ret.partial_state,
|
||||
)
|
||||
)
|
||||
except PartialStateConflictError:
|
||||
# This should be impossible, since we hold the lock on the room's
|
||||
# partial statedness.
|
||||
logger.error(
|
||||
"Room %s was un-partial stated while processing remote join.",
|
||||
room_id,
|
||||
)
|
||||
raise
|
||||
else:
|
||||
# Record the join event id for future use (when we finish the full
|
||||
# join). We have to do this after persisting the event to keep
|
||||
# foreign key constraints intact.
|
||||
if ret.partial_state and not already_partial_state_room:
|
||||
# TODO(faster_joins):
|
||||
# We may want to reset the partial state info if it's from
|
||||
# an old, failed partial state join.
|
||||
# https://github.com/matrix-org/synapse/issues/13000
|
||||
await self.store.write_partial_state_rooms_join_event_id(
|
||||
room_id, event.event_id
|
||||
)
|
||||
finally:
|
||||
# Always kick off the background process that asynchronously fetches
|
||||
# state for the room.
|
||||
# If the join failed, the background process is responsible for
|
||||
# cleaning up — including unmarking the room as a partial state
|
||||
# room.
|
||||
if ret.partial_state:
|
||||
# Kick off the process of asynchronously fetching the state for
|
||||
# this room.
|
||||
self._start_partial_state_room_sync(
|
||||
initial_destination=origin,
|
||||
other_destinations=ret.servers_in_room,
|
||||
room_id=room_id,
|
||||
)
|
||||
|
||||
# We wait here until this instance has seen the events come down
|
||||
# replication (if we're using replication) as the below uses caches.
|
||||
await self._replication.wait_for_stream_position(
|
||||
|
@ -1660,20 +1715,100 @@ class FederationHandler:
|
|||
# well.
|
||||
return None
|
||||
|
||||
async def _resume_sync_partial_state_room(self) -> None:
|
||||
async def _resume_partial_state_room_sync(self) -> None:
|
||||
"""Resumes resyncing of all partial-state rooms after a restart."""
|
||||
assert not self.config.worker.worker_app
|
||||
|
||||
partial_state_rooms = await self.store.get_partial_state_room_resync_info()
|
||||
for room_id, resync_info in partial_state_rooms.items():
|
||||
run_as_background_process(
|
||||
desc="sync_partial_state_room",
|
||||
func=self._sync_partial_state_room,
|
||||
self._start_partial_state_room_sync(
|
||||
initial_destination=resync_info.joined_via,
|
||||
other_destinations=resync_info.servers_in_room,
|
||||
room_id=room_id,
|
||||
)
|
||||
|
||||
def _start_partial_state_room_sync(
|
||||
self,
|
||||
initial_destination: Optional[str],
|
||||
other_destinations: Collection[str],
|
||||
room_id: str,
|
||||
) -> None:
|
||||
"""Starts the background process to resync the state of a partial state room,
|
||||
if it is not already running.
|
||||
|
||||
Args:
|
||||
initial_destination: the initial homeserver to pull the state from
|
||||
other_destinations: other homeservers to try to pull the state from, if
|
||||
`initial_destination` is unavailable
|
||||
room_id: room to be resynced
|
||||
"""
|
||||
|
||||
async def _sync_partial_state_room_wrapper() -> None:
|
||||
if room_id in self._active_partial_state_syncs:
|
||||
# Another local user has joined the room while there is already a
|
||||
# partial state sync running. This implies that there is a new join
|
||||
# event to un-partial state. We might find ourselves in one of a few
|
||||
# scenarios:
|
||||
# 1. There is an existing partial state sync. The partial state sync
|
||||
# un-partial states the new join event before completing and all is
|
||||
# well.
|
||||
# 2. Before the latest join, the homeserver was no longer in the room
|
||||
# and there is an existing partial state sync from our previous
|
||||
# membership of the room. The partial state sync may have:
|
||||
# a) succeeded, but not yet terminated. The room will not be
|
||||
# un-partial stated again unless we restart the partial state
|
||||
# sync.
|
||||
# b) failed, because we were no longer in the room and remote
|
||||
# homeservers were refusing our requests, but not yet
|
||||
# terminated. After the latest join, remote homeservers may
|
||||
# start answering our requests again, so we should restart the
|
||||
# partial state sync.
|
||||
# In the cases where we would want to restart the partial state sync,
|
||||
# the room would have the partial state flag when the partial state sync
|
||||
# terminates.
|
||||
self._partial_state_syncs_maybe_needing_restart[room_id] = (
|
||||
initial_destination,
|
||||
other_destinations,
|
||||
)
|
||||
return
|
||||
|
||||
self._active_partial_state_syncs.add(room_id)
|
||||
|
||||
try:
|
||||
await self._sync_partial_state_room(
|
||||
initial_destination=initial_destination,
|
||||
other_destinations=other_destinations,
|
||||
room_id=room_id,
|
||||
)
|
||||
finally:
|
||||
# Read the room's partial state flag while we still hold the claim to
|
||||
# being the active partial state sync (so that another partial state
|
||||
# sync can't come along and mess with it under us).
|
||||
# Normally, the partial state flag will be gone. If it isn't, then we
|
||||
# may find ourselves in scenario 2a or 2b as described in the comment
|
||||
# above, where we want to restart the partial state sync.
|
||||
is_still_partial_state_room = await self.store.is_partial_state_room(
|
||||
room_id
|
||||
)
|
||||
self._active_partial_state_syncs.remove(room_id)
|
||||
|
||||
if room_id in self._partial_state_syncs_maybe_needing_restart:
|
||||
(
|
||||
restart_initial_destination,
|
||||
restart_other_destinations,
|
||||
) = self._partial_state_syncs_maybe_needing_restart.pop(room_id)
|
||||
|
||||
if is_still_partial_state_room:
|
||||
self._start_partial_state_room_sync(
|
||||
initial_destination=restart_initial_destination,
|
||||
other_destinations=restart_other_destinations,
|
||||
room_id=room_id,
|
||||
)
|
||||
|
||||
run_as_background_process(
|
||||
desc="sync_partial_state_room", func=_sync_partial_state_room_wrapper
|
||||
)
|
||||
|
||||
async def _sync_partial_state_room(
|
||||
self,
|
||||
initial_destination: Optional[str],
|
||||
|
@ -1688,6 +1823,12 @@ class FederationHandler:
|
|||
`initial_destination` is unavailable
|
||||
room_id: room to be resynced
|
||||
"""
|
||||
# Assume that we run on the main process for now.
|
||||
# TODO(faster_joins,multiple workers)
|
||||
# When moving the sync to workers, we need to ensure that
|
||||
# * `_start_partial_state_room_sync` still prevents duplicate resyncs
|
||||
# * `_is_partial_state_room_linearizer` correctly guards partial state flags
|
||||
# for rooms between the workers doing remote joins and resync.
|
||||
assert not self.config.worker.worker_app
|
||||
|
||||
# TODO(faster_joins): do we need to lock to avoid races? What happens if other
|
||||
|
@ -1725,20 +1866,19 @@ class FederationHandler:
|
|||
logger.info("Handling any pending device list updates")
|
||||
await self._device_handler.handle_room_un_partial_stated(room_id)
|
||||
|
||||
logger.info("Clearing partial-state flag for %s", room_id)
|
||||
success = await self.store.clear_partial_state_room(room_id)
|
||||
if success:
|
||||
async with self._is_partial_state_room_linearizer.queue(room_id):
|
||||
logger.info("Clearing partial-state flag for %s", room_id)
|
||||
new_stream_id = await self.store.clear_partial_state_room(room_id)
|
||||
|
||||
if new_stream_id is not None:
|
||||
logger.info("State resync complete for %s", room_id)
|
||||
self._storage_controllers.state.notify_room_un_partial_stated(
|
||||
room_id
|
||||
)
|
||||
# Poke the notifier so that other workers see the write to
|
||||
# the un-partial-stated rooms stream.
|
||||
self._notifier.notify_replication()
|
||||
|
||||
# TODO(faster_joins) update room stats and user directory?
|
||||
# https://github.com/matrix-org/synapse/issues/12814
|
||||
# https://github.com/matrix-org/synapse/issues/12815
|
||||
await self._notifier.on_un_partial_stated_room(
|
||||
room_id, new_stream_id
|
||||
)
|
||||
return
|
||||
|
||||
# we raced against more events arriving with partial state. Go round
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue