mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-08-16 12:30:35 -04:00
Merge remote-tracking branch 'upstream/release-v1.30.0'
This commit is contained in:
commit
8b753230af
102 changed files with 2693 additions and 933 deletions
|
@ -201,7 +201,7 @@ class FederationHandler(BaseHandler):
|
|||
or pdu.internal_metadata.is_outlier()
|
||||
)
|
||||
if already_seen:
|
||||
logger.debug("[%s %s]: Already seen pdu", room_id, event_id)
|
||||
logger.debug("Already seen pdu")
|
||||
return
|
||||
|
||||
# do some initial sanity-checking of the event. In particular, make
|
||||
|
@ -210,18 +210,14 @@ class FederationHandler(BaseHandler):
|
|||
try:
|
||||
self._sanity_check_event(pdu)
|
||||
except SynapseError as err:
|
||||
logger.warning(
|
||||
"[%s %s] Received event failed sanity checks", room_id, event_id
|
||||
)
|
||||
logger.warning("Received event failed sanity checks")
|
||||
raise FederationError("ERROR", err.code, err.msg, affected=pdu.event_id)
|
||||
|
||||
# If we are currently in the process of joining this room, then we
|
||||
# queue up events for later processing.
|
||||
if room_id in self.room_queues:
|
||||
logger.info(
|
||||
"[%s %s] Queuing PDU from %s for now: join in progress",
|
||||
room_id,
|
||||
event_id,
|
||||
"Queuing PDU from %s for now: join in progress",
|
||||
origin,
|
||||
)
|
||||
self.room_queues[room_id].append((pdu, origin))
|
||||
|
@ -236,9 +232,7 @@ class FederationHandler(BaseHandler):
|
|||
is_in_room = await self.auth.check_host_in_room(room_id, self.server_name)
|
||||
if not is_in_room:
|
||||
logger.info(
|
||||
"[%s %s] Ignoring PDU from %s as we're not in the room",
|
||||
room_id,
|
||||
event_id,
|
||||
"Ignoring PDU from %s as we're not in the room",
|
||||
origin,
|
||||
)
|
||||
return None
|
||||
|
@ -250,7 +244,7 @@ class FederationHandler(BaseHandler):
|
|||
# We only backfill backwards to the min depth.
|
||||
min_depth = await self.get_min_depth_for_context(pdu.room_id)
|
||||
|
||||
logger.debug("[%s %s] min_depth: %d", room_id, event_id, min_depth)
|
||||
logger.debug("min_depth: %d", min_depth)
|
||||
|
||||
prevs = set(pdu.prev_event_ids())
|
||||
seen = await self.store.have_events_in_timeline(prevs)
|
||||
|
@ -267,17 +261,13 @@ class FederationHandler(BaseHandler):
|
|||
# If we're missing stuff, ensure we only fetch stuff one
|
||||
# at a time.
|
||||
logger.info(
|
||||
"[%s %s] Acquiring room lock to fetch %d missing prev_events: %s",
|
||||
room_id,
|
||||
event_id,
|
||||
"Acquiring room lock to fetch %d missing prev_events: %s",
|
||||
len(missing_prevs),
|
||||
shortstr(missing_prevs),
|
||||
)
|
||||
with (await self._room_pdu_linearizer.queue(pdu.room_id)):
|
||||
logger.info(
|
||||
"[%s %s] Acquired room lock to fetch %d missing prev_events",
|
||||
room_id,
|
||||
event_id,
|
||||
"Acquired room lock to fetch %d missing prev_events",
|
||||
len(missing_prevs),
|
||||
)
|
||||
|
||||
|
@ -297,9 +287,7 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
if not prevs - seen:
|
||||
logger.info(
|
||||
"[%s %s] Found all missing prev_events",
|
||||
room_id,
|
||||
event_id,
|
||||
"Found all missing prev_events",
|
||||
)
|
||||
|
||||
if prevs - seen:
|
||||
|
@ -329,9 +317,7 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
if sent_to_us_directly:
|
||||
logger.warning(
|
||||
"[%s %s] Rejecting: failed to fetch %d prev events: %s",
|
||||
room_id,
|
||||
event_id,
|
||||
"Rejecting: failed to fetch %d prev events: %s",
|
||||
len(prevs - seen),
|
||||
shortstr(prevs - seen),
|
||||
)
|
||||
|
@ -367,17 +353,16 @@ class FederationHandler(BaseHandler):
|
|||
# Ask the remote server for the states we don't
|
||||
# know about
|
||||
for p in prevs - seen:
|
||||
logger.info(
|
||||
"Requesting state at missing prev_event %s",
|
||||
event_id,
|
||||
)
|
||||
logger.info("Requesting state after missing prev_event %s", p)
|
||||
|
||||
with nested_logging_context(p):
|
||||
# note that if any of the missing prevs share missing state or
|
||||
# auth events, the requests to fetch those events are deduped
|
||||
# by the get_pdu_cache in federation_client.
|
||||
(remote_state, _,) = await self._get_state_for_room(
|
||||
origin, room_id, p, include_event_in_state=True
|
||||
remote_state = (
|
||||
await self._get_state_after_missing_prev_event(
|
||||
origin, room_id, p
|
||||
)
|
||||
)
|
||||
|
||||
remote_state_map = {
|
||||
|
@ -414,10 +399,7 @@ class FederationHandler(BaseHandler):
|
|||
state = [event_map[e] for e in state_map.values()]
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"[%s %s] Error attempting to resolve state at missing "
|
||||
"prev_events",
|
||||
room_id,
|
||||
event_id,
|
||||
"Error attempting to resolve state at missing " "prev_events",
|
||||
exc_info=True,
|
||||
)
|
||||
raise FederationError(
|
||||
|
@ -454,9 +436,7 @@ class FederationHandler(BaseHandler):
|
|||
latest |= seen
|
||||
|
||||
logger.info(
|
||||
"[%s %s]: Requesting missing events between %s and %s",
|
||||
room_id,
|
||||
event_id,
|
||||
"Requesting missing events between %s and %s",
|
||||
shortstr(latest),
|
||||
event_id,
|
||||
)
|
||||
|
@ -523,15 +503,11 @@ class FederationHandler(BaseHandler):
|
|||
# We failed to get the missing events, but since we need to handle
|
||||
# the case of `get_missing_events` not returning the necessary
|
||||
# events anyway, it is safe to simply log the error and continue.
|
||||
logger.warning(
|
||||
"[%s %s]: Failed to get prev_events: %s", room_id, event_id, e
|
||||
)
|
||||
logger.warning("Failed to get prev_events: %s", e)
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"[%s %s]: Got %d prev_events: %s",
|
||||
room_id,
|
||||
event_id,
|
||||
"Got %d prev_events: %s",
|
||||
len(missing_events),
|
||||
shortstr(missing_events),
|
||||
)
|
||||
|
@ -542,9 +518,7 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
for ev in missing_events:
|
||||
logger.info(
|
||||
"[%s %s] Handling received prev_event %s",
|
||||
room_id,
|
||||
event_id,
|
||||
"Handling received prev_event %s",
|
||||
ev.event_id,
|
||||
)
|
||||
with nested_logging_context(ev.event_id):
|
||||
|
@ -553,9 +527,7 @@ class FederationHandler(BaseHandler):
|
|||
except FederationError as e:
|
||||
if e.code == 403:
|
||||
logger.warning(
|
||||
"[%s %s] Received prev_event %s failed history check.",
|
||||
room_id,
|
||||
event_id,
|
||||
"Received prev_event %s failed history check.",
|
||||
ev.event_id,
|
||||
)
|
||||
else:
|
||||
|
@ -566,7 +538,6 @@ class FederationHandler(BaseHandler):
|
|||
destination: str,
|
||||
room_id: str,
|
||||
event_id: str,
|
||||
include_event_in_state: bool = False,
|
||||
) -> Tuple[List[EventBase], List[EventBase]]:
|
||||
"""Requests all of the room state at a given event from a remote homeserver.
|
||||
|
||||
|
@ -574,11 +545,9 @@ class FederationHandler(BaseHandler):
|
|||
destination: The remote homeserver to query for the state.
|
||||
room_id: The id of the room we're interested in.
|
||||
event_id: The id of the event we want the state at.
|
||||
include_event_in_state: if true, the event itself will be included in the
|
||||
returned state event list.
|
||||
|
||||
Returns:
|
||||
A list of events in the state, possibly including the event itself, and
|
||||
A list of events in the state, not including the event itself, and
|
||||
a list of events in the auth chain for the given event.
|
||||
"""
|
||||
(
|
||||
|
@ -590,9 +559,6 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
desired_events = set(state_event_ids + auth_event_ids)
|
||||
|
||||
if include_event_in_state:
|
||||
desired_events.add(event_id)
|
||||
|
||||
event_map = await self._get_events_from_store_or_dest(
|
||||
destination, room_id, desired_events
|
||||
)
|
||||
|
@ -609,13 +575,6 @@ class FederationHandler(BaseHandler):
|
|||
event_map[e_id] for e_id in state_event_ids if e_id in event_map
|
||||
]
|
||||
|
||||
if include_event_in_state:
|
||||
remote_event = event_map.get(event_id)
|
||||
if not remote_event:
|
||||
raise Exception("Unable to get missing prev_event %s" % (event_id,))
|
||||
if remote_event.is_state() and remote_event.rejected_reason is None:
|
||||
remote_state.append(remote_event)
|
||||
|
||||
auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map]
|
||||
auth_chain.sort(key=lambda e: e.depth)
|
||||
|
||||
|
@ -689,6 +648,131 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
return fetched_events
|
||||
|
||||
async def _get_state_after_missing_prev_event(
|
||||
self,
|
||||
destination: str,
|
||||
room_id: str,
|
||||
event_id: str,
|
||||
) -> List[EventBase]:
|
||||
"""Requests all of the room state at a given event from a remote homeserver.
|
||||
|
||||
Args:
|
||||
destination: The remote homeserver to query for the state.
|
||||
room_id: The id of the room we're interested in.
|
||||
event_id: The id of the event we want the state at.
|
||||
|
||||
Returns:
|
||||
A list of events in the state, including the event itself
|
||||
"""
|
||||
# TODO: This function is basically the same as _get_state_for_room. Can
|
||||
# we make backfill() use it, rather than having two code paths? I think the
|
||||
# only difference is that backfill() persists the prev events separately.
|
||||
|
||||
(
|
||||
state_event_ids,
|
||||
auth_event_ids,
|
||||
) = await self.federation_client.get_room_state_ids(
|
||||
destination, room_id, event_id=event_id
|
||||
)
|
||||
|
||||
logger.debug(
|
||||
"state_ids returned %i state events, %i auth events",
|
||||
len(state_event_ids),
|
||||
len(auth_event_ids),
|
||||
)
|
||||
|
||||
# start by just trying to fetch the events from the store
|
||||
desired_events = set(state_event_ids)
|
||||
desired_events.add(event_id)
|
||||
logger.debug("Fetching %i events from cache/store", len(desired_events))
|
||||
fetched_events = await self.store.get_events(
|
||||
desired_events, allow_rejected=True
|
||||
)
|
||||
|
||||
missing_desired_events = desired_events - fetched_events.keys()
|
||||
logger.debug(
|
||||
"We are missing %i events (got %i)",
|
||||
len(missing_desired_events),
|
||||
len(fetched_events),
|
||||
)
|
||||
|
||||
# We probably won't need most of the auth events, so let's just check which
|
||||
# we have for now, rather than thrashing the event cache with them all
|
||||
# unnecessarily.
|
||||
|
||||
# TODO: we probably won't actually need all of the auth events, since we
|
||||
# already have a bunch of the state events. It would be nice if the
|
||||
# federation api gave us a way of finding out which we actually need.
|
||||
|
||||
missing_auth_events = set(auth_event_ids) - fetched_events.keys()
|
||||
missing_auth_events.difference_update(
|
||||
await self.store.have_seen_events(missing_auth_events)
|
||||
)
|
||||
logger.debug("We are also missing %i auth events", len(missing_auth_events))
|
||||
|
||||
missing_events = missing_desired_events | missing_auth_events
|
||||
logger.debug("Fetching %i events from remote", len(missing_events))
|
||||
await self._get_events_and_persist(
|
||||
destination=destination, room_id=room_id, events=missing_events
|
||||
)
|
||||
|
||||
# we need to make sure we re-load from the database to get the rejected
|
||||
# state correct.
|
||||
fetched_events.update(
|
||||
(await self.store.get_events(missing_desired_events, allow_rejected=True))
|
||||
)
|
||||
|
||||
# check for events which were in the wrong room.
|
||||
#
|
||||
# this can happen if a remote server claims that the state or
|
||||
# auth_events at an event in room A are actually events in room B
|
||||
|
||||
bad_events = [
|
||||
(event_id, event.room_id)
|
||||
for event_id, event in fetched_events.items()
|
||||
if event.room_id != room_id
|
||||
]
|
||||
|
||||
for bad_event_id, bad_room_id in bad_events:
|
||||
# This is a bogus situation, but since we may only discover it a long time
|
||||
# after it happened, we try our best to carry on, by just omitting the
|
||||
# bad events from the returned state set.
|
||||
logger.warning(
|
||||
"Remote server %s claims event %s in room %s is an auth/state "
|
||||
"event in room %s",
|
||||
destination,
|
||||
bad_event_id,
|
||||
bad_room_id,
|
||||
room_id,
|
||||
)
|
||||
|
||||
del fetched_events[bad_event_id]
|
||||
|
||||
# if we couldn't get the prev event in question, that's a problem.
|
||||
remote_event = fetched_events.get(event_id)
|
||||
if not remote_event:
|
||||
raise Exception("Unable to get missing prev_event %s" % (event_id,))
|
||||
|
||||
# missing state at that event is a warning, not a blocker
|
||||
# XXX: this doesn't sound right? it means that we'll end up with incomplete
|
||||
# state.
|
||||
failed_to_fetch = desired_events - fetched_events.keys()
|
||||
if failed_to_fetch:
|
||||
logger.warning(
|
||||
"Failed to fetch missing state events for %s %s",
|
||||
event_id,
|
||||
failed_to_fetch,
|
||||
)
|
||||
|
||||
remote_state = [
|
||||
fetched_events[e_id] for e_id in state_event_ids if e_id in fetched_events
|
||||
]
|
||||
|
||||
if remote_event.is_state() and remote_event.rejected_reason is None:
|
||||
remote_state.append(remote_event)
|
||||
|
||||
return remote_state
|
||||
|
||||
async def _process_received_pdu(
|
||||
self,
|
||||
origin: str,
|
||||
|
@ -707,10 +791,7 @@ class FederationHandler(BaseHandler):
|
|||
(ie, we are missing one or more prev_events), the resolved state at the
|
||||
event
|
||||
"""
|
||||
room_id = event.room_id
|
||||
event_id = event.event_id
|
||||
|
||||
logger.debug("[%s %s] Processing event: %s", room_id, event_id, event)
|
||||
logger.debug("Processing event: %s", event)
|
||||
|
||||
try:
|
||||
await self._handle_new_event(origin, event, state=state)
|
||||
|
@ -871,7 +952,6 @@ class FederationHandler(BaseHandler):
|
|||
destination=dest,
|
||||
room_id=room_id,
|
||||
event_id=e_id,
|
||||
include_event_in_state=False,
|
||||
)
|
||||
auth_events.update({a.event_id: a for a in auth})
|
||||
auth_events.update({s.event_id: s for s in state})
|
||||
|
@ -1317,7 +1397,7 @@ class FederationHandler(BaseHandler):
|
|||
async def on_event_auth(self, event_id: str) -> List[EventBase]:
|
||||
event = await self.store.get_event(event_id)
|
||||
auth = await self.store.get_auth_chain(
|
||||
list(event.auth_event_ids()), include_given=True
|
||||
event.room_id, list(event.auth_event_ids()), include_given=True
|
||||
)
|
||||
return list(auth)
|
||||
|
||||
|
@ -1580,7 +1660,7 @@ class FederationHandler(BaseHandler):
|
|||
prev_state_ids = await context.get_prev_state_ids()
|
||||
|
||||
state_ids = list(prev_state_ids.values())
|
||||
auth_chain = await self.store.get_auth_chain(state_ids)
|
||||
auth_chain = await self.store.get_auth_chain(event.room_id, state_ids)
|
||||
|
||||
state = await self.store.get_events(list(prev_state_ids.values()))
|
||||
|
||||
|
@ -2219,7 +2299,7 @@ class FederationHandler(BaseHandler):
|
|||
|
||||
# Now get the current auth_chain for the event.
|
||||
local_auth_chain = await self.store.get_auth_chain(
|
||||
list(event.auth_event_ids()), include_given=True
|
||||
room_id, list(event.auth_event_ids()), include_given=True
|
||||
)
|
||||
|
||||
# TODO: Check if we would now reject event_id. If so we need to tell
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue