Clean up schema for event_edges (#12893)

* Remove redundant references to `event_edges.room_id`

We don't need to care about the room_id here, because we are already checking
the event id.

* Clean up the event_edges table

We make a number of changes to `event_edges`:

 * We give the `room_id` and `is_state` columns defaults (null and false
   respectively) so that we can stop populating them.
 * We drop any rows that have `is_state` set true - they should no longer
   exist.
 * We drop any rows that do not exist in `events` - these should not exist
   either.
 * We drop the old unique constraint on all the colums, which wasn't much use.
 * We create a new unique index on `(event_id, prev_event_id)`.
 * We add a foreign key constraint to `events`.

These happen rather differently depending on whether we are on Postgres or
SQLite. For SQLite, we just rebuild the whole table, copying only the rows we
want to keep. For Postgres, we try to do things in the background as much as
possible.

* Stop populating `event_edges.room_id` and `is_state`

We can just rely on the defaults.
This commit is contained in:
Richard van der Hoff 2022-06-15 12:29:42 +01:00 committed by GitHub
parent a4ae1406d1
commit 75fb10ee45
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 216 additions and 11 deletions

View file

@ -1,4 +1,4 @@
# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
# Copyright 2019-2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -64,6 +64,9 @@ class _BackgroundUpdates:
INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts"
REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"
EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows"
EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index"
@attr.s(slots=True, frozen=True, auto_attribs=True)
class _CalculateChainCover:
@ -235,6 +238,21 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
################################################################################
self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS,
self._background_drop_invalid_event_edges_rows,
)
self.db_pool.updates.register_background_index_update(
_BackgroundUpdates.EVENT_EDGES_REPLACE_INDEX,
index_name="event_edges_event_id_prev_event_id_idx",
table="event_edges",
columns=["event_id", "prev_event_id"],
unique=True,
# the old index which just covered event_id is now redundant.
replaces_index="ev_edges_id",
)
async def _background_reindex_fields_sender(
self, progress: JsonDict, batch_size: int
) -> int:
@ -1285,3 +1303,99 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
)
return 0
async def _background_drop_invalid_event_edges_rows(
self, progress: JsonDict, batch_size: int
) -> int:
"""Drop invalid rows from event_edges
This only runs for postgres. For SQLite, it all happens synchronously.
Firstly, drop any rows with is_state=True. These may have been added a long time
ago, but they are no longer used.
We also drop rows that do not correspond to entries in `events`, and add a
foreign key.
"""
last_event_id = progress.get("last_event_id", "")
def drop_invalid_event_edges_txn(txn: LoggingTransaction) -> bool:
"""Returns True if we're done."""
# first we need to find an endpoint.
txn.execute(
"""
SELECT event_id FROM event_edges
WHERE event_id > ?
ORDER BY event_id
LIMIT 1 OFFSET ?
""",
(last_event_id, batch_size),
)
endpoint = None
row = txn.fetchone()
if row:
endpoint = row[0]
where_clause = "ee.event_id > ?"
args = [last_event_id]
if endpoint:
where_clause += " AND ee.event_id <= ?"
args.append(endpoint)
# now delete any that:
# - have is_state=TRUE, or
# - do not correspond to a row in `events`
txn.execute(
f"""
DELETE FROM event_edges
WHERE event_id IN (
SELECT ee.event_id
FROM event_edges ee
LEFT JOIN events ev USING (event_id)
WHERE ({where_clause}) AND
(is_state OR ev.event_id IS NULL)
)""",
args,
)
logger.info(
"cleaned up event_edges up to %s: removed %i/%i rows",
endpoint,
txn.rowcount,
batch_size,
)
if endpoint is not None:
self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS,
{"last_event_id": endpoint},
)
return False
# if that was the final batch, we validate the foreign key.
#
# The constraint should have been in place and enforced for new rows since
# before we started deleting invalid rows, so there's no chance for any
# invalid rows to have snuck in the meantime. In other words, this really
# ought to succeed.
logger.info("cleaned up event_edges; enabling foreign key")
txn.execute(
"ALTER TABLE event_edges VALIDATE CONSTRAINT event_edges_event_id_fkey"
)
return True
done = await self.db_pool.runInteraction(
desc="drop_invalid_event_edges", func=drop_invalid_event_edges_txn
)
if done:
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS
)
return batch_size