diff --git a/changelog.d/16762.misc b/changelog.d/16762.misc new file mode 100644 index 000000000..c49dc2085 --- /dev/null +++ b/changelog.d/16762.misc @@ -0,0 +1 @@ +Simplify event internal metadata class. diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 80e17f0fb..c52e72666 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -42,7 +42,7 @@ from unpaddedbase64 import encode_base64 from synapse.api.constants import RelationTypes from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions -from synapse.types import JsonDict, RoomStreamToken, StrCollection +from synapse.types import JsonDict, StrCollection from synapse.util.caches import intern_dict from synapse.util.frozenutils import freeze from synapse.util.stringutils import strtobool @@ -211,13 +211,6 @@ class _EventInternalMetadata: device_id: DictProperty[str] = DictProperty("device_id") """The device ID of the user who sent this event, if any.""" - # XXX: These are set by StreamWorkerStore._set_before_and_after. - # I'm pretty sure that these are never persisted to the database, so shouldn't - # be here - before: DictProperty[RoomStreamToken] = DictProperty("before") - after: DictProperty[RoomStreamToken] = DictProperty("after") - order: DictProperty[Tuple[int, int]] = DictProperty("order") - def get_dict(self) -> JsonDict: return dict(self._dict) diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 9a4af3c45..db80345b9 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -208,7 +208,12 @@ class AdminHandler: if not events: break - from_key = events[-1].internal_metadata.after + last_event = events[-1] + assert last_event.internal_metadata.stream_ordering + from_key = RoomStreamToken( + stream=last_event.internal_metadata.stream_ordering, + topological=last_event.depth, + ) events = await filter_events_for_client( self._storage_controllers, user_id, events diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index e78e598d5..41b00a5cf 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1742,13 +1742,19 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]): events = list(room_events) events.extend(e for evs, _ in room_to_events.values() for e in evs) - events.sort(key=lambda e: e.internal_metadata.order) + # We know stream_ordering must be not None here, as its been + # persisted, but mypy doesn't know that + events.sort(key=lambda e: cast(int, e.internal_metadata.stream_ordering)) if limit: events[:] = events[:limit] if events: - end_key = events[-1].internal_metadata.after + last_event = events[-1] + assert last_event.internal_metadata.stream_ordering + end_key = RoomStreamToken( + stream=last_event.internal_metadata.stream_ordering, + ) else: end_key = to_key diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 1152c0158..0385c04bc 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -601,7 +601,10 @@ class SyncHandler: if not limited or block_all_timeline: prev_batch_token = upto_token if recents: - room_key = recents[0].internal_metadata.before + assert recents[0].internal_metadata.stream_ordering + room_key = RoomStreamToken( + stream=recents[0].internal_metadata.stream_ordering - 1 + ) prev_batch_token = upto_token.copy_and_replace( StreamKeyType.ROOM, room_key ) @@ -689,7 +692,10 @@ class SyncHandler: if len(recents) > timeline_limit: limited = True recents = recents[-timeline_limit:] - room_key = recents[0].internal_metadata.before + assert recents[0].internal_metadata.stream_ordering + room_key = RoomStreamToken( + stream=recents[0].internal_metadata.stream_ordering - 1 + ) prev_batch_token = upto_token.copy_and_replace(StreamKeyType.ROOM, room_key) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 1b2a65bed..aeeb74b46 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -705,8 +705,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): [r.event_id for r in rows], get_prev_content=True ) - self._set_before_and_after(ret, rows, topo_order=False) - if order.lower() == "desc": ret.reverse() @@ -793,8 +791,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): [r.event_id for r in rows], get_prev_content=True ) - self._set_before_and_after(ret, rows, topo_order=False) - return ret async def get_recent_events_for_room( @@ -820,8 +816,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): [r.event_id for r in rows], get_prev_content=True ) - self._set_before_and_after(events, rows) - return events, token async def get_recent_event_ids_for_room( @@ -1094,31 +1088,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # `[(None,)]` return rows[0][0] if rows[0][0] is not None else 0 - @staticmethod - def _set_before_and_after( - events: List[EventBase], rows: List[_EventDictReturn], topo_order: bool = True - ) -> None: - """Inserts ordering information to events' internal metadata from - the DB rows. - - Args: - events - rows - topo_order: Whether the events were ordered topologically or by stream - ordering. If true then all rows should have a non null - topological_ordering. - """ - for event, row in zip(events, rows): - stream = row.stream_ordering - if topo_order and row.topological_ordering: - topo: Optional[int] = row.topological_ordering - else: - topo = None - internal = event.internal_metadata - internal.before = RoomStreamToken(topological=topo, stream=stream - 1) - internal.after = RoomStreamToken(topological=topo, stream=stream) - internal.order = (int(topo) if topo else 0, int(stream)) - async def get_events_around( self, room_id: str, @@ -1559,8 +1528,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): [r.event_id for r in rows], get_prev_content=True ) - self._set_before_and_after(events, rows) - return events, token @cached()