Handling expiring stream extrems correctly.

This commit is contained in:
Erik Johnston 2016-09-15 17:34:59 +01:00
parent ea6dc356b0
commit de4f798f01
2 changed files with 22 additions and 3 deletions

View File

@ -222,6 +222,8 @@ class DataStore(RoomMemberStore, RoomStore,
self._find_stream_orderings_for_times, 60 * 60 * 1000 self._find_stream_orderings_for_times, 60 * 60 * 1000
) )
self._stream_order_on_start = self.get_room_max_stream_ordering()
super(DataStore, self).__init__(hs) super(DataStore, self).__init__(hs)
def take_presence_startup_info(self): def take_presence_startup_info(self):

View File

@ -348,7 +348,14 @@ class EventFederationStore(SQLBaseStore):
# We want to make the cache more effective, so we clamp to the last # We want to make the cache more effective, so we clamp to the last
# change before the given ordering. # change before the given ordering.
last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id) last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id)
stream_ordering = min(last_change, stream_ordering)
# We don't always have a full stream_to_exterm_id table, e.g. after
# the upgrade that introduced it, so we make sure we never ask for a
# try and pin to a stream_ordering from before a restart
last_change = max(self._stream_order_on_start, last_change)
if last_change > self.stream_ordering_month_ago:
stream_ordering = min(last_change, stream_ordering)
return self._get_forward_extremeties_for_room(room_id, stream_ordering) return self._get_forward_extremeties_for_room(room_id, stream_ordering)
@ -386,9 +393,19 @@ class EventFederationStore(SQLBaseStore):
def _delete_old_forward_extrem_cache(self): def _delete_old_forward_extrem_cache(self):
def _delete_old_forward_extrem_cache_txn(txn): def _delete_old_forward_extrem_cache_txn(txn):
sql = ("""
DELETE FROM stream_ordering_to_exterm
WHERE
(
SELECT max(stream_ordering) AS stream_ordering
FROM stream_ordering_to_exterm
WHERE room_id = stream_ordering_to_exterm.room_id
) > ?
AND stream_ordering < ?
""")
txn.execute( txn.execute(
"DELETE FROM stream_ordering_to_exterm WHERE stream_ordering < ?", sql,
(self.stream_ordering_month_ago,) (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
) )
return self.runInteraction( return self.runInteraction(
"_delete_old_forward_extrem_cache", "_delete_old_forward_extrem_cache",