diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index d970fde9e..49feb7777 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -88,6 +88,7 @@ class DataStore(RoomMemberStore, RoomStore, def __init__(self, db_conn, hs): self.hs = hs + self._clock = hs.get_clock() self.database_engine = hs.database_engine self.client_ip_last_seen = Cache( @@ -173,6 +174,14 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=push_rules_prefill, ) + cur = db_conn.cursor() + self._find_stream_orderings_for_times_txn(cur) + cur.close() + + self.find_stream_orderings_looping_call = self._clock.looping_call( + self._find_stream_orderings_for_times, 60 * 60 * 1000 + ) + super(DataStore, self).__init__(hs) def take_presence_startup_info(self): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e0d709869..56a0dd80f 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -153,7 +153,6 @@ class SQLBaseStore(object): def __init__(self, hs): self.hs = hs self._db_pool = hs.get_db_pool() - self._clock = hs.get_clock() self._previous_txn_total_time = 0 self._current_txn_total_time = 0 diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 9705db5c4..4dae51a17 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -24,6 +24,10 @@ logger = logging.getLogger(__name__) class EventPushActionsStore(SQLBaseStore): + def __init__(self, hs): + self.stream_ordering_month_ago = None + super(EventPushActionsStore, self).__init__(hs) + def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): """ Args: @@ -224,18 +228,93 @@ class EventPushActionsStore(SQLBaseStore): (room_id, event_id) ) - def _remove_push_actions_before_txn(self, txn, room_id, user_id, - topological_ordering): + def _remove_old_push_actions_before_txn(self, txn, room_id, user_id, + topological_ordering): + """ + Purges old, stale push actions for a user and room before a given + topological_ordering + Args: + txn: The transcation + room_id: Room ID to delete from + user_id: user ID to delete for + topological_ordering: The lowest topological ordering which will + not be deleted. + """ txn.call_after( self.get_unread_event_push_actions_by_room_for_user.invalidate_many, (room_id, user_id, ) ) + + # We need to join on the events table to get the received_ts for + # event_push_actions and sqlite won't let us use a join in a delete so + # we can't just delete where received_ts < x. Furthermore we can + # only identify event_push_actions by a tuple of room_id, event_id + # we we can't use a subquery. + # Instead, we look up the stream ordering for the last event in that + # room received before the threshold time and delete event_push_actions + # in the room with a stream_odering before that. txn.execute( - "DELETE FROM event_push_actions" - " WHERE room_id = ? AND user_id = ? AND topological_ordering < ?", - (room_id, user_id, topological_ordering,) + "DELETE FROM event_push_actions " + " WHERE user_id = ? AND room_id = ? AND " + " topological_ordering < ? AND stream_ordering < ?", + (user_id, room_id, topological_ordering, self.stream_ordering_month_ago) ) + @defer.inlineCallbacks + def _find_stream_orderings_for_times(self): + yield self.runInteraction( + "_find_stream_orderings_for_times", + self._find_stream_orderings_for_times_txn + ) + + def _find_stream_orderings_for_times_txn(self, txn): + logger.info("Searching for stream ordering 1 month ago") + self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn( + txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000 + ) + logger.info( + "Found stream ordering 1 month ago: it's %d", + self.stream_ordering_month_ago + ) + + def _find_first_stream_ordering_after_ts_txn(self, txn, ts): + """ + Find the stream_ordering of the first event that was received after + a given timestamp. This is relatively slow as there is no index on + received_ts but we can then use this to delete push actions before + this. + + received_ts must necessarily be in the same order as stream_ordering + and stream_ordering is indexed, so we manually binary search using + stream_ordering + """ + txn.execute("SELECT MAX(stream_ordering) FROM events") + max_stream_ordering = txn.fetchone()[0] + + if max_stream_ordering is None: + return 0 + + range_start = 0 + range_end = max_stream_ordering + + sql = ( + "SELECT received_ts FROM events" + " WHERE stream_ordering > ?" + " ORDER BY stream_ordering" + " LIMIT 1" + ) + + while range_end - range_start > 1: + middle = int((range_end + range_start) / 2) + txn.execute(sql, (middle,)) + middle_ts = txn.fetchone()[0] + if ts > middle_ts: + range_start = middle + else: + range_end = middle + + return range_end + def _action_has_highlight(actions): for action in actions: diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index fdcf28f3e..f1774f0e4 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -297,7 +297,7 @@ class ReceiptsStore(SQLBaseStore): ) if receipt_type == "m.read" and topological_ordering: - self._remove_push_actions_before_txn( + self._remove_old_push_actions_before_txn( txn, room_id=room_id, user_id=user_id,