From d0b294ad974c05621426369a00be6bf05c4af997 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 28 Jul 2021 10:46:37 -0500 Subject: [PATCH] Make historical events discoverable from backfill for servers without any scrollback history (MSC2716) (#10245) * Make historical messages available to federated servers Part of MSC2716: https://github.com/matrix-org/matrix-doc/pull/2716 Follow-up to https://github.com/matrix-org/synapse/pull/9247 * Debug message not available on federation * Add base starting insertion point when no chunk ID is provided * Fix messages from multiple senders in historical chunk Follow-up to https://github.com/matrix-org/synapse/pull/9247 Part of MSC2716: https://github.com/matrix-org/matrix-doc/pull/2716 --- Previously, Synapse would throw a 403, `Cannot force another user to join.`, because we were trying to use `?user_id` from a single virtual user which did not match with messages from other users in the chunk. * Remove debug lines * Messing with selecting insertion event extremeties * Move db schema change to new version * Add more better comments * Make a fake requester with just what we need See https://github.com/matrix-org/synapse/pull/10276#discussion_r660999080 * Store insertion events in table * Make base insertion event float off on its own See https://github.com/matrix-org/synapse/pull/10250#issuecomment-875711889 Conflicts: synapse/rest/client/v1/room.py * Validate that the app service can actually control the given user See https://github.com/matrix-org/synapse/pull/10276#issuecomment-876316455 Conflicts: synapse/rest/client/v1/room.py * Add some better comments on what we're trying to check for * Continue debugging * Share validation logic * Add inserted historical messages to /backfill response * Remove debug sql queries * Some marker event implemntation trials * Clean up PR * Rename insertion_event_id to just event_id * Add some better sql comments * More accurate description * Add changelog * Make it clear what MSC the change is part of * Add more detail on which insertion event came through * Address review and improve sql queries * Only use event_id as unique constraint * Fix test case where insertion event is already in the normal DAG * Remove debug changes * Switch to chunk events so we can auth via power_levels Previously, we were using `content.chunk_id` to connect one chunk to another. But these events can be from any `sender` and we can't tell who should be able to send historical events. We know we only want the application service to do it but these events have the sender of a real historical message, not the application service user ID as the sender. Other federated homeservers also have no indicator which senders are an application service on the originating homeserver. So we want to auth all of the MSC2716 events via power_levels and have them be sent by the application service with proper PL levels in the room. * Switch to chunk events for federation * Add unstable room version to support new historical PL * Fix federated events being rejected for no state_groups Add fix from https://github.com/matrix-org/synapse/pull/10439 until it merges. * Only connect base insertion event to prev_event_ids Per discussion with @erikjohnston, https://matrix.to/#/!UytJQHLQYfvYWsGrGY:jki.re/$12bTUiObDFdHLAYtT7E-BvYRp3k_xv8w0dUQHibasJk?via=jki.re&via=matrix.org * Make it possible to get the room_version with txn * Allow but ignore historical events in unsupported room version See https://github.com/matrix-org/synapse/pull/10245#discussion_r675592489 We can't reject historical events on unsupported room versions because homeservers without knowledge of MSC2716 or the new room version don't reject historical events either. Since we can't rely on the auth check here to stop historical events on unsupported room versions, I've added some additional checks in the processing/persisting code (`synapse/storage/databases/main/events.py` -> `_handle_insertion_event` and `_handle_chunk_event`). I've had to do some refactoring so there is method to fetch the room version by `txn`. * Move to unique index syntax See https://github.com/matrix-org/synapse/pull/10245#discussion_r675638509 * High-level document how the insertion->chunk lookup works * Remove create_event fallback for room_versions See https://github.com/matrix-org/synapse/pull/10245/files#r677641879 * Use updated method name --- changelog.d/10245.feature | 1 + synapse/api/constants.py | 3 - synapse/api/room_versions.py | 27 ++++++ synapse/event_auth.py | 38 ++++++++ synapse/events/utils.py | 3 + synapse/handlers/federation.py | 6 +- synapse/handlers/room.py | 1 + synapse/rest/client/v1/room.py | 7 +- .../databases/main/event_federation.py | 90 ++++++++++++++++-- synapse/storage/databases/main/events.py | 91 +++++++++++++++++++ synapse/storage/databases/main/state.py | 52 ++++++++--- .../delta/61/01insertion_event_lookups.sql | 49 ++++++++++ 12 files changed, 340 insertions(+), 28 deletions(-) create mode 100644 changelog.d/10245.feature create mode 100644 synapse/storage/schema/main/delta/61/01insertion_event_lookups.sql diff --git a/changelog.d/10245.feature b/changelog.d/10245.feature new file mode 100644 index 000000000..b3c48cc2c --- /dev/null +++ b/changelog.d/10245.feature @@ -0,0 +1 @@ +Make historical events discoverable from backfill for servers without any scrollback history (part of MSC2716). diff --git a/synapse/api/constants.py b/synapse/api/constants.py index a40d6d796..a986fdb47 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -206,9 +206,6 @@ class EventContentFields: MSC2716_CHUNK_ID = "org.matrix.msc2716.chunk_id" # For "marker" events MSC2716_MARKER_INSERTION = "org.matrix.msc2716.marker.insertion" - MSC2716_MARKER_INSERTION_PREV_EVENTS = ( - "org.matrix.msc2716.marker.insertion_prev_events" - ) class RoomTypes: diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py index 697319e52..bc678efe4 100644 --- a/synapse/api/room_versions.py +++ b/synapse/api/room_versions.py @@ -73,6 +73,9 @@ class RoomVersion: # MSC2403: Allows join_rules to be set to 'knock', changes auth rules to allow sending # m.room.membership event with membership 'knock'. msc2403_knocking = attr.ib(type=bool) + # MSC2716: Adds m.room.power_levels -> content.historical field to control + # whether "insertion", "chunk", "marker" events can be sent + msc2716_historical = attr.ib(type=bool) class RoomVersions: @@ -88,6 +91,7 @@ class RoomVersions: msc2176_redaction_rules=False, msc3083_join_rules=False, msc2403_knocking=False, + msc2716_historical=False, ) V2 = RoomVersion( "2", @@ -101,6 +105,7 @@ class RoomVersions: msc2176_redaction_rules=False, msc3083_join_rules=False, msc2403_knocking=False, + msc2716_historical=False, ) V3 = RoomVersion( "3", @@ -114,6 +119,7 @@ class RoomVersions: msc2176_redaction_rules=False, msc3083_join_rules=False, msc2403_knocking=False, + msc2716_historical=False, ) V4 = RoomVersion( "4", @@ -127,6 +133,7 @@ class RoomVersions: msc2176_redaction_rules=False, msc3083_join_rules=False, msc2403_knocking=False, + msc2716_historical=False, ) V5 = RoomVersion( "5", @@ -140,6 +147,7 @@ class RoomVersions: msc2176_redaction_rules=False, msc3083_join_rules=False, msc2403_knocking=False, + msc2716_historical=False, ) V6 = RoomVersion( "6", @@ -153,6 +161,7 @@ class RoomVersions: msc2176_redaction_rules=False, msc3083_join_rules=False, msc2403_knocking=False, + msc2716_historical=False, ) MSC2176 = RoomVersion( "org.matrix.msc2176", @@ -166,6 +175,7 @@ class RoomVersions: msc2176_redaction_rules=True, msc3083_join_rules=False, msc2403_knocking=False, + msc2716_historical=False, ) MSC3083 = RoomVersion( "org.matrix.msc3083.v2", @@ -179,6 +189,7 @@ class RoomVersions: msc2176_redaction_rules=False, msc3083_join_rules=True, msc2403_knocking=False, + msc2716_historical=False, ) V7 = RoomVersion( "7", @@ -192,6 +203,21 @@ class RoomVersions: msc2176_redaction_rules=False, msc3083_join_rules=False, msc2403_knocking=True, + msc2716_historical=False, + ) + MSC2716 = RoomVersion( + "org.matrix.msc2716", + RoomDisposition.STABLE, + EventFormatVersions.V3, + StateResolutionVersions.V2, + enforce_key_validity=True, + special_case_aliases_auth=False, + strict_canonicaljson=True, + limit_notifications_power_levels=True, + msc2176_redaction_rules=False, + msc3083_join_rules=False, + msc2403_knocking=True, + msc2716_historical=True, ) @@ -207,6 +233,7 @@ KNOWN_ROOM_VERSIONS: Dict[str, RoomVersion] = { RoomVersions.MSC2176, RoomVersions.MSC3083, RoomVersions.V7, + RoomVersions.MSC2716, ) } diff --git a/synapse/event_auth.py b/synapse/event_auth.py index cc92d3547..0fa7ffc99 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -205,6 +205,13 @@ def check( if event.type == EventTypes.Redaction: check_redaction(room_version_obj, event, auth_events) + if ( + event.type == EventTypes.MSC2716_INSERTION + or event.type == EventTypes.MSC2716_CHUNK + or event.type == EventTypes.MSC2716_MARKER + ): + check_historical(room_version_obj, event, auth_events) + logger.debug("Allowing! %s", event) @@ -539,6 +546,37 @@ def check_redaction( raise AuthError(403, "You don't have permission to redact events") +def check_historical( + room_version_obj: RoomVersion, + event: EventBase, + auth_events: StateMap[EventBase], +) -> None: + """Check whether the event sender is allowed to send historical related + events like "insertion", "chunk", and "marker". + + Returns: + None + + Raises: + AuthError if the event sender is not allowed to send historical related events + ("insertion", "chunk", and "marker"). + """ + # Ignore the auth checks in room versions that do not support historical + # events + if not room_version_obj.msc2716_historical: + return + + user_level = get_user_power_level(event.user_id, auth_events) + + historical_level = get_named_level(auth_events, "historical", 100) + + if user_level < historical_level: + raise AuthError( + 403, + 'You don\'t have permission to send send historical related events ("insertion", "chunk", and "marker")', + ) + + def _check_power_levels( room_version_obj: RoomVersion, event: EventBase, diff --git a/synapse/events/utils.py b/synapse/events/utils.py index f4da9e092..a0c07f62f 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -126,6 +126,9 @@ def prune_event_dict(room_version: RoomVersion, event_dict: dict) -> dict: if room_version.msc2176_redaction_rules: add_fields("invite") + if room_version.msc2716_historical: + add_fields("historical") + elif event_type == EventTypes.Aliases and room_version.special_case_aliases_auth: add_fields("aliases") elif event_type == EventTypes.RoomHistoryVisibility: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index aba095d2e..8197b60b7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2748,9 +2748,11 @@ class FederationHandler(BaseHandler): event.event_id, e.event_id, ) - context = await self.state_handler.compute_event_context(e) + missing_auth_event_context = ( + await self.state_handler.compute_event_context(e) + ) await self._auth_and_persist_event( - origin, e, context, auth_events=auth + origin, e, missing_auth_event_context, auth_events=auth ) if e.event_id in event_auth_events: diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 370561e54..b33fe09f7 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -951,6 +951,7 @@ class RoomCreationHandler(BaseHandler): "kick": 50, "redact": 50, "invite": 50, + "historical": 100, } if config["original_invitees_have_ops"]: diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 25ba52c62..502a91758 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -504,7 +504,6 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet): events_to_create = body["events"] - prev_event_ids = prev_events_from_query inherited_depth = await self._inherit_depth_from_prev_ids( prev_events_from_query ) @@ -516,6 +515,10 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet): chunk_id_to_connect_to = chunk_id_from_query base_insertion_event = None if chunk_id_from_query: + # All but the first base insertion event should point at a fake + # event, which causes the HS to ask for the state at the start of + # the chunk later. + prev_event_ids = [fake_prev_event_id] # TODO: Verify the chunk_id_from_query corresponds to an insertion event pass # Otherwise, create an insertion event to act as a starting point. @@ -526,6 +529,8 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet): # an insertion event), in which case we just create a new insertion event # that can then get pointed to by a "marker" event later. else: + prev_event_ids = prev_events_from_query + base_insertion_event_dict = self._create_insertion_event_dict( sender=requester.user.to_string(), room_id=room_id, diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index f4a00b073..547e43ab9 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -936,15 +936,46 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas # We want to make sure that we do a breadth-first, "depth" ordered # search. - query = ( - "SELECT depth, prev_event_id FROM event_edges" - " INNER JOIN events" - " ON prev_event_id = events.event_id" - " WHERE event_edges.event_id = ?" - " AND event_edges.is_state = ?" - " LIMIT ?" - ) + # Look for the prev_event_id connected to the given event_id + query = """ + SELECT depth, prev_event_id FROM event_edges + /* Get the depth of the prev_event_id from the events table */ + INNER JOIN events + ON prev_event_id = events.event_id + /* Find an event which matches the given event_id */ + WHERE event_edges.event_id = ? + AND event_edges.is_state = ? + LIMIT ? + """ + # Look for the "insertion" events connected to the given event_id + connected_insertion_event_query = """ + SELECT e.depth, i.event_id FROM insertion_event_edges AS i + /* Get the depth of the insertion event from the events table */ + INNER JOIN events AS e USING (event_id) + /* Find an insertion event which points via prev_events to the given event_id */ + WHERE i.insertion_prev_event_id = ? + LIMIT ? + """ + + # Find any chunk connections of a given insertion event + chunk_connection_query = """ + SELECT e.depth, c.event_id FROM insertion_events AS i + /* Find the chunk that connects to the given insertion event */ + INNER JOIN chunk_events AS c + ON i.next_chunk_id = c.chunk_id + /* Get the depth of the chunk start event from the events table */ + INNER JOIN events AS e USING (event_id) + /* Find an insertion event which matches the given event_id */ + WHERE i.event_id = ? + LIMIT ? + """ + + # In a PriorityQueue, the lowest valued entries are retrieved first. + # We're using depth as the priority in the queue. + # Depth is lowest at the oldest-in-time message and highest and + # newest-in-time message. We add events to the queue with a negative depth so that + # we process the newest-in-time messages first going backwards in time. queue = PriorityQueue() for event_id in event_list: @@ -970,9 +1001,48 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas event_results.add(event_id) - txn.execute(query, (event_id, False, limit - len(event_results))) + # Try and find any potential historical chunks of message history. + # + # First we look for an insertion event connected to the current + # event (by prev_event). If we find any, we need to go and try to + # find any chunk events connected to the insertion event (by + # chunk_id). If we find any, we'll add them to the queue and + # navigate up the DAG like normal in the next iteration of the loop. + txn.execute( + connected_insertion_event_query, (event_id, limit - len(event_results)) + ) + connected_insertion_event_id_results = txn.fetchall() + logger.debug( + "_get_backfill_events: connected_insertion_event_query %s", + connected_insertion_event_id_results, + ) + for row in connected_insertion_event_id_results: + connected_insertion_event_depth = row[0] + connected_insertion_event = row[1] + queue.put((-connected_insertion_event_depth, connected_insertion_event)) - for row in txn: + # Find any chunk connections for the given insertion event + txn.execute( + chunk_connection_query, + (connected_insertion_event, limit - len(event_results)), + ) + chunk_start_event_id_results = txn.fetchall() + logger.debug( + "_get_backfill_events: chunk_start_event_id_results %s", + chunk_start_event_id_results, + ) + for row in chunk_start_event_id_results: + if row[1] not in event_results: + queue.put((-row[0], row[1])) + + # Navigate up the DAG by prev_event + txn.execute(query, (event_id, False, limit - len(event_results))) + prev_event_id_results = txn.fetchall() + logger.debug( + "_get_backfill_events: prev_event_ids %s", prev_event_id_results + ) + + for row in prev_event_id_results: if row[1] not in event_results: queue.put((-row[0], row[1])) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index a396a201d..86baf397f 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1502,6 +1502,9 @@ class PersistEventsStore: self._handle_event_relations(txn, event) + self._handle_insertion_event(txn, event) + self._handle_chunk_event(txn, event) + # Store the labels for this event. labels = event.content.get(EventContentFields.LABELS) if labels: @@ -1754,6 +1757,94 @@ class PersistEventsStore: if rel_type == RelationTypes.REPLACE: txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,)) + def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase): + """Handles keeping track of insertion events and edges/connections. + Part of MSC2716. + + Args: + txn: The database transaction object + event: The event to process + """ + + if event.type != EventTypes.MSC2716_INSERTION: + # Not a insertion event + return + + # Skip processing a insertion event if the room version doesn't + # support it. + room_version = self.store.get_room_version_txn(txn, event.room_id) + if not room_version.msc2716_historical: + return + + next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID) + if next_chunk_id is None: + # Invalid insertion event without next chunk ID + return + + logger.debug( + "_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event + ) + + # Keep track of the insertion event and the chunk ID + self.db_pool.simple_insert_txn( + txn, + table="insertion_events", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "next_chunk_id": next_chunk_id, + }, + ) + + # Insert an edge for every prev_event connection + for prev_event_id in event.prev_events: + self.db_pool.simple_insert_txn( + txn, + table="insertion_event_edges", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "insertion_prev_event_id": prev_event_id, + }, + ) + + def _handle_chunk_event(self, txn: LoggingTransaction, event: EventBase): + """Handles inserting the chunk edges/connections between the chunk event + and an insertion event. Part of MSC2716. + + Args: + txn: The database transaction object + event: The event to process + """ + + if event.type != EventTypes.MSC2716_CHUNK: + # Not a chunk event + return + + # Skip processing a chunk event if the room version doesn't + # support it. + room_version = self.store.get_room_version_txn(txn, event.room_id) + if not room_version.msc2716_historical: + return + + chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID) + if chunk_id is None: + # Invalid chunk event without a chunk ID + return + + logger.debug("_handle_chunk_event chunk_id=%s %s", chunk_id, event) + + # Keep track of the insertion event and the chunk ID + self.db_pool.simple_insert_txn( + txn, + table="chunk_events", + values={ + "event_id": event.event_id, + "room_id": event.room_id, + "chunk_id": chunk_id, + }, + ) + def _handle_redaction(self, txn, redacted_event_id): """Handles receiving a redaction and checking whether we need to remove any redacted relations from the database. diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index 1757064a6..8e22da99a 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -22,7 +22,7 @@ from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion from synapse.events import EventBase from synapse.storage._base import SQLBaseStore -from synapse.storage.database import DatabasePool +from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.roommember import RoomMemberWorkerStore from synapse.storage.state import StateFilter @@ -58,15 +58,32 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): async def get_room_version(self, room_id: str) -> RoomVersion: """Get the room_version of a given room - Raises: NotFoundError: if the room is unknown - UnsupportedRoomVersionError: if the room uses an unknown room version. Typically this happens if support for the room's version has been removed from Synapse. """ - room_version_id = await self.get_room_version_id(room_id) + return await self.db_pool.runInteraction( + "get_room_version_txn", + self.get_room_version_txn, + room_id, + ) + + def get_room_version_txn( + self, txn: LoggingTransaction, room_id: str + ) -> RoomVersion: + """Get the room_version of a given room + Args: + txn: Transaction object + room_id: The room_id of the room you are trying to get the version for + Raises: + NotFoundError: if the room is unknown + UnsupportedRoomVersionError: if the room uses an unknown room version. + Typically this happens if support for the room's version has been + removed from Synapse. + """ + room_version_id = self.get_room_version_id_txn(txn, room_id) v = KNOWN_ROOM_VERSIONS.get(room_version_id) if not v: @@ -80,7 +97,20 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): @cached(max_entries=10000) async def get_room_version_id(self, room_id: str) -> str: """Get the room_version of a given room + Raises: + NotFoundError: if the room is unknown + """ + return await self.db_pool.runInteraction( + "get_room_version_id_txn", + self.get_room_version_id_txn, + room_id, + ) + def get_room_version_id_txn(self, txn: LoggingTransaction, room_id: str) -> str: + """Get the room_version of a given room + Args: + txn: Transaction object + room_id: The room_id of the room you are trying to get the version for Raises: NotFoundError: if the room is unknown """ @@ -88,24 +118,22 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): # First we try looking up room version from the database, but for old # rooms we might not have added the room version to it yet so we fall # back to previous behaviour and look in current state events. - + # # We really should have an entry in the rooms table for every room we # care about, but let's be a bit paranoid (at least while the background # update is happening) to avoid breaking existing rooms. - version = await self.db_pool.simple_select_one_onecol( + room_version = self.db_pool.simple_select_one_onecol_txn( + txn, table="rooms", keyvalues={"room_id": room_id}, retcol="room_version", - desc="get_room_version", allow_none=True, ) - if version is not None: - return version + if room_version is None: + raise NotFoundError("Could not room_version for %s" % (room_id,)) - # Retrieve the room's create event - create_event = await self.get_create_event_for_room(room_id) - return create_event.content.get("room_version", "1") + return room_version async def get_room_predecessor(self, room_id: str) -> Optional[dict]: """Get the predecessor of an upgraded room if it exists. diff --git a/synapse/storage/schema/main/delta/61/01insertion_event_lookups.sql b/synapse/storage/schema/main/delta/61/01insertion_event_lookups.sql new file mode 100644 index 000000000..7d7bafc63 --- /dev/null +++ b/synapse/storage/schema/main/delta/61/01insertion_event_lookups.sql @@ -0,0 +1,49 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Add a table that keeps track of "insertion" events and +-- their next_chunk_id's so we can navigate to the next chunk of history. +CREATE TABLE IF NOT EXISTS insertion_events( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + next_chunk_id TEXT NOT NULL +); +CREATE UNIQUE INDEX IF NOT EXISTS insertion_events_event_id ON insertion_events(event_id); +CREATE INDEX IF NOT EXISTS insertion_events_next_chunk_id ON insertion_events(next_chunk_id); + +-- Add a table that keeps track of all of the events we are inserting between. +-- We use this when navigating the DAG and when we hit an event which matches +-- `insertion_prev_event_id`, it should backfill from the "insertion" event and +-- navigate the historical messages from there. +CREATE TABLE IF NOT EXISTS insertion_event_edges( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + insertion_prev_event_id TEXT NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS insertion_event_edges_event_id ON insertion_event_edges(event_id); +CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_room_id ON insertion_event_edges(room_id); +CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_prev_event_id ON insertion_event_edges(insertion_prev_event_id); + +-- Add a table that keeps track of how each chunk is labeled. The chunks are +-- connected together based on an insertion events `next_chunk_id`. +CREATE TABLE IF NOT EXISTS chunk_events( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + chunk_id TEXT NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS chunk_events_event_id ON chunk_events(event_id); +CREATE INDEX IF NOT EXISTS chunk_events_chunk_id ON chunk_events(chunk_id);