mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-08-16 18:50:30 -04:00
Merge remote-tracking branch 'upstream/release-v1.69'
This commit is contained in:
commit
4b94513ae4
191 changed files with 10356 additions and 2903 deletions
|
@ -38,7 +38,7 @@ from signedjson.sign import verify_signed_json
|
|||
from unpaddedbase64 import decode_base64
|
||||
|
||||
from synapse import event_auth
|
||||
from synapse.api.constants import EventContentFields, EventTypes, Membership
|
||||
from synapse.api.constants import MAX_DEPTH, EventContentFields, EventTypes, Membership
|
||||
from synapse.api.errors import (
|
||||
AuthError,
|
||||
CodeMessageException,
|
||||
|
@ -149,6 +149,8 @@ class FederationHandler:
|
|||
self.http_client = hs.get_proxied_blacklisted_http_client()
|
||||
self._replication = hs.get_replication_data_handler()
|
||||
self._federation_event_handler = hs.get_federation_event_handler()
|
||||
self._device_handler = hs.get_device_handler()
|
||||
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
|
||||
|
||||
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
|
||||
hs
|
||||
|
@ -209,7 +211,7 @@ class FederationHandler:
|
|||
current_depth: int,
|
||||
limit: int,
|
||||
*,
|
||||
processing_start_time: int,
|
||||
processing_start_time: Optional[int],
|
||||
) -> bool:
|
||||
"""
|
||||
Checks whether the `current_depth` is at or approaching any backfill
|
||||
|
@ -221,13 +223,22 @@ class FederationHandler:
|
|||
room_id: The room to backfill in.
|
||||
current_depth: The depth to check at for any upcoming backfill points.
|
||||
limit: The max number of events to request from the remote federated server.
|
||||
processing_start_time: The time when `maybe_backfill` started
|
||||
processing. Only used for timing.
|
||||
processing_start_time: The time when `maybe_backfill` started processing.
|
||||
Only used for timing. If `None`, no timing observation will be made.
|
||||
"""
|
||||
backwards_extremities = [
|
||||
_BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY)
|
||||
for event_id, depth in await self.store.get_oldest_event_ids_with_depth_in_room(
|
||||
room_id
|
||||
for event_id, depth in await self.store.get_backfill_points_in_room(
|
||||
room_id=room_id,
|
||||
current_depth=current_depth,
|
||||
# We only need to end up with 5 extremities combined with the
|
||||
# insertion event extremities to make the `/backfill` request
|
||||
# but fetch an order of magnitude more to make sure there is
|
||||
# enough even after we filter them by whether visible in the
|
||||
# history. This isn't fool-proof as all backfill points within
|
||||
# our limit could be filtered out but seems like a good amount
|
||||
# to try with at least.
|
||||
limit=50,
|
||||
)
|
||||
]
|
||||
|
||||
|
@ -236,7 +247,12 @@ class FederationHandler:
|
|||
insertion_events_to_be_backfilled = [
|
||||
_BackfillPoint(event_id, depth, _BackfillPointType.INSERTION_PONT)
|
||||
for event_id, depth in await self.store.get_insertion_event_backward_extremities_in_room(
|
||||
room_id
|
||||
room_id=room_id,
|
||||
current_depth=current_depth,
|
||||
# We only need to end up with 5 extremities combined with
|
||||
# the backfill points to make the `/backfill` request ...
|
||||
# (see the other comment above for more context).
|
||||
limit=50,
|
||||
)
|
||||
]
|
||||
logger.debug(
|
||||
|
@ -245,10 +261,6 @@ class FederationHandler:
|
|||
insertion_events_to_be_backfilled,
|
||||
)
|
||||
|
||||
if not backwards_extremities and not insertion_events_to_be_backfilled:
|
||||
logger.debug("Not backfilling as no extremeties found.")
|
||||
return False
|
||||
|
||||
# we now have a list of potential places to backpaginate from. We prefer to
|
||||
# start with the most recent (ie, max depth), so let's sort the list.
|
||||
sorted_backfill_points: List[_BackfillPoint] = sorted(
|
||||
|
@ -269,6 +281,33 @@ class FederationHandler:
|
|||
sorted_backfill_points,
|
||||
)
|
||||
|
||||
# If we have no backfill points lower than the `current_depth` then
|
||||
# either we can a) bail or b) still attempt to backfill. We opt to try
|
||||
# backfilling anyway just in case we do get relevant events.
|
||||
if not sorted_backfill_points and current_depth != MAX_DEPTH:
|
||||
logger.debug(
|
||||
"_maybe_backfill_inner: all backfill points are *after* current depth. Trying again with later backfill points."
|
||||
)
|
||||
return await self._maybe_backfill_inner(
|
||||
room_id=room_id,
|
||||
# We use `MAX_DEPTH` so that we find all backfill points next
|
||||
# time (all events are below the `MAX_DEPTH`)
|
||||
current_depth=MAX_DEPTH,
|
||||
limit=limit,
|
||||
# We don't want to start another timing observation from this
|
||||
# nested recursive call. The top-most call can record the time
|
||||
# overall otherwise the smaller one will throw off the results.
|
||||
processing_start_time=None,
|
||||
)
|
||||
|
||||
# Even after recursing with `MAX_DEPTH`, we didn't find any
|
||||
# backward extremities to backfill from.
|
||||
if not sorted_backfill_points:
|
||||
logger.debug(
|
||||
"_maybe_backfill_inner: Not backfilling as no backward extremeties found."
|
||||
)
|
||||
return False
|
||||
|
||||
# If we're approaching an extremity we trigger a backfill, otherwise we
|
||||
# no-op.
|
||||
#
|
||||
|
@ -278,47 +317,16 @@ class FederationHandler:
|
|||
# chose more than one times the limit in case of failure, but choosing a
|
||||
# much larger factor will result in triggering a backfill request much
|
||||
# earlier than necessary.
|
||||
#
|
||||
# XXX: shouldn't we do this *after* the filter by depth below? Again, we don't
|
||||
# care about events that have happened after our current position.
|
||||
#
|
||||
max_depth = sorted_backfill_points[0].depth
|
||||
if current_depth - 2 * limit > max_depth:
|
||||
max_depth_of_backfill_points = sorted_backfill_points[0].depth
|
||||
if current_depth - 2 * limit > max_depth_of_backfill_points:
|
||||
logger.debug(
|
||||
"Not backfilling as we don't need to. %d < %d - 2 * %d",
|
||||
max_depth,
|
||||
max_depth_of_backfill_points,
|
||||
current_depth,
|
||||
limit,
|
||||
)
|
||||
return False
|
||||
|
||||
# We ignore extremities that have a greater depth than our current depth
|
||||
# as:
|
||||
# 1. we don't really care about getting events that have happened
|
||||
# after our current position; and
|
||||
# 2. we have likely previously tried and failed to backfill from that
|
||||
# extremity, so to avoid getting "stuck" requesting the same
|
||||
# backfill repeatedly we drop those extremities.
|
||||
#
|
||||
# However, we need to check that the filtered extremities are non-empty.
|
||||
# If they are empty then either we can a) bail or b) still attempt to
|
||||
# backfill. We opt to try backfilling anyway just in case we do get
|
||||
# relevant events.
|
||||
#
|
||||
filtered_sorted_backfill_points = [
|
||||
t for t in sorted_backfill_points if t.depth <= current_depth
|
||||
]
|
||||
if filtered_sorted_backfill_points:
|
||||
logger.debug(
|
||||
"_maybe_backfill_inner: backfill points before current depth: %s",
|
||||
filtered_sorted_backfill_points,
|
||||
)
|
||||
sorted_backfill_points = filtered_sorted_backfill_points
|
||||
else:
|
||||
logger.debug(
|
||||
"_maybe_backfill_inner: all backfill points are *after* current depth. Backfilling anyway."
|
||||
)
|
||||
|
||||
# For performance's sake, we only want to paginate from a particular extremity
|
||||
# if we can actually see the events we'll get. Otherwise, we'd just spend a lot
|
||||
# of resources to get redacted events. We check each extremity in turn and
|
||||
|
@ -404,11 +412,22 @@ class FederationHandler:
|
|||
# First we try hosts that are already in the room.
|
||||
# TODO: HEURISTIC ALERT.
|
||||
likely_domains = (
|
||||
await self._storage_controllers.state.get_current_hosts_in_room(room_id)
|
||||
await self._storage_controllers.state.get_current_hosts_in_room_ordered(
|
||||
room_id
|
||||
)
|
||||
)
|
||||
|
||||
async def try_backfill(domains: Collection[str]) -> bool:
|
||||
# TODO: Should we try multiple of these at a time?
|
||||
|
||||
# Number of contacted remote homeservers that have denied our backfill
|
||||
# request with a 4xx code.
|
||||
denied_count = 0
|
||||
|
||||
# Maximum number of contacted remote homeservers that can deny our
|
||||
# backfill request with 4xx codes before we give up.
|
||||
max_denied_count = 5
|
||||
|
||||
for dom in domains:
|
||||
# We don't want to ask our own server for information we don't have
|
||||
if dom == self.server_name:
|
||||
|
@ -427,13 +446,33 @@ class FederationHandler:
|
|||
continue
|
||||
except HttpResponseException as e:
|
||||
if 400 <= e.code < 500:
|
||||
raise e.to_synapse_error()
|
||||
logger.warning(
|
||||
"Backfill denied from %s because %s [%d/%d]",
|
||||
dom,
|
||||
e,
|
||||
denied_count,
|
||||
max_denied_count,
|
||||
)
|
||||
denied_count += 1
|
||||
if denied_count >= max_denied_count:
|
||||
return False
|
||||
continue
|
||||
|
||||
logger.info("Failed to backfill from %s because %s", dom, e)
|
||||
continue
|
||||
except CodeMessageException as e:
|
||||
if 400 <= e.code < 500:
|
||||
raise
|
||||
logger.warning(
|
||||
"Backfill denied from %s because %s [%d/%d]",
|
||||
dom,
|
||||
e,
|
||||
denied_count,
|
||||
max_denied_count,
|
||||
)
|
||||
denied_count += 1
|
||||
if denied_count >= max_denied_count:
|
||||
return False
|
||||
continue
|
||||
|
||||
logger.info("Failed to backfill from %s because %s", dom, e)
|
||||
continue
|
||||
|
@ -452,10 +491,15 @@ class FederationHandler:
|
|||
|
||||
return False
|
||||
|
||||
processing_end_time = self.clock.time_msec()
|
||||
backfill_processing_before_timer.observe(
|
||||
(processing_end_time - processing_start_time) / 1000
|
||||
)
|
||||
# If we have the `processing_start_time`, then we can make an
|
||||
# observation. We wouldn't have the `processing_start_time` in the case
|
||||
# where `_maybe_backfill_inner` is recursively called to find any
|
||||
# backfill points regardless of `current_depth`.
|
||||
if processing_start_time is not None:
|
||||
processing_end_time = self.clock.time_msec()
|
||||
backfill_processing_before_timer.observe(
|
||||
(processing_end_time - processing_start_time) / 1000
|
||||
)
|
||||
|
||||
success = await try_backfill(likely_domains)
|
||||
if success:
|
||||
|
@ -583,7 +627,11 @@ class FederationHandler:
|
|||
# Mark the room as having partial state.
|
||||
# The background process is responsible for unmarking this flag,
|
||||
# even if the join fails.
|
||||
await self.store.store_partial_state_room(room_id, ret.servers_in_room)
|
||||
await self.store.store_partial_state_room(
|
||||
room_id=room_id,
|
||||
servers=ret.servers_in_room,
|
||||
device_lists_stream_id=self.store.get_device_stream_token(),
|
||||
)
|
||||
|
||||
try:
|
||||
max_stream_id = (
|
||||
|
@ -608,6 +656,14 @@ class FederationHandler:
|
|||
room_id,
|
||||
)
|
||||
raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
|
||||
else:
|
||||
# Record the join event id for future use (when we finish the full
|
||||
# join). We have to do this after persisting the event to keep foreign
|
||||
# key constraints intact.
|
||||
if ret.partial_state:
|
||||
await self.store.write_partial_state_rooms_join_event_id(
|
||||
room_id, event.event_id
|
||||
)
|
||||
finally:
|
||||
# Always kick off the background process that asynchronously fetches
|
||||
# state for the room.
|
||||
|
@ -804,7 +860,7 @@ class FederationHandler:
|
|||
)
|
||||
|
||||
# now check that we are *still* in the room
|
||||
is_in_room = await self._event_auth_handler.check_host_in_room(
|
||||
is_in_room = await self._event_auth_handler.is_host_in_room(
|
||||
room_id, self.server_name
|
||||
)
|
||||
if not is_in_room:
|
||||
|
@ -946,9 +1002,15 @@ class FederationHandler:
|
|||
)
|
||||
|
||||
context = EventContext.for_outlier(self._storage_controllers)
|
||||
await self._federation_event_handler.persist_events_and_notify(
|
||||
event.room_id, [(event, context)]
|
||||
)
|
||||
|
||||
await self._bulk_push_rule_evaluator.action_for_event_by_user(event, context)
|
||||
try:
|
||||
await self._federation_event_handler.persist_events_and_notify(
|
||||
event.room_id, [(event, context)]
|
||||
)
|
||||
except Exception:
|
||||
await self.store.remove_push_actions_from_staging(event.event_id)
|
||||
raise
|
||||
|
||||
return event
|
||||
|
||||
|
@ -1150,9 +1212,7 @@ class FederationHandler:
|
|||
async def on_backfill_request(
|
||||
self, origin: str, room_id: str, pdu_list: List[str], limit: int
|
||||
) -> List[EventBase]:
|
||||
in_room = await self._event_auth_handler.check_host_in_room(room_id, origin)
|
||||
if not in_room:
|
||||
raise AuthError(403, "Host not in room.")
|
||||
await self._event_auth_handler.assert_host_in_room(room_id, origin)
|
||||
|
||||
# Synapse asks for 100 events per backfill request. Do not allow more.
|
||||
limit = min(limit, 100)
|
||||
|
@ -1198,21 +1258,17 @@ class FederationHandler:
|
|||
event_id, allow_none=True, allow_rejected=True
|
||||
)
|
||||
|
||||
if event:
|
||||
in_room = await self._event_auth_handler.check_host_in_room(
|
||||
event.room_id, origin
|
||||
)
|
||||
if not in_room:
|
||||
raise AuthError(403, "Host not in room.")
|
||||
|
||||
events = await filter_events_for_server(
|
||||
self._storage_controllers, origin, [event]
|
||||
)
|
||||
event = events[0]
|
||||
return event
|
||||
else:
|
||||
if not event:
|
||||
return None
|
||||
|
||||
await self._event_auth_handler.assert_host_in_room(event.room_id, origin)
|
||||
|
||||
events = await filter_events_for_server(
|
||||
self._storage_controllers, origin, [event]
|
||||
)
|
||||
event = events[0]
|
||||
return event
|
||||
|
||||
async def on_get_missing_events(
|
||||
self,
|
||||
origin: str,
|
||||
|
@ -1221,9 +1277,7 @@ class FederationHandler:
|
|||
latest_events: List[str],
|
||||
limit: int,
|
||||
) -> List[EventBase]:
|
||||
in_room = await self._event_auth_handler.check_host_in_room(room_id, origin)
|
||||
if not in_room:
|
||||
raise AuthError(403, "Host not in room.")
|
||||
await self._event_auth_handler.assert_host_in_room(room_id, origin)
|
||||
|
||||
# Only allow up to 20 events to be retrieved per request.
|
||||
limit = min(limit, 20)
|
||||
|
@ -1257,7 +1311,7 @@ class FederationHandler:
|
|||
"state_key": target_user_id,
|
||||
}
|
||||
|
||||
if await self._event_auth_handler.check_host_in_room(room_id, self.hs.hostname):
|
||||
if await self._event_auth_handler.is_host_in_room(room_id, self.hs.hostname):
|
||||
room_version_obj = await self.store.get_room_version(room_id)
|
||||
builder = self.event_builder_factory.for_room_version(
|
||||
room_version_obj, event_dict
|
||||
|
@ -1622,6 +1676,9 @@ class FederationHandler:
|
|||
# https://github.com/matrix-org/synapse/issues/12994
|
||||
await self.state_handler.update_current_state(room_id)
|
||||
|
||||
logger.info("Handling any pending device list updates")
|
||||
await self._device_handler.handle_room_un_partial_stated(room_id)
|
||||
|
||||
logger.info("Clearing partial-state flag for %s", room_id)
|
||||
success = await self.store.clear_partial_state_room(room_id)
|
||||
if success:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue