Include bundled aggregations for the latest event in a thread. (#12273)

The `latest_event` field of the bundled aggregations for `m.thread` relations
did not include bundled aggregations itself. This resulted in clients needing to
immediately request the event from the server (and thus making it useless that
the latest event itself was serialized instead of just including an event ID).
This commit is contained in:
Patrick Cloke 2022-05-04 08:38:18 -04:00 committed by GitHub
parent 01e625513a
commit 75dff3dc98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 197 additions and 50 deletions

1
changelog.d/12273.bugfix Normal file
View File

@ -0,0 +1 @@
Fix a bug introduced in Synapse v1.48.0 where latest thread reply provided failed to include the proper bundled aggregations.

View File

@ -426,13 +426,12 @@ class EventClientSerializer:
# Check if there are any bundled aggregations to include with the event. # Check if there are any bundled aggregations to include with the event.
if bundle_aggregations: if bundle_aggregations:
event_aggregations = bundle_aggregations.get(event.event_id) if event.event_id in bundle_aggregations:
if event_aggregations:
self._inject_bundled_aggregations( self._inject_bundled_aggregations(
event, event,
time_now, time_now,
config, config,
event_aggregations, bundle_aggregations,
serialized_event, serialized_event,
apply_edits=apply_edits, apply_edits=apply_edits,
) )
@ -471,7 +470,7 @@ class EventClientSerializer:
event: EventBase, event: EventBase,
time_now: int, time_now: int,
config: SerializeEventConfig, config: SerializeEventConfig,
aggregations: "BundledAggregations", bundled_aggregations: Dict[str, "BundledAggregations"],
serialized_event: JsonDict, serialized_event: JsonDict,
apply_edits: bool, apply_edits: bool,
) -> None: ) -> None:
@ -481,22 +480,37 @@ class EventClientSerializer:
event: The event being serialized. event: The event being serialized.
time_now: The current time in milliseconds time_now: The current time in milliseconds
config: Event serialization config config: Event serialization config
aggregations: The bundled aggregation to serialize. bundled_aggregations: Bundled aggregations to be injected.
A map from event_id to aggregation data. Must contain at least an
entry for `event`.
While serializing the bundled aggregations this map may be searched
again for additional events in a recursive manner.
serialized_event: The serialized event which may be modified. serialized_event: The serialized event which may be modified.
apply_edits: Whether the content of the event should be modified to reflect apply_edits: Whether the content of the event should be modified to reflect
any replacement in `aggregations.replace`. any replacement in `aggregations.replace`.
""" """
# We have already checked that aggregations exist for this event.
event_aggregations = bundled_aggregations[event.event_id]
# The JSON dictionary to be added under the unsigned property of the event
# being serialized.
serialized_aggregations = {} serialized_aggregations = {}
if aggregations.annotations: if event_aggregations.annotations:
serialized_aggregations[RelationTypes.ANNOTATION] = aggregations.annotations serialized_aggregations[
RelationTypes.ANNOTATION
] = event_aggregations.annotations
if aggregations.references: if event_aggregations.references:
serialized_aggregations[RelationTypes.REFERENCE] = aggregations.references serialized_aggregations[
RelationTypes.REFERENCE
] = event_aggregations.references
if aggregations.replace: if event_aggregations.replace:
# If there is an edit, optionally apply it to the event. # If there is an edit, optionally apply it to the event.
edit = aggregations.replace edit = event_aggregations.replace
if apply_edits: if apply_edits:
self._apply_edit(event, serialized_event, edit) self._apply_edit(event, serialized_event, edit)
@ -507,18 +521,15 @@ class EventClientSerializer:
"sender": edit.sender, "sender": edit.sender,
} }
# If this event is the start of a thread, include a summary of the replies. # Include any threaded replies to this event.
if aggregations.thread: if event_aggregations.thread:
thread = aggregations.thread thread = event_aggregations.thread
# Don't bundle aggregations as this could recurse forever. serialized_latest_event = self.serialize_event(
serialized_latest_event = serialize_event( thread.latest_event,
thread.latest_event, time_now, config=config time_now,
) config=config,
# Manually apply an edit, if one exists. bundle_aggregations=bundled_aggregations,
if thread.latest_edit:
self._apply_edit(
thread.latest_event, serialized_latest_event, thread.latest_edit
) )
thread_summary = { thread_summary = {

View File

@ -44,8 +44,6 @@ logger = logging.getLogger(__name__)
class _ThreadAggregation: class _ThreadAggregation:
# The latest event in the thread. # The latest event in the thread.
latest_event: EventBase latest_event: EventBase
# The latest edit to the latest event in the thread.
latest_edit: Optional[EventBase]
# The total number of events in the thread. # The total number of events in the thread.
count: int count: int
# True if the current user has sent an event to the thread. # True if the current user has sent an event to the thread.
@ -295,7 +293,7 @@ class RelationsHandler:
for event_id, summary in summaries.items(): for event_id, summary in summaries.items():
if summary: if summary:
thread_count, latest_thread_event, edit = summary thread_count, latest_thread_event = summary
# Subtract off the count of any ignored users. # Subtract off the count of any ignored users.
for ignored_user in ignored_users: for ignored_user in ignored_users:
@ -340,7 +338,6 @@ class RelationsHandler:
results[event_id] = _ThreadAggregation( results[event_id] = _ThreadAggregation(
latest_event=latest_thread_event, latest_event=latest_thread_event,
latest_edit=edit,
count=thread_count, count=thread_count,
# If there's a thread summary it must also exist in the # If there's a thread summary it must also exist in the
# participated dictionary. # participated dictionary.
@ -359,8 +356,13 @@ class RelationsHandler:
user_id: The user requesting the bundled aggregations. user_id: The user requesting the bundled aggregations.
Returns: Returns:
A map of event ID to the bundled aggregation for the event. Not all A map of event ID to the bundled aggregations for the event.
events may have bundled aggregations in the results.
Not all requested events may exist in the results (if they don't have
bundled aggregations).
The results may include additional events which are related to the
requested events.
""" """
# De-duplicate events by ID to handle the same event requested multiple times. # De-duplicate events by ID to handle the same event requested multiple times.
# #
@ -369,22 +371,59 @@ class RelationsHandler:
event.event_id: event for event in events if not event.is_state() event.event_id: event for event in events if not event.is_state()
} }
# A map of event ID to the relation in that event, if there is one.
relations_by_id: Dict[str, str] = {}
for event_id, event in events_by_id.items():
relates_to = event.content.get("m.relates_to")
if isinstance(relates_to, collections.abc.Mapping):
relation_type = relates_to.get("rel_type")
if isinstance(relation_type, str):
relations_by_id[event_id] = relation_type
# event ID -> bundled aggregation in non-serialized form. # event ID -> bundled aggregation in non-serialized form.
results: Dict[str, BundledAggregations] = {} results: Dict[str, BundledAggregations] = {}
# Fetch any ignored users of the requesting user. # Fetch any ignored users of the requesting user.
ignored_users = await self._main_store.ignored_users(user_id) ignored_users = await self._main_store.ignored_users(user_id)
# Threads are special as the latest event of a thread might cause additional
# events to be fetched. Thus, we check those first!
# Fetch thread summaries (but only for the directly requested events).
threads = await self.get_threads_for_events(
# It is not valid to start a thread on an event which itself relates to another event.
[eid for eid in events_by_id.keys() if eid not in relations_by_id],
user_id,
ignored_users,
)
for event_id, thread in threads.items():
results.setdefault(event_id, BundledAggregations()).thread = thread
# If the latest event in a thread is not already being fetched,
# add it. This ensures that the bundled aggregations for the
# latest thread event is correct.
latest_thread_event = thread.latest_event
if latest_thread_event and latest_thread_event.event_id not in events_by_id:
events_by_id[latest_thread_event.event_id] = latest_thread_event
# Keep relations_by_id in sync with events_by_id:
#
# We know that the latest event in a thread has a thread relation
# (as that is what makes it part of the thread).
relations_by_id[latest_thread_event.event_id] = RelationTypes.THREAD
# Fetch other relations per event. # Fetch other relations per event.
for event in events_by_id.values(): for event in events_by_id.values():
# Do not bundle aggregations for an event which represents an edit or an # An event which is a replacement (ie edit) or annotation (ie, reaction)
# annotation. It does not make sense for them to have related events. # may not have any other event related to it.
relates_to = event.content.get("m.relates_to") #
if isinstance(relates_to, collections.abc.Mapping): # XXX This is buggy, see https://github.com/matrix-org/synapse/issues/12566
relation_type = relates_to.get("rel_type") if relations_by_id.get(event.event_id) in (
if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE): RelationTypes.ANNOTATION,
RelationTypes.REPLACE,
):
continue continue
# Fetch any annotations (ie, reactions) to bundle with this event.
annotations = await self.get_annotations_for_event( annotations = await self.get_annotations_for_event(
event.event_id, event.room_id, ignored_users=ignored_users event.event_id, event.room_id, ignored_users=ignored_users
) )
@ -393,6 +432,7 @@ class RelationsHandler:
event.event_id, BundledAggregations() event.event_id, BundledAggregations()
).annotations = {"chunk": annotations} ).annotations = {"chunk": annotations}
# Fetch any references to bundle with this event.
references, next_token = await self.get_relations_for_event( references, next_token = await self.get_relations_for_event(
event.event_id, event.event_id,
event, event,
@ -425,10 +465,4 @@ class RelationsHandler:
for event_id, edit in edits.items(): for event_id, edit in edits.items():
results.setdefault(event_id, BundledAggregations()).replace = edit results.setdefault(event_id, BundledAggregations()).replace = edit
threads = await self.get_threads_for_events(
events_by_id.keys(), user_id, ignored_users
)
for event_id, thread in threads.items():
results.setdefault(event_id, BundledAggregations()).thread = thread
return results return results

View File

@ -445,8 +445,8 @@ class RelationsWorkerStore(SQLBaseStore):
@cachedList(cached_method_name="get_thread_summary", list_name="event_ids") @cachedList(cached_method_name="get_thread_summary", list_name="event_ids")
async def get_thread_summaries( async def get_thread_summaries(
self, event_ids: Collection[str] self, event_ids: Collection[str]
) -> Dict[str, Optional[Tuple[int, EventBase, Optional[EventBase]]]]: ) -> Dict[str, Optional[Tuple[int, EventBase]]]:
"""Get the number of threaded replies, the latest reply (if any), and the latest edit for that reply for the given event. """Get the number of threaded replies and the latest reply (if any) for the given events.
Args: Args:
event_ids: Summarize the thread related to this event ID. event_ids: Summarize the thread related to this event ID.
@ -458,7 +458,6 @@ class RelationsWorkerStore(SQLBaseStore):
Each summary is a tuple of: Each summary is a tuple of:
The number of events in the thread. The number of events in the thread.
The most recent event in the thread. The most recent event in the thread.
The most recent edit to the most recent event in the thread, if applicable.
""" """
def _get_thread_summaries_txn( def _get_thread_summaries_txn(
@ -544,9 +543,6 @@ class RelationsWorkerStore(SQLBaseStore):
latest_events = await self.get_events(latest_event_ids.values()) # type: ignore[attr-defined] latest_events = await self.get_events(latest_event_ids.values()) # type: ignore[attr-defined]
# Check to see if any of those events are edited.
latest_edits = await self.get_applicable_edits(latest_event_ids.values())
# Map to the event IDs to the thread summary. # Map to the event IDs to the thread summary.
# #
# There might not be a summary due to there not being a thread or # There might not be a summary due to there not being a thread or
@ -557,8 +553,7 @@ class RelationsWorkerStore(SQLBaseStore):
summary = None summary = None
if latest_event: if latest_event:
latest_edit = latest_edits.get(latest_event_id) summary = (counts[parent_event_id], latest_event)
summary = (counts[parent_event_id], latest_event, latest_edit)
summaries[parent_event_id] = summary summaries[parent_event_id] = summary
return summaries return summaries

View File

@ -1029,7 +1029,106 @@ class BundledAggregationsTestCase(BaseRelationsTestCase):
bundled_aggregations.get("latest_event"), bundled_aggregations.get("latest_event"),
) )
self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 9) self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 10)
def test_thread_with_bundled_aggregations_for_latest(self) -> None:
"""
Bundled aggregations should get applied to the latest thread event.
"""
self._send_relation(RelationTypes.THREAD, "m.room.test")
channel = self._send_relation(RelationTypes.THREAD, "m.room.test")
thread_2 = channel.json_body["event_id"]
self._send_relation(
RelationTypes.ANNOTATION, "m.reaction", "a", parent_id=thread_2
)
def assert_thread(bundled_aggregations: JsonDict) -> None:
self.assertEqual(2, bundled_aggregations.get("count"))
self.assertTrue(bundled_aggregations.get("current_user_participated"))
# The latest thread event has some fields that don't matter.
self.assert_dict(
{
"content": {
"m.relates_to": {
"event_id": self.parent_id,
"rel_type": RelationTypes.THREAD,
}
},
"event_id": thread_2,
"sender": self.user_id,
"type": "m.room.test",
},
bundled_aggregations.get("latest_event"),
)
# Check the unsigned field on the latest event.
self.assert_dict(
{
"m.relations": {
RelationTypes.ANNOTATION: {
"chunk": [
{"type": "m.reaction", "key": "a", "count": 1},
]
},
}
},
bundled_aggregations["latest_event"].get("unsigned"),
)
self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 10)
def test_nested_thread(self) -> None:
"""
Ensure that a nested thread gets ignored by bundled aggregations, as
those are forbidden.
"""
# Start a thread.
channel = self._send_relation(RelationTypes.THREAD, "m.room.test")
reply_event_id = channel.json_body["event_id"]
# Disable the validation to pretend this came over federation, since it is
# not an event the Client-Server API will allow..
with patch(
"synapse.handlers.message.EventCreationHandler._validate_event_relation",
new=lambda self, event: make_awaitable(None),
):
# Create a sub-thread off the thread, which is not allowed.
self._send_relation(
RelationTypes.THREAD, "m.room.test", parent_id=reply_event_id
)
# Fetch the thread root, to get the bundled aggregation for the thread.
relations_from_event = self._get_bundled_aggregations()
# Ensure that requesting the room messages also does not return the sub-thread.
channel = self.make_request(
"GET",
f"/rooms/{self.room}/messages?dir=b",
access_token=self.user_token,
)
self.assertEqual(200, channel.code, channel.json_body)
event = self._find_event_in_chunk(channel.json_body["chunk"])
relations_from_messages = event["unsigned"]["m.relations"]
# Check the bundled aggregations from each point.
for aggregations, desc in (
(relations_from_event, "/event"),
(relations_from_messages, "/messages"),
):
# The latest event should have bundled aggregations.
self.assertIn(RelationTypes.THREAD, aggregations, desc)
thread_summary = aggregations[RelationTypes.THREAD]
self.assertIn("latest_event", thread_summary, desc)
self.assertEqual(
thread_summary["latest_event"]["event_id"], reply_event_id, desc
)
# The latest event should not have any bundled aggregations (since the
# only relation to it is another thread, which is invalid).
self.assertNotIn(
"m.relations", thread_summary["latest_event"]["unsigned"], desc
)
def test_thread_edit_latest_event(self) -> None: def test_thread_edit_latest_event(self) -> None:
"""Test that editing the latest event in a thread works.""" """Test that editing the latest event in a thread works."""
@ -1049,6 +1148,7 @@ class BundledAggregationsTestCase(BaseRelationsTestCase):
content={"msgtype": "m.text", "body": "foo", "m.new_content": new_body}, content={"msgtype": "m.text", "body": "foo", "m.new_content": new_body},
parent_id=threaded_event_id, parent_id=threaded_event_id,
) )
edit_event_id = channel.json_body["event_id"]
# Fetch the thread root, to get the bundled aggregation for the thread. # Fetch the thread root, to get the bundled aggregation for the thread.
relations_dict = self._get_bundled_aggregations() relations_dict = self._get_bundled_aggregations()
@ -1061,6 +1161,12 @@ class BundledAggregationsTestCase(BaseRelationsTestCase):
self.assertIn("latest_event", thread_summary) self.assertIn("latest_event", thread_summary)
latest_event_in_thread = thread_summary["latest_event"] latest_event_in_thread = thread_summary["latest_event"]
self.assertEqual(latest_event_in_thread["content"]["body"], "I've been edited!") self.assertEqual(latest_event_in_thread["content"]["body"], "I've been edited!")
# The latest event in the thread should have the edit appear under the
# bundled aggregations.
self.assertDictContainsSubset(
{"event_id": edit_event_id, "sender": "@alice:test"},
latest_event_in_thread["unsigned"]["m.relations"][RelationTypes.REPLACE],
)
def test_aggregation_get_event_for_annotation(self) -> None: def test_aggregation_get_event_for_annotation(self) -> None:
"""Test that annotations do not get bundled aggregations included """Test that annotations do not get bundled aggregations included