Merge pull request #2858 from matrix-org/rav/purge_updates

delete_local_events for purge_room_history
This commit is contained in:
Richard van der Hoff 2018-02-09 14:11:00 +00:00 committed by GitHub
commit 10b34dbb9a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 100 additions and 41 deletions

View file

@ -51,7 +51,7 @@ class MessageHandler(BaseHandler):
self.pagination_lock = ReadWriteLock()
@defer.inlineCallbacks
def purge_history(self, room_id, event_id):
def purge_history(self, room_id, event_id, delete_local_events=False):
event = yield self.store.get_event(event_id)
if event.room_id != room_id:
@ -60,7 +60,7 @@ class MessageHandler(BaseHandler):
depth = event.depth
with (yield self.pagination_lock.write(room_id)):
yield self.store.delete_old_state(room_id, depth)
yield self.store.purge_history(room_id, depth, delete_local_events)
@defer.inlineCallbacks
def get_messages(self, requester, room_id=None, pagin_config=None,

View file

@ -148,11 +148,13 @@ def parse_string_from_args(args, name, default=None, required=False,
return default
def parse_json_value_from_request(request):
def parse_json_value_from_request(request, allow_empty_body=False):
"""Parse a JSON value from the body of a twisted HTTP request.
Args:
request: the twisted HTTP request.
allow_empty_body (bool): if True, an empty body will be accepted and
turned into None
Returns:
The JSON value.
@ -165,6 +167,9 @@ def parse_json_value_from_request(request):
except Exception:
raise SynapseError(400, "Error reading JSON content.")
if not content_bytes and allow_empty_body:
return None
try:
content = simplejson.loads(content_bytes)
except Exception as e:
@ -174,17 +179,24 @@ def parse_json_value_from_request(request):
return content
def parse_json_object_from_request(request):
def parse_json_object_from_request(request, allow_empty_body=False):
"""Parse a JSON object from the body of a twisted HTTP request.
Args:
request: the twisted HTTP request.
allow_empty_body (bool): if True, an empty body will be accepted and
turned into an empty dict.
Raises:
SynapseError if the request body couldn't be decoded as JSON or
if it wasn't a JSON object.
"""
content = parse_json_value_from_request(request)
content = parse_json_value_from_request(
request, allow_empty_body=allow_empty_body,
)
if allow_empty_body and content is None:
return {}
if type(content) != dict:
message = "Content must be a JSON object."

View file

@ -129,7 +129,16 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
if not is_admin:
raise AuthError(403, "You are not a server admin")
yield self.handlers.message_handler.purge_history(room_id, event_id)
body = parse_json_object_from_request(request, allow_empty_body=True)
delete_local_events = bool(
body.get("delete_local_history", False)
)
yield self.handlers.message_handler.purge_history(
room_id, event_id,
delete_local_events=delete_local_events,
)
defer.returnValue((200, {}))

View file

@ -2063,16 +2063,32 @@ class EventsStore(SQLBaseStore):
)
return self.runInteraction("get_all_new_events", get_all_new_events_txn)
def delete_old_state(self, room_id, topological_ordering):
return self.runInteraction(
"delete_old_state",
self._delete_old_state_txn, room_id, topological_ordering
)
def purge_history(
self, room_id, topological_ordering, delete_local_events,
):
"""Deletes room history before a certain point
def _delete_old_state_txn(self, txn, room_id, topological_ordering):
"""Deletes old room state
Args:
room_id (str):
topological_ordering (int):
minimum topo ordering to preserve
delete_local_events (bool):
if True, we will delete local events as well as remote ones
(instead of just marking them as outliers and deleting their
state groups).
"""
return self.runInteraction(
"purge_history",
self._purge_history_txn, room_id, topological_ordering,
delete_local_events,
)
def _purge_history_txn(
self, txn, room_id, topological_ordering, delete_local_events,
):
# Tables that should be pruned:
# event_auth
# event_backward_extremities
@ -2113,7 +2129,7 @@ class EventsStore(SQLBaseStore):
400, "topological_ordering is greater than forward extremeties"
)
logger.debug("[purge] looking for events to delete")
logger.info("[purge] looking for events to delete")
txn.execute(
"SELECT event_id, state_key FROM events"
@ -2125,16 +2141,16 @@ class EventsStore(SQLBaseStore):
to_delete = [
(event_id,) for event_id, state_key in event_rows
if state_key is None and not self.hs.is_mine_id(event_id)
if state_key is None and (
delete_local_events or not self.hs.is_mine_id(event_id)
)
]
logger.info(
"[purge] found %i events before cutoff, of which %i are remote"
" non-state events to delete", len(event_rows), len(to_delete))
"[purge] found %i events before cutoff, of which %i can be deleted",
len(event_rows), len(to_delete),
)
for event_id, state_key in event_rows:
txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
logger.debug("[purge] Finding new backward extremities")
logger.info("[purge] Finding new backward extremities")
# We calculate the new entries for the backward extremeties by finding
# all events that point to events that are to be purged
@ -2148,7 +2164,7 @@ class EventsStore(SQLBaseStore):
)
new_backwards_extrems = txn.fetchall()
logger.debug("[purge] replacing backward extremities: %r", new_backwards_extrems)
logger.info("[purge] replacing backward extremities: %r", new_backwards_extrems)
txn.execute(
"DELETE FROM event_backward_extremities WHERE room_id = ?",
@ -2164,7 +2180,7 @@ class EventsStore(SQLBaseStore):
]
)
logger.debug("[purge] finding redundant state groups")
logger.info("[purge] finding redundant state groups")
# Get all state groups that are only referenced by events that are
# to be deleted.
@ -2181,15 +2197,15 @@ class EventsStore(SQLBaseStore):
)
state_rows = txn.fetchall()
logger.debug("[purge] found %i redundant state groups", len(state_rows))
logger.info("[purge] found %i redundant state groups", len(state_rows))
# make a set of the redundant state groups, so that we can look them up
# efficiently
state_groups_to_delete = set([sg for sg, in state_rows])
# Now we get all the state groups that rely on these state groups
logger.debug("[purge] finding state groups which depend on redundant"
" state groups")
logger.info("[purge] finding state groups which depend on redundant"
" state groups")
remaining_state_groups = []
for i in xrange(0, len(state_rows), 100):
chunk = [sg for sg, in state_rows[i:i + 100]]
@ -2214,7 +2230,7 @@ class EventsStore(SQLBaseStore):
# Now we turn the state groups that reference to-be-deleted state
# groups to non delta versions.
for sg in remaining_state_groups:
logger.debug("[purge] de-delta-ing remaining state group %s", sg)
logger.info("[purge] de-delta-ing remaining state group %s", sg)
curr_state = self._get_state_groups_from_groups_txn(
txn, [sg], types=None
)
@ -2251,7 +2267,7 @@ class EventsStore(SQLBaseStore):
],
)
logger.debug("[purge] removing redundant state groups")
logger.info("[purge] removing redundant state groups")
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
state_rows
@ -2261,18 +2277,15 @@ class EventsStore(SQLBaseStore):
state_rows
)
# Delete all non-state
logger.debug("[purge] removing events from event_to_state_groups")
logger.info("[purge] removing events from event_to_state_groups")
txn.executemany(
"DELETE FROM event_to_state_groups WHERE event_id = ?",
[(event_id,) for event_id, _ in event_rows]
)
logger.debug("[purge] updating room_depth")
txn.execute(
"UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
(topological_ordering, room_id,)
)
for event_id, _ in event_rows:
txn.call_after(self._get_state_group_for_event.invalidate, (
event_id,
))
# Delete all remote non-state events
for table in (
@ -2290,7 +2303,8 @@ class EventsStore(SQLBaseStore):
"event_signatures",
"rejections",
):
logger.debug("[purge] removing remote non-state events from %s", table)
logger.info("[purge] removing remote non-state events from %s",
table)
txn.executemany(
"DELETE FROM %s WHERE event_id = ?" % (table,),
@ -2298,16 +2312,30 @@ class EventsStore(SQLBaseStore):
)
# Mark all state and own events as outliers
logger.debug("[purge] marking remaining events as outliers")
logger.info("[purge] marking remaining events as outliers")
txn.executemany(
"UPDATE events SET outlier = ?"
" WHERE event_id = ?",
[
(True, event_id,) for event_id, state_key in event_rows
if state_key is not None or self.hs.is_mine_id(event_id)
if state_key is not None or (
not delete_local_events and self.hs.is_mine_id(event_id)
)
]
)
# synapse tries to take out an exclusive lock on room_depth whenever it
# persists events (because upsert), and once we run this update, we
# will block that for the rest of our transaction.
#
# So, let's stick it at the end so that we don't block event
# persistence.
logger.info("[purge] updating room_depth")
txn.execute(
"UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
(topological_ordering, room_id,)
)
logger.info("[purge] done")
@defer.inlineCallbacks