mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2024-12-27 02:19:25 -05:00
Support stable identifiers for MSC3440: Threading (#12151)
The unstable identifiers are still supported if the experimental configuration flag is enabled. The unstable identifiers will be removed in a future release.
This commit is contained in:
parent
52a947dc46
commit
ea27528b5d
1
changelog.d/12151.feature
Normal file
1
changelog.d/12151.feature
Normal file
@ -0,0 +1 @@
|
||||
Support the stable identifiers from [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440): threads.
|
@ -178,7 +178,9 @@ class RelationTypes:
|
||||
ANNOTATION: Final = "m.annotation"
|
||||
REPLACE: Final = "m.replace"
|
||||
REFERENCE: Final = "m.reference"
|
||||
THREAD: Final = "io.element.thread"
|
||||
THREAD: Final = "m.thread"
|
||||
# TODO Remove this in Synapse >= v1.57.0.
|
||||
UNSTABLE_THREAD: Final = "io.element.thread"
|
||||
|
||||
|
||||
class LimitBlockingTypes:
|
||||
|
@ -88,7 +88,9 @@ ROOM_EVENT_FILTER_SCHEMA = {
|
||||
"org.matrix.labels": {"type": "array", "items": {"type": "string"}},
|
||||
"org.matrix.not_labels": {"type": "array", "items": {"type": "string"}},
|
||||
# MSC3440, filtering by event relations.
|
||||
"related_by_senders": {"type": "array", "items": {"type": "string"}},
|
||||
"io.element.relation_senders": {"type": "array", "items": {"type": "string"}},
|
||||
"related_by_rel_types": {"type": "array", "items": {"type": "string"}},
|
||||
"io.element.relation_types": {"type": "array", "items": {"type": "string"}},
|
||||
},
|
||||
}
|
||||
@ -318,19 +320,18 @@ class Filter:
|
||||
self.labels = filter_json.get("org.matrix.labels", None)
|
||||
self.not_labels = filter_json.get("org.matrix.not_labels", [])
|
||||
|
||||
# Ideally these would be rejected at the endpoint if they were provided
|
||||
# and not supported, but that would involve modifying the JSON schema
|
||||
# based on the homeserver configuration.
|
||||
self.related_by_senders = self.filter_json.get("related_by_senders", None)
|
||||
self.related_by_rel_types = self.filter_json.get("related_by_rel_types", None)
|
||||
|
||||
# Fallback to the unstable prefix if the stable version is not given.
|
||||
if hs.config.experimental.msc3440_enabled:
|
||||
self.relation_senders = self.filter_json.get(
|
||||
self.related_by_senders = self.related_by_senders or self.filter_json.get(
|
||||
"io.element.relation_senders", None
|
||||
)
|
||||
self.relation_types = self.filter_json.get(
|
||||
"io.element.relation_types", None
|
||||
self.related_by_rel_types = (
|
||||
self.related_by_rel_types
|
||||
or self.filter_json.get("io.element.relation_types", None)
|
||||
)
|
||||
else:
|
||||
self.relation_senders = None
|
||||
self.relation_types = None
|
||||
|
||||
def filters_all_types(self) -> bool:
|
||||
return "*" in self.not_types
|
||||
@ -461,7 +462,7 @@ class Filter:
|
||||
event_ids = [event.event_id for event in events if isinstance(event, EventBase)] # type: ignore[attr-defined]
|
||||
event_ids_to_keep = set(
|
||||
await self._store.events_have_relations(
|
||||
event_ids, self.relation_senders, self.relation_types
|
||||
event_ids, self.related_by_senders, self.related_by_rel_types
|
||||
)
|
||||
)
|
||||
|
||||
@ -474,7 +475,7 @@ class Filter:
|
||||
async def filter(self, events: Iterable[FilterEvent]) -> List[FilterEvent]:
|
||||
result = [event for event in events if self._check(event)]
|
||||
|
||||
if self.relation_senders or self.relation_types:
|
||||
if self.related_by_senders or self.related_by_rel_types:
|
||||
return await self._check_event_relations(result)
|
||||
|
||||
return result
|
||||
|
@ -38,6 +38,7 @@ from synapse.util.frozenutils import unfreeze
|
||||
from . import EventBase
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.databases.main.relations import BundledAggregations
|
||||
|
||||
|
||||
@ -395,6 +396,9 @@ class EventClientSerializer:
|
||||
clients.
|
||||
"""
|
||||
|
||||
def __init__(self, hs: "HomeServer"):
|
||||
self._msc3440_enabled = hs.config.experimental.msc3440_enabled
|
||||
|
||||
def serialize_event(
|
||||
self,
|
||||
event: Union[JsonDict, EventBase],
|
||||
@ -515,11 +519,14 @@ class EventClientSerializer:
|
||||
thread.latest_event, serialized_latest_event, thread.latest_edit
|
||||
)
|
||||
|
||||
serialized_aggregations[RelationTypes.THREAD] = {
|
||||
thread_summary = {
|
||||
"latest_event": serialized_latest_event,
|
||||
"count": thread.count,
|
||||
"current_user_participated": thread.current_user_participated,
|
||||
}
|
||||
serialized_aggregations[RelationTypes.THREAD] = thread_summary
|
||||
if self._msc3440_enabled:
|
||||
serialized_aggregations[RelationTypes.UNSTABLE_THREAD] = thread_summary
|
||||
|
||||
# Include the bundled aggregations in the event.
|
||||
if serialized_aggregations:
|
||||
|
@ -1079,7 +1079,10 @@ class EventCreationHandler:
|
||||
raise SynapseError(400, "Can't send same reaction twice")
|
||||
|
||||
# Don't attempt to start a thread if the parent event is a relation.
|
||||
elif relation_type == RelationTypes.THREAD:
|
||||
elif (
|
||||
relation_type == RelationTypes.THREAD
|
||||
or relation_type == RelationTypes.UNSTABLE_THREAD
|
||||
):
|
||||
if await self.store.event_includes_relation(relates_to):
|
||||
raise SynapseError(
|
||||
400, "Cannot start threads from an event with a relation"
|
||||
|
@ -101,6 +101,7 @@ class VersionsRestServlet(RestServlet):
|
||||
"org.matrix.msc3030": self.config.experimental.msc3030_enabled,
|
||||
# Adds support for thread relations, per MSC3440.
|
||||
"org.matrix.msc3440": self.config.experimental.msc3440_enabled,
|
||||
"org.matrix.msc3440.stable": True, # TODO: remove when "v1.3" is added above
|
||||
},
|
||||
},
|
||||
)
|
||||
|
@ -754,7 +754,7 @@ class HomeServer(metaclass=abc.ABCMeta):
|
||||
|
||||
@cache_in_self
|
||||
def get_event_client_serializer(self) -> EventClientSerializer:
|
||||
return EventClientSerializer()
|
||||
return EventClientSerializer(self)
|
||||
|
||||
@cache_in_self
|
||||
def get_password_policy_handler(self) -> PasswordPolicyHandler:
|
||||
|
@ -1814,7 +1814,10 @@ class PersistEventsStore:
|
||||
if rel_type == RelationTypes.REPLACE:
|
||||
txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
|
||||
|
||||
if rel_type == RelationTypes.THREAD:
|
||||
if (
|
||||
rel_type == RelationTypes.THREAD
|
||||
or rel_type == RelationTypes.UNSTABLE_THREAD
|
||||
):
|
||||
txn.call_after(self.store.get_thread_summary.invalidate, (parent_id,))
|
||||
# It should be safe to only invalidate the cache if the user has not
|
||||
# previously participated in the thread, but that's difficult (and
|
||||
|
@ -508,7 +508,7 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||
AND parent.room_id = child.room_id
|
||||
WHERE
|
||||
%s
|
||||
AND relation_type = ?
|
||||
AND %s
|
||||
ORDER BY parent.event_id, child.topological_ordering DESC, child.stream_ordering DESC
|
||||
"""
|
||||
else:
|
||||
@ -523,16 +523,22 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||
AND parent.room_id = child.room_id
|
||||
WHERE
|
||||
%s
|
||||
AND relation_type = ?
|
||||
AND %s
|
||||
ORDER BY child.topological_ordering DESC, child.stream_ordering DESC
|
||||
"""
|
||||
|
||||
clause, args = make_in_list_sql_clause(
|
||||
txn.database_engine, "relates_to_id", event_ids
|
||||
)
|
||||
|
||||
if self._msc3440_enabled:
|
||||
relations_clause = "(relation_type = ? OR relation_type = ?)"
|
||||
args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD))
|
||||
else:
|
||||
relations_clause = "relation_type = ?"
|
||||
args.append(RelationTypes.THREAD)
|
||||
|
||||
txn.execute(sql % (clause,), args)
|
||||
txn.execute(sql % (clause, relations_clause), args)
|
||||
latest_event_ids = {}
|
||||
for parent_event_id, child_event_id in txn:
|
||||
# Only consider the latest threaded reply (by topological ordering).
|
||||
@ -552,7 +558,7 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||
AND parent.room_id = child.room_id
|
||||
WHERE
|
||||
%s
|
||||
AND relation_type = ?
|
||||
AND %s
|
||||
GROUP BY parent.event_id
|
||||
"""
|
||||
|
||||
@ -561,9 +567,15 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||
clause, args = make_in_list_sql_clause(
|
||||
txn.database_engine, "relates_to_id", latest_event_ids.keys()
|
||||
)
|
||||
|
||||
if self._msc3440_enabled:
|
||||
relations_clause = "(relation_type = ? OR relation_type = ?)"
|
||||
args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD))
|
||||
else:
|
||||
relations_clause = "relation_type = ?"
|
||||
args.append(RelationTypes.THREAD)
|
||||
|
||||
txn.execute(sql % (clause,), args)
|
||||
txn.execute(sql % (clause, relations_clause), args)
|
||||
counts = dict(cast(List[Tuple[str, int]], txn.fetchall()))
|
||||
|
||||
return counts, latest_event_ids
|
||||
@ -626,16 +638,24 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||
AND parent.room_id = child.room_id
|
||||
WHERE
|
||||
%s
|
||||
AND relation_type = ?
|
||||
AND %s
|
||||
AND child.sender = ?
|
||||
"""
|
||||
|
||||
clause, args = make_in_list_sql_clause(
|
||||
txn.database_engine, "relates_to_id", event_ids
|
||||
)
|
||||
args.extend((RelationTypes.THREAD, user_id))
|
||||
|
||||
txn.execute(sql % (clause,), args)
|
||||
if self._msc3440_enabled:
|
||||
relations_clause = "(relation_type = ? OR relation_type = ?)"
|
||||
args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD))
|
||||
else:
|
||||
relations_clause = "relation_type = ?"
|
||||
args.append(RelationTypes.THREAD)
|
||||
|
||||
args.append(user_id)
|
||||
|
||||
txn.execute(sql % (clause, relations_clause), args)
|
||||
return {row[0] for row in txn.fetchall()}
|
||||
|
||||
participated_threads = await self.db_pool.runInteraction(
|
||||
@ -834,13 +854,10 @@ class RelationsWorkerStore(SQLBaseStore):
|
||||
results.setdefault(event_id, BundledAggregations()).replace = edit
|
||||
|
||||
# Fetch thread summaries.
|
||||
if self._msc3440_enabled:
|
||||
summaries = await self._get_thread_summaries(events_by_id.keys())
|
||||
# Only fetch participated for a limited selection based on what had
|
||||
# summaries.
|
||||
participated = await self._get_threads_participated(
|
||||
summaries.keys(), user_id
|
||||
)
|
||||
participated = await self._get_threads_participated(summaries.keys(), user_id)
|
||||
for event_id, summary in summaries.items():
|
||||
if summary:
|
||||
thread_count, latest_thread_event, edit = summary
|
||||
|
@ -325,21 +325,23 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
|
||||
args.extend(event_filter.labels)
|
||||
|
||||
# Filter on relation_senders / relation types from the joined tables.
|
||||
if event_filter.relation_senders:
|
||||
if event_filter.related_by_senders:
|
||||
clauses.append(
|
||||
"(%s)"
|
||||
% " OR ".join(
|
||||
"related_event.sender = ?" for _ in event_filter.relation_senders
|
||||
"related_event.sender = ?" for _ in event_filter.related_by_senders
|
||||
)
|
||||
)
|
||||
args.extend(event_filter.relation_senders)
|
||||
args.extend(event_filter.related_by_senders)
|
||||
|
||||
if event_filter.relation_types:
|
||||
if event_filter.related_by_rel_types:
|
||||
clauses.append(
|
||||
"(%s)"
|
||||
% " OR ".join("relation_type = ?" for _ in event_filter.relation_types)
|
||||
% " OR ".join(
|
||||
"relation_type = ?" for _ in event_filter.related_by_rel_types
|
||||
)
|
||||
args.extend(event_filter.relation_types)
|
||||
)
|
||||
args.extend(event_filter.related_by_rel_types)
|
||||
|
||||
return " AND ".join(clauses), args
|
||||
|
||||
@ -1203,7 +1205,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
# If there is a filter on relation_senders and relation_types join to the
|
||||
# relations table.
|
||||
if event_filter and (
|
||||
event_filter.relation_senders or event_filter.relation_types
|
||||
event_filter.related_by_senders or event_filter.related_by_rel_types
|
||||
):
|
||||
# Filtering by relations could cause the same event to appear multiple
|
||||
# times (since there's no limit on the number of relations to an event).
|
||||
@ -1211,7 +1213,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||
join_clause += """
|
||||
LEFT JOIN event_relations AS relation ON (event.event_id = relation.relates_to_id)
|
||||
"""
|
||||
if event_filter.relation_senders:
|
||||
if event_filter.related_by_senders:
|
||||
join_clause += """
|
||||
LEFT JOIN events AS related_event ON (relation.event_id = related_event.event_id)
|
||||
"""
|
||||
|
@ -547,9 +547,7 @@ class RelationsTestCase(BaseRelationsTestCase):
|
||||
)
|
||||
self.assertEqual(400, channel.code, channel.json_body)
|
||||
|
||||
@unittest.override_config(
|
||||
{"experimental_features": {"msc3440_enabled": True, "msc3666_enabled": True}}
|
||||
)
|
||||
@unittest.override_config({"experimental_features": {"msc3666_enabled": True}})
|
||||
def test_bundled_aggregations(self) -> None:
|
||||
"""
|
||||
Test that annotations, references, and threads get correctly bundled.
|
||||
@ -758,7 +756,6 @@ class RelationsTestCase(BaseRelationsTestCase):
|
||||
},
|
||||
)
|
||||
|
||||
@unittest.override_config({"experimental_features": {"msc3440_enabled": True}})
|
||||
def test_ignore_invalid_room(self) -> None:
|
||||
"""Test that we ignore invalid relations over federation."""
|
||||
# Create another room and send a message in it.
|
||||
@ -1065,7 +1062,6 @@ class RelationsTestCase(BaseRelationsTestCase):
|
||||
{"event_id": edit_event_id, "sender": self.user_id}, m_replace_dict
|
||||
)
|
||||
|
||||
@unittest.override_config({"experimental_features": {"msc3440_enabled": True}})
|
||||
def test_edit_thread(self) -> None:
|
||||
"""Test that editing a thread works."""
|
||||
|
||||
@ -1383,7 +1379,6 @@ class RelationRedactionTestCase(BaseRelationsTestCase):
|
||||
chunk = self._get_aggregations()
|
||||
self.assertEqual(chunk, [{"type": "m.reaction", "key": "a", "count": 1}])
|
||||
|
||||
@unittest.override_config({"experimental_features": {"msc3440_enabled": True}})
|
||||
def test_redact_relation_thread(self) -> None:
|
||||
"""
|
||||
Test that thread replies are properly handled after the thread reply redacted.
|
||||
|
@ -2141,21 +2141,19 @@ class RelationsTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
def test_filter_relation_senders(self) -> None:
|
||||
# Messages which second user reacted to.
|
||||
filter = {"io.element.relation_senders": [self.second_user_id]}
|
||||
filter = {"related_by_senders": [self.second_user_id]}
|
||||
chunk = self._filter_messages(filter)
|
||||
self.assertEqual(len(chunk), 1, chunk)
|
||||
self.assertEqual(chunk[0]["event_id"], self.event_id_1)
|
||||
|
||||
# Messages which third user reacted to.
|
||||
filter = {"io.element.relation_senders": [self.third_user_id]}
|
||||
filter = {"related_by_senders": [self.third_user_id]}
|
||||
chunk = self._filter_messages(filter)
|
||||
self.assertEqual(len(chunk), 1, chunk)
|
||||
self.assertEqual(chunk[0]["event_id"], self.event_id_2)
|
||||
|
||||
# Messages which either user reacted to.
|
||||
filter = {
|
||||
"io.element.relation_senders": [self.second_user_id, self.third_user_id]
|
||||
}
|
||||
filter = {"related_by_senders": [self.second_user_id, self.third_user_id]}
|
||||
chunk = self._filter_messages(filter)
|
||||
self.assertEqual(len(chunk), 2, chunk)
|
||||
self.assertCountEqual(
|
||||
@ -2164,20 +2162,20 @@ class RelationsTestCase(unittest.HomeserverTestCase):
|
||||
|
||||
def test_filter_relation_type(self) -> None:
|
||||
# Messages which have annotations.
|
||||
filter = {"io.element.relation_types": [RelationTypes.ANNOTATION]}
|
||||
filter = {"related_by_rel_types": [RelationTypes.ANNOTATION]}
|
||||
chunk = self._filter_messages(filter)
|
||||
self.assertEqual(len(chunk), 1, chunk)
|
||||
self.assertEqual(chunk[0]["event_id"], self.event_id_1)
|
||||
|
||||
# Messages which have references.
|
||||
filter = {"io.element.relation_types": [RelationTypes.REFERENCE]}
|
||||
filter = {"related_by_rel_types": [RelationTypes.REFERENCE]}
|
||||
chunk = self._filter_messages(filter)
|
||||
self.assertEqual(len(chunk), 1, chunk)
|
||||
self.assertEqual(chunk[0]["event_id"], self.event_id_2)
|
||||
|
||||
# Messages which have either annotations or references.
|
||||
filter = {
|
||||
"io.element.relation_types": [
|
||||
"related_by_rel_types": [
|
||||
RelationTypes.ANNOTATION,
|
||||
RelationTypes.REFERENCE,
|
||||
]
|
||||
@ -2191,8 +2189,8 @@ class RelationsTestCase(unittest.HomeserverTestCase):
|
||||
def test_filter_relation_senders_and_type(self) -> None:
|
||||
# Messages which second user reacted to.
|
||||
filter = {
|
||||
"io.element.relation_senders": [self.second_user_id],
|
||||
"io.element.relation_types": [RelationTypes.ANNOTATION],
|
||||
"related_by_senders": [self.second_user_id],
|
||||
"related_by_rel_types": [RelationTypes.ANNOTATION],
|
||||
}
|
||||
chunk = self._filter_messages(filter)
|
||||
self.assertEqual(len(chunk), 1, chunk)
|
||||
|
@ -129,21 +129,19 @@ class PaginationTestCase(HomeserverTestCase):
|
||||
|
||||
def test_filter_relation_senders(self):
|
||||
# Messages which second user reacted to.
|
||||
filter = {"io.element.relation_senders": [self.second_user_id]}
|
||||
filter = {"related_by_senders": [self.second_user_id]}
|
||||
chunk = self._filter_messages(filter)
|
||||
self.assertEqual(len(chunk), 1, chunk)
|
||||
self.assertEqual(chunk[0].event_id, self.event_id_1)
|
||||
|
||||
# Messages which third user reacted to.
|
||||
filter = {"io.element.relation_senders": [self.third_user_id]}
|
||||
filter = {"related_by_senders": [self.third_user_id]}
|
||||
chunk = self._filter_messages(filter)
|
||||
self.assertEqual(len(chunk), 1, chunk)
|
||||
self.assertEqual(chunk[0].event_id, self.event_id_2)
|
||||
|
||||
# Messages which either user reacted to.
|
||||
filter = {
|
||||
"io.element.relation_senders": [self.second_user_id, self.third_user_id]
|
||||
}
|
||||
filter = {"related_by_senders": [self.second_user_id, self.third_user_id]}
|
||||
chunk = self._filter_messages(filter)
|
||||
self.assertEqual(len(chunk), 2, chunk)
|
||||
self.assertCountEqual(
|
||||
@ -152,20 +150,20 @@ class PaginationTestCase(HomeserverTestCase):
|
||||
|
||||
def test_filter_relation_type(self):
|
||||
# Messages which have annotations.
|
||||
filter = {"io.element.relation_types": [RelationTypes.ANNOTATION]}
|
||||
filter = {"related_by_rel_types": [RelationTypes.ANNOTATION]}
|
||||
chunk = self._filter_messages(filter)
|
||||
self.assertEqual(len(chunk), 1, chunk)
|
||||
self.assertEqual(chunk[0].event_id, self.event_id_1)
|
||||
|
||||
# Messages which have references.
|
||||
filter = {"io.element.relation_types": [RelationTypes.REFERENCE]}
|
||||
filter = {"related_by_rel_types": [RelationTypes.REFERENCE]}
|
||||
chunk = self._filter_messages(filter)
|
||||
self.assertEqual(len(chunk), 1, chunk)
|
||||
self.assertEqual(chunk[0].event_id, self.event_id_2)
|
||||
|
||||
# Messages which have either annotations or references.
|
||||
filter = {
|
||||
"io.element.relation_types": [
|
||||
"related_by_rel_types": [
|
||||
RelationTypes.ANNOTATION,
|
||||
RelationTypes.REFERENCE,
|
||||
]
|
||||
@ -179,8 +177,8 @@ class PaginationTestCase(HomeserverTestCase):
|
||||
def test_filter_relation_senders_and_type(self):
|
||||
# Messages which second user reacted to.
|
||||
filter = {
|
||||
"io.element.relation_senders": [self.second_user_id],
|
||||
"io.element.relation_types": [RelationTypes.ANNOTATION],
|
||||
"related_by_senders": [self.second_user_id],
|
||||
"related_by_rel_types": [RelationTypes.ANNOTATION],
|
||||
}
|
||||
chunk = self._filter_messages(filter)
|
||||
self.assertEqual(len(chunk), 1, chunk)
|
||||
@ -201,7 +199,7 @@ class PaginationTestCase(HomeserverTestCase):
|
||||
tok=self.second_tok,
|
||||
)
|
||||
|
||||
filter = {"io.element.relation_senders": [self.second_user_id]}
|
||||
filter = {"related_by_senders": [self.second_user_id]}
|
||||
chunk = self._filter_messages(filter)
|
||||
self.assertEqual(len(chunk), 1, chunk)
|
||||
self.assertEqual(chunk[0].event_id, self.event_id_1)
|
||||
|
Loading…
Reference in New Issue
Block a user