Correctly filter out extremities with soft failed prevs (#5274)

When we receive a soft failed event we, correctly, *do not* update the
forward extremity table with the event. However, if we later receive an
event that references the soft failed event we then need to remove the
soft failed events prev events from the forward extremities table,
otherwise we just build up forward extremities.

Fixes #5269
This commit is contained in:
Erik Johnston 2019-05-29 11:56:24 +01:00 committed by GitHub
parent f76d407ef3
commit 58c8ed5b0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 80 additions and 3 deletions

1
changelog.d/5274.bugfix Normal file
View File

@ -0,0 +1 @@
Fix bug where we leaked extremities when we soft failed events, leading to performance degradation.

View File

@ -554,10 +554,18 @@ class EventsStore(
e_id for event in new_events for e_id in event.prev_event_ids() e_id for event in new_events for e_id in event.prev_event_ids()
) )
# Finally, remove any events which are prev_events of any existing events. # Remove any events which are prev_events of any existing events.
existing_prevs = yield self._get_events_which_are_prevs(result) existing_prevs = yield self._get_events_which_are_prevs(result)
result.difference_update(existing_prevs) result.difference_update(existing_prevs)
# Finally handle the case where the new events have soft-failed prev
# events. If they do we need to remove them and their prev events,
# otherwise we end up with dangling extremities.
existing_prevs = yield self._get_prevs_before_rejected(
e_id for event in new_events for e_id in event.prev_event_ids()
)
result.difference_update(existing_prevs)
defer.returnValue(result) defer.returnValue(result)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -573,7 +581,7 @@ class EventsStore(
""" """
results = [] results = []
def _get_events(txn, batch): def _get_events_which_are_prevs_txn(txn, batch):
sql = """ sql = """
SELECT prev_event_id, internal_metadata SELECT prev_event_id, internal_metadata
FROM event_edges FROM event_edges
@ -596,10 +604,78 @@ class EventsStore(
) )
for chunk in batch_iter(event_ids, 100): for chunk in batch_iter(event_ids, 100):
yield self.runInteraction("_get_events_which_are_prevs", _get_events, chunk) yield self.runInteraction(
"_get_events_which_are_prevs",
_get_events_which_are_prevs_txn,
chunk,
)
defer.returnValue(results) defer.returnValue(results)
@defer.inlineCallbacks
def _get_prevs_before_rejected(self, event_ids):
"""Get soft-failed ancestors to remove from the extremities.
Given a set of events, find all those that have been soft-failed or
rejected. Returns those soft failed/rejected events and their prev
events (whether soft-failed/rejected or not), and recurses up the
prev-event graph until it finds no more soft-failed/rejected events.
This is used to find extremities that are ancestors of new events, but
are separated by soft failed events.
Args:
event_ids (Iterable[str]): Events to find prev events for. Note
that these must have already been persisted.
Returns:
Deferred[set[str]]
"""
# The set of event_ids to return. This includes all soft-failed events
# and their prev events.
existing_prevs = set()
def _get_prevs_before_rejected_txn(txn, batch):
to_recursively_check = batch
while to_recursively_check:
sql = """
SELECT
event_id, prev_event_id, internal_metadata,
rejections.event_id IS NOT NULL
FROM event_edges
INNER JOIN events USING (event_id)
LEFT JOIN rejections USING (event_id)
LEFT JOIN event_json USING (event_id)
WHERE
event_id IN (%s)
AND NOT events.outlier
""" % (
",".join("?" for _ in to_recursively_check),
)
txn.execute(sql, to_recursively_check)
to_recursively_check = []
for event_id, prev_event_id, metadata, rejected in txn:
if prev_event_id in existing_prevs:
continue
soft_failed = json.loads(metadata).get("soft_failed")
if soft_failed or rejected:
to_recursively_check.append(prev_event_id)
existing_prevs.add(prev_event_id)
for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
"_get_prevs_before_rejected",
_get_prevs_before_rejected_txn,
chunk,
)
defer.returnValue(existing_prevs)
@defer.inlineCallbacks @defer.inlineCallbacks
def _get_new_state_after_events( def _get_new_state_after_events(
self, room_id, events_context, old_latest_event_ids, new_latest_event_ids self, room_id, events_context, old_latest_event_ids, new_latest_event_ids