Merge remote-tracking branch 'upstream/release-v1.61'

This commit is contained in:
Tulir Asokan 2022-06-14 13:54:26 +03:00
commit 1543a3643e
235 changed files with 5079 additions and 14896 deletions

View file

@ -20,7 +20,16 @@ import itertools
import logging
from enum import Enum
from http import HTTPStatus
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union
from typing import (
TYPE_CHECKING,
Collection,
Dict,
Iterable,
List,
Optional,
Tuple,
Union,
)
import attr
from signedjson.key import decode_verify_key_bytes
@ -34,6 +43,7 @@ from synapse.api.errors import (
CodeMessageException,
Codes,
FederationDeniedError,
FederationError,
HttpResponseException,
NotFoundError,
RequestSendFailed,
@ -125,8 +135,8 @@ class FederationHandler:
self.hs = hs
self.store = hs.get_datastores().main
self.storage = hs.get_storage()
self.state_store = self.storage.state
self._storage_controllers = hs.get_storage_controllers()
self._state_storage_controller = self._storage_controllers.state
self.federation_client = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
@ -159,6 +169,14 @@ class FederationHandler:
self.third_party_event_rules = hs.get_third_party_event_rules()
# if this is the main process, fire off a background process to resume
# any partial-state-resync operations which were in flight when we
# were shut down.
if not hs.config.worker.worker_app:
run_as_background_process(
"resume_sync_partial_state_room", self._resume_sync_partial_state_room
)
async def maybe_backfill(
self, room_id: str, current_depth: int, limit: int
) -> bool:
@ -324,7 +342,7 @@ class FederationHandler:
# We set `check_history_visibility_only` as we might otherwise get false
# positives from users having been erased.
filtered_extremities = await filter_events_for_server(
self.storage,
self._storage_controllers,
self.server_name,
events_to_check,
redact=False,
@ -353,7 +371,7 @@ class FederationHandler:
# First we try hosts that are already in the room
# TODO: HEURISTIC ALERT.
curr_state = await self.state_handler.get_current_state(room_id)
curr_state = await self._storage_controllers.state.get_current_state(room_id)
curr_domains = get_domains_from_state(curr_state)
@ -460,6 +478,8 @@ class FederationHandler:
"""
# TODO: We should be able to call this on workers, but the upgrading of
# room stuff after join currently doesn't work on workers.
# TODO: Before we relax this condition, we need to allow re-syncing of
# partial room state to happen on workers.
assert self.config.worker.worker_app is None
logger.debug("Joining %s to %s", joinee, room_id)
@ -540,12 +560,11 @@ class FederationHandler:
if ret.partial_state:
# Kick off the process of asynchronously fetching the state for this
# room.
#
# TODO(faster_joins): pick this up again on restart
run_as_background_process(
desc="sync_partial_state_room",
func=self._sync_partial_state_room,
destination=origin,
initial_destination=origin,
other_destinations=ret.servers_in_room,
room_id=room_id,
)
@ -660,7 +679,7 @@ class FederationHandler:
# in the invitee's sync stream. It is stripped out for all other local users.
event.unsigned["knock_room_state"] = stripped_room_state["knock_state_events"]
context = EventContext.for_outlier(self.storage)
context = EventContext.for_outlier(self._storage_controllers)
stream_id = await self._federation_event_handler.persist_events_and_notify(
event.room_id, [(event, context)]
)
@ -731,7 +750,9 @@ class FederationHandler:
# Note that this requires the /send_join request to come back to the
# same server.
if room_version.msc3083_join_rules:
state_ids = await self.store.get_current_state_ids(room_id)
state_ids = await self._state_storage_controller.get_current_state_ids(
room_id
)
if await self._event_auth_handler.has_restricted_join_rules(
state_ids, room_version
):
@ -849,7 +870,7 @@ class FederationHandler:
)
)
context = EventContext.for_outlier(self.storage)
context = EventContext.for_outlier(self._storage_controllers)
await self._federation_event_handler.persist_events_and_notify(
event.room_id, [(event, context)]
)
@ -878,7 +899,7 @@ class FederationHandler:
await self.federation_client.send_leave(host_list, event)
context = EventContext.for_outlier(self.storage)
context = EventContext.for_outlier(self._storage_controllers)
stream_id = await self._federation_event_handler.persist_events_and_notify(
event.room_id, [(event, context)]
)
@ -1027,7 +1048,9 @@ class FederationHandler:
if event.internal_metadata.outlier:
raise NotFoundError("State not known at event %s" % (event_id,))
state_groups = await self.state_store.get_state_groups_ids(room_id, [event_id])
state_groups = await self._state_storage_controller.get_state_groups_ids(
room_id, [event_id]
)
# get_state_groups_ids should return exactly one result
assert len(state_groups) == 1
@ -1076,7 +1099,9 @@ class FederationHandler:
],
)
events = await filter_events_for_server(self.storage, origin, events)
events = await filter_events_for_server(
self._storage_controllers, origin, events
)
return events
@ -1107,7 +1132,9 @@ class FederationHandler:
if not in_room:
raise AuthError(403, "Host not in room.")
events = await filter_events_for_server(self.storage, origin, [event])
events = await filter_events_for_server(
self._storage_controllers, origin, [event]
)
event = events[0]
return event
else:
@ -1136,7 +1163,7 @@ class FederationHandler:
)
missing_events = await filter_events_for_server(
self.storage, origin, missing_events
self._storage_controllers, origin, missing_events
)
return missing_events
@ -1446,17 +1473,35 @@ class FederationHandler:
# well.
return None
async def _resume_sync_partial_state_room(self) -> None:
"""Resumes resyncing of all partial-state rooms after a restart."""
assert not self.config.worker.worker_app
partial_state_rooms = await self.store.get_partial_state_rooms_and_servers()
for room_id, servers_in_room in partial_state_rooms.items():
run_as_background_process(
desc="sync_partial_state_room",
func=self._sync_partial_state_room,
initial_destination=None,
other_destinations=servers_in_room,
room_id=room_id,
)
async def _sync_partial_state_room(
self,
destination: str,
initial_destination: Optional[str],
other_destinations: Collection[str],
room_id: str,
) -> None:
"""Background process to resync the state of a partial-state room
Args:
destination: homeserver to pull the state from
initial_destination: the initial homeserver to pull the state from
other_destinations: other homeservers to try to pull the state from, if
`initial_destination` is unavailable
room_id: room to be resynced
"""
assert not self.config.worker.worker_app
# TODO(faster_joins): do we need to lock to avoid races? What happens if other
# worker processes kick off a resync in parallel? Perhaps we should just elect
@ -1466,8 +1511,29 @@ class FederationHandler:
# really leave, that might mean we have difficulty getting the room state over
# federation.
#
# TODO(faster_joins): try other destinations if the one we have fails
# TODO(faster_joins): we need some way of prioritising which homeservers in
# `other_destinations` to try first, otherwise we'll spend ages trying dead
# homeservers for large rooms.
if initial_destination is None and len(other_destinations) == 0:
raise ValueError(
f"Cannot resync state of {room_id}: no destinations provided"
)
# Make an infinite iterator of destinations to try. Once we find a working
# destination, we'll stick with it until it flakes.
if initial_destination is not None:
# Move `initial_destination` to the front of the list.
destinations = list(other_destinations)
if initial_destination in destinations:
destinations.remove(initial_destination)
destinations = [initial_destination] + destinations
destination_iter = itertools.cycle(destinations)
else:
destination_iter = itertools.cycle(other_destinations)
# `destination` is the current remote homeserver we're pulling from.
destination = next(destination_iter)
logger.info("Syncing state for room %s via %s", room_id, destination)
# we work through the queue in order of increasing stream ordering.
@ -1478,14 +1544,19 @@ class FederationHandler:
# clear the lazy-loading flag.
logger.info("Updating current state for %s", room_id)
assert (
self.storage.persistence is not None
self._storage_controllers.persistence is not None
), "TODO(faster_joins): support for workers"
await self.storage.persistence.update_current_state(room_id)
await self._storage_controllers.persistence.update_current_state(
room_id
)
logger.info("Clearing partial-state flag for %s", room_id)
success = await self.store.clear_partial_state_room(room_id)
if success:
logger.info("State resync complete for %s", room_id)
self._storage_controllers.state.notify_room_un_partial_stated(
room_id
)
# TODO(faster_joins) update room stats and user directory?
return
@ -1503,6 +1574,41 @@ class FederationHandler:
allow_rejected=True,
)
for event in events:
await self._federation_event_handler.update_state_for_partial_state_event(
destination, event
)
for attempt in itertools.count():
try:
await self._federation_event_handler.update_state_for_partial_state_event(
destination, event
)
break
except FederationError as e:
if attempt == len(destinations) - 1:
# We have tried every remote server for this event. Give up.
# TODO(faster_joins) giving up isn't the right thing to do
# if there's a temporary network outage. retrying
# indefinitely is also not the right thing to do if we can
# reach all homeservers and they all claim they don't have
# the state we want.
logger.error(
"Failed to get state for %s at %s from %s because %s, "
"giving up!",
room_id,
event,
destination,
e,
)
raise
# Try the next remote server.
logger.info(
"Failed to get state for %s at %s from %s because %s",
room_id,
event,
destination,
e,
)
destination = next(destination_iter)
logger.info(
"Syncing state for room %s via %s instead",
room_id,
destination,
)