Split up SyncHandler.compute_state_delta (#16929)

This is a huge method, which melts my brain.

This is a non-functional change which lays some groundwork for future
work in this area.
This commit is contained in:
Richard van der Hoff 2024-03-14 17:18:48 +00:00 committed by GitHub
parent 1198f649ea
commit 6d5bafb2c8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 238 additions and 145 deletions

2
changelog.d/16929.misc Normal file
View File

@ -0,0 +1,2 @@
Refactor state delta calculation in `/sync` handler.

View File

@ -1014,30 +1014,6 @@ class SyncHandler:
if event.is_state(): if event.is_state():
timeline_state[(event.type, event.state_key)] = event.event_id timeline_state[(event.type, event.state_key)] = event.event_id
if full_state:
# always make sure we LL ourselves so we know we're in the room
# (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
# We only need apply this on full state syncs given we disabled
# LL for incr syncs in https://github.com/matrix-org/synapse/pull/3840.
# We don't insert ourselves into `members_to_fetch`, because in some
# rare cases (an empty event batch with a now_token after the user's
# leave in a partial state room which another local user has
# joined), the room state will be missing our membership and there
# is no guarantee that our membership will be in the auth events of
# timeline events when the room is partial stated.
state_filter = StateFilter.from_lazy_load_member_list(
members_to_fetch.union((sync_config.user.to_string(),))
)
else:
state_filter = StateFilter.from_lazy_load_member_list(
members_to_fetch
)
# We are happy to use partial state to compute the `/sync` response.
# Since partial state may not include the lazy-loaded memberships we
# require, we fix up the state response afterwards with memberships from
# auth events.
await_full_state = False
else: else:
timeline_state = { timeline_state = {
(event.type, event.state_key): event.event_id (event.type, event.state_key): event.event_id
@ -1045,9 +1021,6 @@ class SyncHandler:
if event.is_state() if event.is_state()
} }
state_filter = StateFilter.all()
await_full_state = True
# Now calculate the state to return in the sync response for the room. # Now calculate the state to return in the sync response for the room.
# This is more or less the change in state between the end of the previous # This is more or less the change in state between the end of the previous
# sync's timeline and the start of the current sync's timeline. # sync's timeline and the start of the current sync's timeline.
@ -1057,132 +1030,29 @@ class SyncHandler:
# whether the room is partial stated *before* fetching it. # whether the room is partial stated *before* fetching it.
is_partial_state_room = await self.store.is_partial_state_room(room_id) is_partial_state_room = await self.store.is_partial_state_room(room_id)
if full_state: if full_state:
if batch: state_ids = await self._compute_state_delta_for_full_sync(
state_at_timeline_end = ( room_id,
await self._state_storage_controller.get_state_ids_for_event( sync_config.user,
batch.events[-1].event_id, batch,
state_filter=state_filter, now_token,
await_full_state=await_full_state, members_to_fetch,
) timeline_state,
)
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
else:
state_at_timeline_end = await self.get_state_at(
room_id,
stream_position=now_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
state_at_timeline_start = state_at_timeline_end
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
timeline_end=state_at_timeline_end,
previous_timeline_end={},
lazy_load_members=lazy_load_members,
) )
elif batch.limited: else:
if batch:
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_start = await self.get_state_at(
room_id,
stream_position=now_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
# for now, we disable LL for gappy syncs - see
# https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
# N.B. this slows down incr syncs as we are now processing way
# more state in the server than if we were LLing.
#
# We still have to filter timeline_start to LL entries (above) in order
# for _calculate_state's LL logic to work, as we have to include LL
# members for timeline senders in case they weren't loaded in the initial
# sync. We do this by (counterintuitively) by filtering timeline_start
# members to just be ones which were timeline senders, which then ensures
# all of the rest get included in the state block (if we need to know
# about them).
state_filter = StateFilter.all()
# If this is an initial sync then full_state should be set, and # If this is an initial sync then full_state should be set, and
# that case is handled above. We assert here to ensure that this # that case is handled above. We assert here to ensure that this
# is indeed the case. # is indeed the case.
assert since_token is not None assert since_token is not None
state_at_previous_sync = await self.get_state_at(
state_ids = await self._compute_state_delta_for_incremental_sync(
room_id, room_id,
stream_position=since_token, batch,
state_filter=state_filter, since_token,
await_full_state=await_full_state, now_token,
members_to_fetch,
timeline_state,
) )
if batch:
state_at_timeline_end = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[-1].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_end = await self.get_state_at(
room_id,
stream_position=now_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
timeline_end=state_at_timeline_end,
previous_timeline_end=state_at_previous_sync,
# we have to include LL members in case LL initial sync missed them
lazy_load_members=lazy_load_members,
)
else:
state_ids = {}
if lazy_load_members:
if members_to_fetch and batch.events:
# We're returning an incremental sync, with no
# "gap" since the previous sync, so normally there would be
# no state to return.
# But we're lazy-loading, so the client might need some more
# member events to understand the events in this timeline.
# So we fish out all the member events corresponding to the
# timeline here, and then dedupe any redundant ones below.
state_ids = await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
# we only want members!
state_filter=StateFilter.from_types(
(EventTypes.Member, member)
for member in members_to_fetch
),
await_full_state=False,
)
# If we only have partial state for the room, `state_ids` may be missing the # If we only have partial state for the room, `state_ids` may be missing the
# memberships we wanted. We attempt to find some by digging through the auth # memberships we wanted. We attempt to find some by digging through the auth
# events of timeline events. # events of timeline events.
@ -1245,6 +1115,227 @@ class SyncHandler:
if e.type != EventTypes.Aliases # until MSC2261 or alternative solution if e.type != EventTypes.Aliases # until MSC2261 or alternative solution
} }
async def _compute_state_delta_for_full_sync(
self,
room_id: str,
syncing_user: UserID,
batch: TimelineBatch,
now_token: StreamToken,
members_to_fetch: Optional[Set[str]],
timeline_state: StateMap[str],
) -> StateMap[str]:
"""Calculate the state events to be included in a full sync response.
As with `_compute_state_delta_for_incremental_sync`, the result will include
the membership events for the senders of each event in `members_to_fetch`.
Args:
room_id: The room we are calculating for.
syncing_user: The user that is calling `/sync`.
batch: The timeline batch for the room that will be sent to the user.
now_token: Token of the end of the current batch.
members_to_fetch: If lazy-loading is enabled, the memberships needed for
events in the timeline.
timeline_state: The contribution to the room state from state events in
`batch`. Only contains the last event for any given state key.
Returns:
A map from (type, state_key) to event_id, for each event that we believe
should be included in the `state` part of the sync response.
"""
if members_to_fetch is not None:
# Lazy-loading of membership events is enabled.
#
# Always make sure we load our own membership event so we know if
# we're in the room, to fix https://github.com/vector-im/riot-web/issues/7209.
#
# We only need apply this on full state syncs given we disabled
# LL for incr syncs in https://github.com/matrix-org/synapse/pull/3840.
#
# We don't insert ourselves into `members_to_fetch`, because in some
# rare cases (an empty event batch with a now_token after the user's
# leave in a partial state room which another local user has
# joined), the room state will be missing our membership and there
# is no guarantee that our membership will be in the auth events of
# timeline events when the room is partial stated.
state_filter = StateFilter.from_lazy_load_member_list(
members_to_fetch.union((syncing_user.to_string(),))
)
# We are happy to use partial state to compute the `/sync` response.
# Since partial state may not include the lazy-loaded memberships we
# require, we fix up the state response afterwards with memberships from
# auth events.
await_full_state = False
lazy_load_members = True
else:
state_filter = StateFilter.all()
await_full_state = True
lazy_load_members = False
if batch:
state_at_timeline_end = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[-1].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
else:
state_at_timeline_end = await self.get_state_at(
room_id,
stream_position=now_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
state_at_timeline_start = state_at_timeline_end
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
timeline_end=state_at_timeline_end,
previous_timeline_end={},
lazy_load_members=lazy_load_members,
)
return state_ids
async def _compute_state_delta_for_incremental_sync(
self,
room_id: str,
batch: TimelineBatch,
since_token: StreamToken,
now_token: StreamToken,
members_to_fetch: Optional[Set[str]],
timeline_state: StateMap[str],
) -> StateMap[str]:
"""Calculate the state events to be included in an incremental sync response.
If lazy-loading of membership events is enabled (as indicated by
`members_to_fetch` being not-`None`), the result will include the membership
events for each member in `members_to_fetch`. The caller
(`compute_state_delta`) is responsible for keeping track of which membership
events we have already sent to the client, and hence ripping them out.
Args:
room_id: The room we are calculating for.
batch: The timeline batch for the room that will be sent to the user.
since_token: Token of the end of the previous batch.
now_token: Token of the end of the current batch.
members_to_fetch: If lazy-loading is enabled, the memberships needed for
events in the timeline. Otherwise, `None`.
timeline_state: The contribution to the room state from state events in
`batch`. Only contains the last event for any given state key.
Returns:
A map from (type, state_key) to event_id, for each event that we believe
should be included in the `state` part of the sync response.
"""
if members_to_fetch is not None:
# Lazy-loading is enabled. Only return the state that is needed.
state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
await_full_state = False
lazy_load_members = True
else:
state_filter = StateFilter.all()
await_full_state = True
lazy_load_members = False
if batch.limited:
if batch:
state_at_timeline_start = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_start = await self.get_state_at(
room_id,
stream_position=now_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
# for now, we disable LL for gappy syncs - see
# https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
# N.B. this slows down incr syncs as we are now processing way
# more state in the server than if we were LLing.
#
# We still have to filter timeline_start to LL entries (above) in order
# for _calculate_state's LL logic to work, as we have to include LL
# members for timeline senders in case they weren't loaded in the initial
# sync. We do this by (counterintuitively) by filtering timeline_start
# members to just be ones which were timeline senders, which then ensures
# all of the rest get included in the state block (if we need to know
# about them).
state_filter = StateFilter.all()
state_at_previous_sync = await self.get_state_at(
room_id,
stream_position=since_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
if batch:
state_at_timeline_end = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[-1].event_id,
state_filter=state_filter,
await_full_state=await_full_state,
)
)
else:
# We can get here if the user has ignored the senders of all
# the recent events.
state_at_timeline_end = await self.get_state_at(
room_id,
stream_position=now_token,
state_filter=state_filter,
await_full_state=await_full_state,
)
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
timeline_end=state_at_timeline_end,
previous_timeline_end=state_at_previous_sync,
lazy_load_members=lazy_load_members,
)
else:
state_ids = {}
if lazy_load_members:
if members_to_fetch and batch.events:
# We're returning an incremental sync, with no
# "gap" since the previous sync, so normally there would be
# no state to return.
# But we're lazy-loading, so the client might need some more
# member events to understand the events in this timeline.
# So we fish out all the member events corresponding to the
# timeline here. The caller will then dedupe any redundant ones.
state_ids = await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
# we only want members!
state_filter=StateFilter.from_types(
(EventTypes.Member, member) for member in members_to_fetch
),
await_full_state=False,
)
return state_ids
async def _find_missing_partial_state_memberships( async def _find_missing_partial_state_memberships(
self, self,
room_id: str, room_id: str,