Rework event purge to use a temporary table

... which should speed things up by reducing the amount of data being shuffled
across the connection
This commit is contained in:
Richard van der Hoff 2018-02-14 11:02:22 +00:00
parent d627174da2
commit 5fcbf1e07c

View File

@ -2115,23 +2115,44 @@ class EventsStore(SQLBaseStore):
logger.info("[purge] looking for events to delete") logger.info("[purge] looking for events to delete")
# we build a temporary table listing the events so that we don't have
# to keep shovelling the list back and forth across the connection.
txn.execute( txn.execute(
"SELECT event_id, state_key FROM events" "CREATE TEMPORARY TABLE events_to_purge ("
" LEFT JOIN state_events USING (room_id, event_id)" " event_id TEXT NOT NULL,"
" WHERE room_id = ? AND topological_ordering < ?", " should_delete BOOLEAN NOT NULL"
(room_id, topological_ordering,) ")"
)
# create an index on should_delete because later we'll be looking for
# the should_delete / shouldn't_delete subsets
txn.execute("CREATE INDEX ON events_to_purge(should_delete)")
should_delete_expr = "state_key IS NULL"
should_delete_params = ()
if not delete_local_events:
should_delete_expr += " AND event_id NOT LIKE ?"
should_delete_params += ("%:" + self.hs.hostname, )
should_delete_params += (room_id, topological_ordering)
txn.execute(
"INSERT INTO events_to_purge"
" SELECT event_id, %s"
" FROM events AS e LEFT JOIN state_events USING (event_id)"
" WHERE e.room_id = ? AND topological_ordering < ?" % (
should_delete_expr,
),
should_delete_params,
)
txn.execute(
"SELECT event_id, should_delete FROM events_to_purge"
) )
event_rows = txn.fetchall() event_rows = txn.fetchall()
to_delete = [
(event_id,) for event_id, state_key in event_rows
if state_key is None and (
delete_local_events or not self.hs.is_mine_id(event_id)
)
]
logger.info( logger.info(
"[purge] found %i events before cutoff, of which %i can be deleted", "[purge] found %i events before cutoff, of which %i can be deleted",
len(event_rows), len(to_delete), len(event_rows), sum(1 for e in event_rows if e[1]),
) )
logger.info("[purge] Finding new backward extremities") logger.info("[purge] Finding new backward extremities")
@ -2139,12 +2160,11 @@ class EventsStore(SQLBaseStore):
# We calculate the new entries for the backward extremeties by finding # We calculate the new entries for the backward extremeties by finding
# all events that point to events that are to be purged # all events that point to events that are to be purged
txn.execute( txn.execute(
"SELECT DISTINCT e.event_id FROM events as e" "SELECT DISTINCT e.event_id FROM events_to_purge AS e"
" INNER JOIN event_edges as ed ON e.event_id = ed.prev_event_id" " INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id"
" INNER JOIN events as e2 ON e2.event_id = ed.event_id" " INNER JOIN events AS e2 ON e2.event_id = ed.event_id"
" WHERE e.room_id = ? AND e.topological_ordering < ?" " WHERE e2.topological_ordering >= ?",
" AND e2.topological_ordering >= ?", (topological_ordering, )
(room_id, topological_ordering, topological_ordering)
) )
new_backwards_extrems = txn.fetchall() new_backwards_extrems = txn.fetchall()
@ -2172,12 +2192,11 @@ class EventsStore(SQLBaseStore):
"SELECT state_group FROM event_to_state_groups" "SELECT state_group FROM event_to_state_groups"
" INNER JOIN events USING (event_id)" " INNER JOIN events USING (event_id)"
" WHERE state_group IN (" " WHERE state_group IN ("
" SELECT DISTINCT state_group FROM events" " SELECT DISTINCT state_group FROM events_to_purge"
" INNER JOIN event_to_state_groups USING (event_id)" " INNER JOIN event_to_state_groups USING (event_id)"
" WHERE room_id = ? AND topological_ordering < ?"
" )" " )"
" GROUP BY state_group HAVING MAX(topological_ordering) < ?", " GROUP BY state_group HAVING MAX(topological_ordering) < ?",
(room_id, topological_ordering, topological_ordering) (topological_ordering, )
) )
state_rows = txn.fetchall() state_rows = txn.fetchall()
@ -2262,9 +2281,9 @@ class EventsStore(SQLBaseStore):
) )
logger.info("[purge] removing events from event_to_state_groups") logger.info("[purge] removing events from event_to_state_groups")
txn.executemany( txn.execute(
"DELETE FROM event_to_state_groups WHERE event_id = ?", "DELETE FROM event_to_state_groups "
[(event_id,) for event_id, _ in event_rows] "WHERE event_id IN (SELECT event_id from events_to_purge)"
) )
for event_id, _ in event_rows: for event_id, _ in event_rows:
txn.call_after(self._get_state_group_for_event.invalidate, ( txn.call_after(self._get_state_group_for_event.invalidate, (
@ -2289,22 +2308,26 @@ class EventsStore(SQLBaseStore):
): ):
logger.info("[purge] removing events from %s", table) logger.info("[purge] removing events from %s", table)
txn.executemany( txn.execute(
"DELETE FROM %s WHERE event_id = ?" % (table,), "DELETE FROM %s WHERE event_id IN ("
to_delete " SELECT event_id FROM events_to_purge WHERE should_delete"
")" % (table,),
) )
# Mark all state and own events as outliers # Mark all state and own events as outliers
logger.info("[purge] marking remaining events as outliers") logger.info("[purge] marking remaining events as outliers")
txn.executemany( txn.execute(
"UPDATE events SET outlier = ?" "UPDATE events SET outlier = ?"
" WHERE event_id = ?", " WHERE event_id IN ("
[ " SELECT event_id FROM events_to_purge "
(True, event_id,) for event_id, state_key in event_rows " WHERE NOT should_delete"
if state_key is not None or ( ")",
not delete_local_events and self.hs.is_mine_id(event_id) (True,),
) )
]
# we're now done with the temporary table
txn.execute(
"DROP TABLE events_to_purge"
) )
# synapse tries to take out an exclusive lock on room_depth whenever it # synapse tries to take out an exclusive lock on room_depth whenever it