Make purge_history operate on tokens

As we're soon going to change how topological_ordering works
This commit is contained in:
Erik Johnston 2018-05-15 16:06:30 +01:00
parent 37dbee6490
commit 5f27ed75ad
3 changed files with 25 additions and 21 deletions

View File

@ -86,14 +86,14 @@ class MessageHandler(BaseHandler):
# map from purge id to PurgeStatus # map from purge id to PurgeStatus
self._purges_by_id = {} self._purges_by_id = {}
def start_purge_history(self, room_id, topological_ordering, def start_purge_history(self, room_id, token,
delete_local_events=False): delete_local_events=False):
"""Start off a history purge on a room. """Start off a history purge on a room.
Args: Args:
room_id (str): The room to purge from room_id (str): The room to purge from
topological_ordering (int): minimum topo ordering to preserve token (str): topological token to delete events before
delete_local_events (bool): True to delete local events as well as delete_local_events (bool): True to delete local events as well as
remote ones remote ones
@ -115,19 +115,19 @@ class MessageHandler(BaseHandler):
self._purges_by_id[purge_id] = PurgeStatus() self._purges_by_id[purge_id] = PurgeStatus()
run_in_background( run_in_background(
self._purge_history, self._purge_history,
purge_id, room_id, topological_ordering, delete_local_events, purge_id, room_id, token, delete_local_events,
) )
return purge_id return purge_id
@defer.inlineCallbacks @defer.inlineCallbacks
def _purge_history(self, purge_id, room_id, topological_ordering, def _purge_history(self, purge_id, room_id, token,
delete_local_events): delete_local_events):
"""Carry out a history purge on a room. """Carry out a history purge on a room.
Args: Args:
purge_id (str): The id for this purge purge_id (str): The id for this purge
room_id (str): The room to purge from room_id (str): The room to purge from
topological_ordering (int): minimum topo ordering to preserve token (str): topological token to delete events before
delete_local_events (bool): True to delete local events as well as delete_local_events (bool): True to delete local events as well as
remote ones remote ones
@ -138,7 +138,7 @@ class MessageHandler(BaseHandler):
try: try:
with (yield self.pagination_lock.write(room_id)): with (yield self.pagination_lock.write(room_id)):
yield self.store.purge_history( yield self.store.purge_history(
room_id, topological_ordering, delete_local_events, room_id, token, delete_local_events,
) )
logger.info("[purge] complete") logger.info("[purge] complete")
self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE

View File

@ -151,10 +151,11 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
if event.room_id != room_id: if event.room_id != room_id:
raise SynapseError(400, "Event is for wrong room.") raise SynapseError(400, "Event is for wrong room.")
depth = event.depth token = yield self.store.get_topological_token_for_event(event_id)
logger.info( logger.info(
"[purge] purging up to depth %i (event_id %s)", "[purge] purging up to token %s (event_id %s)",
depth, event_id, token, event_id,
) )
elif 'purge_up_to_ts' in body: elif 'purge_up_to_ts' in body:
ts = body['purge_up_to_ts'] ts = body['purge_up_to_ts']
@ -174,7 +175,9 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
) )
) )
if room_event_after_stream_ordering: if room_event_after_stream_ordering:
(_, depth, _) = room_event_after_stream_ordering token = yield self.store.get_topological_token_for_event(
room_event_after_stream_ordering,
)
else: else:
logger.warn( logger.warn(
"[purge] purging events not possible: No event found " "[purge] purging events not possible: No event found "
@ -187,9 +190,9 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
errcode=Codes.NOT_FOUND, errcode=Codes.NOT_FOUND,
) )
logger.info( logger.info(
"[purge] purging up to depth %i (received_ts %i => " "[purge] purging up to token %d (received_ts %i => "
"stream_ordering %i)", "stream_ordering %i)",
depth, ts, stream_ordering, token, ts, stream_ordering,
) )
else: else:
raise SynapseError( raise SynapseError(
@ -199,7 +202,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
) )
purge_id = yield self.handlers.message_handler.start_purge_history( purge_id = yield self.handlers.message_handler.start_purge_history(
room_id, depth, room_id, token,
delete_local_events=delete_local_events, delete_local_events=delete_local_events,
) )

View File

@ -33,7 +33,7 @@ from synapse.util.metrics import Measure
from synapse.api.constants import EventTypes from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError from synapse.api.errors import SynapseError
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from synapse.types import get_domain_from_id from synapse.types import get_domain_from_id, RoomStreamToken
import synapse.metrics import synapse.metrics
# these are only included to make the type annotations work # these are only included to make the type annotations work
@ -1803,15 +1803,14 @@ class EventsStore(EventsWorkerStore):
return self.runInteraction("get_all_new_events", get_all_new_events_txn) return self.runInteraction("get_all_new_events", get_all_new_events_txn)
def purge_history( def purge_history(
self, room_id, topological_ordering, delete_local_events, self, room_id, token, delete_local_events,
): ):
"""Deletes room history before a certain point """Deletes room history before a certain point
Args: Args:
room_id (str): room_id (str):
topological_ordering (int): token (str): A topological token to delete events before
minimum topo ordering to preserve
delete_local_events (bool): delete_local_events (bool):
if True, we will delete local events as well as remote ones if True, we will delete local events as well as remote ones
@ -1821,12 +1820,12 @@ class EventsStore(EventsWorkerStore):
return self.runInteraction( return self.runInteraction(
"purge_history", "purge_history",
self._purge_history_txn, room_id, topological_ordering, self._purge_history_txn, room_id, token,
delete_local_events, delete_local_events,
) )
def _purge_history_txn( def _purge_history_txn(
self, txn, room_id, topological_ordering, delete_local_events, self, txn, room_id, token, delete_local_events,
): ):
# Tables that should be pruned: # Tables that should be pruned:
# event_auth # event_auth
@ -1856,6 +1855,8 @@ class EventsStore(EventsWorkerStore):
# furthermore, we might already have the table from a previous (failed) # furthermore, we might already have the table from a previous (failed)
# purge attempt, so let's drop the table first. # purge attempt, so let's drop the table first.
token = RoomStreamToken.parse(token)
txn.execute("DROP TABLE IF EXISTS events_to_purge") txn.execute("DROP TABLE IF EXISTS events_to_purge")
txn.execute( txn.execute(
@ -1888,7 +1889,7 @@ class EventsStore(EventsWorkerStore):
rows = txn.fetchall() rows = txn.fetchall()
max_depth = max(row[0] for row in rows) max_depth = max(row[0] for row in rows)
if max_depth <= topological_ordering: if max_depth <= token.topological:
# We need to ensure we don't delete all the events from the datanase # We need to ensure we don't delete all the events from the datanase
# otherwise we wouldn't be able to send any events (due to not # otherwise we wouldn't be able to send any events (due to not
# having any backwards extremeties) # having any backwards extremeties)
@ -1904,7 +1905,7 @@ class EventsStore(EventsWorkerStore):
should_delete_expr += " AND event_id NOT LIKE ?" should_delete_expr += " AND event_id NOT LIKE ?"
should_delete_params += ("%:" + self.hs.hostname, ) should_delete_params += ("%:" + self.hs.hostname, )
should_delete_params += (room_id, topological_ordering) should_delete_params += (room_id, token.topological)
txn.execute( txn.execute(
"INSERT INTO events_to_purge" "INSERT INTO events_to_purge"