From 6bf81a7a61d8d5248be5def955104c44fcb78dae Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 7 Jan 2022 09:10:46 -0500 Subject: [PATCH] Bundle aggregations outside of the serialization method. (#11612) This makes the serialization of events synchronous (and it no longer access the database), but we must manually calculate and provide the bundled aggregations. Overall this should cause no change in behavior, but is prep work for other improvements. --- changelog.d/11612.misc | 1 + synapse/events/utils.py | 124 ++++++------------- synapse/handlers/events.py | 2 +- synapse/handlers/initial_sync.py | 16 ++- synapse/handlers/message.py | 2 +- synapse/handlers/pagination.py | 8 +- synapse/handlers/room.py | 10 ++ synapse/handlers/search.py | 10 +- synapse/rest/admin/rooms.py | 16 +-- synapse/rest/client/events.py | 2 +- synapse/rest/client/notifications.py | 2 +- synapse/rest/client/relations.py | 11 +- synapse/rest/client/room.py | 28 +++-- synapse/rest/client/sync.py | 39 +++--- synapse/server.py | 2 +- synapse/storage/databases/main/relations.py | 128 +++++++++++++++++++- tests/rest/client/test_retention.py | 2 +- 17 files changed, 248 insertions(+), 155 deletions(-) create mode 100644 changelog.d/11612.misc diff --git a/changelog.d/11612.misc b/changelog.d/11612.misc new file mode 100644 index 000000000..2d886169c --- /dev/null +++ b/changelog.d/11612.misc @@ -0,0 +1 @@ +Avoid database access in the JSON serialization process. diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 2038e7292..de0e0c173 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -14,17 +14,7 @@ # limitations under the License. import collections.abc import re -from typing import ( - TYPE_CHECKING, - Any, - Callable, - Dict, - Iterable, - List, - Mapping, - Optional, - Union, -) +from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Union from frozendict import frozendict @@ -32,14 +22,10 @@ from synapse.api.constants import EventContentFields, EventTypes, RelationTypes from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import RoomVersion from synapse.types import JsonDict -from synapse.util.async_helpers import yieldable_gather_results from synapse.util.frozenutils import unfreeze from . import EventBase -if TYPE_CHECKING: - from synapse.server import HomeServer - # Split strings on "." but not "\." This uses a negative lookbehind assertion for '\' # (? JsonDict: """Serializes a single event. @@ -418,66 +399,41 @@ class EventClientSerializer: serialized_event = serialize_event(event, time_now, **kwargs) # Check if there are any bundled aggregations to include with the event. - # - # Do not bundle aggregations if any of the following at true: - # - # * Support is disabled via the configuration or the caller. - # * The event is a state event. - # * The event has been redacted. - if ( - self._msc1849_enabled - and bundle_aggregations - and not event.is_state() - and not event.internal_metadata.is_redacted() - ): - await self._injected_bundled_aggregations(event, time_now, serialized_event) + if bundle_aggregations: + event_aggregations = bundle_aggregations.get(event.event_id) + if event_aggregations: + self._injected_bundled_aggregations( + event, + time_now, + bundle_aggregations[event.event_id], + serialized_event, + ) return serialized_event - async def _injected_bundled_aggregations( - self, event: EventBase, time_now: int, serialized_event: JsonDict + def _injected_bundled_aggregations( + self, + event: EventBase, + time_now: int, + aggregations: JsonDict, + serialized_event: JsonDict, ) -> None: """Potentially injects bundled aggregations into the unsigned portion of the serialized event. Args: event: The event being serialized. time_now: The current time in milliseconds + aggregations: The bundled aggregation to serialize. serialized_event: The serialized event which may be modified. """ - # Do not bundle aggregations for an event which represents an edit or an - # annotation. It does not make sense for them to have related events. - relates_to = event.content.get("m.relates_to") - if isinstance(relates_to, (dict, frozendict)): - relation_type = relates_to.get("rel_type") - if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE): - return + # Make a copy in-case the object is cached. + aggregations = aggregations.copy() - event_id = event.event_id - room_id = event.room_id - - # The bundled aggregations to include. - aggregations = {} - - annotations = await self.store.get_aggregation_groups_for_event( - event_id, room_id - ) - if annotations.chunk: - aggregations[RelationTypes.ANNOTATION] = annotations.to_dict() - - references = await self.store.get_relations_for_event( - event_id, room_id, RelationTypes.REFERENCE, direction="f" - ) - if references.chunk: - aggregations[RelationTypes.REFERENCE] = references.to_dict() - - edit = None - if event.type == EventTypes.Message: - edit = await self.store.get_applicable_edit(event_id, room_id) - - if edit: + if RelationTypes.REPLACE in aggregations: # If there is an edit replace the content, preserving existing # relations. + edit = aggregations[RelationTypes.REPLACE] # Ensure we take copies of the edit content, otherwise we risk modifying # the original event. @@ -502,27 +458,19 @@ class EventClientSerializer: } # If this event is the start of a thread, include a summary of the replies. - if self._msc3440_enabled: - ( - thread_count, - latest_thread_event, - ) = await self.store.get_thread_summary(event_id, room_id) - if latest_thread_event: - aggregations[RelationTypes.THREAD] = { - # Don't bundle aggregations as this could recurse forever. - "latest_event": await self.serialize_event( - latest_thread_event, time_now, bundle_aggregations=False - ), - "count": thread_count, - } + if RelationTypes.THREAD in aggregations: + # Serialize the latest thread event. + latest_thread_event = aggregations[RelationTypes.THREAD]["latest_event"] - # If any bundled aggregations were found, include them. - if aggregations: - serialized_event["unsigned"].setdefault("m.relations", {}).update( - aggregations + # Don't bundle aggregations as this could recurse forever. + aggregations[RelationTypes.THREAD]["latest_event"] = self.serialize_event( + latest_thread_event, time_now, bundle_aggregations=None ) - async def serialize_events( + # Include the bundled aggregations in the event. + serialized_event["unsigned"].setdefault("m.relations", {}).update(aggregations) + + def serialize_events( self, events: Iterable[Union[JsonDict, EventBase]], time_now: int, **kwargs: Any ) -> List[JsonDict]: """Serializes multiple events. @@ -535,9 +483,9 @@ class EventClientSerializer: Returns: The list of serialized events """ - return await yieldable_gather_results( - self.serialize_event, events, time_now=time_now, **kwargs - ) + return [ + self.serialize_event(event, time_now=time_now, **kwargs) for event in events + ] def copy_power_levels_contents( diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 1b996c420..a3add8a58 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -119,7 +119,7 @@ class EventStreamHandler: events.extend(to_add) - chunks = await self._event_serializer.serialize_events( + chunks = self._event_serializer.serialize_events( events, time_now, as_client_event=as_client_event, diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 601bab67f..346a06ff4 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -170,7 +170,7 @@ class InitialSyncHandler: d["inviter"] = event.sender invite_event = await self.store.get_event(event.event_id) - d["invite"] = await self._event_serializer.serialize_event( + d["invite"] = self._event_serializer.serialize_event( invite_event, time_now, as_client_event=as_client_event, @@ -222,7 +222,7 @@ class InitialSyncHandler: d["messages"] = { "chunk": ( - await self._event_serializer.serialize_events( + self._event_serializer.serialize_events( messages, time_now=time_now, as_client_event=as_client_event, @@ -232,7 +232,7 @@ class InitialSyncHandler: "end": await end_token.to_string(self.store), } - d["state"] = await self._event_serializer.serialize_events( + d["state"] = self._event_serializer.serialize_events( current_state.values(), time_now=time_now, as_client_event=as_client_event, @@ -376,16 +376,14 @@ class InitialSyncHandler: "messages": { "chunk": ( # Don't bundle aggregations as this is a deprecated API. - await self._event_serializer.serialize_events(messages, time_now) + self._event_serializer.serialize_events(messages, time_now) ), "start": await start_token.to_string(self.store), "end": await end_token.to_string(self.store), }, "state": ( # Don't bundle aggregations as this is a deprecated API. - await self._event_serializer.serialize_events( - room_state.values(), time_now - ) + self._event_serializer.serialize_events(room_state.values(), time_now) ), "presence": [], "receipts": [], @@ -404,7 +402,7 @@ class InitialSyncHandler: # TODO: These concurrently time_now = self.clock.time_msec() # Don't bundle aggregations as this is a deprecated API. - state = await self._event_serializer.serialize_events( + state = self._event_serializer.serialize_events( current_state.values(), time_now ) @@ -480,7 +478,7 @@ class InitialSyncHandler: "messages": { "chunk": ( # Don't bundle aggregations as this is a deprecated API. - await self._event_serializer.serialize_events(messages, time_now) + self._event_serializer.serialize_events(messages, time_now) ), "start": await start_token.to_string(self.store), "end": await end_token.to_string(self.store), diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 5e3d3886e..b37250aa3 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -246,7 +246,7 @@ class MessageHandler: room_state = room_state_events[membership_event_id] now = self.clock.time_msec() - events = await self._event_serializer.serialize_events(room_state.values(), now) + events = self._event_serializer.serialize_events(room_state.values(), now) return events async def get_joined_members(self, requester: Requester, room_id: str) -> dict: diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 7469cc55a..472688f04 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -537,14 +537,16 @@ class PaginationHandler: state_dict = await self.store.get_events(list(state_ids.values())) state = state_dict.values() + aggregations = await self.store.get_bundled_aggregations(events) + time_now = self.clock.time_msec() chunk = { "chunk": ( - await self._event_serializer.serialize_events( + self._event_serializer.serialize_events( events, time_now, - bundle_aggregations=True, + bundle_aggregations=aggregations, as_client_event=as_client_event, ) ), @@ -553,7 +555,7 @@ class PaginationHandler: } if state: - chunk["state"] = await self._event_serializer.serialize_events( + chunk["state"] = self._event_serializer.serialize_events( state, time_now, as_client_event=as_client_event ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 3d3a0f6ac..3d47163f2 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1181,6 +1181,16 @@ class RoomContextHandler: # `filtered` rather than the event we retrieved from the datastore. results["event"] = filtered[0] + # Fetch the aggregations. + aggregations = await self.store.get_bundled_aggregations([results["event"]]) + aggregations.update( + await self.store.get_bundled_aggregations(results["events_before"]) + ) + aggregations.update( + await self.store.get_bundled_aggregations(results["events_after"]) + ) + results["aggregations"] = aggregations + if results["events_after"]: last_event_id = results["events_after"][-1].event_id else: diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index ab7eaab2f..0b153a682 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -420,10 +420,10 @@ class SearchHandler: time_now = self.clock.time_msec() for context in contexts.values(): - context["events_before"] = await self._event_serializer.serialize_events( + context["events_before"] = self._event_serializer.serialize_events( context["events_before"], time_now ) - context["events_after"] = await self._event_serializer.serialize_events( + context["events_after"] = self._event_serializer.serialize_events( context["events_after"], time_now ) @@ -441,9 +441,7 @@ class SearchHandler: results.append( { "rank": rank_map[e.event_id], - "result": ( - await self._event_serializer.serialize_event(e, time_now) - ), + "result": self._event_serializer.serialize_event(e, time_now), "context": contexts.get(e.event_id, {}), } ) @@ -457,7 +455,7 @@ class SearchHandler: if state_results: s = {} for room_id, state_events in state_results.items(): - s[room_id] = await self._event_serializer.serialize_events( + s[room_id] = self._event_serializer.serialize_events( state_events, time_now ) diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 6030373eb..2e714ac87 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -424,7 +424,7 @@ class RoomStateRestServlet(RestServlet): event_ids = await self.store.get_current_state_ids(room_id) events = await self.store.get_events(event_ids.values()) now = self.clock.time_msec() - room_state = await self._event_serializer.serialize_events(events.values(), now) + room_state = self._event_serializer.serialize_events(events.values(), now) ret = {"state": room_state} return HTTPStatus.OK, ret @@ -744,22 +744,22 @@ class RoomEventContextServlet(RestServlet): ) time_now = self.clock.time_msec() - results["events_before"] = await self._event_serializer.serialize_events( + results["events_before"] = self._event_serializer.serialize_events( results["events_before"], time_now, - bundle_aggregations=True, + bundle_aggregations=results["aggregations"], ) - results["event"] = await self._event_serializer.serialize_event( + results["event"] = self._event_serializer.serialize_event( results["event"], time_now, - bundle_aggregations=True, + bundle_aggregations=results["aggregations"], ) - results["events_after"] = await self._event_serializer.serialize_events( + results["events_after"] = self._event_serializer.serialize_events( results["events_after"], time_now, - bundle_aggregations=True, + bundle_aggregations=results["aggregations"], ) - results["state"] = await self._event_serializer.serialize_events( + results["state"] = self._event_serializer.serialize_events( results["state"], time_now ) diff --git a/synapse/rest/client/events.py b/synapse/rest/client/events.py index 13b72a045..672c82106 100644 --- a/synapse/rest/client/events.py +++ b/synapse/rest/client/events.py @@ -91,7 +91,7 @@ class EventRestServlet(RestServlet): time_now = self.clock.time_msec() if event: - result = await self._event_serializer.serialize_event(event, time_now) + result = self._event_serializer.serialize_event(event, time_now) return 200, result else: return 404, "Event not found." diff --git a/synapse/rest/client/notifications.py b/synapse/rest/client/notifications.py index acd0c9e13..8e427a96a 100644 --- a/synapse/rest/client/notifications.py +++ b/synapse/rest/client/notifications.py @@ -72,7 +72,7 @@ class NotificationsServlet(RestServlet): "actions": pa.actions, "ts": pa.received_ts, "event": ( - await self._event_serializer.serialize_event( + self._event_serializer.serialize_event( notif_events[pa.event_id], self.clock.time_msec(), event_format=format_event_for_client_v2_without_room_id, diff --git a/synapse/rest/client/relations.py b/synapse/rest/client/relations.py index 382349801..37d949a71 100644 --- a/synapse/rest/client/relations.py +++ b/synapse/rest/client/relations.py @@ -113,13 +113,14 @@ class RelationPaginationServlet(RestServlet): now = self.clock.time_msec() # Do not bundle aggregations when retrieving the original event because # we want the content before relations are applied to it. - original_event = await self._event_serializer.serialize_event( - event, now, bundle_aggregations=False + original_event = self._event_serializer.serialize_event( + event, now, bundle_aggregations=None ) # The relations returned for the requested event do include their # bundled aggregations. - serialized_events = await self._event_serializer.serialize_events( - events, now, bundle_aggregations=True + aggregations = await self.store.get_bundled_aggregations(events) + serialized_events = self._event_serializer.serialize_events( + events, now, bundle_aggregations=aggregations ) return_value = pagination_chunk.to_dict() @@ -308,7 +309,7 @@ class RelationAggregationGroupPaginationServlet(RestServlet): ) now = self.clock.time_msec() - serialized_events = await self._event_serializer.serialize_events(events, now) + serialized_events = self._event_serializer.serialize_events(events, now) return_value = result.to_dict() return_value["chunk"] = serialized_events diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 40330749e..da6014900 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -642,6 +642,7 @@ class RoomEventServlet(RestServlet): def __init__(self, hs: "HomeServer"): super().__init__() self.clock = hs.get_clock() + self._store = hs.get_datastore() self.event_handler = hs.get_event_handler() self._event_serializer = hs.get_event_client_serializer() self.auth = hs.get_auth() @@ -660,10 +661,13 @@ class RoomEventServlet(RestServlet): # https://matrix.org/docs/spec/client_server/r0.5.0#get-matrix-client-r0-rooms-roomid-event-eventid raise SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND) - time_now = self.clock.time_msec() if event: - event_dict = await self._event_serializer.serialize_event( - event, time_now, bundle_aggregations=True + # Ensure there are bundled aggregations available. + aggregations = await self._store.get_bundled_aggregations([event]) + + time_now = self.clock.time_msec() + event_dict = self._event_serializer.serialize_event( + event, time_now, bundle_aggregations=aggregations ) return 200, event_dict @@ -708,16 +712,20 @@ class RoomEventContextServlet(RestServlet): raise SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND) time_now = self.clock.time_msec() - results["events_before"] = await self._event_serializer.serialize_events( - results["events_before"], time_now, bundle_aggregations=True + results["events_before"] = self._event_serializer.serialize_events( + results["events_before"], + time_now, + bundle_aggregations=results["aggregations"], ) - results["event"] = await self._event_serializer.serialize_event( - results["event"], time_now, bundle_aggregations=True + results["event"] = self._event_serializer.serialize_event( + results["event"], time_now, bundle_aggregations=results["aggregations"] ) - results["events_after"] = await self._event_serializer.serialize_events( - results["events_after"], time_now, bundle_aggregations=True + results["events_after"] = self._event_serializer.serialize_events( + results["events_after"], + time_now, + bundle_aggregations=results["aggregations"], ) - results["state"] = await self._event_serializer.serialize_events( + results["state"] = self._event_serializer.serialize_events( results["state"], time_now ) diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index e99a943d0..a3e57e4b2 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -17,7 +17,6 @@ from collections import defaultdict from typing import ( TYPE_CHECKING, Any, - Awaitable, Callable, Dict, Iterable, @@ -395,7 +394,7 @@ class SyncRestServlet(RestServlet): """ invited = {} for room in rooms: - invite = await self._event_serializer.serialize_event( + invite = self._event_serializer.serialize_event( room.invite, time_now, token_id=token_id, @@ -432,7 +431,7 @@ class SyncRestServlet(RestServlet): """ knocked = {} for room in rooms: - knock = await self._event_serializer.serialize_event( + knock = self._event_serializer.serialize_event( room.knock, time_now, token_id=token_id, @@ -525,21 +524,14 @@ class SyncRestServlet(RestServlet): The room, encoded in our response format """ - def serialize(events: Iterable[EventBase]) -> Awaitable[List[JsonDict]]: + def serialize( + events: Iterable[EventBase], + aggregations: Optional[Dict[str, Dict[str, Any]]] = None, + ) -> List[JsonDict]: return self._event_serializer.serialize_events( events, time_now=time_now, - # Don't bother to bundle aggregations if the timeline is unlimited, - # as clients will have all the necessary information. - # bundle_aggregations=room.timeline.limited, - # - # richvdh 2021-12-15: disable this temporarily as it has too high an - # overhead for initialsyncs. We need to figure out a way that the - # bundling can be done *before* the events are stored in the - # SyncResponseCache so that this part can be synchronous. - # - # Ensure to re-enable the test at tests/rest/client/test_relations.py::RelationsTestCase.test_bundled_aggregations. - bundle_aggregations=False, + bundle_aggregations=aggregations, token_id=token_id, event_format=event_formatter, only_event_fields=only_fields, @@ -561,8 +553,21 @@ class SyncRestServlet(RestServlet): event.room_id, ) - serialized_state = await serialize(state_events) - serialized_timeline = await serialize(timeline_events) + serialized_state = serialize(state_events) + # Don't bother to bundle aggregations if the timeline is unlimited, + # as clients will have all the necessary information. + # bundle_aggregations=room.timeline.limited, + # + # richvdh 2021-12-15: disable this temporarily as it has too high an + # overhead for initialsyncs. We need to figure out a way that the + # bundling can be done *before* the events are stored in the + # SyncResponseCache so that this part can be synchronous. + # + # Ensure to re-enable the test at tests/rest/client/test_relations.py::RelationsTestCase.test_bundled_aggregations. + # if room.timeline.limited: + # aggregations = await self.store.get_bundled_aggregations(timeline_events) + aggregations = None + serialized_timeline = serialize(timeline_events, aggregations) account_data = room.account_data diff --git a/synapse/server.py b/synapse/server.py index 185e40e4d..3032f0b73 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -759,7 +759,7 @@ class HomeServer(metaclass=abc.ABCMeta): @cache_in_self def get_event_client_serializer(self) -> EventClientSerializer: - return EventClientSerializer(self) + return EventClientSerializer() @cache_in_self def get_password_policy_handler(self) -> PasswordPolicyHandler: diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 4ff6aed25..c6c4bd18d 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -13,14 +13,30 @@ # limitations under the License. import logging -from typing import List, Optional, Tuple, Union, cast +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Iterable, + List, + Optional, + Tuple, + Union, + cast, +) import attr +from frozendict import frozendict -from synapse.api.constants import RelationTypes +from synapse.api.constants import EventTypes, RelationTypes from synapse.events import EventBase from synapse.storage._base import SQLBaseStore -from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, + make_in_list_sql_clause, +) from synapse.storage.databases.main.stream import generate_pagination_where_clause from synapse.storage.relations import ( AggregationPaginationToken, @@ -29,10 +45,24 @@ from synapse.storage.relations import ( ) from synapse.util.caches.descriptors import cached +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) class RelationsWorkerStore(SQLBaseStore): + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + self._msc1849_enabled = hs.config.experimental.msc1849_enabled + self._msc3440_enabled = hs.config.experimental.msc3440_enabled + @cached(tree=True) async def get_relations_for_event( self, @@ -515,6 +545,98 @@ class RelationsWorkerStore(SQLBaseStore): "get_if_user_has_annotated_event", _get_if_user_has_annotated_event ) + async def _get_bundled_aggregation_for_event( + self, event: EventBase + ) -> Optional[Dict[str, Any]]: + """Generate bundled aggregations for an event. + + Note that this does not use a cache, but depends on cached methods. + + Args: + event: The event to calculate bundled aggregations for. + + Returns: + The bundled aggregations for an event, if bundled aggregations are + enabled and the event can have bundled aggregations. + """ + # State events and redacted events do not get bundled aggregations. + if event.is_state() or event.internal_metadata.is_redacted(): + return None + + # Do not bundle aggregations for an event which represents an edit or an + # annotation. It does not make sense for them to have related events. + relates_to = event.content.get("m.relates_to") + if isinstance(relates_to, (dict, frozendict)): + relation_type = relates_to.get("rel_type") + if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE): + return None + + event_id = event.event_id + room_id = event.room_id + + # The bundled aggregations to include, a mapping of relation type to a + # type-specific value. Some types include the direct return type here + # while others need more processing during serialization. + aggregations: Dict[str, Any] = {} + + annotations = await self.get_aggregation_groups_for_event(event_id, room_id) + if annotations.chunk: + aggregations[RelationTypes.ANNOTATION] = annotations.to_dict() + + references = await self.get_relations_for_event( + event_id, room_id, RelationTypes.REFERENCE, direction="f" + ) + if references.chunk: + aggregations[RelationTypes.REFERENCE] = references.to_dict() + + edit = None + if event.type == EventTypes.Message: + edit = await self.get_applicable_edit(event_id, room_id) + + if edit: + aggregations[RelationTypes.REPLACE] = edit + + # If this event is the start of a thread, include a summary of the replies. + if self._msc3440_enabled: + ( + thread_count, + latest_thread_event, + ) = await self.get_thread_summary(event_id, room_id) + if latest_thread_event: + aggregations[RelationTypes.THREAD] = { + # Don't bundle aggregations as this could recurse forever. + "latest_event": latest_thread_event, + "count": thread_count, + } + + # Store the bundled aggregations in the event metadata for later use. + return aggregations + + async def get_bundled_aggregations( + self, events: Iterable[EventBase] + ) -> Dict[str, Dict[str, Any]]: + """Generate bundled aggregations for events. + + Args: + events: The iterable of events to calculate bundled aggregations for. + + Returns: + A map of event ID to the bundled aggregation for the event. Not all + events may have bundled aggregations in the results. + """ + # If bundled aggregations are disabled, nothing to do. + if not self._msc1849_enabled: + return {} + + # TODO Parallelize. + results = {} + for event in events: + event_result = await self._get_bundled_aggregation_for_event(event) + if event_result is not None: + results[event.event_id] = event_result + + return results + class RelationsStore(RelationsWorkerStore): pass diff --git a/tests/rest/client/test_retention.py b/tests/rest/client/test_retention.py index b58452195..fe5b536d9 100644 --- a/tests/rest/client/test_retention.py +++ b/tests/rest/client/test_retention.py @@ -228,7 +228,7 @@ class RetentionTestCase(unittest.HomeserverTestCase): self.assertIsNotNone(event) time_now = self.clock.time_msec() - serialized = self.get_success(self.serializer.serialize_event(event, time_now)) + serialized = self.serializer.serialize_event(event, time_now) return serialized