Add a background task to purge unused chain IDs. (#9542)

This is a companion change to apply the fix in #9498 /
922788c604 to previously
purged rooms.
This commit is contained in:
Patrick Cloke 2021-03-09 11:22:25 -05:00 committed by GitHub
parent e9df3f496b
commit dc51d8ffaf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 99 additions and 6 deletions

View file

@ -135,6 +135,11 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
self._chain_cover_index,
)
self.db_pool.updates.register_background_update_handler(
"purged_chain_cover",
self._purged_chain_cover_index,
)
async def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
@ -932,3 +937,77 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
processed_count=count,
finished_room_map=finished_rooms,
)
async def _purged_chain_cover_index(self, progress: dict, batch_size: int) -> int:
"""
A background updates that iterates over the chain cover and deletes the
chain cover for events that have been purged.
This may be due to fully purging a room or via setting a retention policy.
"""
current_event_id = progress.get("current_event_id", "")
def purged_chain_cover_txn(txn) -> int:
# The event ID from events will be null if the chain ID / sequence
# number points to a purged event.
sql = """
SELECT event_id, chain_id, sequence_number, e.event_id IS NOT NULL
FROM event_auth_chains
LEFT JOIN events AS e USING (event_id)
WHERE event_id > ? ORDER BY event_auth_chains.event_id ASC LIMIT ?
"""
txn.execute(sql, (current_event_id, batch_size))
rows = txn.fetchall()
if not rows:
return 0
# The event IDs and chain IDs / sequence numbers where the event has
# been purged.
unreferenced_event_ids = []
unreferenced_chain_id_tuples = []
event_id = ""
for event_id, chain_id, sequence_number, has_event in rows:
if not has_event:
unreferenced_event_ids.append(event_id)
unreferenced_chain_id_tuples.append((chain_id, sequence_number))
# Delete the unreferenced auth chains from event_auth_chain_links and
# event_auth_chains.
txn.executemany(
"""
DELETE FROM event_auth_chains WHERE event_id = ?
""",
unreferenced_event_ids,
)
# We should also delete matching target_*, but there is no index on
# target_chain_id. Hopefully any purged events are due to a room
# being fully purged and they will be removed from the origin_*
# searches.
txn.executemany(
"""
DELETE FROM event_auth_chain_links WHERE
origin_chain_id = ? AND origin_sequence_number = ?
""",
unreferenced_chain_id_tuples,
)
progress = {
"current_event_id": event_id,
}
self.db_pool.updates._background_update_progress_txn(
txn, "purged_chain_cover", progress
)
return len(rows)
result = await self.db_pool.runInteraction(
"_purged_chain_cover_index",
purged_chain_cover_txn,
)
if not result:
await self.db_pool.updates._end_background_update("purged_chain_cover")
return result