Refactor on_receive_pdu code (#10615)

* drop room pdu linearizer sooner

No point holding onto it while we recheck the db

* move out `missing_prevs` calculation

we're going to need `missing_prevs` whatever we do, so we may as well calculate
it eagerly and just update it if it gets outdated.

* Add another `if missing_prevs` condition

this should be a no-op, since all the code inside the block already checks `if
missing_prevs`

* reorder if conditions

This shouldn't change the logic at all.

* Push down `min_depth` read

No point reading it from the database unless we're going to use it.

* Collect the sent_to_us_directly code together

Move the remaining `sent_to_us_directly` code inside the `if
sent_to_us_directly` block.

* Properly separate the `not sent_to_us_directly` branch

Since the only way this second block is now reachable is if we
*didn't* go into the `sent_to_us_directly` branch, we can replace it with a
simple `else`.

* changelog
This commit is contained in:
Richard van der Hoff 2021-08-18 12:36:22 +01:00 committed by GitHub
parent 6a5f8fbcda
commit 964f29cb6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 134 additions and 130 deletions

1
changelog.d/10615.misc Normal file
View File

@ -0,0 +1 @@
Clean up some of the federation event authentication code for clarity.

View File

@ -285,169 +285,172 @@ class FederationHandler(BaseHandler):
# - Fetching any missing prev events to fill in gaps in the graph # - Fetching any missing prev events to fill in gaps in the graph
# - Fetching state if we have a hole in the graph # - Fetching state if we have a hole in the graph
if not pdu.internal_metadata.is_outlier(): if not pdu.internal_metadata.is_outlier():
# We only backfill backwards to the min depth.
min_depth = await self.get_min_depth_for_context(pdu.room_id)
logger.debug("min_depth: %d", min_depth)
prevs = set(pdu.prev_event_ids()) prevs = set(pdu.prev_event_ids())
seen = await self.store.have_events_in_timeline(prevs) seen = await self.store.have_events_in_timeline(prevs)
missing_prevs = prevs - seen
if min_depth is not None and pdu.depth > min_depth: if missing_prevs:
missing_prevs = prevs - seen if sent_to_us_directly:
if sent_to_us_directly and missing_prevs: # We only backfill backwards to the min depth.
# If we're missing stuff, ensure we only fetch stuff one min_depth = await self.get_min_depth_for_context(pdu.room_id)
# at a time. logger.debug("min_depth: %d", min_depth)
logger.info(
"Acquiring room lock to fetch %d missing prev_events: %s", if min_depth is not None and pdu.depth > min_depth:
len(missing_prevs), # If we're missing stuff, ensure we only fetch stuff one
shortstr(missing_prevs), # at a time.
)
with (await self._room_pdu_linearizer.queue(pdu.room_id)):
logger.info( logger.info(
"Acquired room lock to fetch %d missing prev_events", "Acquiring room lock to fetch %d missing prev_events: %s",
len(missing_prevs), len(missing_prevs),
shortstr(missing_prevs),
) )
with (await self._room_pdu_linearizer.queue(pdu.room_id)):
try: logger.info(
await self._get_missing_events_for_pdu( "Acquired room lock to fetch %d missing prev_events",
origin, pdu, prevs, min_depth len(missing_prevs),
) )
except Exception as e:
raise Exception( try:
"Error fetching missing prev_events for %s: %s" await self._get_missing_events_for_pdu(
% (event_id, e) origin, pdu, prevs, min_depth
) from e )
except Exception as e:
raise Exception(
"Error fetching missing prev_events for %s: %s"
% (event_id, e)
) from e
# Update the set of things we've seen after trying to # Update the set of things we've seen after trying to
# fetch the missing stuff # fetch the missing stuff
seen = await self.store.have_events_in_timeline(prevs) seen = await self.store.have_events_in_timeline(prevs)
missing_prevs = prevs - seen
if not prevs - seen: if not missing_prevs:
logger.info( logger.info("Found all missing prev_events")
"Found all missing prev_events",
)
missing_prevs = prevs - seen if missing_prevs:
if missing_prevs: # since this event was pushed to us, it is possible for it to
# We've still not been able to get all of the prev_events for this event. # become the only forward-extremity in the room, and we would then
# # trust its state to be the state for the whole room. This is very
# In this case, we need to fall back to asking another server in the # bad. Further, if the event was pushed to us, there is no excuse
# federation for the state at this event. That's ok provided we then # for us not to have all the prev_events. (XXX: apart from
# resolve the state against other bits of the DAG before using it (which # min_depth?)
# will ensure that you can't just take over a room by sending an event, #
# withholding its prev_events, and declaring yourself to be an admin in # We therefore reject any such events.
# the subsequent state request). logger.warning(
# "Rejecting: failed to fetch %d prev events: %s",
# Now, if we're pulling this event as a missing prev_event, then clearly len(missing_prevs),
# this event is not going to become the only forward-extremity and we are shortstr(missing_prevs),
# guaranteed to resolve its state against our existing forward )
# extremities, so that should be fine. raise FederationError(
# "ERROR",
# On the other hand, if this event was pushed to us, it is possible for 403,
# it to become the only forward-extremity in the room, and we would then (
# trust its state to be the state for the whole room. This is very bad. "Your server isn't divulging details about prev_events "
# Further, if the event was pushed to us, there is no excuse for us not to "referenced in this event."
# have all the prev_events. We therefore reject any such events. ),
# affected=pdu.event_id,
# XXX this really feels like it could/should be merged with the above, )
# but there is an interaction with min_depth that I'm not really
# following.
if sent_to_us_directly: else:
logger.warning( # We don't have all of the prev_events for this event.
"Rejecting: failed to fetch %d prev events: %s", #
len(missing_prevs), # In this case, we need to fall back to asking another server in the
# federation for the state at this event. That's ok provided we then
# resolve the state against other bits of the DAG before using it (which
# will ensure that you can't just take over a room by sending an event,
# withholding its prev_events, and declaring yourself to be an admin in
# the subsequent state request).
#
# Since we're pulling this event as a missing prev_event, then clearly
# this event is not going to become the only forward-extremity and we are
# guaranteed to resolve its state against our existing forward
# extremities, so that should be fine.
#
# XXX this really feels like it could/should be merged with the above,
# but there is an interaction with min_depth that I'm not really
# following.
logger.info(
"Event %s is missing prev_events %s: calculating state for a "
"backwards extremity",
event_id,
shortstr(missing_prevs), shortstr(missing_prevs),
) )
raise FederationError(
"ERROR",
403,
(
"Your server isn't divulging details about prev_events "
"referenced in this event."
),
affected=pdu.event_id,
)
logger.info( # Calculate the state after each of the previous events, and
"Event %s is missing prev_events %s: calculating state for a " # resolve them to find the correct state at the current event.
"backwards extremity", event_map = {event_id: pdu}
event_id, try:
shortstr(missing_prevs), # Get the state of the events we know about
) ours = await self.state_store.get_state_groups_ids(
room_id, seen
)
# Calculate the state after each of the previous events, and # state_maps is a list of mappings from (type, state_key) to event_id
# resolve them to find the correct state at the current event. state_maps: List[StateMap[str]] = list(ours.values())
event_map = {event_id: pdu}
try:
# Get the state of the events we know about
ours = await self.state_store.get_state_groups_ids(room_id, seen)
# state_maps is a list of mappings from (type, state_key) to event_id # we don't need this any more, let's delete it.
state_maps: List[StateMap[str]] = list(ours.values()) del ours
# we don't need this any more, let's delete it. # Ask the remote server for the states we don't
del ours # know about
for p in missing_prevs:
# Ask the remote server for the states we don't logger.info(
# know about "Requesting state after missing prev_event %s", p
for p in missing_prevs:
logger.info("Requesting state after missing prev_event %s", p)
with nested_logging_context(p):
# note that if any of the missing prevs share missing state or
# auth events, the requests to fetch those events are deduped
# by the get_pdu_cache in federation_client.
remote_state = (
await self._get_state_after_missing_prev_event(
origin, room_id, p
)
) )
remote_state_map = { with nested_logging_context(p):
(x.type, x.state_key): x.event_id for x in remote_state # note that if any of the missing prevs share missing state or
} # auth events, the requests to fetch those events are deduped
state_maps.append(remote_state_map) # by the get_pdu_cache in federation_client.
remote_state = (
await self._get_state_after_missing_prev_event(
origin, room_id, p
)
)
for x in remote_state: remote_state_map = {
event_map[x.event_id] = x (x.type, x.state_key): x.event_id
for x in remote_state
}
state_maps.append(remote_state_map)
room_version = await self.store.get_room_version_id(room_id) for x in remote_state:
state_map = ( event_map[x.event_id] = x
await self._state_resolution_handler.resolve_events_with_store(
room_version = await self.store.get_room_version_id(room_id)
state_map = await self._state_resolution_handler.resolve_events_with_store(
room_id, room_id,
room_version, room_version,
state_maps, state_maps,
event_map, event_map,
state_res_store=StateResolutionStore(self.store), state_res_store=StateResolutionStore(self.store),
) )
)
# We need to give _process_received_pdu the actual state events # We need to give _process_received_pdu the actual state events
# rather than event ids, so generate that now. # rather than event ids, so generate that now.
# First though we need to fetch all the events that are in # First though we need to fetch all the events that are in
# state_map, so we can build up the state below. # state_map, so we can build up the state below.
evs = await self.store.get_events( evs = await self.store.get_events(
list(state_map.values()), list(state_map.values()),
get_prev_content=False, get_prev_content=False,
redact_behaviour=EventRedactBehaviour.AS_IS, redact_behaviour=EventRedactBehaviour.AS_IS,
) )
event_map.update(evs) event_map.update(evs)
state = [event_map[e] for e in state_map.values()] state = [event_map[e] for e in state_map.values()]
except Exception: except Exception:
logger.warning( logger.warning(
"Error attempting to resolve state at missing " "prev_events", "Error attempting to resolve state at missing "
exc_info=True, "prev_events",
) exc_info=True,
raise FederationError( )
"ERROR", raise FederationError(
403, "ERROR",
"We can't get valid state history.", 403,
affected=event_id, "We can't get valid state history.",
) affected=event_id,
)
# A second round of checks for all events. Check that the event passes auth # A second round of checks for all events. Check that the event passes auth
# based on `auth_events`, this allows us to assert that the event would # based on `auth_events`, this allows us to assert that the event would