Merge pull request #6295 from matrix-org/erikj/split_purge_history

Split purge API into events vs state and add PurgeEventsStorage
This commit is contained in:
Erik Johnston 2019-11-08 10:19:15 +00:00 committed by GitHub
commit f713c01e2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 335 additions and 182 deletions

1
changelog.d/6295.misc Normal file
View File

@ -0,0 +1 @@
Split out state storage into separate data store.

View File

@ -127,7 +127,9 @@ class PaginationHandler(object):
self._purges_in_progress_by_room.add(room_id) self._purges_in_progress_by_room.add(room_id)
try: try:
with (yield self.pagination_lock.write(room_id)): with (yield self.pagination_lock.write(room_id)):
yield self.store.purge_history(room_id, token, delete_local_events) yield self.storage.purge_events.purge_history(
room_id, token, delete_local_events
)
logger.info("[purge] complete") logger.info("[purge] complete")
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
except Exception: except Exception:
@ -170,7 +172,7 @@ class PaginationHandler(object):
if joined: if joined:
raise SynapseError(400, "Users are still joined to this room") raise SynapseError(400, "Users are still joined to this room")
await self.store.purge_room(room_id) await self.storage.purge_events.purge_room(room_id)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_messages( def get_messages(

View File

@ -30,6 +30,7 @@ stored in `synapse.storage.schema`.
from synapse.storage.data_stores import DataStores from synapse.storage.data_stores import DataStores
from synapse.storage.data_stores.main import DataStore from synapse.storage.data_stores.main import DataStore
from synapse.storage.persist_events import EventsPersistenceStorage from synapse.storage.persist_events import EventsPersistenceStorage
from synapse.storage.purge_events import PurgeEventsStorage
from synapse.storage.state import StateGroupStorage from synapse.storage.state import StateGroupStorage
__all__ = ["DataStores", "DataStore"] __all__ = ["DataStores", "DataStore"]
@ -46,6 +47,7 @@ class Storage(object):
self.main = stores.main self.main = stores.main
self.persistence = EventsPersistenceStorage(hs, stores) self.persistence = EventsPersistenceStorage(hs, stores)
self.purge_events = PurgeEventsStorage(hs, stores)
self.state = StateGroupStorage(hs, stores) self.state = StateGroupStorage(hs, stores)

View File

@ -1375,6 +1375,10 @@ class EventsStore(
if True, we will delete local events as well as remote ones if True, we will delete local events as well as remote ones
(instead of just marking them as outliers and deleting their (instead of just marking them as outliers and deleting their
state groups). state groups).
Returns:
Deferred[set[int]]: The set of state groups that are referenced by
deleted events.
""" """
return self.runInteraction( return self.runInteraction(
@ -1511,11 +1515,10 @@ class EventsStore(
[(room_id, event_id) for event_id, in new_backwards_extrems], [(room_id, event_id) for event_id, in new_backwards_extrems],
) )
logger.info("[purge] finding redundant state groups") logger.info("[purge] finding state groups referenced by deleted events")
# Get all state groups that are referenced by events that are to be # Get all state groups that are referenced by events that are to be
# deleted. We then go and check if they are referenced by other events # deleted.
# or state groups, and if not we delete them.
txn.execute( txn.execute(
""" """
SELECT DISTINCT state_group FROM events_to_purge SELECT DISTINCT state_group FROM events_to_purge
@ -1528,60 +1531,6 @@ class EventsStore(
"[purge] found %i referenced state groups", len(referenced_state_groups) "[purge] found %i referenced state groups", len(referenced_state_groups)
) )
logger.info("[purge] finding state groups that can be deleted")
_ = self._find_unreferenced_groups_during_purge(txn, referenced_state_groups)
state_groups_to_delete, remaining_state_groups = _
logger.info(
"[purge] found %i state groups to delete", len(state_groups_to_delete)
)
logger.info(
"[purge] de-delta-ing %i remaining state groups",
len(remaining_state_groups),
)
# Now we turn the state groups that reference to-be-deleted state
# groups to non delta versions.
for sg in remaining_state_groups:
logger.info("[purge] de-delta-ing remaining state group %s", sg)
curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
curr_state = curr_state[sg]
self._simple_delete_txn(
txn, table="state_groups_state", keyvalues={"state_group": sg}
)
self._simple_delete_txn(
txn, table="state_group_edges", keyvalues={"state_group": sg}
)
self._simple_insert_many_txn(
txn,
table="state_groups_state",
values=[
{
"state_group": sg,
"room_id": room_id,
"type": key[0],
"state_key": key[1],
"event_id": state_id,
}
for key, state_id in iteritems(curr_state)
],
)
logger.info("[purge] removing redundant state groups")
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
((sg,) for sg in state_groups_to_delete),
)
txn.executemany(
"DELETE FROM state_groups WHERE id = ?",
((sg,) for sg in state_groups_to_delete),
)
logger.info("[purge] removing events from event_to_state_groups") logger.info("[purge] removing events from event_to_state_groups")
txn.execute( txn.execute(
"DELETE FROM event_to_state_groups " "DELETE FROM event_to_state_groups "
@ -1668,138 +1617,35 @@ class EventsStore(
logger.info("[purge] done") logger.info("[purge] done")
def _find_unreferenced_groups_during_purge(self, txn, state_groups): return referenced_state_groups
"""Used when purging history to figure out which state groups can be
deleted and which need to be de-delta'ed (due to one of its prev groups
being scheduled for deletion).
Args:
txn
state_groups (set[int]): Set of state groups referenced by events
that are going to be deleted.
Returns:
tuple[set[int], set[int]]: The set of state groups that can be
deleted and the set of state groups that need to be de-delta'ed
"""
# Graph of state group -> previous group
graph = {}
# Set of events that we have found to be referenced by events
referenced_groups = set()
# Set of state groups we've already seen
state_groups_seen = set(state_groups)
# Set of state groups to handle next.
next_to_search = set(state_groups)
while next_to_search:
# We bound size of groups we're looking up at once, to stop the
# SQL query getting too big
if len(next_to_search) < 100:
current_search = next_to_search
next_to_search = set()
else:
current_search = set(itertools.islice(next_to_search, 100))
next_to_search -= current_search
# Check if state groups are referenced
sql = """
SELECT DISTINCT state_group FROM event_to_state_groups
LEFT JOIN events_to_purge AS ep USING (event_id)
WHERE ep.event_id IS NULL AND
"""
clause, args = make_in_list_sql_clause(
txn.database_engine, "state_group", current_search
)
txn.execute(sql + clause, list(args))
referenced = set(sg for sg, in txn)
referenced_groups |= referenced
# We don't continue iterating up the state group graphs for state
# groups that are referenced.
current_search -= referenced
rows = self._simple_select_many_txn(
txn,
table="state_group_edges",
column="prev_state_group",
iterable=current_search,
keyvalues={},
retcols=("prev_state_group", "state_group"),
)
prevs = set(row["state_group"] for row in rows)
# We don't bother re-handling groups we've already seen
prevs -= state_groups_seen
next_to_search |= prevs
state_groups_seen |= prevs
for row in rows:
# Note: Each state group can have at most one prev group
graph[row["state_group"]] = row["prev_state_group"]
to_delete = state_groups_seen - referenced_groups
to_dedelta = set()
for sg in referenced_groups:
prev_sg = graph.get(sg)
if prev_sg and prev_sg in to_delete:
to_dedelta.add(sg)
return to_delete, to_dedelta
def purge_room(self, room_id): def purge_room(self, room_id):
"""Deletes all record of a room """Deletes all record of a room
Args: Args:
room_id (str): room_id (str)
Returns:
Deferred[List[int]]: The list of state groups to delete.
""" """
return self.runInteraction("purge_room", self._purge_room_txn, room_id) return self.runInteraction("purge_room", self._purge_room_txn, room_id)
def _purge_room_txn(self, txn, room_id): def _purge_room_txn(self, txn, room_id):
# first we have to delete the state groups states # First we fetch all the state groups that should be deleted, before
logger.info("[purge] removing %s from state_groups_state", room_id) # we delete that information.
txn.execute( txn.execute(
""" """
DELETE FROM state_groups_state WHERE state_group IN ( SELECT DISTINCT state_group FROM events
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id) INNER JOIN event_to_state_groups USING(event_id)
WHERE events.room_id = ? WHERE events.room_id = ?
)
""", """,
(room_id,), (room_id,),
) )
# ... and the state group edges state_groups = [row[0] for row in txn]
logger.info("[purge] removing %s from state_group_edges", room_id)
txn.execute( # Now we delete tables which lack an index on room_id but have one on event_id
"""
DELETE FROM state_group_edges WHERE state_group IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)
# ... and the state groups
logger.info("[purge] removing %s from state_groups", room_id)
txn.execute(
"""
DELETE FROM state_groups WHERE id IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)
# and then tables which lack an index on room_id but have one on event_id
for table in ( for table in (
"event_auth", "event_auth",
"event_edges", "event_edges",
@ -1887,6 +1733,165 @@ class EventsStore(
logger.info("[purge] done") logger.info("[purge] done")
return state_groups
def purge_unreferenced_state_groups(
self, room_id: str, state_groups_to_delete
) -> defer.Deferred:
"""Deletes no longer referenced state groups and de-deltas any state
groups that reference them.
Args:
room_id: The room the state groups belong to (must all be in the
same room).
state_groups_to_delete (Collection[int]): Set of all state groups
to delete.
"""
return self.runInteraction(
"purge_unreferenced_state_groups",
self._purge_unreferenced_state_groups,
room_id,
state_groups_to_delete,
)
def _purge_unreferenced_state_groups(self, txn, room_id, state_groups_to_delete):
logger.info(
"[purge] found %i state groups to delete", len(state_groups_to_delete)
)
rows = self._simple_select_many_txn(
txn,
table="state_group_edges",
column="prev_state_group",
iterable=state_groups_to_delete,
keyvalues={},
retcols=("state_group",),
)
remaining_state_groups = set(
row["state_group"]
for row in rows
if row["state_group"] not in state_groups_to_delete
)
logger.info(
"[purge] de-delta-ing %i remaining state groups",
len(remaining_state_groups),
)
# Now we turn the state groups that reference to-be-deleted state
# groups to non delta versions.
for sg in remaining_state_groups:
logger.info("[purge] de-delta-ing remaining state group %s", sg)
curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
curr_state = curr_state[sg]
self._simple_delete_txn(
txn, table="state_groups_state", keyvalues={"state_group": sg}
)
self._simple_delete_txn(
txn, table="state_group_edges", keyvalues={"state_group": sg}
)
self._simple_insert_many_txn(
txn,
table="state_groups_state",
values=[
{
"state_group": sg,
"room_id": room_id,
"type": key[0],
"state_key": key[1],
"event_id": state_id,
}
for key, state_id in iteritems(curr_state)
],
)
logger.info("[purge] removing redundant state groups")
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
((sg,) for sg in state_groups_to_delete),
)
txn.executemany(
"DELETE FROM state_groups WHERE id = ?",
((sg,) for sg in state_groups_to_delete),
)
@defer.inlineCallbacks
def get_previous_state_groups(self, state_groups):
"""Fetch the previous groups of the given state groups.
Args:
state_groups (Iterable[int])
Returns:
Deferred[dict[int, int]]: mapping from state group to previous
state group.
"""
rows = yield self._simple_select_many_batch(
table="state_group_edges",
column="prev_state_group",
iterable=state_groups,
keyvalues={},
retcols=("prev_state_group", "state_group"),
desc="get_previous_state_groups",
)
return {row["state_group"]: row["prev_state_group"] for row in rows}
def purge_room_state(self, room_id, state_groups_to_delete):
"""Deletes all record of a room from state tables
Args:
room_id (str):
state_groups_to_delete (list[int]): State groups to delete
"""
return self.runInteraction(
"purge_room_state",
self._purge_room_state_txn,
room_id,
state_groups_to_delete,
)
def _purge_room_state_txn(self, txn, room_id, state_groups_to_delete):
# first we have to delete the state groups states
logger.info("[purge] removing %s from state_groups_state", room_id)
self._simple_delete_many_txn(
txn,
table="state_groups_state",
column="state_group",
iterable=state_groups_to_delete,
keyvalues={},
)
# ... and the state group edges
logger.info("[purge] removing %s from state_group_edges", room_id)
self._simple_delete_many_txn(
txn,
table="state_group_edges",
column="state_group",
iterable=state_groups_to_delete,
keyvalues={},
)
# ... and the state groups
logger.info("[purge] removing %s from state_groups", room_id)
self._simple_delete_many_txn(
txn,
table="state_groups",
column="id",
iterable=state_groups_to_delete,
keyvalues={},
)
async def is_event_after(self, event_id1, event_id2): async def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream """Returns True if event_id1 is after event_id2 in the stream
""" """

View File

@ -995,6 +995,29 @@ class StateGroupWorkerStore(
return self.runInteraction("store_state_group", _store_state_group_txn) return self.runInteraction("store_state_group", _store_state_group_txn)
@defer.inlineCallbacks
def get_referenced_state_groups(self, state_groups):
"""Check if the state groups are referenced by events.
Args:
state_groups (Iterable[int])
Returns:
Deferred[set[int]]: The subset of state groups that are
referenced.
"""
rows = yield self._simple_select_many_batch(
table="event_to_state_groups",
column="state_group",
iterable=state_groups,
keyvalues={},
retcols=("DISTINCT state_group",),
desc="get_referenced_state_groups",
)
return set(row["state_group"] for row in rows)
class StateBackgroundUpdateStore( class StateBackgroundUpdateStore(
StateGroupBackgroundUpdateStore, BackgroundUpdateStore StateGroupBackgroundUpdateStore, BackgroundUpdateStore

View File

@ -0,0 +1,117 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import logging
from twisted.internet import defer
logger = logging.getLogger(__name__)
class PurgeEventsStorage(object):
"""High level interface for purging rooms and event history.
"""
def __init__(self, hs, stores):
self.stores = stores
@defer.inlineCallbacks
def purge_room(self, room_id: str):
"""Deletes all record of a room
"""
state_groups_to_delete = yield self.stores.main.purge_room(room_id)
yield self.stores.main.purge_room_state(room_id, state_groups_to_delete)
@defer.inlineCallbacks
def purge_history(self, room_id, token, delete_local_events):
"""Deletes room history before a certain point
Args:
room_id (str):
token (str): A topological token to delete events before
delete_local_events (bool):
if True, we will delete local events as well as remote ones
(instead of just marking them as outliers and deleting their
state groups).
"""
state_groups = yield self.stores.main.purge_history(
room_id, token, delete_local_events
)
logger.info("[purge] finding state groups that can be deleted")
sg_to_delete = yield self._find_unreferenced_groups(state_groups)
yield self.stores.main.purge_unreferenced_state_groups(room_id, sg_to_delete)
@defer.inlineCallbacks
def _find_unreferenced_groups(self, state_groups):
"""Used when purging history to figure out which state groups can be
deleted.
Args:
state_groups (set[int]): Set of state groups referenced by events
that are going to be deleted.
Returns:
Deferred[set[int]] The set of state groups that can be deleted.
"""
# Graph of state group -> previous group
graph = {}
# Set of events that we have found to be referenced by events
referenced_groups = set()
# Set of state groups we've already seen
state_groups_seen = set(state_groups)
# Set of state groups to handle next.
next_to_search = set(state_groups)
while next_to_search:
# We bound size of groups we're looking up at once, to stop the
# SQL query getting too big
if len(next_to_search) < 100:
current_search = next_to_search
next_to_search = set()
else:
current_search = set(itertools.islice(next_to_search, 100))
next_to_search -= current_search
referenced = yield self.stores.main.get_referenced_state_groups(
current_search
)
referenced_groups |= referenced
# We don't continue iterating up the state group graphs for state
# groups that are referenced.
current_search -= referenced
edges = yield self.stores.main.get_previous_state_groups(current_search)
prevs = set(edges.values())
# We don't bother re-handling groups we've already seen
prevs -= state_groups_seen
next_to_search |= prevs
state_groups_seen |= prevs
graph.update(edges)
to_delete = state_groups_seen - referenced_groups
return to_delete

View File

@ -628,10 +628,12 @@ class PurgeRoomTestCase(unittest.HomeserverTestCase):
"local_invites", "local_invites",
"room_account_data", "room_account_data",
"room_tags", "room_tags",
"state_groups",
"state_groups_state",
): ):
count = self.get_success( count = self.get_success(
self.store._simple_select_one_onecol( self.store._simple_select_one_onecol(
table="events", table=table,
keyvalues={"room_id": room_id}, keyvalues={"room_id": room_id},
retcol="COUNT(*)", retcol="COUNT(*)",
desc="test_purge_room", desc="test_purge_room",

View File

@ -40,23 +40,24 @@ class PurgeTests(HomeserverTestCase):
third = self.helper.send(self.room_id, body="test3") third = self.helper.send(self.room_id, body="test3")
last = self.helper.send(self.room_id, body="test4") last = self.helper.send(self.room_id, body="test4")
storage = self.hs.get_datastore() store = self.hs.get_datastore()
storage = self.hs.get_storage()
# Get the topological token # Get the topological token
event = storage.get_topological_token_for_event(last["event_id"]) event = store.get_topological_token_for_event(last["event_id"])
self.pump() self.pump()
event = self.successResultOf(event) event = self.successResultOf(event)
# Purge everything before this topological token # Purge everything before this topological token
purge = storage.purge_history(self.room_id, event, True) purge = storage.purge_events.purge_history(self.room_id, event, True)
self.pump() self.pump()
self.assertEqual(self.successResultOf(purge), None) self.assertEqual(self.successResultOf(purge), None)
# Try and get the events # Try and get the events
get_first = storage.get_event(first["event_id"]) get_first = store.get_event(first["event_id"])
get_second = storage.get_event(second["event_id"]) get_second = store.get_event(second["event_id"])
get_third = storage.get_event(third["event_id"]) get_third = store.get_event(third["event_id"])
get_last = storage.get_event(last["event_id"]) get_last = store.get_event(last["event_id"])
self.pump() self.pump()
# 1-3 should fail and last will succeed, meaning that 1-3 are deleted # 1-3 should fail and last will succeed, meaning that 1-3 are deleted