mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-07-27 03:05:25 -04:00
Split state groups into a separate data store (#6296)
This commit is contained in:
parent
fa780e9721
commit
75d8f26ac8
28 changed files with 1159 additions and 1168 deletions
|
@ -1757,163 +1757,6 @@ class EventsStore(
|
|||
|
||||
return state_groups
|
||||
|
||||
def purge_unreferenced_state_groups(
|
||||
self, room_id: str, state_groups_to_delete
|
||||
) -> defer.Deferred:
|
||||
"""Deletes no longer referenced state groups and de-deltas any state
|
||||
groups that reference them.
|
||||
|
||||
Args:
|
||||
room_id: The room the state groups belong to (must all be in the
|
||||
same room).
|
||||
state_groups_to_delete (Collection[int]): Set of all state groups
|
||||
to delete.
|
||||
"""
|
||||
|
||||
return self.db.runInteraction(
|
||||
"purge_unreferenced_state_groups",
|
||||
self._purge_unreferenced_state_groups,
|
||||
room_id,
|
||||
state_groups_to_delete,
|
||||
)
|
||||
|
||||
def _purge_unreferenced_state_groups(self, txn, room_id, state_groups_to_delete):
|
||||
logger.info(
|
||||
"[purge] found %i state groups to delete", len(state_groups_to_delete)
|
||||
)
|
||||
|
||||
rows = self.db.simple_select_many_txn(
|
||||
txn,
|
||||
table="state_group_edges",
|
||||
column="prev_state_group",
|
||||
iterable=state_groups_to_delete,
|
||||
keyvalues={},
|
||||
retcols=("state_group",),
|
||||
)
|
||||
|
||||
remaining_state_groups = set(
|
||||
row["state_group"]
|
||||
for row in rows
|
||||
if row["state_group"] not in state_groups_to_delete
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"[purge] de-delta-ing %i remaining state groups",
|
||||
len(remaining_state_groups),
|
||||
)
|
||||
|
||||
# Now we turn the state groups that reference to-be-deleted state
|
||||
# groups to non delta versions.
|
||||
for sg in remaining_state_groups:
|
||||
logger.info("[purge] de-delta-ing remaining state group %s", sg)
|
||||
curr_state = self._get_state_groups_from_groups_txn(txn, [sg])
|
||||
curr_state = curr_state[sg]
|
||||
|
||||
self.db.simple_delete_txn(
|
||||
txn, table="state_groups_state", keyvalues={"state_group": sg}
|
||||
)
|
||||
|
||||
self.db.simple_delete_txn(
|
||||
txn, table="state_group_edges", keyvalues={"state_group": sg}
|
||||
)
|
||||
|
||||
self.db.simple_insert_many_txn(
|
||||
txn,
|
||||
table="state_groups_state",
|
||||
values=[
|
||||
{
|
||||
"state_group": sg,
|
||||
"room_id": room_id,
|
||||
"type": key[0],
|
||||
"state_key": key[1],
|
||||
"event_id": state_id,
|
||||
}
|
||||
for key, state_id in iteritems(curr_state)
|
||||
],
|
||||
)
|
||||
|
||||
logger.info("[purge] removing redundant state groups")
|
||||
txn.executemany(
|
||||
"DELETE FROM state_groups_state WHERE state_group = ?",
|
||||
((sg,) for sg in state_groups_to_delete),
|
||||
)
|
||||
txn.executemany(
|
||||
"DELETE FROM state_groups WHERE id = ?",
|
||||
((sg,) for sg in state_groups_to_delete),
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_previous_state_groups(self, state_groups):
|
||||
"""Fetch the previous groups of the given state groups.
|
||||
|
||||
Args:
|
||||
state_groups (Iterable[int])
|
||||
|
||||
Returns:
|
||||
Deferred[dict[int, int]]: mapping from state group to previous
|
||||
state group.
|
||||
"""
|
||||
|
||||
rows = yield self.db.simple_select_many_batch(
|
||||
table="state_group_edges",
|
||||
column="prev_state_group",
|
||||
iterable=state_groups,
|
||||
keyvalues={},
|
||||
retcols=("prev_state_group", "state_group"),
|
||||
desc="get_previous_state_groups",
|
||||
)
|
||||
|
||||
return {row["state_group"]: row["prev_state_group"] for row in rows}
|
||||
|
||||
def purge_room_state(self, room_id, state_groups_to_delete):
|
||||
"""Deletes all record of a room from state tables
|
||||
|
||||
Args:
|
||||
room_id (str):
|
||||
state_groups_to_delete (list[int]): State groups to delete
|
||||
"""
|
||||
|
||||
return self.db.runInteraction(
|
||||
"purge_room_state",
|
||||
self._purge_room_state_txn,
|
||||
room_id,
|
||||
state_groups_to_delete,
|
||||
)
|
||||
|
||||
def _purge_room_state_txn(self, txn, room_id, state_groups_to_delete):
|
||||
# first we have to delete the state groups states
|
||||
logger.info("[purge] removing %s from state_groups_state", room_id)
|
||||
|
||||
self.db.simple_delete_many_txn(
|
||||
txn,
|
||||
table="state_groups_state",
|
||||
column="state_group",
|
||||
iterable=state_groups_to_delete,
|
||||
keyvalues={},
|
||||
)
|
||||
|
||||
# ... and the state group edges
|
||||
logger.info("[purge] removing %s from state_group_edges", room_id)
|
||||
|
||||
self.db.simple_delete_many_txn(
|
||||
txn,
|
||||
table="state_group_edges",
|
||||
column="state_group",
|
||||
iterable=state_groups_to_delete,
|
||||
keyvalues={},
|
||||
)
|
||||
|
||||
# ... and the state groups
|
||||
logger.info("[purge] removing %s from state_groups", room_id)
|
||||
|
||||
self.db.simple_delete_many_txn(
|
||||
txn,
|
||||
table="state_groups",
|
||||
column="id",
|
||||
iterable=state_groups_to_delete,
|
||||
keyvalues={},
|
||||
)
|
||||
|
||||
async def is_event_after(self, event_id1, event_id2):
|
||||
"""Returns True if event_id1 is after event_id2 in the stream
|
||||
"""
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue