mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-08-16 23:30:13 -04:00
Merge remote-tracking branch 'upstream/release-v1.58'
This commit is contained in:
commit
6669e3b670
112 changed files with 6165 additions and 1516 deletions
|
@ -1,4 +1,4 @@
|
|||
# Copyright 2014-2021 The Matrix.org Foundation C.I.C.
|
||||
# Copyright 2014-2022 The Matrix.org Foundation C.I.C.
|
||||
# Copyright 2020 Sorunome
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
|
@ -15,10 +15,14 @@
|
|||
|
||||
"""Contains handlers for federation events."""
|
||||
|
||||
import enum
|
||||
import itertools
|
||||
import logging
|
||||
from enum import Enum
|
||||
from http import HTTPStatus
|
||||
from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union
|
||||
|
||||
import attr
|
||||
from signedjson.key import decode_verify_key_bytes
|
||||
from signedjson.sign import verify_signed_json
|
||||
from unpaddedbase64 import decode_base64
|
||||
|
@ -92,6 +96,24 @@ def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
|
|||
return sorted(joined_domains.items(), key=lambda d: d[1])
|
||||
|
||||
|
||||
class _BackfillPointType(Enum):
|
||||
# a regular backwards extremity (ie, an event which we don't yet have, but which
|
||||
# is referred to by other events in the DAG)
|
||||
BACKWARDS_EXTREMITY = enum.auto()
|
||||
|
||||
# an MSC2716 "insertion event"
|
||||
INSERTION_PONT = enum.auto()
|
||||
|
||||
|
||||
@attr.s(slots=True, auto_attribs=True, frozen=True)
|
||||
class _BackfillPoint:
|
||||
"""A potential point we might backfill from"""
|
||||
|
||||
event_id: str
|
||||
depth: int
|
||||
type: _BackfillPointType
|
||||
|
||||
|
||||
class FederationHandler:
|
||||
"""Handles general incoming federation requests
|
||||
|
||||
|
@ -157,89 +179,51 @@ class FederationHandler:
|
|||
async def _maybe_backfill_inner(
|
||||
self, room_id: str, current_depth: int, limit: int
|
||||
) -> bool:
|
||||
oldest_events_with_depth = (
|
||||
await self.store.get_oldest_event_ids_with_depth_in_room(room_id)
|
||||
)
|
||||
backwards_extremities = [
|
||||
_BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY)
|
||||
for event_id, depth in await self.store.get_oldest_event_ids_with_depth_in_room(
|
||||
room_id
|
||||
)
|
||||
]
|
||||
|
||||
insertion_events_to_be_backfilled: Dict[str, int] = {}
|
||||
insertion_events_to_be_backfilled: List[_BackfillPoint] = []
|
||||
if self.hs.config.experimental.msc2716_enabled:
|
||||
insertion_events_to_be_backfilled = (
|
||||
await self.store.get_insertion_event_backward_extremities_in_room(
|
||||
insertion_events_to_be_backfilled = [
|
||||
_BackfillPoint(event_id, depth, _BackfillPointType.INSERTION_PONT)
|
||||
for event_id, depth in await self.store.get_insertion_event_backward_extremities_in_room(
|
||||
room_id
|
||||
)
|
||||
)
|
||||
]
|
||||
logger.debug(
|
||||
"_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
|
||||
oldest_events_with_depth,
|
||||
"_maybe_backfill_inner: backwards_extremities=%s insertion_events_to_be_backfilled=%s",
|
||||
backwards_extremities,
|
||||
insertion_events_to_be_backfilled,
|
||||
)
|
||||
|
||||
if not oldest_events_with_depth and not insertion_events_to_be_backfilled:
|
||||
if not backwards_extremities and not insertion_events_to_be_backfilled:
|
||||
logger.debug("Not backfilling as no extremeties found.")
|
||||
return False
|
||||
|
||||
# We only want to paginate if we can actually see the events we'll get,
|
||||
# as otherwise we'll just spend a lot of resources to get redacted
|
||||
# events.
|
||||
#
|
||||
# We do this by filtering all the backwards extremities and seeing if
|
||||
# any remain. Given we don't have the extremity events themselves, we
|
||||
# need to actually check the events that reference them.
|
||||
#
|
||||
# *Note*: the spec wants us to keep backfilling until we reach the start
|
||||
# of the room in case we are allowed to see some of the history. However
|
||||
# in practice that causes more issues than its worth, as a) its
|
||||
# relatively rare for there to be any visible history and b) even when
|
||||
# there is its often sufficiently long ago that clients would stop
|
||||
# attempting to paginate before backfill reached the visible history.
|
||||
#
|
||||
# TODO: If we do do a backfill then we should filter the backwards
|
||||
# extremities to only include those that point to visible portions of
|
||||
# history.
|
||||
#
|
||||
# TODO: Correctly handle the case where we are allowed to see the
|
||||
# forward event but not the backward extremity, e.g. in the case of
|
||||
# initial join of the server where we are allowed to see the join
|
||||
# event but not anything before it. This would require looking at the
|
||||
# state *before* the event, ignoring the special casing certain event
|
||||
# types have.
|
||||
|
||||
forward_event_ids = await self.store.get_successor_events(
|
||||
list(oldest_events_with_depth)
|
||||
# we now have a list of potential places to backpaginate from. We prefer to
|
||||
# start with the most recent (ie, max depth), so let's sort the list.
|
||||
sorted_backfill_points: List[_BackfillPoint] = sorted(
|
||||
itertools.chain(
|
||||
backwards_extremities,
|
||||
insertion_events_to_be_backfilled,
|
||||
),
|
||||
key=lambda e: -int(e.depth),
|
||||
)
|
||||
|
||||
extremities_events = await self.store.get_events(
|
||||
forward_event_ids,
|
||||
redact_behaviour=EventRedactBehaviour.AS_IS,
|
||||
get_prev_content=False,
|
||||
)
|
||||
|
||||
# We set `check_history_visibility_only` as we might otherwise get false
|
||||
# positives from users having been erased.
|
||||
filtered_extremities = await filter_events_for_server(
|
||||
self.storage,
|
||||
self.server_name,
|
||||
list(extremities_events.values()),
|
||||
redact=False,
|
||||
check_history_visibility_only=True,
|
||||
)
|
||||
logger.debug(
|
||||
"_maybe_backfill_inner: filtered_extremities %s", filtered_extremities
|
||||
"_maybe_backfill_inner: room_id: %s: current_depth: %s, limit: %s, "
|
||||
"backfill points (%d): %s",
|
||||
room_id,
|
||||
current_depth,
|
||||
limit,
|
||||
len(sorted_backfill_points),
|
||||
sorted_backfill_points,
|
||||
)
|
||||
|
||||
if not filtered_extremities and not insertion_events_to_be_backfilled:
|
||||
return False
|
||||
|
||||
extremities = {
|
||||
**oldest_events_with_depth,
|
||||
# TODO: insertion_events_to_be_backfilled is currently skipping the filtered_extremities checks
|
||||
**insertion_events_to_be_backfilled,
|
||||
}
|
||||
|
||||
# Check if we reached a point where we should start backfilling.
|
||||
sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1]))
|
||||
max_depth = sorted_extremeties_tuple[0][1]
|
||||
|
||||
# If we're approaching an extremity we trigger a backfill, otherwise we
|
||||
# no-op.
|
||||
#
|
||||
|
@ -249,6 +233,11 @@ class FederationHandler:
|
|||
# chose more than one times the limit in case of failure, but choosing a
|
||||
# much larger factor will result in triggering a backfill request much
|
||||
# earlier than necessary.
|
||||
#
|
||||
# XXX: shouldn't we do this *after* the filter by depth below? Again, we don't
|
||||
# care about events that have happened after our current position.
|
||||
#
|
||||
max_depth = sorted_backfill_points[0].depth
|
||||
if current_depth - 2 * limit > max_depth:
|
||||
logger.debug(
|
||||
"Not backfilling as we don't need to. %d < %d - 2 * %d",
|
||||
|
@ -265,31 +254,98 @@ class FederationHandler:
|
|||
# 2. we have likely previously tried and failed to backfill from that
|
||||
# extremity, so to avoid getting "stuck" requesting the same
|
||||
# backfill repeatedly we drop those extremities.
|
||||
filtered_sorted_extremeties_tuple = [
|
||||
t for t in sorted_extremeties_tuple if int(t[1]) <= current_depth
|
||||
]
|
||||
|
||||
logger.debug(
|
||||
"room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems (%d): %s filtered_sorted_extremeties_tuple: %s",
|
||||
room_id,
|
||||
current_depth,
|
||||
limit,
|
||||
max_depth,
|
||||
len(sorted_extremeties_tuple),
|
||||
sorted_extremeties_tuple,
|
||||
filtered_sorted_extremeties_tuple,
|
||||
)
|
||||
|
||||
#
|
||||
# However, we need to check that the filtered extremities are non-empty.
|
||||
# If they are empty then either we can a) bail or b) still attempt to
|
||||
# backfill. We opt to try backfilling anyway just in case we do get
|
||||
# relevant events.
|
||||
if filtered_sorted_extremeties_tuple:
|
||||
sorted_extremeties_tuple = filtered_sorted_extremeties_tuple
|
||||
#
|
||||
filtered_sorted_backfill_points = [
|
||||
t for t in sorted_backfill_points if t.depth <= current_depth
|
||||
]
|
||||
if filtered_sorted_backfill_points:
|
||||
logger.debug(
|
||||
"_maybe_backfill_inner: backfill points before current depth: %s",
|
||||
filtered_sorted_backfill_points,
|
||||
)
|
||||
sorted_backfill_points = filtered_sorted_backfill_points
|
||||
else:
|
||||
logger.debug(
|
||||
"_maybe_backfill_inner: all backfill points are *after* current depth. Backfilling anyway."
|
||||
)
|
||||
|
||||
# We don't want to specify too many extremities as it causes the backfill
|
||||
# request URI to be too long.
|
||||
extremities = dict(sorted_extremeties_tuple[:5])
|
||||
# For performance's sake, we only want to paginate from a particular extremity
|
||||
# if we can actually see the events we'll get. Otherwise, we'd just spend a lot
|
||||
# of resources to get redacted events. We check each extremity in turn and
|
||||
# ignore those which users on our server wouldn't be able to see.
|
||||
#
|
||||
# Additionally, we limit ourselves to backfilling from at most 5 extremities,
|
||||
# for two reasons:
|
||||
#
|
||||
# - The check which determines if we can see an extremity's events can be
|
||||
# expensive (we load the full state for the room at each of the backfill
|
||||
# points, or (worse) their successors)
|
||||
# - We want to avoid the server-server API request URI becoming too long.
|
||||
#
|
||||
# *Note*: the spec wants us to keep backfilling until we reach the start
|
||||
# of the room in case we are allowed to see some of the history. However,
|
||||
# in practice that causes more issues than its worth, as (a) it's
|
||||
# relatively rare for there to be any visible history and (b) even when
|
||||
# there is it's often sufficiently long ago that clients would stop
|
||||
# attempting to paginate before backfill reached the visible history.
|
||||
|
||||
extremities_to_request: List[str] = []
|
||||
for bp in sorted_backfill_points:
|
||||
if len(extremities_to_request) >= 5:
|
||||
break
|
||||
|
||||
# For regular backwards extremities, we don't have the extremity events
|
||||
# themselves, so we need to actually check the events that reference them -
|
||||
# their "successor" events.
|
||||
#
|
||||
# TODO: Correctly handle the case where we are allowed to see the
|
||||
# successor event but not the backward extremity, e.g. in the case of
|
||||
# initial join of the server where we are allowed to see the join
|
||||
# event but not anything before it. This would require looking at the
|
||||
# state *before* the event, ignoring the special casing certain event
|
||||
# types have.
|
||||
if bp.type == _BackfillPointType.INSERTION_PONT:
|
||||
event_ids_to_check = [bp.event_id]
|
||||
else:
|
||||
event_ids_to_check = await self.store.get_successor_events(bp.event_id)
|
||||
|
||||
events_to_check = await self.store.get_events_as_list(
|
||||
event_ids_to_check,
|
||||
redact_behaviour=EventRedactBehaviour.AS_IS,
|
||||
get_prev_content=False,
|
||||
)
|
||||
|
||||
# We set `check_history_visibility_only` as we might otherwise get false
|
||||
# positives from users having been erased.
|
||||
filtered_extremities = await filter_events_for_server(
|
||||
self.storage,
|
||||
self.server_name,
|
||||
events_to_check,
|
||||
redact=False,
|
||||
check_history_visibility_only=True,
|
||||
)
|
||||
if filtered_extremities:
|
||||
extremities_to_request.append(bp.event_id)
|
||||
else:
|
||||
logger.debug(
|
||||
"_maybe_backfill_inner: skipping extremity %s as it would not be visible",
|
||||
bp,
|
||||
)
|
||||
|
||||
if not extremities_to_request:
|
||||
logger.debug(
|
||||
"_maybe_backfill_inner: found no extremities which would be visible"
|
||||
)
|
||||
return False
|
||||
|
||||
logger.debug(
|
||||
"_maybe_backfill_inner: extremities_to_request %s", extremities_to_request
|
||||
)
|
||||
|
||||
# Now we need to decide which hosts to hit first.
|
||||
|
||||
|
@ -309,7 +365,7 @@ class FederationHandler:
|
|||
for dom in domains:
|
||||
try:
|
||||
await self._federation_event_handler.backfill(
|
||||
dom, room_id, limit=100, extremities=extremities
|
||||
dom, room_id, limit=100, extremities=extremities_to_request
|
||||
)
|
||||
# If this succeeded then we probably already have the
|
||||
# appropriate stuff.
|
||||
|
@ -466,6 +522,8 @@ class FederationHandler:
|
|||
)
|
||||
|
||||
if ret.partial_state:
|
||||
# TODO(faster_joins): roll this back if we don't manage to start the
|
||||
# background resync (eg process_remote_join fails)
|
||||
await self.store.store_partial_state_room(room_id, ret.servers_in_room)
|
||||
|
||||
max_stream_id = await self._federation_event_handler.process_remote_join(
|
||||
|
@ -478,6 +536,18 @@ class FederationHandler:
|
|||
partial_state=ret.partial_state,
|
||||
)
|
||||
|
||||
if ret.partial_state:
|
||||
# Kick off the process of asynchronously fetching the state for this
|
||||
# room.
|
||||
#
|
||||
# TODO(faster_joins): pick this up again on restart
|
||||
run_as_background_process(
|
||||
desc="sync_partial_state_room",
|
||||
func=self._sync_partial_state_room,
|
||||
destination=origin,
|
||||
room_id=room_id,
|
||||
)
|
||||
|
||||
# We wait here until this instance has seen the events come down
|
||||
# replication (if we're using replication) as the below uses caches.
|
||||
await self._replication.wait_for_stream_position(
|
||||
|
@ -1370,3 +1440,64 @@ class FederationHandler:
|
|||
# We fell off the bottom, couldn't get the complexity from anyone. Oh
|
||||
# well.
|
||||
return None
|
||||
|
||||
async def _sync_partial_state_room(
|
||||
self,
|
||||
destination: str,
|
||||
room_id: str,
|
||||
) -> None:
|
||||
"""Background process to resync the state of a partial-state room
|
||||
|
||||
Args:
|
||||
destination: homeserver to pull the state from
|
||||
room_id: room to be resynced
|
||||
"""
|
||||
|
||||
# TODO(faster_joins): do we need to lock to avoid races? What happens if other
|
||||
# worker processes kick off a resync in parallel? Perhaps we should just elect
|
||||
# a single worker to do the resync.
|
||||
#
|
||||
# TODO(faster_joins): what happens if we leave the room during a resync? if we
|
||||
# really leave, that might mean we have difficulty getting the room state over
|
||||
# federation.
|
||||
#
|
||||
# TODO(faster_joins): try other destinations if the one we have fails
|
||||
|
||||
logger.info("Syncing state for room %s via %s", room_id, destination)
|
||||
|
||||
# we work through the queue in order of increasing stream ordering.
|
||||
while True:
|
||||
batch = await self.store.get_partial_state_events_batch(room_id)
|
||||
if not batch:
|
||||
# all the events are updated, so we can update current state and
|
||||
# clear the lazy-loading flag.
|
||||
logger.info("Updating current state for %s", room_id)
|
||||
assert (
|
||||
self.storage.persistence is not None
|
||||
), "TODO(faster_joins): support for workers"
|
||||
await self.storage.persistence.update_current_state(room_id)
|
||||
|
||||
logger.info("Clearing partial-state flag for %s", room_id)
|
||||
success = await self.store.clear_partial_state_room(room_id)
|
||||
if success:
|
||||
logger.info("State resync complete for %s", room_id)
|
||||
|
||||
# TODO(faster_joins) update room stats and user directory?
|
||||
return
|
||||
|
||||
# we raced against more events arriving with partial state. Go round
|
||||
# the loop again. We've already logged a warning, so no need for more.
|
||||
# TODO(faster_joins): there is still a race here, whereby incoming events which raced
|
||||
# with us will fail to be persisted after the call to `clear_partial_state_room` due to
|
||||
# having partial state.
|
||||
continue
|
||||
|
||||
events = await self.store.get_events_as_list(
|
||||
batch,
|
||||
redact_behaviour=EventRedactBehaviour.AS_IS,
|
||||
allow_rejected=True,
|
||||
)
|
||||
for event in events:
|
||||
await self._federation_event_handler.update_state_for_partial_state_event(
|
||||
destination, event
|
||||
)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue