diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index d970fde9e..49feb7777 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -88,6 +88,7 @@ class DataStore(RoomMemberStore, RoomStore, def __init__(self, db_conn, hs): self.hs = hs + self._clock = hs.get_clock() self.database_engine = hs.database_engine self.client_ip_last_seen = Cache( @@ -173,6 +174,14 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=push_rules_prefill, ) + cur = db_conn.cursor() + self._find_stream_orderings_for_times_txn(cur) + cur.close() + + self.find_stream_orderings_looping_call = self._clock.looping_call( + self._find_stream_orderings_for_times, 60 * 60 * 1000 + ) + super(DataStore, self).__init__(hs) def take_presence_startup_info(self): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e0d709869..56a0dd80f 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -153,7 +153,6 @@ class SQLBaseStore(object): def __init__(self, hs): self.hs = hs self._db_pool = hs.get_db_pool() - self._clock = hs.get_clock() self._previous_txn_total_time = 0 self._current_txn_total_time = 0 diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 336c03c68..4425d4bce 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -22,10 +22,12 @@ import ujson as json logger = logging.getLogger(__name__) -KEEP_PUSH_ACTIONS_FOR_MS = 30 * 24 * 60 * 60 * 1000 - class EventPushActionsStore(SQLBaseStore): + def __init__(self, hs): + self.stream_ordering_month_ago = None + super(EventPushActionsStore, self).__init__(hs) + def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): """ Args: @@ -237,9 +239,6 @@ class EventPushActionsStore(SQLBaseStore): user_id: user ID to delete for topological_ordering: The lowest topological ordering which will not be deleted. - - Returns: - """ txn.call_after( self.get_unread_event_push_actions_by_room_for_user.invalidate_many, @@ -259,15 +258,63 @@ class EventPushActionsStore(SQLBaseStore): txn.execute( "DELETE FROM event_push_actions " " WHERE user_id = ? AND room_id = ? AND " - " topological_ordering < ? AND stream_ordering < (" - " SELECT stream_ordering FROM events" - " WHERE room_id = ? AND received_ts < ?" - " ORDER BY stream_ordering DESC" - " LIMIT 1" - ")", - (user_id, room_id, topological_ordering, room_id, threshold) + " topological_ordering < ? AND stream_ordering < ?" + (user_id, room_id, topological_ordering, self.stream_ordering_month_ago) ) + @defer.inlineCallbacks + def _find_stream_orderings_for_times(self): + yield self.runInteraction( + "_find_stream_orderings_for_times", + self._find_stream_orderings_for_times_txn + ) + + def _find_stream_orderings_for_times_txn(self, txn): + logger.info("Searching for stream ordering 1 month ago") + self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn( + txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000 + ) + logger.info( + "Found stream ordering 1 month ago: it's %d", + self.stream_ordering_month_ago + ) + + def _find_first_stream_ordering_after_ts_txn(self, txn, ts): + """ + Find the stream_ordering of the first event that was received after + a given timestamp. This is relatively slow as there is no index on + received_ts but we can then use this to delete push actions before + this. + + received_ts must necessarily be in the same order as stream_ordering + and stream_ordering is indexed, so we manually binary search using + stream_ordering + """ + txn.execute("SELECT MAX(stream_ordering) FROM events") + max_stream_ordering = txn.fetchone()[0] + + range_start = 0 + range_end = max_stream_ordering + + sql = ( + "SELECT received_ts FROM events" + " WHERE stream_ordering > ?" + " ORDER BY stream_ordering" + " LIMIT 1" + ) + + while range_end - range_start > 1: + middle = int((range_end + range_start) / 2) + txn.execute(sql, (middle,)) + middle_ts = txn.fetchone()[0] + if ts > middle_ts: + range_start = middle + else: + range_end = middle + logger.info("done: picking %d from %d and %d", range_end, range_start, range_end) + + return range_end + def _action_has_highlight(actions): for action in actions: