mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2025-08-09 20:42:10 -04:00
Merge remote-tracking branch 'upstream/release-v1.49'
This commit is contained in:
commit
08a269e2af
165 changed files with 7713 additions and 2703 deletions
|
@ -334,6 +334,19 @@ class SyncHandler:
|
|||
full_state: bool,
|
||||
cache_context: ResponseCacheContext[SyncRequestKey],
|
||||
) -> SyncResult:
|
||||
"""The start of the machinery that produces a /sync response.
|
||||
|
||||
See https://spec.matrix.org/v1.1/client-server-api/#syncing for full details.
|
||||
|
||||
This method does high-level bookkeeping:
|
||||
- tracking the kind of sync in the logging context
|
||||
- deleting any to_device messages whose delivery has been acknowledged.
|
||||
- deciding if we should dispatch an instant or delayed response
|
||||
- marking the sync as being lazily loaded, if appropriate
|
||||
|
||||
Computing the body of the response begins in the next method,
|
||||
`current_sync_for_user`.
|
||||
"""
|
||||
if since_token is None:
|
||||
sync_type = "initial_sync"
|
||||
elif full_state:
|
||||
|
@ -363,7 +376,7 @@ class SyncHandler:
|
|||
sync_config, since_token, full_state=full_state
|
||||
)
|
||||
else:
|
||||
|
||||
# Otherwise, we wait for something to happen and report it to the user.
|
||||
async def current_sync_callback(
|
||||
before_token: StreamToken, after_token: StreamToken
|
||||
) -> SyncResult:
|
||||
|
@ -402,7 +415,12 @@ class SyncHandler:
|
|||
since_token: Optional[StreamToken] = None,
|
||||
full_state: bool = False,
|
||||
) -> SyncResult:
|
||||
"""Get the sync for client needed to match what the server has now."""
|
||||
"""Generates the response body of a sync result, represented as a SyncResult.
|
||||
|
||||
This is a wrapper around `generate_sync_result` which starts an open tracing
|
||||
span to track the sync. See `generate_sync_result` for the next part of your
|
||||
indoctrination.
|
||||
"""
|
||||
with start_active_span("current_sync_for_user"):
|
||||
log_kv({"since_token": since_token})
|
||||
sync_result = await self.generate_sync_result(
|
||||
|
@ -560,7 +578,7 @@ class SyncHandler:
|
|||
# that have happened since `since_key` up to `end_key`, so we
|
||||
# can just use `get_room_events_stream_for_room`.
|
||||
# Otherwise, we want to return the last N events in the room
|
||||
# in toplogical ordering.
|
||||
# in topological ordering.
|
||||
if since_key:
|
||||
events, end_key = await self.store.get_room_events_stream_for_room(
|
||||
room_id,
|
||||
|
@ -1041,7 +1059,18 @@ class SyncHandler:
|
|||
since_token: Optional[StreamToken] = None,
|
||||
full_state: bool = False,
|
||||
) -> SyncResult:
|
||||
"""Generates a sync result."""
|
||||
"""Generates the response body of a sync result.
|
||||
|
||||
This is represented by a `SyncResult` struct, which is built from small pieces
|
||||
using a `SyncResultBuilder`. See also
|
||||
https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync
|
||||
the `sync_result_builder` is passed as a mutable ("inout") parameter to various
|
||||
helper functions. These retrieve and process the data which forms the sync body,
|
||||
often writing to the `sync_result_builder` to store their output.
|
||||
|
||||
At the end, we transfer data from the `sync_result_builder` to a new `SyncResult`
|
||||
instance to signify that the sync calculation is complete.
|
||||
"""
|
||||
# NB: The now_token gets changed by some of the generate_sync_* methods,
|
||||
# this is due to some of the underlying streams not supporting the ability
|
||||
# to query up to a given point.
|
||||
|
@ -1343,14 +1372,22 @@ class SyncHandler:
|
|||
async def _generate_sync_entry_for_account_data(
|
||||
self, sync_result_builder: "SyncResultBuilder"
|
||||
) -> Dict[str, Dict[str, JsonDict]]:
|
||||
"""Generates the account data portion of the sync response. Populates
|
||||
`sync_result_builder` with the result.
|
||||
"""Generates the account data portion of the sync response.
|
||||
|
||||
Account data (called "Client Config" in the spec) can be set either globally
|
||||
or for a specific room. Account data consists of a list of events which
|
||||
accumulate state, much like a room.
|
||||
|
||||
This function retrieves global and per-room account data. The former is written
|
||||
to the given `sync_result_builder`. The latter is returned directly, to be
|
||||
later written to the `sync_result_builder` on a room-by-room basis.
|
||||
|
||||
Args:
|
||||
sync_result_builder
|
||||
|
||||
Returns:
|
||||
A dictionary containing the per room account data.
|
||||
A dictionary whose keys (room ids) map to the per room account data for that
|
||||
room.
|
||||
"""
|
||||
sync_config = sync_result_builder.sync_config
|
||||
user_id = sync_result_builder.sync_config.user.to_string()
|
||||
|
@ -1358,7 +1395,7 @@ class SyncHandler:
|
|||
|
||||
if since_token and not sync_result_builder.full_state:
|
||||
(
|
||||
account_data,
|
||||
global_account_data,
|
||||
account_data_by_room,
|
||||
) = await self.store.get_updated_account_data_for_user(
|
||||
user_id, since_token.account_data_key
|
||||
|
@ -1369,23 +1406,23 @@ class SyncHandler:
|
|||
)
|
||||
|
||||
if push_rules_changed:
|
||||
account_data["m.push_rules"] = await self.push_rules_for_user(
|
||||
global_account_data["m.push_rules"] = await self.push_rules_for_user(
|
||||
sync_config.user
|
||||
)
|
||||
else:
|
||||
(
|
||||
account_data,
|
||||
global_account_data,
|
||||
account_data_by_room,
|
||||
) = await self.store.get_account_data_for_user(sync_config.user.to_string())
|
||||
|
||||
account_data["m.push_rules"] = await self.push_rules_for_user(
|
||||
global_account_data["m.push_rules"] = await self.push_rules_for_user(
|
||||
sync_config.user
|
||||
)
|
||||
|
||||
account_data_for_user = await sync_config.filter_collection.filter_account_data(
|
||||
[
|
||||
{"type": account_data_type, "content": content}
|
||||
for account_data_type, content in account_data.items()
|
||||
for account_data_type, content in global_account_data.items()
|
||||
]
|
||||
)
|
||||
|
||||
|
@ -1459,18 +1496,31 @@ class SyncHandler:
|
|||
"""Generates the rooms portion of the sync response. Populates the
|
||||
`sync_result_builder` with the result.
|
||||
|
||||
In the response that reaches the client, rooms are divided into four categories:
|
||||
`invite`, `join`, `knock`, `leave`. These aren't the same as the four sets of
|
||||
room ids returned by this function.
|
||||
|
||||
Args:
|
||||
sync_result_builder
|
||||
account_data_by_room: Dictionary of per room account data
|
||||
|
||||
Returns:
|
||||
Returns a 4-tuple of
|
||||
`(newly_joined_rooms, newly_joined_or_invited_users,
|
||||
newly_left_rooms, newly_left_users)`
|
||||
Returns a 4-tuple describing rooms the user has joined or left, and users who've
|
||||
joined or left rooms any rooms the user is in. This gets used later in
|
||||
`_generate_sync_entry_for_device_list`.
|
||||
|
||||
Its entries are:
|
||||
- newly_joined_rooms
|
||||
- newly_joined_or_invited_or_knocked_users
|
||||
- newly_left_rooms
|
||||
- newly_left_users
|
||||
"""
|
||||
since_token = sync_result_builder.since_token
|
||||
|
||||
# 1. Start by fetching all ephemeral events in rooms we've joined (if required).
|
||||
user_id = sync_result_builder.sync_config.user.to_string()
|
||||
block_all_room_ephemeral = (
|
||||
sync_result_builder.since_token is None
|
||||
since_token is None
|
||||
and sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral()
|
||||
)
|
||||
|
||||
|
@ -1484,9 +1534,8 @@ class SyncHandler:
|
|||
)
|
||||
sync_result_builder.now_token = now_token
|
||||
|
||||
# We check up front if anything has changed, if it hasn't then there is
|
||||
# 2. We check up front if anything has changed, if it hasn't then there is
|
||||
# no point in going further.
|
||||
since_token = sync_result_builder.since_token
|
||||
if not sync_result_builder.full_state:
|
||||
if since_token and not ephemeral_by_room and not account_data_by_room:
|
||||
have_changed = await self._have_rooms_changed(sync_result_builder)
|
||||
|
@ -1499,20 +1548,8 @@ class SyncHandler:
|
|||
logger.debug("no-oping sync")
|
||||
return set(), set(), set(), set()
|
||||
|
||||
ignored_account_data = (
|
||||
await self.store.get_global_account_data_by_type_for_user(
|
||||
AccountDataTypes.IGNORED_USER_LIST, user_id=user_id
|
||||
)
|
||||
)
|
||||
|
||||
# If there is ignored users account data and it matches the proper type,
|
||||
# then use it.
|
||||
ignored_users: FrozenSet[str] = frozenset()
|
||||
if ignored_account_data:
|
||||
ignored_users_data = ignored_account_data.get("ignored_users", {})
|
||||
if isinstance(ignored_users_data, dict):
|
||||
ignored_users = frozenset(ignored_users_data.keys())
|
||||
|
||||
# 3. Work out which rooms need reporting in the sync response.
|
||||
ignored_users = await self._get_ignored_users(user_id)
|
||||
if since_token:
|
||||
room_changes = await self._get_rooms_changed(
|
||||
sync_result_builder, ignored_users
|
||||
|
@ -1522,7 +1559,6 @@ class SyncHandler:
|
|||
)
|
||||
else:
|
||||
room_changes = await self._get_all_rooms(sync_result_builder, ignored_users)
|
||||
|
||||
tags_by_room = await self.store.get_tags_for_user(user_id)
|
||||
|
||||
log_kv({"rooms_changed": len(room_changes.room_entries)})
|
||||
|
@ -1533,6 +1569,8 @@ class SyncHandler:
|
|||
newly_joined_rooms = room_changes.newly_joined_rooms
|
||||
newly_left_rooms = room_changes.newly_left_rooms
|
||||
|
||||
# 4. We need to apply further processing to `room_entries` (rooms considered
|
||||
# joined or archived).
|
||||
async def handle_room_entries(room_entry: "RoomSyncResultBuilder") -> None:
|
||||
logger.debug("Generating room entry for %s", room_entry.room_id)
|
||||
await self._generate_room_entry(
|
||||
|
@ -1551,31 +1589,13 @@ class SyncHandler:
|
|||
sync_result_builder.invited.extend(invited)
|
||||
sync_result_builder.knocked.extend(knocked)
|
||||
|
||||
# Now we want to get any newly joined, invited or knocking users
|
||||
newly_joined_or_invited_or_knocked_users = set()
|
||||
newly_left_users = set()
|
||||
if since_token:
|
||||
for joined_sync in sync_result_builder.joined:
|
||||
it = itertools.chain(
|
||||
joined_sync.timeline.events, joined_sync.state.values()
|
||||
)
|
||||
for event in it:
|
||||
if event.type == EventTypes.Member:
|
||||
if (
|
||||
event.membership == Membership.JOIN
|
||||
or event.membership == Membership.INVITE
|
||||
or event.membership == Membership.KNOCK
|
||||
):
|
||||
newly_joined_or_invited_or_knocked_users.add(
|
||||
event.state_key
|
||||
)
|
||||
else:
|
||||
prev_content = event.unsigned.get("prev_content", {})
|
||||
prev_membership = prev_content.get("membership", None)
|
||||
if prev_membership == Membership.JOIN:
|
||||
newly_left_users.add(event.state_key)
|
||||
|
||||
newly_left_users -= newly_joined_or_invited_or_knocked_users
|
||||
# 5. Work out which users have joined or left rooms we're in. We use this
|
||||
# to build the device_list part of the sync response in
|
||||
# `_generate_sync_entry_for_device_list`.
|
||||
(
|
||||
newly_joined_or_invited_or_knocked_users,
|
||||
newly_left_users,
|
||||
) = sync_result_builder.calculate_user_changes()
|
||||
|
||||
return (
|
||||
set(newly_joined_rooms),
|
||||
|
@ -1584,11 +1604,36 @@ class SyncHandler:
|
|||
newly_left_users,
|
||||
)
|
||||
|
||||
async def _get_ignored_users(self, user_id: str) -> FrozenSet[str]:
|
||||
"""Retrieve the users ignored by the given user from their global account_data.
|
||||
|
||||
Returns an empty set if
|
||||
- there is no global account_data entry for ignored_users
|
||||
- there is such an entry, but it's not a JSON object.
|
||||
"""
|
||||
# TODO: Can we `SELECT ignored_user_id FROM ignored_users WHERE ignorer_user_id=?;` instead?
|
||||
ignored_account_data = (
|
||||
await self.store.get_global_account_data_by_type_for_user(
|
||||
AccountDataTypes.IGNORED_USER_LIST, user_id=user_id
|
||||
)
|
||||
)
|
||||
|
||||
# If there is ignored users account data and it matches the proper type,
|
||||
# then use it.
|
||||
ignored_users: FrozenSet[str] = frozenset()
|
||||
if ignored_account_data:
|
||||
ignored_users_data = ignored_account_data.get("ignored_users", {})
|
||||
if isinstance(ignored_users_data, dict):
|
||||
ignored_users = frozenset(ignored_users_data.keys())
|
||||
return ignored_users
|
||||
|
||||
async def _have_rooms_changed(
|
||||
self, sync_result_builder: "SyncResultBuilder"
|
||||
) -> bool:
|
||||
"""Returns whether there may be any new events that should be sent down
|
||||
the sync. Returns True if there are.
|
||||
|
||||
Does not modify the `sync_result_builder`.
|
||||
"""
|
||||
user_id = sync_result_builder.sync_config.user.to_string()
|
||||
since_token = sync_result_builder.since_token
|
||||
|
@ -1596,12 +1641,13 @@ class SyncHandler:
|
|||
|
||||
assert since_token
|
||||
|
||||
# Get a list of membership change events that have happened.
|
||||
rooms_changed = await self.store.get_membership_changes_for_user(
|
||||
# Get a list of membership change events that have happened to the user
|
||||
# requesting the sync.
|
||||
membership_changes = await self.store.get_membership_changes_for_user(
|
||||
user_id, since_token.room_key, now_token.room_key
|
||||
)
|
||||
|
||||
if rooms_changed:
|
||||
if membership_changes:
|
||||
return True
|
||||
|
||||
stream_id = since_token.room_key.stream
|
||||
|
@ -1613,7 +1659,25 @@ class SyncHandler:
|
|||
async def _get_rooms_changed(
|
||||
self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str]
|
||||
) -> _RoomChanges:
|
||||
"""Gets the the changes that have happened since the last sync."""
|
||||
"""Determine the changes in rooms to report to the user.
|
||||
|
||||
Ideally, we want to report all events whose stream ordering `s` lies in the
|
||||
range `since_token < s <= now_token`, where the two tokens are read from the
|
||||
sync_result_builder.
|
||||
|
||||
If there are too many events in that range to report, things get complicated.
|
||||
In this situation we return a truncated list of the most recent events, and
|
||||
indicate in the response that there is a "gap" of omitted events. Additionally:
|
||||
|
||||
- we include a "state_delta", to describe the changes in state over the gap,
|
||||
- we include all membership events applying to the user making the request,
|
||||
even those in the gap.
|
||||
|
||||
See the spec for the rationale:
|
||||
https://spec.matrix.org/v1.1/client-server-api/#syncing
|
||||
|
||||
The sync_result_builder is not modified by this function.
|
||||
"""
|
||||
user_id = sync_result_builder.sync_config.user.to_string()
|
||||
since_token = sync_result_builder.since_token
|
||||
now_token = sync_result_builder.now_token
|
||||
|
@ -1621,21 +1685,36 @@ class SyncHandler:
|
|||
|
||||
assert since_token
|
||||
|
||||
# Get a list of membership change events that have happened.
|
||||
rooms_changed = await self.store.get_membership_changes_for_user(
|
||||
# The spec
|
||||
# https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync
|
||||
# notes that membership events need special consideration:
|
||||
#
|
||||
# > When a sync is limited, the server MUST return membership events for events
|
||||
# > in the gap (between since and the start of the returned timeline), regardless
|
||||
# > as to whether or not they are redundant.
|
||||
#
|
||||
# We fetch such events here, but we only seem to use them for categorising rooms
|
||||
# as newly joined, newly left, invited or knocked.
|
||||
# TODO: we've already called this function and ran this query in
|
||||
# _have_rooms_changed. We could keep the results in memory to avoid a
|
||||
# second query, at the cost of more complicated source code.
|
||||
membership_change_events = await self.store.get_membership_changes_for_user(
|
||||
user_id, since_token.room_key, now_token.room_key
|
||||
)
|
||||
|
||||
mem_change_events_by_room_id: Dict[str, List[EventBase]] = {}
|
||||
for event in rooms_changed:
|
||||
for event in membership_change_events:
|
||||
mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
|
||||
|
||||
newly_joined_rooms = []
|
||||
newly_left_rooms = []
|
||||
room_entries = []
|
||||
invited = []
|
||||
knocked = []
|
||||
newly_joined_rooms: List[str] = []
|
||||
newly_left_rooms: List[str] = []
|
||||
room_entries: List[RoomSyncResultBuilder] = []
|
||||
invited: List[InvitedSyncResult] = []
|
||||
knocked: List[KnockedSyncResult] = []
|
||||
for room_id, events in mem_change_events_by_room_id.items():
|
||||
# The body of this loop will add this room to at least one of the five lists
|
||||
# above. Things get messy if you've e.g. joined, left, joined then left the
|
||||
# room all in the same sync period.
|
||||
logger.debug(
|
||||
"Membership changes in %s: [%s]",
|
||||
room_id,
|
||||
|
@ -1690,6 +1769,7 @@ class SyncHandler:
|
|||
|
||||
if not non_joins:
|
||||
continue
|
||||
last_non_join = non_joins[-1]
|
||||
|
||||
# Check if we have left the room. This can either be because we were
|
||||
# joined before *or* that we since joined and then left.
|
||||
|
@ -1711,18 +1791,18 @@ class SyncHandler:
|
|||
newly_left_rooms.append(room_id)
|
||||
|
||||
# Only bother if we're still currently invited
|
||||
should_invite = non_joins[-1].membership == Membership.INVITE
|
||||
should_invite = last_non_join.membership == Membership.INVITE
|
||||
if should_invite:
|
||||
if event.sender not in ignored_users:
|
||||
invite_room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
|
||||
if last_non_join.sender not in ignored_users:
|
||||
invite_room_sync = InvitedSyncResult(room_id, invite=last_non_join)
|
||||
if invite_room_sync:
|
||||
invited.append(invite_room_sync)
|
||||
|
||||
# Only bother if our latest membership in the room is knock (and we haven't
|
||||
# been accepted/rejected in the meantime).
|
||||
should_knock = non_joins[-1].membership == Membership.KNOCK
|
||||
should_knock = last_non_join.membership == Membership.KNOCK
|
||||
if should_knock:
|
||||
knock_room_sync = KnockedSyncResult(room_id, knock=non_joins[-1])
|
||||
knock_room_sync = KnockedSyncResult(room_id, knock=last_non_join)
|
||||
if knock_room_sync:
|
||||
knocked.append(knock_room_sync)
|
||||
|
||||
|
@ -1780,7 +1860,9 @@ class SyncHandler:
|
|||
|
||||
timeline_limit = sync_config.filter_collection.timeline_limit()
|
||||
|
||||
# Get all events for rooms we're currently joined to.
|
||||
# Get all events since the `from_key` in rooms we're currently joined to.
|
||||
# If there are too many, we get the most recent events only. This leaves
|
||||
# a "gap" in the timeline, as described by the spec for /sync.
|
||||
room_to_events = await self.store.get_room_events_stream_for_rooms(
|
||||
room_ids=sync_result_builder.joined_room_ids,
|
||||
from_key=since_token.room_key,
|
||||
|
@ -1841,6 +1923,10 @@ class SyncHandler:
|
|||
) -> _RoomChanges:
|
||||
"""Returns entries for all rooms for the user.
|
||||
|
||||
Like `_get_rooms_changed`, but assumes the `since_token` is `None`.
|
||||
|
||||
This function does not modify the sync_result_builder.
|
||||
|
||||
Args:
|
||||
sync_result_builder
|
||||
ignored_users: Set of users ignored by user.
|
||||
|
@ -1852,16 +1938,9 @@ class SyncHandler:
|
|||
now_token = sync_result_builder.now_token
|
||||
sync_config = sync_result_builder.sync_config
|
||||
|
||||
membership_list = (
|
||||
Membership.INVITE,
|
||||
Membership.KNOCK,
|
||||
Membership.JOIN,
|
||||
Membership.LEAVE,
|
||||
Membership.BAN,
|
||||
)
|
||||
|
||||
room_list = await self.store.get_rooms_for_local_user_where_membership_is(
|
||||
user_id=user_id, membership_list=membership_list
|
||||
user_id=user_id,
|
||||
membership_list=Membership.LIST,
|
||||
)
|
||||
|
||||
room_entries = []
|
||||
|
@ -2211,8 +2290,7 @@ def _calculate_state(
|
|||
# to only include membership events for the senders in the timeline.
|
||||
# In practice, we can do this by removing them from the p_ids list,
|
||||
# which is the list of relevant state we know we have already sent to the client.
|
||||
# see https://github.com/matrix-org/synapse/pull/2970
|
||||
# /files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809
|
||||
# see https://github.com/matrix-org/synapse/pull/2970/files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809
|
||||
|
||||
if lazy_load_members:
|
||||
p_ids.difference_update(
|
||||
|
@ -2261,6 +2339,39 @@ class SyncResultBuilder:
|
|||
groups: Optional[GroupsSyncResult] = None
|
||||
to_device: List[JsonDict] = attr.Factory(list)
|
||||
|
||||
def calculate_user_changes(self) -> Tuple[Set[str], Set[str]]:
|
||||
"""Work out which other users have joined or left rooms we are joined to.
|
||||
|
||||
This data only is only useful for an incremental sync.
|
||||
|
||||
The SyncResultBuilder is not modified by this function.
|
||||
"""
|
||||
newly_joined_or_invited_or_knocked_users = set()
|
||||
newly_left_users = set()
|
||||
if self.since_token:
|
||||
for joined_sync in self.joined:
|
||||
it = itertools.chain(
|
||||
joined_sync.timeline.events, joined_sync.state.values()
|
||||
)
|
||||
for event in it:
|
||||
if event.type == EventTypes.Member:
|
||||
if (
|
||||
event.membership == Membership.JOIN
|
||||
or event.membership == Membership.INVITE
|
||||
or event.membership == Membership.KNOCK
|
||||
):
|
||||
newly_joined_or_invited_or_knocked_users.add(
|
||||
event.state_key
|
||||
)
|
||||
else:
|
||||
prev_content = event.unsigned.get("prev_content", {})
|
||||
prev_membership = prev_content.get("membership", None)
|
||||
if prev_membership == Membership.JOIN:
|
||||
newly_left_users.add(event.state_key)
|
||||
|
||||
newly_left_users -= newly_joined_or_invited_or_knocked_users
|
||||
return newly_joined_or_invited_or_knocked_users, newly_left_users
|
||||
|
||||
|
||||
@attr.s(slots=True, auto_attribs=True)
|
||||
class RoomSyncResultBuilder:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue