Prune inbound federation queues if they get too long (#10390)

This commit is contained in:
Erik Johnston 2021-08-02 14:37:25 +01:00 committed by GitHub
parent ba5287f5e8
commit 01d45fe964
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 177 additions and 2 deletions

1
changelog.d/10390.misc Normal file
View File

@ -0,0 +1 @@
Prune inbound federation inbound queues for a room if they get too large.

View File

@ -1024,6 +1024,23 @@ class FederationServer(FederationBase):
origin, event = next origin, event = next
# Prune the event queue if it's getting large.
#
# We do this *after* handling the first event as the common case is
# that the queue is empty (/has the single event in), and so there's
# no need to do this check.
pruned = await self.store.prune_staged_events_in_room(room_id, room_version)
if pruned:
# If we have pruned the queue check we need to refetch the next
# event to handle.
next = await self.store.get_next_staged_event_for_room(
room_id, room_version
)
if not next:
break
origin, event = next
lock = await self.store.try_acquire_lock( lock = await self.store.try_acquire_lock(
_INBOUND_EVENT_HANDLING_LOCK_NAME, room_id _INBOUND_EVENT_HANDLING_LOCK_NAME, room_id
) )

View File

@ -16,11 +16,11 @@ import logging
from queue import Empty, PriorityQueue from queue import Empty, PriorityQueue
from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple
from prometheus_client import Gauge from prometheus_client import Counter, Gauge
from synapse.api.constants import MAX_DEPTH from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import StoreError from synapse.api.errors import StoreError
from synapse.api.room_versions import RoomVersion from synapse.api.room_versions import EventFormatVersions, RoomVersion
from synapse.events import EventBase, make_event_from_dict from synapse.events import EventBase, make_event_from_dict
from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@ -44,6 +44,12 @@ number_pdus_in_federation_queue = Gauge(
"The total number of events in the inbound federation staging", "The total number of events in the inbound federation staging",
) )
pdus_pruned_from_federation_queue = Counter(
"synapse_federation_server_number_inbound_pdu_pruned",
"The number of events in the inbound federation staging that have been "
"pruned due to the queue getting too long",
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -1277,6 +1283,100 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
return origin, event return origin, event
async def prune_staged_events_in_room(
self,
room_id: str,
room_version: RoomVersion,
) -> bool:
"""Checks if there are lots of staged events for the room, and if so
prune them down.
Returns:
Whether any events were pruned
"""
# First check the size of the queue.
count = await self.db_pool.simple_select_one_onecol(
table="federation_inbound_events_staging",
keyvalues={"room_id": room_id},
retcol="COALESCE(COUNT(*), 0)",
desc="prune_staged_events_in_room_count",
)
if count < 100:
return False
# If the queue is too large, then we want clear the entire queue,
# keeping only the forward extremities (i.e. the events not referenced
# by other events in the queue). We do this so that we can always
# backpaginate in all the events we have dropped.
rows = await self.db_pool.simple_select_list(
table="federation_inbound_events_staging",
keyvalues={"room_id": room_id},
retcols=("event_id", "event_json"),
desc="prune_staged_events_in_room_fetch",
)
# Find the set of events referenced by those in the queue, as well as
# collecting all the event IDs in the queue.
referenced_events: Set[str] = set()
seen_events: Set[str] = set()
for row in rows:
event_id = row["event_id"]
seen_events.add(event_id)
event_d = db_to_json(row["event_json"])
# We don't bother parsing the dicts into full blown event objects,
# as that is needlessly expensive.
# We haven't checked that the `prev_events` have the right format
# yet, so we check as we go.
prev_events = event_d.get("prev_events", [])
if not isinstance(prev_events, list):
logger.info("Invalid prev_events for %s", event_id)
continue
if room_version.event_format == EventFormatVersions.V1:
for prev_event_tuple in prev_events:
if not isinstance(prev_event_tuple, list) or len(prev_events) != 2:
logger.info("Invalid prev_events for %s", event_id)
break
prev_event_id = prev_event_tuple[0]
if not isinstance(prev_event_id, str):
logger.info("Invalid prev_events for %s", event_id)
break
referenced_events.add(prev_event_id)
else:
for prev_event_id in prev_events:
if not isinstance(prev_event_id, str):
logger.info("Invalid prev_events for %s", event_id)
break
referenced_events.add(prev_event_id)
to_delete = referenced_events & seen_events
if not to_delete:
return False
pdus_pruned_from_federation_queue.inc(len(to_delete))
logger.info(
"Pruning %d events in room %s from federation queue",
len(to_delete),
room_id,
)
await self.db_pool.simple_delete_many(
table="federation_inbound_events_staging",
keyvalues={"room_id": room_id},
iterable=to_delete,
column="event_id",
desc="prune_staged_events_in_room_delete",
)
return True
async def get_all_rooms_with_staged_incoming_events(self) -> List[str]: async def get_all_rooms_with_staged_incoming_events(self) -> List[str]:
"""Get the room IDs of all events currently staged.""" """Get the room IDs of all events currently staged."""
return await self.db_pool.simple_select_onecol( return await self.db_pool.simple_select_onecol(

View File

@ -15,7 +15,9 @@
import attr import attr
from parameterized import parameterized from parameterized import parameterized
from synapse.api.room_versions import RoomVersions
from synapse.events import _EventInternalMetadata from synapse.events import _EventInternalMetadata
from synapse.util import json_encoder
import tests.unittest import tests.unittest
import tests.utils import tests.utils
@ -504,6 +506,61 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
) )
self.assertSetEqual(difference, set()) self.assertSetEqual(difference, set())
def test_prune_inbound_federation_queue(self):
"Test that pruning of inbound federation queues work"
room_id = "some_room_id"
# Insert a bunch of events that all reference the previous one.
self.get_success(
self.store.db_pool.simple_insert_many(
table="federation_inbound_events_staging",
values=[
{
"origin": "some_origin",
"room_id": room_id,
"received_ts": 0,
"event_id": f"$fake_event_id_{i + 1}",
"event_json": json_encoder.encode(
{"prev_events": [f"$fake_event_id_{i}"]}
),
"internal_metadata": "{}",
}
for i in range(500)
],
desc="test_prune_inbound_federation_queue",
)
)
# Calling prune once should return True, i.e. a prune happen. The second
# time it shouldn't.
pruned = self.get_success(
self.store.prune_staged_events_in_room(room_id, RoomVersions.V6)
)
self.assertTrue(pruned)
pruned = self.get_success(
self.store.prune_staged_events_in_room(room_id, RoomVersions.V6)
)
self.assertFalse(pruned)
# Assert that we only have a single event left in the queue, and that it
# is the last one.
count = self.get_success(
self.store.db_pool.simple_select_one_onecol(
table="federation_inbound_events_staging",
keyvalues={"room_id": room_id},
retcol="COALESCE(COUNT(*), 0)",
desc="test_prune_inbound_federation_queue",
)
)
self.assertEqual(count, 1)
_, event_id = self.get_success(
self.store.get_next_staged_event_id_for_room(room_id)
)
self.assertEqual(event_id, "$fake_event_id_500")
@attr.s @attr.s
class FakeEvent: class FakeEvent: