Merge branch 'develop' into babolivier/message_retention

This commit is contained in:
Brendan Abolivier 2019-11-26 17:53:57 +00:00
commit 9e937c28ee
138 changed files with 2594 additions and 1268 deletions

View file

@ -713,9 +713,7 @@ class EventsStore(
metadata_json = encode_json(event.internal_metadata.get_dict())
sql = (
"UPDATE event_json SET internal_metadata = ?" " WHERE event_id = ?"
)
sql = "UPDATE event_json SET internal_metadata = ? WHERE event_id = ?"
txn.execute(sql, (metadata_json, event.event_id))
# Add an entry to the ex_outlier_stream table to replicate the
@ -732,7 +730,7 @@ class EventsStore(
},
)
sql = "UPDATE events SET outlier = ?" " WHERE event_id = ?"
sql = "UPDATE events SET outlier = ? WHERE event_id = ?"
txn.execute(sql, (False, event.event_id))
# Update the event_backward_extremities table now that this
@ -1378,6 +1376,10 @@ class EventsStore(
if True, we will delete local events as well as remote ones
(instead of just marking them as outliers and deleting their
state groups).
Returns:
Deferred[set[int]]: The set of state groups that are referenced by
deleted events.
"""
return self.runInteraction(
@ -1478,7 +1480,7 @@ class EventsStore(
# We do joins against events_to_purge for e.g. calculating state
# groups to purge, etc., so lets make an index.
txn.execute("CREATE INDEX events_to_purge_id" " ON events_to_purge(event_id)")
txn.execute("CREATE INDEX events_to_purge_id ON events_to_purge(event_id)")
txn.execute("SELECT event_id, should_delete FROM events_to_purge")
event_rows = txn.fetchall()
@ -1514,11 +1516,10 @@ class EventsStore(
[(room_id, event_id) for event_id, in new_backwards_extrems],
)
logger.info("[purge] finding redundant state groups")
logger.info("[purge] finding state groups referenced by deleted events")
# Get all state groups that are referenced by events that are to be
# deleted. We then go and check if they are referenced by other events
# or state groups, and if not we delete them.
# deleted.
txn.execute(
"""
SELECT DISTINCT state_group FROM events_to_purge
@ -1531,60 +1532,6 @@ class EventsStore(
"[purge] found %i referenced state groups", len(referenced_state_groups)
)
logger.info("[purge] finding state groups that can be deleted")
_ = self._find_unreferenced_groups_during_purge(txn, referenced_state_groups)
state_groups_to_delete, remaining_state_groups = _
logger.info(
"[purge] found %i state groups to delete", len(state_groups_to_delete)
)
logger.info(
"[purge] de-delta-ing %i remaining state groups",
len(remaining_state_groups),
)
# Now we turn the state groups that reference to-be-deleted state
# groups to non delta versions.
for sg in remaining_state_groups:
logger.info("[purge] de-delta-ing remaining state group %s", sg)
curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
curr_state = curr_state[sg]
self._simple_delete_txn(
txn, table="state_groups_state", keyvalues={"state_group": sg}
)
self._simple_delete_txn(
txn, table="state_group_edges", keyvalues={"state_group": sg}
)
self._simple_insert_many_txn(
txn,
table="state_groups_state",
values=[
{
"state_group": sg,
"room_id": room_id,
"type": key[0],
"state_key": key[1],
"event_id": state_id,
}
for key, state_id in iteritems(curr_state)
],
)
logger.info("[purge] removing redundant state groups")
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
((sg,) for sg in state_groups_to_delete),
)
txn.executemany(
"DELETE FROM state_groups WHERE id = ?",
((sg,) for sg in state_groups_to_delete),
)
logger.info("[purge] removing events from event_to_state_groups")
txn.execute(
"DELETE FROM event_to_state_groups "
@ -1671,138 +1618,35 @@ class EventsStore(
logger.info("[purge] done")
def _find_unreferenced_groups_during_purge(self, txn, state_groups):
"""Used when purging history to figure out which state groups can be
deleted and which need to be de-delta'ed (due to one of its prev groups
being scheduled for deletion).
Args:
txn
state_groups (set[int]): Set of state groups referenced by events
that are going to be deleted.
Returns:
tuple[set[int], set[int]]: The set of state groups that can be
deleted and the set of state groups that need to be de-delta'ed
"""
# Graph of state group -> previous group
graph = {}
# Set of events that we have found to be referenced by events
referenced_groups = set()
# Set of state groups we've already seen
state_groups_seen = set(state_groups)
# Set of state groups to handle next.
next_to_search = set(state_groups)
while next_to_search:
# We bound size of groups we're looking up at once, to stop the
# SQL query getting too big
if len(next_to_search) < 100:
current_search = next_to_search
next_to_search = set()
else:
current_search = set(itertools.islice(next_to_search, 100))
next_to_search -= current_search
# Check if state groups are referenced
sql = """
SELECT DISTINCT state_group FROM event_to_state_groups
LEFT JOIN events_to_purge AS ep USING (event_id)
WHERE ep.event_id IS NULL AND
"""
clause, args = make_in_list_sql_clause(
txn.database_engine, "state_group", current_search
)
txn.execute(sql + clause, list(args))
referenced = set(sg for sg, in txn)
referenced_groups |= referenced
# We don't continue iterating up the state group graphs for state
# groups that are referenced.
current_search -= referenced
rows = self._simple_select_many_txn(
txn,
table="state_group_edges",
column="prev_state_group",
iterable=current_search,
keyvalues={},
retcols=("prev_state_group", "state_group"),
)
prevs = set(row["state_group"] for row in rows)
# We don't bother re-handling groups we've already seen
prevs -= state_groups_seen
next_to_search |= prevs
state_groups_seen |= prevs
for row in rows:
# Note: Each state group can have at most one prev group
graph[row["state_group"]] = row["prev_state_group"]
to_delete = state_groups_seen - referenced_groups
to_dedelta = set()
for sg in referenced_groups:
prev_sg = graph.get(sg)
if prev_sg and prev_sg in to_delete:
to_dedelta.add(sg)
return to_delete, to_dedelta
return referenced_state_groups
def purge_room(self, room_id):
"""Deletes all record of a room
Args:
room_id (str):
room_id (str)
Returns:
Deferred[List[int]]: The list of state groups to delete.
"""
return self.runInteraction("purge_room", self._purge_room_txn, room_id)
def _purge_room_txn(self, txn, room_id):
# first we have to delete the state groups states
logger.info("[purge] removing %s from state_groups_state", room_id)
# First we fetch all the state groups that should be deleted, before
# we delete that information.
txn.execute(
"""
DELETE FROM state_groups_state WHERE state_group IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
SELECT DISTINCT state_group FROM events
INNER JOIN event_to_state_groups USING(event_id)
WHERE events.room_id = ?
""",
(room_id,),
)
# ... and the state group edges
logger.info("[purge] removing %s from state_group_edges", room_id)
state_groups = [row[0] for row in txn]
txn.execute(
"""
DELETE FROM state_group_edges WHERE state_group IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)
# ... and the state groups
logger.info("[purge] removing %s from state_groups", room_id)
txn.execute(
"""
DELETE FROM state_groups WHERE id IN (
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
WHERE events.room_id=?
)
""",
(room_id,),
)
# and then tables which lack an index on room_id but have one on event_id
# Now we delete tables which lack an index on room_id but have one on event_id
for table in (
"event_auth",
"event_edges",
@ -1890,6 +1734,165 @@ class EventsStore(
logger.info("[purge] done")
return state_groups
def purge_unreferenced_state_groups(
self, room_id: str, state_groups_to_delete
) -> defer.Deferred:
"""Deletes no longer referenced state groups and de-deltas any state
groups that reference them.
Args:
room_id: The room the state groups belong to (must all be in the
same room).
state_groups_to_delete (Collection[int]): Set of all state groups
to delete.
"""
return self.runInteraction(
"purge_unreferenced_state_groups",
self._purge_unreferenced_state_groups,
room_id,
state_groups_to_delete,
)
def _purge_unreferenced_state_groups(self, txn, room_id, state_groups_to_delete):
logger.info(
"[purge] found %i state groups to delete", len(state_groups_to_delete)
)
rows = self._simple_select_many_txn(
txn,
table="state_group_edges",
column="prev_state_group",
iterable=state_groups_to_delete,
keyvalues={},
retcols=("state_group",),
)
remaining_state_groups = set(
row["state_group"]
for row in rows
if row["state_group"] not in state_groups_to_delete
)
logger.info(
"[purge] de-delta-ing %i remaining state groups",
len(remaining_state_groups),
)
# Now we turn the state groups that reference to-be-deleted state
# groups to non delta versions.
for sg in remaining_state_groups:
logger.info("[purge] de-delta-ing remaining state group %s", sg)
curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
curr_state = curr_state[sg]
self._simple_delete_txn(
txn, table="state_groups_state", keyvalues={"state_group": sg}
)
self._simple_delete_txn(
txn, table="state_group_edges", keyvalues={"state_group": sg}
)
self._simple_insert_many_txn(
txn,
table="state_groups_state",
values=[
{
"state_group": sg,
"room_id": room_id,
"type": key[0],
"state_key": key[1],
"event_id": state_id,
}
for key, state_id in iteritems(curr_state)
],
)
logger.info("[purge] removing redundant state groups")
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
((sg,) for sg in state_groups_to_delete),
)
txn.executemany(
"DELETE FROM state_groups WHERE id = ?",
((sg,) for sg in state_groups_to_delete),
)
@defer.inlineCallbacks
def get_previous_state_groups(self, state_groups):
"""Fetch the previous groups of the given state groups.
Args:
state_groups (Iterable[int])
Returns:
Deferred[dict[int, int]]: mapping from state group to previous
state group.
"""
rows = yield self._simple_select_many_batch(
table="state_group_edges",
column="prev_state_group",
iterable=state_groups,
keyvalues={},
retcols=("prev_state_group", "state_group"),
desc="get_previous_state_groups",
)
return {row["state_group"]: row["prev_state_group"] for row in rows}
def purge_room_state(self, room_id, state_groups_to_delete):
"""Deletes all record of a room from state tables
Args:
room_id (str):
state_groups_to_delete (list[int]): State groups to delete
"""
return self.runInteraction(
"purge_room_state",
self._purge_room_state_txn,
room_id,
state_groups_to_delete,
)
def _purge_room_state_txn(self, txn, room_id, state_groups_to_delete):
# first we have to delete the state groups states
logger.info("[purge] removing %s from state_groups_state", room_id)
self._simple_delete_many_txn(
txn,
table="state_groups_state",
column="state_group",
iterable=state_groups_to_delete,
keyvalues={},
)
# ... and the state group edges
logger.info("[purge] removing %s from state_group_edges", room_id)
self._simple_delete_many_txn(
txn,
table="state_group_edges",
column="state_group",
iterable=state_groups_to_delete,
keyvalues={},
)
# ... and the state groups
logger.info("[purge] removing %s from state_groups", room_id)
self._simple_delete_many_txn(
txn,
table="state_groups",
column="id",
iterable=state_groups_to_delete,
keyvalues={},
)
async def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream
"""