Refactor have_seen_events to reduce OOMs (#12886)

My server is currently OOMing in the middle of have_seen_events, so let's try
to fix that.
This commit is contained in:
Richard van der Hoff 2022-05-27 11:27:33 +02:00 committed by GitHub
parent 49f06866e4
commit bc1beebc27
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 25 additions and 18 deletions

1
changelog.d/12886.misc Normal file
View File

@ -0,0 +1 @@
Refactor `have_seen_events` to reduce memory consumed when processing federation traffic.

View File

@ -1356,14 +1356,23 @@ class EventsWorkerStore(SQLBaseStore):
Returns: Returns:
The set of events we have already seen. The set of events we have already seen.
""" """
res = await self._have_seen_events_dict(
(room_id, event_id) for event_id in event_ids # @cachedList chomps lots of memory if you call it with a big list, so
# we break it down. However, each batch requires its own index scan, so we make
# the batches as big as possible.
results: Set[str] = set()
for chunk in batch_iter(event_ids, 500):
r = await self._have_seen_events_dict(
[(room_id, event_id) for event_id in chunk]
) )
return {eid for ((_rid, eid), have_event) in res.items() if have_event} results.update(eid for ((_rid, eid), have_event) in r.items() if have_event)
return results
@cachedList(cached_method_name="have_seen_event", list_name="keys") @cachedList(cached_method_name="have_seen_event", list_name="keys")
async def _have_seen_events_dict( async def _have_seen_events_dict(
self, keys: Iterable[Tuple[str, str]] self, keys: Collection[Tuple[str, str]]
) -> Dict[Tuple[str, str], bool]: ) -> Dict[Tuple[str, str], bool]:
"""Helper for have_seen_events """Helper for have_seen_events
@ -1375,11 +1384,12 @@ class EventsWorkerStore(SQLBaseStore):
cache_results = { cache_results = {
(rid, eid) for (rid, eid) in keys if self._get_event_cache.contains((eid,)) (rid, eid) for (rid, eid) in keys if self._get_event_cache.contains((eid,))
} }
results = {x: True for x in cache_results} results = dict.fromkeys(cache_results, True)
remaining = [k for k in keys if k not in cache_results]
if not remaining:
return results
def have_seen_events_txn( def have_seen_events_txn(txn: LoggingTransaction) -> None:
txn: LoggingTransaction, chunk: Tuple[Tuple[str, str], ...]
) -> None:
# we deliberately do *not* query the database for room_id, to make the # we deliberately do *not* query the database for room_id, to make the
# query an index-only lookup on `events_event_id_key`. # query an index-only lookup on `events_event_id_key`.
# #
@ -1387,21 +1397,17 @@ class EventsWorkerStore(SQLBaseStore):
sql = "SELECT event_id FROM events AS e WHERE " sql = "SELECT event_id FROM events AS e WHERE "
clause, args = make_in_list_sql_clause( clause, args = make_in_list_sql_clause(
txn.database_engine, "e.event_id", [eid for (_rid, eid) in chunk] txn.database_engine, "e.event_id", [eid for (_rid, eid) in remaining]
) )
txn.execute(sql + clause, args) txn.execute(sql + clause, args)
found_events = {eid for eid, in txn} found_events = {eid for eid, in txn}
# ... and then we can update the results for each row in the batch # ... and then we can update the results for each key
results.update({(rid, eid): (eid in found_events) for (rid, eid) in chunk}) results.update(
{(rid, eid): (eid in found_events) for (rid, eid) in remaining}
# each batch requires its own index scan, so we make the batches as big as
# possible.
for chunk in batch_iter((k for k in keys if k not in cache_results), 500):
await self.db_pool.runInteraction(
"have_seen_events", have_seen_events_txn, chunk
) )
await self.db_pool.runInteraction("have_seen_events", have_seen_events_txn)
return results return results
@cached(max_entries=100000, tree=True) @cached(max_entries=100000, tree=True)