mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2024-10-01 08:25:44 -04:00
Merge pull request #795 from matrix-org/dbkr/delete_push_actions_after_a_month
Only delete push actions after 30 days
This commit is contained in:
commit
209ba4d024
@ -88,6 +88,7 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||
|
||||
def __init__(self, db_conn, hs):
|
||||
self.hs = hs
|
||||
self._clock = hs.get_clock()
|
||||
self.database_engine = hs.database_engine
|
||||
|
||||
self.client_ip_last_seen = Cache(
|
||||
@ -173,6 +174,14 @@ class DataStore(RoomMemberStore, RoomStore,
|
||||
prefilled_cache=push_rules_prefill,
|
||||
)
|
||||
|
||||
cur = db_conn.cursor()
|
||||
self._find_stream_orderings_for_times_txn(cur)
|
||||
cur.close()
|
||||
|
||||
self.find_stream_orderings_looping_call = self._clock.looping_call(
|
||||
self._find_stream_orderings_for_times, 60 * 60 * 1000
|
||||
)
|
||||
|
||||
super(DataStore, self).__init__(hs)
|
||||
|
||||
def take_presence_startup_info(self):
|
||||
|
@ -153,7 +153,6 @@ class SQLBaseStore(object):
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
self._db_pool = hs.get_db_pool()
|
||||
self._clock = hs.get_clock()
|
||||
|
||||
self._previous_txn_total_time = 0
|
||||
self._current_txn_total_time = 0
|
||||
|
@ -24,6 +24,10 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EventPushActionsStore(SQLBaseStore):
|
||||
def __init__(self, hs):
|
||||
self.stream_ordering_month_ago = None
|
||||
super(EventPushActionsStore, self).__init__(hs)
|
||||
|
||||
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
|
||||
"""
|
||||
Args:
|
||||
@ -224,18 +228,93 @@ class EventPushActionsStore(SQLBaseStore):
|
||||
(room_id, event_id)
|
||||
)
|
||||
|
||||
def _remove_push_actions_before_txn(self, txn, room_id, user_id,
|
||||
topological_ordering):
|
||||
def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
|
||||
topological_ordering):
|
||||
"""
|
||||
Purges old, stale push actions for a user and room before a given
|
||||
topological_ordering
|
||||
Args:
|
||||
txn: The transcation
|
||||
room_id: Room ID to delete from
|
||||
user_id: user ID to delete for
|
||||
topological_ordering: The lowest topological ordering which will
|
||||
not be deleted.
|
||||
"""
|
||||
txn.call_after(
|
||||
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
|
||||
(room_id, user_id, )
|
||||
)
|
||||
|
||||
# We need to join on the events table to get the received_ts for
|
||||
# event_push_actions and sqlite won't let us use a join in a delete so
|
||||
# we can't just delete where received_ts < x. Furthermore we can
|
||||
# only identify event_push_actions by a tuple of room_id, event_id
|
||||
# we we can't use a subquery.
|
||||
# Instead, we look up the stream ordering for the last event in that
|
||||
# room received before the threshold time and delete event_push_actions
|
||||
# in the room with a stream_odering before that.
|
||||
txn.execute(
|
||||
"DELETE FROM event_push_actions"
|
||||
" WHERE room_id = ? AND user_id = ? AND topological_ordering < ?",
|
||||
(room_id, user_id, topological_ordering,)
|
||||
"DELETE FROM event_push_actions "
|
||||
" WHERE user_id = ? AND room_id = ? AND "
|
||||
" topological_ordering < ? AND stream_ordering < ?",
|
||||
(user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _find_stream_orderings_for_times(self):
|
||||
yield self.runInteraction(
|
||||
"_find_stream_orderings_for_times",
|
||||
self._find_stream_orderings_for_times_txn
|
||||
)
|
||||
|
||||
def _find_stream_orderings_for_times_txn(self, txn):
|
||||
logger.info("Searching for stream ordering 1 month ago")
|
||||
self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
|
||||
txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
|
||||
)
|
||||
logger.info(
|
||||
"Found stream ordering 1 month ago: it's %d",
|
||||
self.stream_ordering_month_ago
|
||||
)
|
||||
|
||||
def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
|
||||
"""
|
||||
Find the stream_ordering of the first event that was received after
|
||||
a given timestamp. This is relatively slow as there is no index on
|
||||
received_ts but we can then use this to delete push actions before
|
||||
this.
|
||||
|
||||
received_ts must necessarily be in the same order as stream_ordering
|
||||
and stream_ordering is indexed, so we manually binary search using
|
||||
stream_ordering
|
||||
"""
|
||||
txn.execute("SELECT MAX(stream_ordering) FROM events")
|
||||
max_stream_ordering = txn.fetchone()[0]
|
||||
|
||||
if max_stream_ordering is None:
|
||||
return 0
|
||||
|
||||
range_start = 0
|
||||
range_end = max_stream_ordering
|
||||
|
||||
sql = (
|
||||
"SELECT received_ts FROM events"
|
||||
" WHERE stream_ordering > ?"
|
||||
" ORDER BY stream_ordering"
|
||||
" LIMIT 1"
|
||||
)
|
||||
|
||||
while range_end - range_start > 1:
|
||||
middle = int((range_end + range_start) / 2)
|
||||
txn.execute(sql, (middle,))
|
||||
middle_ts = txn.fetchone()[0]
|
||||
if ts > middle_ts:
|
||||
range_start = middle
|
||||
else:
|
||||
range_end = middle
|
||||
|
||||
return range_end
|
||||
|
||||
|
||||
def _action_has_highlight(actions):
|
||||
for action in actions:
|
||||
|
@ -297,7 +297,7 @@ class ReceiptsStore(SQLBaseStore):
|
||||
)
|
||||
|
||||
if receipt_type == "m.read" and topological_ordering:
|
||||
self._remove_push_actions_before_txn(
|
||||
self._remove_old_push_actions_before_txn(
|
||||
txn,
|
||||
room_id=room_id,
|
||||
user_id=user_id,
|
||||
|
Loading…
Reference in New Issue
Block a user