Handle inbound events from federation asynchronously (#10272)

Fixes #9490

This will break a couple of SyTest that are expecting failures to be added to the response of a federation /send, which obviously doesn't happen now that things are asynchronous.

Two drawbacks:

    Currently there is no logic to handle any events left in the staging area after restart, and so they'll only be handled on the next incoming event in that room. That can be fixed separately.
    We now only process one event per room at a time. This can be fixed up further down the line.
This commit is contained in:
Erik Johnston 2021-06-29 19:55:22 +01:00 committed by GitHub
parent 85d237eba7
commit c54db67d0e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 241 additions and 5 deletions

View file

@ -14,18 +14,20 @@
import itertools
import logging
from queue import Empty, PriorityQueue
from typing import Collection, Dict, Iterable, List, Set, Tuple
from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple
from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import StoreError
from synapse.events import EventBase
from synapse.api.room_versions import RoomVersion
from synapse.events import EventBase, make_event_from_dict
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.signatures import SignatureWorkerStore
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache
from synapse.util.iterutils import batch_iter
@ -1044,6 +1046,107 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
_delete_old_forward_extrem_cache_txn,
)
async def insert_received_event_to_staging(
self, origin: str, event: EventBase
) -> None:
"""Insert a newly received event from federation into the staging area."""
# We use an upsert here to handle the case where we see the same event
# from the same server multiple times.
await self.db_pool.simple_upsert(
table="federation_inbound_events_staging",
keyvalues={
"origin": origin,
"event_id": event.event_id,
},
values={},
insertion_values={
"room_id": event.room_id,
"received_ts": self._clock.time_msec(),
"event_json": json_encoder.encode(event.get_dict()),
"internal_metadata": json_encoder.encode(
event.internal_metadata.get_dict()
),
},
desc="insert_received_event_to_staging",
)
async def remove_received_event_from_staging(
self,
origin: str,
event_id: str,
) -> None:
"""Remove the given event from the staging area"""
await self.db_pool.simple_delete(
table="federation_inbound_events_staging",
keyvalues={
"origin": origin,
"event_id": event_id,
},
desc="remove_received_event_from_staging",
)
async def get_next_staged_event_id_for_room(
self,
room_id: str,
) -> Optional[Tuple[str, str]]:
"""Get the next event ID in the staging area for the given room."""
def _get_next_staged_event_id_for_room_txn(txn):
sql = """
SELECT origin, event_id
FROM federation_inbound_events_staging
WHERE room_id = ?
ORDER BY received_ts ASC
LIMIT 1
"""
txn.execute(sql, (room_id,))
return txn.fetchone()
return await self.db_pool.runInteraction(
"get_next_staged_event_id_for_room", _get_next_staged_event_id_for_room_txn
)
async def get_next_staged_event_for_room(
self,
room_id: str,
room_version: RoomVersion,
) -> Optional[Tuple[str, EventBase]]:
"""Get the next event in the staging area for the given room."""
def _get_next_staged_event_for_room_txn(txn):
sql = """
SELECT event_json, internal_metadata, origin
FROM federation_inbound_events_staging
WHERE room_id = ?
ORDER BY received_ts ASC
LIMIT 1
"""
txn.execute(sql, (room_id,))
return txn.fetchone()
row = await self.db_pool.runInteraction(
"get_next_staged_event_for_room", _get_next_staged_event_for_room_txn
)
if not row:
return None
event_d = db_to_json(row[0])
internal_metadata_d = db_to_json(row[1])
origin = row[2]
event = make_event_from_dict(
event_dict=event_d,
room_version=room_version,
internal_metadata_dict=internal_metadata_d,
)
return origin, event
class EventFederationStore(EventFederationWorkerStore):
"""Responsible for storing and serving up the various graphs associated

View file

@ -0,0 +1,32 @@
/* Copyright 2021 The Matrix.org Foundation C.I.C
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- A staging area for newly received events over federation.
--
-- Note we may store the same event multiple times if it comes from different
-- servers; this is to handle the case if we get a redacted and non-redacted
-- versions of the event.
CREATE TABLE federation_inbound_events_staging (
origin TEXT NOT NULL,
room_id TEXT NOT NULL,
event_id TEXT NOT NULL,
received_ts BIGINT NOT NULL,
event_json TEXT NOT NULL,
internal_metadata TEXT NOT NULL
);
CREATE INDEX federation_inbound_events_staging_room ON federation_inbound_events_staging(room_id, received_ts);
CREATE UNIQUE INDEX federation_inbound_events_staging_instance_event ON federation_inbound_events_staging(origin, event_id);