From 5fcbf1e07c5f7c2ce0ec44c2569116507caa0183 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 14 Feb 2018 11:02:22 +0000 Subject: [PATCH 1/4] Rework event purge to use a temporary table ... which should speed things up by reducing the amount of data being shuffled across the connection --- synapse/storage/events.py | 93 ++++++++++++++++++++++++--------------- 1 file changed, 58 insertions(+), 35 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index bbb6aa992..5a2e6a03d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2115,23 +2115,44 @@ class EventsStore(SQLBaseStore): logger.info("[purge] looking for events to delete") + # we build a temporary table listing the events so that we don't have + # to keep shovelling the list back and forth across the connection. + txn.execute( - "SELECT event_id, state_key FROM events" - " LEFT JOIN state_events USING (room_id, event_id)" - " WHERE room_id = ? AND topological_ordering < ?", - (room_id, topological_ordering,) + "CREATE TEMPORARY TABLE events_to_purge (" + " event_id TEXT NOT NULL," + " should_delete BOOLEAN NOT NULL" + ")" + ) + + # create an index on should_delete because later we'll be looking for + # the should_delete / shouldn't_delete subsets + txn.execute("CREATE INDEX ON events_to_purge(should_delete)") + + should_delete_expr = "state_key IS NULL" + should_delete_params = () + if not delete_local_events: + should_delete_expr += " AND event_id NOT LIKE ?" + should_delete_params += ("%:" + self.hs.hostname, ) + + should_delete_params += (room_id, topological_ordering) + + txn.execute( + "INSERT INTO events_to_purge" + " SELECT event_id, %s" + " FROM events AS e LEFT JOIN state_events USING (event_id)" + " WHERE e.room_id = ? AND topological_ordering < ?" % ( + should_delete_expr, + ), + should_delete_params, + ) + txn.execute( + "SELECT event_id, should_delete FROM events_to_purge" ) event_rows = txn.fetchall() - - to_delete = [ - (event_id,) for event_id, state_key in event_rows - if state_key is None and ( - delete_local_events or not self.hs.is_mine_id(event_id) - ) - ] logger.info( "[purge] found %i events before cutoff, of which %i can be deleted", - len(event_rows), len(to_delete), + len(event_rows), sum(1 for e in event_rows if e[1]), ) logger.info("[purge] Finding new backward extremities") @@ -2139,12 +2160,11 @@ class EventsStore(SQLBaseStore): # We calculate the new entries for the backward extremeties by finding # all events that point to events that are to be purged txn.execute( - "SELECT DISTINCT e.event_id FROM events as e" - " INNER JOIN event_edges as ed ON e.event_id = ed.prev_event_id" - " INNER JOIN events as e2 ON e2.event_id = ed.event_id" - " WHERE e.room_id = ? AND e.topological_ordering < ?" - " AND e2.topological_ordering >= ?", - (room_id, topological_ordering, topological_ordering) + "SELECT DISTINCT e.event_id FROM events_to_purge AS e" + " INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id" + " INNER JOIN events AS e2 ON e2.event_id = ed.event_id" + " WHERE e2.topological_ordering >= ?", + (topological_ordering, ) ) new_backwards_extrems = txn.fetchall() @@ -2172,12 +2192,11 @@ class EventsStore(SQLBaseStore): "SELECT state_group FROM event_to_state_groups" " INNER JOIN events USING (event_id)" " WHERE state_group IN (" - " SELECT DISTINCT state_group FROM events" + " SELECT DISTINCT state_group FROM events_to_purge" " INNER JOIN event_to_state_groups USING (event_id)" - " WHERE room_id = ? AND topological_ordering < ?" " )" " GROUP BY state_group HAVING MAX(topological_ordering) < ?", - (room_id, topological_ordering, topological_ordering) + (topological_ordering, ) ) state_rows = txn.fetchall() @@ -2262,9 +2281,9 @@ class EventsStore(SQLBaseStore): ) logger.info("[purge] removing events from event_to_state_groups") - txn.executemany( - "DELETE FROM event_to_state_groups WHERE event_id = ?", - [(event_id,) for event_id, _ in event_rows] + txn.execute( + "DELETE FROM event_to_state_groups " + "WHERE event_id IN (SELECT event_id from events_to_purge)" ) for event_id, _ in event_rows: txn.call_after(self._get_state_group_for_event.invalidate, ( @@ -2289,22 +2308,26 @@ class EventsStore(SQLBaseStore): ): logger.info("[purge] removing events from %s", table) - txn.executemany( - "DELETE FROM %s WHERE event_id = ?" % (table,), - to_delete + txn.execute( + "DELETE FROM %s WHERE event_id IN (" + " SELECT event_id FROM events_to_purge WHERE should_delete" + ")" % (table,), ) # Mark all state and own events as outliers logger.info("[purge] marking remaining events as outliers") - txn.executemany( + txn.execute( "UPDATE events SET outlier = ?" - " WHERE event_id = ?", - [ - (True, event_id,) for event_id, state_key in event_rows - if state_key is not None or ( - not delete_local_events and self.hs.is_mine_id(event_id) - ) - ] + " WHERE event_id IN (" + " SELECT event_id FROM events_to_purge " + " WHERE NOT should_delete" + ")", + (True,), + ) + + # we're now done with the temporary table + txn.execute( + "DROP TABLE events_to_purge" ) # synapse tries to take out an exclusive lock on room_depth whenever it From 278d21b5e498e301a43a217595ce75ed7729256f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 14 Feb 2018 15:44:51 +0000 Subject: [PATCH 2/4] purge_history: fix index use event_push_actions doesn't have an index on event_id, so we need to specify room_id. --- synapse/storage/events.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5a2e6a03d..c81bc75ea 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2314,6 +2314,20 @@ class EventsStore(SQLBaseStore): ")" % (table,), ) + # event_push_actions lacks an index on event_id, and has one on + # (room_id, event_id) instead. + for table in ( + "event_push_actions", + ): + logger.info("[purge] removing events from %s", table) + + txn.execute( + "DELETE FROM %s WHERE room_id = ? AND event_id IN (" + " SELECT event_id FROM events_to_purge WHERE should_delete" + ")" % (table,), + (room_id, ) + ) + # Mark all state and own events as outliers logger.info("[purge] marking remaining events as outliers") txn.execute( From ac27f6a35ebb63d502769edec642fbf70a178a60 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 14 Feb 2018 16:41:12 +0000 Subject: [PATCH 3/4] purge_history: handle sqlite asshattery apparently creating a temporary table commits the transaction. because that's a useful thing. --- synapse/storage/events.py | 46 +++++++++++++++++++++++---------------- 1 file changed, 27 insertions(+), 19 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c81bc75ea..90e910f61 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2093,6 +2093,27 @@ class EventsStore(SQLBaseStore): # state_groups # state_groups_state + # we will build a temporary table listing the events so that we don't + # have to keep shovelling the list back and forth across the + # connection. Annoyingly the python sqlite driver commits the + # transaction on CREATE, so let's do this first. + # + # furthermore, we might already have the table from a previous (failed) + # purge attempt, so let's drop the table first. + + txn.execute("DROP TABLE IF EXISTS events_to_purge") + + txn.execute( + "CREATE TEMPORARY TABLE events_to_purge (" + " event_id TEXT NOT NULL," + " should_delete BOOLEAN NOT NULL" + ")" + ) + + # create an index on should_delete because later we'll be looking for + # the should_delete / shouldn't_delete subsets + txn.execute("CREATE INDEX ON events_to_purge(should_delete)") + # First ensure that we're not about to delete all the forward extremeties txn.execute( "SELECT e.event_id, e.depth FROM events as e " @@ -2115,20 +2136,6 @@ class EventsStore(SQLBaseStore): logger.info("[purge] looking for events to delete") - # we build a temporary table listing the events so that we don't have - # to keep shovelling the list back and forth across the connection. - - txn.execute( - "CREATE TEMPORARY TABLE events_to_purge (" - " event_id TEXT NOT NULL," - " should_delete BOOLEAN NOT NULL" - ")" - ) - - # create an index on should_delete because later we'll be looking for - # the should_delete / shouldn't_delete subsets - txn.execute("CREATE INDEX ON events_to_purge(should_delete)") - should_delete_expr = "state_key IS NULL" should_delete_params = () if not delete_local_events: @@ -2339,11 +2346,6 @@ class EventsStore(SQLBaseStore): (True,), ) - # we're now done with the temporary table - txn.execute( - "DROP TABLE events_to_purge" - ) - # synapse tries to take out an exclusive lock on room_depth whenever it # persists events (because upsert), and once we run this update, we # will block that for the rest of our transaction. @@ -2356,6 +2358,12 @@ class EventsStore(SQLBaseStore): (topological_ordering, room_id,) ) + # finally, drop the temp table. this will commit the txn in sqlite, + # so make sure to keep this actually last. + txn.execute( + "DROP TABLE events_to_purge" + ) + logger.info("[purge] done") @defer.inlineCallbacks From 39bf47319f002614d8de11948d09db7648b26315 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 14 Feb 2018 16:42:19 +0000 Subject: [PATCH 4/4] purge_history: fix sqlite syntax error apparently sqlite insists on indexes being named --- synapse/storage/events.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 90e910f61..28cce2979 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2112,7 +2112,10 @@ class EventsStore(SQLBaseStore): # create an index on should_delete because later we'll be looking for # the should_delete / shouldn't_delete subsets - txn.execute("CREATE INDEX ON events_to_purge(should_delete)") + txn.execute( + "CREATE INDEX events_to_purge_should_delete" + " ON events_to_purge(should_delete)", + ) # First ensure that we're not about to delete all the forward extremeties txn.execute(