mirror of
https://mau.dev/maunium/synapse.git
synced 2024-10-01 01:36:05 -04:00
Sliding Sync: Add receipts extension (MSC3960) (#17489)
[MSC3960](https://github.com/matrix-org/matrix-spec-proposals/pull/3960): Receipts extension Based on [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575): Sliding Sync
This commit is contained in:
parent
b2c55bd049
commit
b221f0b84b
1
changelog.d/17489.feature
Normal file
1
changelog.d/17489.feature
Normal file
@ -0,0 +1 @@
|
|||||||
|
Add receipts extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
|
@ -286,7 +286,9 @@ class ReceiptEventSource(EventSource[MultiWriterStreamToken, JsonMapping]):
|
|||||||
room_ids: Iterable[str],
|
room_ids: Iterable[str],
|
||||||
is_guest: bool,
|
is_guest: bool,
|
||||||
explicit_room_id: Optional[str] = None,
|
explicit_room_id: Optional[str] = None,
|
||||||
|
to_key: Optional[MultiWriterStreamToken] = None,
|
||||||
) -> Tuple[List[JsonMapping], MultiWriterStreamToken]:
|
) -> Tuple[List[JsonMapping], MultiWriterStreamToken]:
|
||||||
|
if to_key is None:
|
||||||
to_key = self.get_current_key()
|
to_key = self.get_current_key()
|
||||||
|
|
||||||
if from_key == to_key:
|
if from_key == to_key:
|
||||||
|
@ -49,6 +49,7 @@ from synapse.types import (
|
|||||||
DeviceListUpdates,
|
DeviceListUpdates,
|
||||||
JsonDict,
|
JsonDict,
|
||||||
JsonMapping,
|
JsonMapping,
|
||||||
|
MultiWriterStreamToken,
|
||||||
PersistedEventPosition,
|
PersistedEventPosition,
|
||||||
Requester,
|
Requester,
|
||||||
RoomStreamToken,
|
RoomStreamToken,
|
||||||
@ -493,8 +494,7 @@ class SlidingSyncHandler:
|
|||||||
|
|
||||||
# Assemble sliding window lists
|
# Assemble sliding window lists
|
||||||
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
|
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
|
||||||
# Keep track of the rooms that we're going to display and need to fetch more
|
# Keep track of the rooms that we can display and need to fetch more info about
|
||||||
# info about
|
|
||||||
relevant_room_map: Dict[str, RoomSyncConfig] = {}
|
relevant_room_map: Dict[str, RoomSyncConfig] = {}
|
||||||
if has_lists and sync_config.lists is not None:
|
if has_lists and sync_config.lists is not None:
|
||||||
sync_room_map = await self.filter_rooms_relevant_for_sync(
|
sync_room_map = await self.filter_rooms_relevant_for_sync(
|
||||||
@ -622,6 +622,8 @@ class SlidingSyncHandler:
|
|||||||
|
|
||||||
# Filter out rooms that haven't received updates and we've sent down
|
# Filter out rooms that haven't received updates and we've sent down
|
||||||
# previously.
|
# previously.
|
||||||
|
# Keep track of the rooms that we're going to display and need to fetch more info about
|
||||||
|
relevant_rooms_to_send_map = relevant_room_map
|
||||||
if from_token:
|
if from_token:
|
||||||
rooms_should_send = set()
|
rooms_should_send = set()
|
||||||
|
|
||||||
@ -659,7 +661,7 @@ class SlidingSyncHandler:
|
|||||||
relevant_room_map.keys(), from_token.stream_token.room_key
|
relevant_room_map.keys(), from_token.stream_token.room_key
|
||||||
)
|
)
|
||||||
rooms_should_send.update(rooms_that_have_updates)
|
rooms_should_send.update(rooms_that_have_updates)
|
||||||
relevant_room_map = {
|
relevant_rooms_to_send_map = {
|
||||||
room_id: room_sync_config
|
room_id: room_sync_config
|
||||||
for room_id, room_sync_config in relevant_room_map.items()
|
for room_id, room_sync_config in relevant_room_map.items()
|
||||||
if room_id in rooms_should_send
|
if room_id in rooms_should_send
|
||||||
@ -671,7 +673,7 @@ class SlidingSyncHandler:
|
|||||||
room_sync_result = await self.get_room_sync_data(
|
room_sync_result = await self.get_room_sync_data(
|
||||||
sync_config=sync_config,
|
sync_config=sync_config,
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
room_sync_config=relevant_room_map[room_id],
|
room_sync_config=relevant_rooms_to_send_map[room_id],
|
||||||
room_membership_for_user_at_to_token=room_membership_for_user_map[
|
room_membership_for_user_at_to_token=room_membership_for_user_map[
|
||||||
room_id
|
room_id
|
||||||
],
|
],
|
||||||
@ -683,13 +685,20 @@ class SlidingSyncHandler:
|
|||||||
if room_sync_result or not from_token:
|
if room_sync_result or not from_token:
|
||||||
rooms[room_id] = room_sync_result
|
rooms[room_id] = room_sync_result
|
||||||
|
|
||||||
if relevant_room_map:
|
if relevant_rooms_to_send_map:
|
||||||
with start_active_span("sliding_sync.generate_room_entries"):
|
with start_active_span("sliding_sync.generate_room_entries"):
|
||||||
await concurrently_execute(handle_room, relevant_room_map, 10)
|
await concurrently_execute(handle_room, relevant_rooms_to_send_map, 10)
|
||||||
|
|
||||||
extensions = await self.get_extensions_response(
|
extensions = await self.get_extensions_response(
|
||||||
sync_config=sync_config,
|
sync_config=sync_config,
|
||||||
lists=lists,
|
actual_lists=lists,
|
||||||
|
# We're purposely using `relevant_room_map` instead of
|
||||||
|
# `relevant_rooms_to_send_map` here. This needs to be all room_ids we could
|
||||||
|
# send regardless of whether they have an event update or not. The
|
||||||
|
# extensions care about more than just normal events in the rooms (like
|
||||||
|
# account data, read receipts, typing indicators, to-device messages, etc).
|
||||||
|
actual_room_ids=set(relevant_room_map.keys()),
|
||||||
|
actual_room_response_map=rooms,
|
||||||
from_token=from_token,
|
from_token=from_token,
|
||||||
to_token=to_token,
|
to_token=to_token,
|
||||||
)
|
)
|
||||||
@ -698,7 +707,7 @@ class SlidingSyncHandler:
|
|||||||
connection_position = await self.connection_store.record_rooms(
|
connection_position = await self.connection_store.record_rooms(
|
||||||
sync_config=sync_config,
|
sync_config=sync_config,
|
||||||
from_token=from_token,
|
from_token=from_token,
|
||||||
sent_room_ids=relevant_room_map.keys(),
|
sent_room_ids=relevant_rooms_to_send_map.keys(),
|
||||||
# TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
|
# TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
|
||||||
unsent_room_ids=[],
|
unsent_room_ids=[],
|
||||||
)
|
)
|
||||||
@ -1902,7 +1911,9 @@ class SlidingSyncHandler:
|
|||||||
async def get_extensions_response(
|
async def get_extensions_response(
|
||||||
self,
|
self,
|
||||||
sync_config: SlidingSyncConfig,
|
sync_config: SlidingSyncConfig,
|
||||||
lists: Dict[str, SlidingSyncResult.SlidingWindowList],
|
actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
|
||||||
|
actual_room_ids: Set[str],
|
||||||
|
actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult],
|
||||||
to_token: StreamToken,
|
to_token: StreamToken,
|
||||||
from_token: Optional[SlidingSyncStreamToken],
|
from_token: Optional[SlidingSyncStreamToken],
|
||||||
) -> SlidingSyncResult.Extensions:
|
) -> SlidingSyncResult.Extensions:
|
||||||
@ -1910,7 +1921,11 @@ class SlidingSyncHandler:
|
|||||||
|
|
||||||
Args:
|
Args:
|
||||||
sync_config: Sync configuration
|
sync_config: Sync configuration
|
||||||
lists: Sliding window API. A map of list key to list results.
|
actual_lists: Sliding window API. A map of list key to list results in the
|
||||||
|
Sliding Sync response.
|
||||||
|
actual_room_ids: The actual room IDs in the the Sliding Sync response.
|
||||||
|
actual_room_response_map: A map of room ID to room results in the the
|
||||||
|
Sliding Sync response.
|
||||||
to_token: The point in the stream to sync up to.
|
to_token: The point in the stream to sync up to.
|
||||||
from_token: The point in the stream to sync from.
|
from_token: The point in the stream to sync from.
|
||||||
"""
|
"""
|
||||||
@ -1939,18 +1954,103 @@ class SlidingSyncHandler:
|
|||||||
if sync_config.extensions.account_data is not None:
|
if sync_config.extensions.account_data is not None:
|
||||||
account_data_response = await self.get_account_data_extension_response(
|
account_data_response = await self.get_account_data_extension_response(
|
||||||
sync_config=sync_config,
|
sync_config=sync_config,
|
||||||
lists=lists,
|
actual_lists=actual_lists,
|
||||||
|
actual_room_ids=actual_room_ids,
|
||||||
account_data_request=sync_config.extensions.account_data,
|
account_data_request=sync_config.extensions.account_data,
|
||||||
to_token=to_token,
|
to_token=to_token,
|
||||||
from_token=from_token,
|
from_token=from_token,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
receipts_response = None
|
||||||
|
if sync_config.extensions.receipts is not None:
|
||||||
|
receipts_response = await self.get_receipts_extension_response(
|
||||||
|
sync_config=sync_config,
|
||||||
|
actual_lists=actual_lists,
|
||||||
|
actual_room_ids=actual_room_ids,
|
||||||
|
actual_room_response_map=actual_room_response_map,
|
||||||
|
receipts_request=sync_config.extensions.receipts,
|
||||||
|
to_token=to_token,
|
||||||
|
from_token=from_token,
|
||||||
|
)
|
||||||
|
|
||||||
return SlidingSyncResult.Extensions(
|
return SlidingSyncResult.Extensions(
|
||||||
to_device=to_device_response,
|
to_device=to_device_response,
|
||||||
e2ee=e2ee_response,
|
e2ee=e2ee_response,
|
||||||
account_data=account_data_response,
|
account_data=account_data_response,
|
||||||
|
receipts=receipts_response,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def find_relevant_room_ids_for_extension(
|
||||||
|
self,
|
||||||
|
requested_lists: Optional[List[str]],
|
||||||
|
requested_room_ids: Optional[List[str]],
|
||||||
|
actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
|
||||||
|
actual_room_ids: Set[str],
|
||||||
|
) -> Set[str]:
|
||||||
|
"""
|
||||||
|
Handle the reserved `lists`/`rooms` keys for extensions. Extensions should only
|
||||||
|
return results for rooms in the Sliding Sync response. This matches up the
|
||||||
|
requested rooms/lists with the actual lists/rooms in the Sliding Sync response.
|
||||||
|
|
||||||
|
{"lists": []} // Do not process any lists.
|
||||||
|
{"lists": ["rooms", "dms"]} // Process only a subset of lists.
|
||||||
|
{"lists": ["*"]} // Process all lists defined in the Sliding Window API. (This is the default.)
|
||||||
|
|
||||||
|
{"rooms": []} // Do not process any specific rooms.
|
||||||
|
{"rooms": ["!a:b", "!c:d"]} // Process only a subset of room subscriptions.
|
||||||
|
{"rooms": ["*"]} // Process all room subscriptions defined in the Room Subscription API. (This is the default.)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
requested_lists: The `lists` from the extension request.
|
||||||
|
requested_room_ids: The `rooms` from the extension request.
|
||||||
|
actual_lists: The actual lists from the Sliding Sync response.
|
||||||
|
actual_room_ids: The actual room subscriptions from the Sliding Sync request.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# We only want to include account data for rooms that are already in the sliding
|
||||||
|
# sync response AND that were requested in the account data request.
|
||||||
|
relevant_room_ids: Set[str] = set()
|
||||||
|
|
||||||
|
# See what rooms from the room subscriptions we should get account data for
|
||||||
|
if requested_room_ids is not None:
|
||||||
|
for room_id in requested_room_ids:
|
||||||
|
# A wildcard means we process all rooms from the room subscriptions
|
||||||
|
if room_id == "*":
|
||||||
|
relevant_room_ids.update(actual_room_ids)
|
||||||
|
break
|
||||||
|
|
||||||
|
if room_id in actual_room_ids:
|
||||||
|
relevant_room_ids.add(room_id)
|
||||||
|
|
||||||
|
# See what rooms from the sliding window lists we should get account data for
|
||||||
|
if requested_lists is not None:
|
||||||
|
for list_key in requested_lists:
|
||||||
|
# Just some typing because we share the variable name in multiple places
|
||||||
|
actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None
|
||||||
|
|
||||||
|
# A wildcard means we process rooms from all lists
|
||||||
|
if list_key == "*":
|
||||||
|
for actual_list in actual_lists.values():
|
||||||
|
# We only expect a single SYNC operation for any list
|
||||||
|
assert len(actual_list.ops) == 1
|
||||||
|
sync_op = actual_list.ops[0]
|
||||||
|
assert sync_op.op == OperationType.SYNC
|
||||||
|
|
||||||
|
relevant_room_ids.update(sync_op.room_ids)
|
||||||
|
|
||||||
|
break
|
||||||
|
|
||||||
|
actual_list = actual_lists.get(list_key)
|
||||||
|
if actual_list is not None:
|
||||||
|
# We only expect a single SYNC operation for any list
|
||||||
|
assert len(actual_list.ops) == 1
|
||||||
|
sync_op = actual_list.ops[0]
|
||||||
|
assert sync_op.op == OperationType.SYNC
|
||||||
|
|
||||||
|
relevant_room_ids.update(sync_op.room_ids)
|
||||||
|
|
||||||
|
return relevant_room_ids
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
async def get_to_device_extension_response(
|
async def get_to_device_extension_response(
|
||||||
self,
|
self,
|
||||||
@ -2081,7 +2181,8 @@ class SlidingSyncHandler:
|
|||||||
async def get_account_data_extension_response(
|
async def get_account_data_extension_response(
|
||||||
self,
|
self,
|
||||||
sync_config: SlidingSyncConfig,
|
sync_config: SlidingSyncConfig,
|
||||||
lists: Dict[str, SlidingSyncResult.SlidingWindowList],
|
actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
|
||||||
|
actual_room_ids: Set[str],
|
||||||
account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension,
|
account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension,
|
||||||
to_token: StreamToken,
|
to_token: StreamToken,
|
||||||
from_token: Optional[SlidingSyncStreamToken],
|
from_token: Optional[SlidingSyncStreamToken],
|
||||||
@ -2090,7 +2191,9 @@ class SlidingSyncHandler:
|
|||||||
|
|
||||||
Args:
|
Args:
|
||||||
sync_config: Sync configuration
|
sync_config: Sync configuration
|
||||||
lists: Sliding window API. A map of list key to list results.
|
actual_lists: Sliding window API. A map of list key to list results in the
|
||||||
|
Sliding Sync response.
|
||||||
|
actual_room_ids: The actual room IDs in the the Sliding Sync response.
|
||||||
account_data_request: The account_data extension from the request
|
account_data_request: The account_data extension from the request
|
||||||
to_token: The point in the stream to sync up to.
|
to_token: The point in the stream to sync up to.
|
||||||
from_token: The point in the stream to sync from.
|
from_token: The point in the stream to sync from.
|
||||||
@ -2103,6 +2206,7 @@ class SlidingSyncHandler:
|
|||||||
|
|
||||||
global_account_data_map: Mapping[str, JsonMapping] = {}
|
global_account_data_map: Mapping[str, JsonMapping] = {}
|
||||||
if from_token is not None:
|
if from_token is not None:
|
||||||
|
# TODO: This should take into account the `from_token` and `to_token`
|
||||||
global_account_data_map = (
|
global_account_data_map = (
|
||||||
await self.store.get_updated_global_account_data_for_user(
|
await self.store.get_updated_global_account_data_for_user(
|
||||||
user_id, from_token.stream_token.account_data_key
|
user_id, from_token.stream_token.account_data_key
|
||||||
@ -2114,76 +2218,40 @@ class SlidingSyncHandler:
|
|||||||
)
|
)
|
||||||
if have_push_rules_changed:
|
if have_push_rules_changed:
|
||||||
global_account_data_map = dict(global_account_data_map)
|
global_account_data_map = dict(global_account_data_map)
|
||||||
|
# TODO: This should take into account the `from_token` and `to_token`
|
||||||
global_account_data_map[AccountDataTypes.PUSH_RULES] = (
|
global_account_data_map[AccountDataTypes.PUSH_RULES] = (
|
||||||
await self.push_rules_handler.push_rules_for_user(sync_config.user)
|
await self.push_rules_handler.push_rules_for_user(sync_config.user)
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
# TODO: This should take into account the `to_token`
|
||||||
all_global_account_data = await self.store.get_global_account_data_for_user(
|
all_global_account_data = await self.store.get_global_account_data_for_user(
|
||||||
user_id
|
user_id
|
||||||
)
|
)
|
||||||
|
|
||||||
global_account_data_map = dict(all_global_account_data)
|
global_account_data_map = dict(all_global_account_data)
|
||||||
|
# TODO: This should take into account the `to_token`
|
||||||
global_account_data_map[AccountDataTypes.PUSH_RULES] = (
|
global_account_data_map[AccountDataTypes.PUSH_RULES] = (
|
||||||
await self.push_rules_handler.push_rules_for_user(sync_config.user)
|
await self.push_rules_handler.push_rules_for_user(sync_config.user)
|
||||||
)
|
)
|
||||||
|
|
||||||
# We only want to include account data for rooms that are already in the sliding
|
|
||||||
# sync response AND that were requested in the account data request.
|
|
||||||
relevant_room_ids: Set[str] = set()
|
|
||||||
|
|
||||||
# See what rooms from the room subscriptions we should get account data for
|
|
||||||
if (
|
|
||||||
account_data_request.rooms is not None
|
|
||||||
and sync_config.room_subscriptions is not None
|
|
||||||
):
|
|
||||||
actual_room_ids = sync_config.room_subscriptions.keys()
|
|
||||||
|
|
||||||
for room_id in account_data_request.rooms:
|
|
||||||
# A wildcard means we process all rooms from the room subscriptions
|
|
||||||
if room_id == "*":
|
|
||||||
relevant_room_ids.update(sync_config.room_subscriptions.keys())
|
|
||||||
break
|
|
||||||
|
|
||||||
if room_id in actual_room_ids:
|
|
||||||
relevant_room_ids.add(room_id)
|
|
||||||
|
|
||||||
# See what rooms from the sliding window lists we should get account data for
|
|
||||||
if account_data_request.lists is not None:
|
|
||||||
for list_key in account_data_request.lists:
|
|
||||||
# Just some typing because we share the variable name in multiple places
|
|
||||||
actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None
|
|
||||||
|
|
||||||
# A wildcard means we process rooms from all lists
|
|
||||||
if list_key == "*":
|
|
||||||
for actual_list in lists.values():
|
|
||||||
# We only expect a single SYNC operation for any list
|
|
||||||
assert len(actual_list.ops) == 1
|
|
||||||
sync_op = actual_list.ops[0]
|
|
||||||
assert sync_op.op == OperationType.SYNC
|
|
||||||
|
|
||||||
relevant_room_ids.update(sync_op.room_ids)
|
|
||||||
|
|
||||||
break
|
|
||||||
|
|
||||||
actual_list = lists.get(list_key)
|
|
||||||
if actual_list is not None:
|
|
||||||
# We only expect a single SYNC operation for any list
|
|
||||||
assert len(actual_list.ops) == 1
|
|
||||||
sync_op = actual_list.ops[0]
|
|
||||||
assert sync_op.op == OperationType.SYNC
|
|
||||||
|
|
||||||
relevant_room_ids.update(sync_op.room_ids)
|
|
||||||
|
|
||||||
# Fetch room account data
|
# Fetch room account data
|
||||||
account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] = {}
|
account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] = {}
|
||||||
|
relevant_room_ids = self.find_relevant_room_ids_for_extension(
|
||||||
|
requested_lists=account_data_request.lists,
|
||||||
|
requested_room_ids=account_data_request.rooms,
|
||||||
|
actual_lists=actual_lists,
|
||||||
|
actual_room_ids=actual_room_ids,
|
||||||
|
)
|
||||||
if len(relevant_room_ids) > 0:
|
if len(relevant_room_ids) > 0:
|
||||||
if from_token is not None:
|
if from_token is not None:
|
||||||
|
# TODO: This should take into account the `from_token` and `to_token`
|
||||||
account_data_by_room_map = (
|
account_data_by_room_map = (
|
||||||
await self.store.get_updated_room_account_data_for_user(
|
await self.store.get_updated_room_account_data_for_user(
|
||||||
user_id, from_token.stream_token.account_data_key
|
user_id, from_token.stream_token.account_data_key
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
# TODO: This should take into account the `to_token`
|
||||||
account_data_by_room_map = (
|
account_data_by_room_map = (
|
||||||
await self.store.get_room_account_data_for_user(user_id)
|
await self.store.get_room_account_data_for_user(user_id)
|
||||||
)
|
)
|
||||||
@ -2200,6 +2268,86 @@ class SlidingSyncHandler:
|
|||||||
account_data_by_room_map=account_data_by_room_map,
|
account_data_by_room_map=account_data_by_room_map,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
async def get_receipts_extension_response(
|
||||||
|
self,
|
||||||
|
sync_config: SlidingSyncConfig,
|
||||||
|
actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
|
||||||
|
actual_room_ids: Set[str],
|
||||||
|
actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult],
|
||||||
|
receipts_request: SlidingSyncConfig.Extensions.ReceiptsExtension,
|
||||||
|
to_token: StreamToken,
|
||||||
|
from_token: Optional[SlidingSyncStreamToken],
|
||||||
|
) -> Optional[SlidingSyncResult.Extensions.ReceiptsExtension]:
|
||||||
|
"""Handle Receipts extension (MSC3960)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
sync_config: Sync configuration
|
||||||
|
actual_lists: Sliding window API. A map of list key to list results in the
|
||||||
|
Sliding Sync response.
|
||||||
|
actual_room_ids: The actual room IDs in the the Sliding Sync response.
|
||||||
|
actual_room_response_map: A map of room ID to room results in the the
|
||||||
|
Sliding Sync response.
|
||||||
|
account_data_request: The account_data extension from the request
|
||||||
|
to_token: The point in the stream to sync up to.
|
||||||
|
from_token: The point in the stream to sync from.
|
||||||
|
"""
|
||||||
|
# Skip if the extension is not enabled
|
||||||
|
if not receipts_request.enabled:
|
||||||
|
return None
|
||||||
|
|
||||||
|
relevant_room_ids = self.find_relevant_room_ids_for_extension(
|
||||||
|
requested_lists=receipts_request.lists,
|
||||||
|
requested_room_ids=receipts_request.rooms,
|
||||||
|
actual_lists=actual_lists,
|
||||||
|
actual_room_ids=actual_room_ids,
|
||||||
|
)
|
||||||
|
|
||||||
|
room_id_to_receipt_map: Dict[str, JsonMapping] = {}
|
||||||
|
if len(relevant_room_ids) > 0:
|
||||||
|
receipt_source = self.event_sources.sources.receipt
|
||||||
|
receipts, _ = await receipt_source.get_new_events(
|
||||||
|
user=sync_config.user,
|
||||||
|
from_key=(
|
||||||
|
from_token.stream_token.receipt_key
|
||||||
|
if from_token
|
||||||
|
else MultiWriterStreamToken(stream=0)
|
||||||
|
),
|
||||||
|
to_key=to_token.receipt_key,
|
||||||
|
# This is a dummy value and isn't used in the function
|
||||||
|
limit=0,
|
||||||
|
room_ids=relevant_room_ids,
|
||||||
|
is_guest=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
for receipt in receipts:
|
||||||
|
# These fields should exist for every receipt
|
||||||
|
room_id = receipt["room_id"]
|
||||||
|
type = receipt["type"]
|
||||||
|
content = receipt["content"]
|
||||||
|
|
||||||
|
room_result = actual_room_response_map.get(room_id)
|
||||||
|
if room_result is not None:
|
||||||
|
if room_result.initial:
|
||||||
|
# TODO: In the future, it would be good to fetch less receipts
|
||||||
|
# out of the database in the first place but we would need to
|
||||||
|
# add a new `event_id` index to `receipts_linearized`.
|
||||||
|
relevant_event_ids = [
|
||||||
|
event.event_id for event in room_result.timeline_events
|
||||||
|
]
|
||||||
|
|
||||||
|
assert isinstance(content, dict)
|
||||||
|
content = {
|
||||||
|
event_id: content_value
|
||||||
|
for event_id, content_value in content.items()
|
||||||
|
if event_id in relevant_event_ids
|
||||||
|
}
|
||||||
|
|
||||||
|
room_id_to_receipt_map[room_id] = {"type": type, "content": content}
|
||||||
|
|
||||||
|
return SlidingSyncResult.Extensions.ReceiptsExtension(
|
||||||
|
room_id_to_receipt_map=room_id_to_receipt_map,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class HaveSentRoomFlag(Enum):
|
class HaveSentRoomFlag(Enum):
|
||||||
"""Flag for whether we have sent the room down a sliding sync connection.
|
"""Flag for whether we have sent the room down a sliding sync connection.
|
||||||
|
@ -1150,6 +1150,12 @@ class SlidingSyncRestServlet(RestServlet):
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if extensions.receipts is not None:
|
||||||
|
serialized_extensions["receipts"] = {
|
||||||
|
# Same as the the top-level `account_data.events` field in Sync v2.
|
||||||
|
"rooms": extensions.receipts.room_id_to_receipt_map,
|
||||||
|
}
|
||||||
|
|
||||||
return serialized_extensions
|
return serialized_extensions
|
||||||
|
|
||||||
|
|
||||||
|
@ -152,7 +152,7 @@ class SlidingSyncResult:
|
|||||||
Attributes:
|
Attributes:
|
||||||
next_pos: The next position token in the sliding window to request (next_batch).
|
next_pos: The next position token in the sliding window to request (next_batch).
|
||||||
lists: Sliding window API. A map of list key to list results.
|
lists: Sliding window API. A map of list key to list results.
|
||||||
rooms: Room subscription API. A map of room ID to room subscription to room results.
|
rooms: Room subscription API. A map of room ID to room results.
|
||||||
extensions: Extensions API. A map of extension key to extension results.
|
extensions: Extensions API. A map of extension key to extension results.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@ -361,12 +361,28 @@ class SlidingSyncResult:
|
|||||||
self.global_account_data_map or self.account_data_by_room_map
|
self.global_account_data_map or self.account_data_by_room_map
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||||
|
class ReceiptsExtension:
|
||||||
|
"""The Receipts extension (MSC3960)
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
room_id_to_receipt_map: Mapping from room_id to `m.receipt` event (type, content)
|
||||||
|
"""
|
||||||
|
|
||||||
|
room_id_to_receipt_map: Mapping[str, JsonMapping]
|
||||||
|
|
||||||
|
def __bool__(self) -> bool:
|
||||||
|
return bool(self.room_id_to_receipt_map)
|
||||||
|
|
||||||
to_device: Optional[ToDeviceExtension] = None
|
to_device: Optional[ToDeviceExtension] = None
|
||||||
e2ee: Optional[E2eeExtension] = None
|
e2ee: Optional[E2eeExtension] = None
|
||||||
account_data: Optional[AccountDataExtension] = None
|
account_data: Optional[AccountDataExtension] = None
|
||||||
|
receipts: Optional[ReceiptsExtension] = None
|
||||||
|
|
||||||
def __bool__(self) -> bool:
|
def __bool__(self) -> bool:
|
||||||
return bool(self.to_device or self.e2ee or self.account_data)
|
return bool(
|
||||||
|
self.to_device or self.e2ee or self.account_data or self.receipts
|
||||||
|
)
|
||||||
|
|
||||||
next_pos: SlidingSyncStreamToken
|
next_pos: SlidingSyncStreamToken
|
||||||
lists: Dict[str, SlidingWindowList]
|
lists: Dict[str, SlidingWindowList]
|
||||||
|
@ -342,9 +342,27 @@ class SlidingSyncBody(RequestBodyModel):
|
|||||||
# Process all room subscriptions defined in the Room Subscription API. (This is the default.)
|
# Process all room subscriptions defined in the Room Subscription API. (This is the default.)
|
||||||
rooms: Optional[List[StrictStr]] = ["*"]
|
rooms: Optional[List[StrictStr]] = ["*"]
|
||||||
|
|
||||||
|
class ReceiptsExtension(RequestBodyModel):
|
||||||
|
"""The Receipts extension (MSC3960)
|
||||||
|
|
||||||
|
Attributes:
|
||||||
|
enabled
|
||||||
|
lists: List of list keys (from the Sliding Window API) to apply this
|
||||||
|
extension to.
|
||||||
|
rooms: List of room IDs (from the Room Subscription API) to apply this
|
||||||
|
extension to.
|
||||||
|
"""
|
||||||
|
|
||||||
|
enabled: Optional[StrictBool] = False
|
||||||
|
# Process all lists defined in the Sliding Window API. (This is the default.)
|
||||||
|
lists: Optional[List[StrictStr]] = ["*"]
|
||||||
|
# Process all room subscriptions defined in the Room Subscription API. (This is the default.)
|
||||||
|
rooms: Optional[List[StrictStr]] = ["*"]
|
||||||
|
|
||||||
to_device: Optional[ToDeviceExtension] = None
|
to_device: Optional[ToDeviceExtension] = None
|
||||||
e2ee: Optional[E2eeExtension] = None
|
e2ee: Optional[E2eeExtension] = None
|
||||||
account_data: Optional[AccountDataExtension] = None
|
account_data: Optional[AccountDataExtension] = None
|
||||||
|
receipts: Optional[ReceiptsExtension] = None
|
||||||
|
|
||||||
conn_id: Optional[str]
|
conn_id: Optional[str]
|
||||||
|
|
||||||
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user