mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2025-01-18 17:47:12 -05:00
Move _find_unreferenced_groups
This commit is contained in:
parent
664b192a3b
commit
ad88460e0d
@ -2052,8 +2052,10 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
|
|||||||
|
|
||||||
logger.info("[purge] finding state groups that can be deleted")
|
logger.info("[purge] finding state groups that can be deleted")
|
||||||
|
|
||||||
state_groups_to_delete, remaining_state_groups = self._find_unreferenced_groups(
|
state_groups_to_delete, remaining_state_groups = (
|
||||||
txn, referenced_state_groups,
|
self._find_unreferenced_groups_during_purge(
|
||||||
|
txn, referenced_state_groups,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
@ -2209,6 +2211,85 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
|
|||||||
|
|
||||||
logger.info("[purge] done")
|
logger.info("[purge] done")
|
||||||
|
|
||||||
|
def _find_unreferenced_groups_during_purge(self, txn, state_groups):
|
||||||
|
"""Used when purging history to figure out which state groups can be
|
||||||
|
deleted and which need to be de-delta'ed (due to one of its prev groups
|
||||||
|
being scheduled for deletion).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
txn
|
||||||
|
state_groups (set[int]): Set of state groups referenced by events
|
||||||
|
that are going to be deleted.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
tuple[set[int], set[int]]: The set of state groups that can be
|
||||||
|
deleted and the set of state groups that need to be de-delta'ed
|
||||||
|
"""
|
||||||
|
# Graph of state group -> previous group
|
||||||
|
graph = {}
|
||||||
|
|
||||||
|
# Set of events that we have found to be referenced by events
|
||||||
|
referenced_groups = set()
|
||||||
|
|
||||||
|
# Set of state groups we've already seen
|
||||||
|
state_groups_seen = set(state_groups)
|
||||||
|
|
||||||
|
# Set of state groups to handle next.
|
||||||
|
next_to_search = set(state_groups)
|
||||||
|
while next_to_search:
|
||||||
|
# We bound size of groups we're looking up at once, to stop the
|
||||||
|
# SQL query getting too big
|
||||||
|
if len(next_to_search) < 100:
|
||||||
|
current_search = next_to_search
|
||||||
|
next_to_search = set()
|
||||||
|
else:
|
||||||
|
current_search = set(itertools.islice(next_to_search, 100))
|
||||||
|
next_to_search -= current_search
|
||||||
|
|
||||||
|
# Check if state groups are referenced
|
||||||
|
sql = """
|
||||||
|
SELECT DISTINCT state_group FROM event_to_state_groups
|
||||||
|
LEFT JOIN events_to_purge AS ep USING (event_id)
|
||||||
|
WHERE state_group IN (%s) AND ep.event_id IS NULL
|
||||||
|
""" % (",".join("?" for _ in current_search),)
|
||||||
|
txn.execute(sql, list(current_search))
|
||||||
|
|
||||||
|
referenced = set(sg for sg, in txn)
|
||||||
|
referenced_groups |= referenced
|
||||||
|
|
||||||
|
# We don't continue iterating up the state group graphs for state
|
||||||
|
# groups that are referenced.
|
||||||
|
current_search -= referenced
|
||||||
|
|
||||||
|
rows = self._simple_select_many_txn(
|
||||||
|
txn,
|
||||||
|
table="state_group_edges",
|
||||||
|
column="prev_state_group",
|
||||||
|
iterable=current_search,
|
||||||
|
keyvalues={},
|
||||||
|
retcols=("prev_state_group", "state_group",),
|
||||||
|
)
|
||||||
|
|
||||||
|
prevs = set(row["state_group"] for row in rows)
|
||||||
|
# We don't bother re-handling groups we've already seen
|
||||||
|
prevs -= state_groups_seen
|
||||||
|
next_to_search |= prevs
|
||||||
|
state_groups_seen |= prevs
|
||||||
|
|
||||||
|
for row in rows:
|
||||||
|
# Note: Each state group can have at most one prev group
|
||||||
|
graph[row["state_group"]] = row["prev_state_group"]
|
||||||
|
|
||||||
|
to_delete = state_groups_seen - referenced_groups
|
||||||
|
|
||||||
|
to_dedelta = set()
|
||||||
|
for sg in referenced_groups:
|
||||||
|
prev_sg = graph.get(sg)
|
||||||
|
if prev_sg and prev_sg in to_delete:
|
||||||
|
to_dedelta.add(sg)
|
||||||
|
|
||||||
|
return to_delete, to_dedelta
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def is_event_after(self, event_id1, event_id2):
|
def is_event_after(self, event_id1, event_id2):
|
||||||
"""Returns True if event_id1 is after event_id2 in the stream
|
"""Returns True if event_id1 is after event_id2 in the stream
|
||||||
|
@ -1234,85 +1234,6 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
|
|||||||
|
|
||||||
return count
|
return count
|
||||||
|
|
||||||
def _find_unreferenced_groups(self, txn, state_groups):
|
|
||||||
"""Used when purging history to figure out which state groups can be
|
|
||||||
deleted and which need to be de-delta'ed (due to one of its prev groups
|
|
||||||
being scheduled for deletion).
|
|
||||||
|
|
||||||
Args:
|
|
||||||
txn
|
|
||||||
state_groups (set[int]): Set of state groups referenced by events
|
|
||||||
that are going to be deleted.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
tuple[set[int], set[int]]: The set of state groups that can be
|
|
||||||
deleted and the set of state groups that need to be de-delta'ed
|
|
||||||
"""
|
|
||||||
# Graph of state group -> previous group
|
|
||||||
graph = {}
|
|
||||||
|
|
||||||
# Set of events that we have found to be referenced by events
|
|
||||||
referenced_groups = set()
|
|
||||||
|
|
||||||
# Set of state groups we've already seen
|
|
||||||
state_groups_seen = set(state_groups)
|
|
||||||
|
|
||||||
# Set of state groups to handle next.
|
|
||||||
next_to_search = set(state_groups)
|
|
||||||
while next_to_search:
|
|
||||||
# We bound size of groups we're looking up at once, to stop the
|
|
||||||
# SQL query getting too big
|
|
||||||
if len(next_to_search) < 100:
|
|
||||||
current_search = next_to_search
|
|
||||||
next_to_search = set()
|
|
||||||
else:
|
|
||||||
current_search = set(islice(next_to_search, 100))
|
|
||||||
next_to_search -= current_search
|
|
||||||
|
|
||||||
# Check if state groups are referenced
|
|
||||||
sql = """
|
|
||||||
SELECT DISTINCT state_group FROM event_to_state_groups
|
|
||||||
LEFT JOIN events_to_purge AS ep USING (event_id)
|
|
||||||
WHERE state_group IN (%s) AND ep.event_id IS NULL
|
|
||||||
""" % (",".join("?" for _ in current_search),)
|
|
||||||
txn.execute(sql, list(current_search))
|
|
||||||
|
|
||||||
referenced = set(sg for sg, in txn)
|
|
||||||
referenced_groups |= referenced
|
|
||||||
|
|
||||||
# We don't continue iterating up the state group graphs for state
|
|
||||||
# groups that are referenced.
|
|
||||||
current_search -= referenced
|
|
||||||
|
|
||||||
rows = self._simple_select_many_txn(
|
|
||||||
txn,
|
|
||||||
table="state_group_edges",
|
|
||||||
column="prev_state_group",
|
|
||||||
iterable=current_search,
|
|
||||||
keyvalues={},
|
|
||||||
retcols=("prev_state_group", "state_group",),
|
|
||||||
)
|
|
||||||
|
|
||||||
prevs = set(row["state_group"] for row in rows)
|
|
||||||
# We don't bother re-handling groups we've already seen
|
|
||||||
prevs -= state_groups_seen
|
|
||||||
next_to_search |= prevs
|
|
||||||
state_groups_seen |= prevs
|
|
||||||
|
|
||||||
for row in rows:
|
|
||||||
# Note: Each state group can have at most one prev group
|
|
||||||
graph[row["state_group"]] = row["prev_state_group"]
|
|
||||||
|
|
||||||
to_delete = state_groups_seen - referenced_groups
|
|
||||||
|
|
||||||
to_dedelta = set()
|
|
||||||
for sg in referenced_groups:
|
|
||||||
prev_sg = graph.get(sg)
|
|
||||||
if prev_sg and prev_sg in to_delete:
|
|
||||||
to_dedelta.add(sg)
|
|
||||||
|
|
||||||
return to_delete, to_dedelta
|
|
||||||
|
|
||||||
|
|
||||||
class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
|
class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
|
||||||
""" Keeps track of the state at a given event.
|
""" Keeps track of the state at a given event.
|
||||||
|
Loading…
Reference in New Issue
Block a user