mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2024-12-25 09:59:27 -05:00
Keep track when we try and fail to process a pulled event (#13589)
We can follow-up this PR with: 1. Only try to backfill from an event if we haven't tried recently -> https://github.com/matrix-org/synapse/issues/13622 1. When we decide to backfill that event again, process it in the background so it doesn't block and make `/messages` slow when we know it will probably fail again -> https://github.com/matrix-org/synapse/issues/13623 1. Generally track failures everywhere we try and fail to pull an event over federation -> https://github.com/matrix-org/synapse/issues/13700 Fix https://github.com/matrix-org/synapse/issues/13621 Part of https://github.com/matrix-org/synapse/issues/13356 Mentioned in [internal doc](https://docs.google.com/document/d/1lvUoVfYUiy6UaHB6Rb4HicjaJAU40-APue9Q4vzuW3c/edit#bookmark=id.qv7cj51sv9i5)
This commit is contained in:
parent
666ae87729
commit
957e3d74fc
1
changelog.d/13589.feature
Normal file
1
changelog.d/13589.feature
Normal file
@ -0,0 +1 @@
|
|||||||
|
Keep track when we attempt to backfill an event but fail so we can intelligently back-off in the future.
|
@ -862,6 +862,9 @@ class FederationEventHandler:
|
|||||||
self._sanity_check_event(event)
|
self._sanity_check_event(event)
|
||||||
except SynapseError as err:
|
except SynapseError as err:
|
||||||
logger.warning("Event %s failed sanity check: %s", event_id, err)
|
logger.warning("Event %s failed sanity check: %s", event_id, err)
|
||||||
|
await self._store.record_event_failed_pull_attempt(
|
||||||
|
event.room_id, event_id, str(err)
|
||||||
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@ -897,6 +900,10 @@ class FederationEventHandler:
|
|||||||
backfilled=backfilled,
|
backfilled=backfilled,
|
||||||
)
|
)
|
||||||
except FederationError as e:
|
except FederationError as e:
|
||||||
|
await self._store.record_event_failed_pull_attempt(
|
||||||
|
event.room_id, event_id, str(e)
|
||||||
|
)
|
||||||
|
|
||||||
if e.code == 403:
|
if e.code == 403:
|
||||||
logger.warning("Pulled event %s failed history check.", event_id)
|
logger.warning("Pulled event %s failed history check.", event_id)
|
||||||
else:
|
else:
|
||||||
|
@ -1294,6 +1294,51 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
|||||||
|
|
||||||
return event_id_results
|
return event_id_results
|
||||||
|
|
||||||
|
@trace
|
||||||
|
async def record_event_failed_pull_attempt(
|
||||||
|
self, room_id: str, event_id: str, cause: str
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Record when we fail to pull an event over federation.
|
||||||
|
|
||||||
|
This information allows us to be more intelligent when we decide to
|
||||||
|
retry (we don't need to fail over and over) and we can process that
|
||||||
|
event in the background so we don't block on it each time.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id: The room where the event failed to pull from
|
||||||
|
event_id: The event that failed to be fetched or processed
|
||||||
|
cause: The error message or reason that we failed to pull the event
|
||||||
|
"""
|
||||||
|
await self.db_pool.runInteraction(
|
||||||
|
"record_event_failed_pull_attempt",
|
||||||
|
self._record_event_failed_pull_attempt_upsert_txn,
|
||||||
|
room_id,
|
||||||
|
event_id,
|
||||||
|
cause,
|
||||||
|
db_autocommit=True, # Safe as it's a single upsert
|
||||||
|
)
|
||||||
|
|
||||||
|
def _record_event_failed_pull_attempt_upsert_txn(
|
||||||
|
self,
|
||||||
|
txn: LoggingTransaction,
|
||||||
|
room_id: str,
|
||||||
|
event_id: str,
|
||||||
|
cause: str,
|
||||||
|
) -> None:
|
||||||
|
sql = """
|
||||||
|
INSERT INTO event_failed_pull_attempts (
|
||||||
|
room_id, event_id, num_attempts, last_attempt_ts, last_cause
|
||||||
|
)
|
||||||
|
VALUES (?, ?, ?, ?, ?)
|
||||||
|
ON CONFLICT (room_id, event_id) DO UPDATE SET
|
||||||
|
num_attempts=event_failed_pull_attempts.num_attempts + 1,
|
||||||
|
last_attempt_ts=EXCLUDED.last_attempt_ts,
|
||||||
|
last_cause=EXCLUDED.last_cause;
|
||||||
|
"""
|
||||||
|
|
||||||
|
txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))
|
||||||
|
|
||||||
async def get_missing_events(
|
async def get_missing_events(
|
||||||
self,
|
self,
|
||||||
room_id: str,
|
room_id: str,
|
||||||
|
@ -2435,17 +2435,31 @@ class PersistEventsStore:
|
|||||||
"DELETE FROM event_backward_extremities"
|
"DELETE FROM event_backward_extremities"
|
||||||
" WHERE event_id = ? AND room_id = ?"
|
" WHERE event_id = ? AND room_id = ?"
|
||||||
)
|
)
|
||||||
|
backward_extremity_tuples_to_remove = [
|
||||||
|
(ev.event_id, ev.room_id)
|
||||||
|
for ev in events
|
||||||
|
if not ev.internal_metadata.is_outlier()
|
||||||
|
# If we encountered an event with no prev_events, then we might
|
||||||
|
# as well remove it now because it won't ever have anything else
|
||||||
|
# to backfill from.
|
||||||
|
or len(ev.prev_event_ids()) == 0
|
||||||
|
]
|
||||||
txn.execute_batch(
|
txn.execute_batch(
|
||||||
query,
|
query,
|
||||||
[
|
backward_extremity_tuples_to_remove,
|
||||||
(ev.event_id, ev.room_id)
|
)
|
||||||
for ev in events
|
|
||||||
if not ev.internal_metadata.is_outlier()
|
# Clear out the failed backfill attempts after we successfully pulled
|
||||||
# If we encountered an event with no prev_events, then we might
|
# the event. Since we no longer need these events as backward
|
||||||
# as well remove it now because it won't ever have anything else
|
# extremities, it also means that they won't be backfilled from again so
|
||||||
# to backfill from.
|
# we no longer need to store the backfill attempts around it.
|
||||||
or len(ev.prev_event_ids()) == 0
|
query = """
|
||||||
],
|
DELETE FROM event_failed_pull_attempts
|
||||||
|
WHERE event_id = ? and room_id = ?
|
||||||
|
"""
|
||||||
|
txn.execute_batch(
|
||||||
|
query,
|
||||||
|
backward_extremity_tuples_to_remove,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -81,6 +81,8 @@ Changes in SCHEMA_VERSION = 72:
|
|||||||
Changes in SCHEMA_VERSION = 73;
|
Changes in SCHEMA_VERSION = 73;
|
||||||
- thread_id column is added to event_push_actions, event_push_actions_staging
|
- thread_id column is added to event_push_actions, event_push_actions_staging
|
||||||
event_push_summary, receipts_linearized, and receipts_graph.
|
event_push_summary, receipts_linearized, and receipts_graph.
|
||||||
|
- Add table `event_failed_pull_attempts` to keep track when we fail to pull
|
||||||
|
events over federation.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
@ -0,0 +1,29 @@
|
|||||||
|
/* Copyright 2022 The Matrix.org Foundation C.I.C
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
-- Add a table that keeps track of when we failed to pull an event over
|
||||||
|
-- federation (via /backfill, `/event`, `/get_missing_events`, etc). This allows
|
||||||
|
-- us to be more intelligent when we decide to retry (we don't need to fail over
|
||||||
|
-- and over) and we can process that event in the background so we don't block
|
||||||
|
-- on it each time.
|
||||||
|
CREATE TABLE IF NOT EXISTS event_failed_pull_attempts(
|
||||||
|
room_id TEXT NOT NULL REFERENCES rooms (room_id),
|
||||||
|
event_id TEXT NOT NULL,
|
||||||
|
num_attempts INT NOT NULL,
|
||||||
|
last_attempt_ts BIGINT NOT NULL,
|
||||||
|
last_cause TEXT NOT NULL,
|
||||||
|
PRIMARY KEY (room_id, event_id)
|
||||||
|
);
|
@ -227,3 +227,225 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
|
|||||||
|
|
||||||
if prev_exists_as_outlier:
|
if prev_exists_as_outlier:
|
||||||
self.mock_federation_transport_client.get_event.assert_not_called()
|
self.mock_federation_transport_client.get_event.assert_not_called()
|
||||||
|
|
||||||
|
def test_process_pulled_event_records_failed_backfill_attempts(
|
||||||
|
self,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Test to make sure that failed backfill attempts for an event are
|
||||||
|
recorded in the `event_failed_pull_attempts` table.
|
||||||
|
|
||||||
|
In this test, we pretend we are processing a "pulled" event via
|
||||||
|
backfill. The pulled event has a fake `prev_event` which our server has
|
||||||
|
obviously never seen before so it attempts to request the state at that
|
||||||
|
`prev_event` which expectedly fails because it's a fake event. Because
|
||||||
|
the server can't fetch the state at the missing `prev_event`, the
|
||||||
|
"pulled" event fails the history check and is fails to process.
|
||||||
|
|
||||||
|
We check that we correctly record the number of failed pull attempts
|
||||||
|
of the pulled event and as a sanity check, that the "pulled" event isn't
|
||||||
|
persisted.
|
||||||
|
"""
|
||||||
|
OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
|
||||||
|
main_store = self.hs.get_datastores().main
|
||||||
|
|
||||||
|
# Create the room
|
||||||
|
user_id = self.register_user("kermit", "test")
|
||||||
|
tok = self.login("kermit", "test")
|
||||||
|
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
|
||||||
|
room_version = self.get_success(main_store.get_room_version(room_id))
|
||||||
|
|
||||||
|
# We expect an outbound request to /state_ids, so stub that out
|
||||||
|
self.mock_federation_transport_client.get_room_state_ids.return_value = make_awaitable(
|
||||||
|
{
|
||||||
|
# Mimic the other server not knowing about the state at all.
|
||||||
|
# We want to cause Synapse to throw an error (`Unable to get
|
||||||
|
# missing prev_event $fake_prev_event`) and fail to backfill
|
||||||
|
# the pulled event.
|
||||||
|
"pdu_ids": [],
|
||||||
|
"auth_chain_ids": [],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
# We also expect an outbound request to /state
|
||||||
|
self.mock_federation_transport_client.get_room_state.return_value = make_awaitable(
|
||||||
|
StateRequestResponse(
|
||||||
|
# Mimic the other server not knowing about the state at all.
|
||||||
|
# We want to cause Synapse to throw an error (`Unable to get
|
||||||
|
# missing prev_event $fake_prev_event`) and fail to backfill
|
||||||
|
# the pulled event.
|
||||||
|
auth_events=[],
|
||||||
|
state=[],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
pulled_event = make_event_from_dict(
|
||||||
|
self.add_hashes_and_signatures_from_other_server(
|
||||||
|
{
|
||||||
|
"type": "test_regular_type",
|
||||||
|
"room_id": room_id,
|
||||||
|
"sender": OTHER_USER,
|
||||||
|
"prev_events": [
|
||||||
|
# The fake prev event will make the pulled event fail
|
||||||
|
# the history check (`Unable to get missing prev_event
|
||||||
|
# $fake_prev_event`)
|
||||||
|
"$fake_prev_event"
|
||||||
|
],
|
||||||
|
"auth_events": [],
|
||||||
|
"origin_server_ts": 1,
|
||||||
|
"depth": 12,
|
||||||
|
"content": {"body": "pulled"},
|
||||||
|
}
|
||||||
|
),
|
||||||
|
room_version,
|
||||||
|
)
|
||||||
|
|
||||||
|
# The function under test: try to process the pulled event
|
||||||
|
with LoggingContext("test"):
|
||||||
|
self.get_success(
|
||||||
|
self.hs.get_federation_event_handler()._process_pulled_event(
|
||||||
|
self.OTHER_SERVER_NAME, pulled_event, backfilled=True
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Make sure our failed pull attempt was recorded
|
||||||
|
backfill_num_attempts = self.get_success(
|
||||||
|
main_store.db_pool.simple_select_one_onecol(
|
||||||
|
table="event_failed_pull_attempts",
|
||||||
|
keyvalues={"event_id": pulled_event.event_id},
|
||||||
|
retcol="num_attempts",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.assertEqual(backfill_num_attempts, 1)
|
||||||
|
|
||||||
|
# The function under test: try to process the pulled event again
|
||||||
|
with LoggingContext("test"):
|
||||||
|
self.get_success(
|
||||||
|
self.hs.get_federation_event_handler()._process_pulled_event(
|
||||||
|
self.OTHER_SERVER_NAME, pulled_event, backfilled=True
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Make sure our second failed pull attempt was recorded (`num_attempts` was incremented)
|
||||||
|
backfill_num_attempts = self.get_success(
|
||||||
|
main_store.db_pool.simple_select_one_onecol(
|
||||||
|
table="event_failed_pull_attempts",
|
||||||
|
keyvalues={"event_id": pulled_event.event_id},
|
||||||
|
retcol="num_attempts",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.assertEqual(backfill_num_attempts, 2)
|
||||||
|
|
||||||
|
# And as a sanity check, make sure the event was not persisted through all of this.
|
||||||
|
persisted = self.get_success(
|
||||||
|
main_store.get_event(pulled_event.event_id, allow_none=True)
|
||||||
|
)
|
||||||
|
self.assertIsNone(
|
||||||
|
persisted,
|
||||||
|
"pulled event that fails the history check should not be persisted at all",
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_process_pulled_event_clears_backfill_attempts_after_being_successfully_persisted(
|
||||||
|
self,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Test to make sure that failed pull attempts
|
||||||
|
(`event_failed_pull_attempts` table) for an event are cleared after the
|
||||||
|
event is successfully persisted.
|
||||||
|
|
||||||
|
In this test, we pretend we are processing a "pulled" event via
|
||||||
|
backfill. The pulled event succesfully processes and the backward
|
||||||
|
extremeties are updated along with clearing out any failed pull attempts
|
||||||
|
for those old extremities.
|
||||||
|
|
||||||
|
We check that we correctly cleared failed pull attempts of the
|
||||||
|
pulled event.
|
||||||
|
"""
|
||||||
|
OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
|
||||||
|
main_store = self.hs.get_datastores().main
|
||||||
|
|
||||||
|
# Create the room
|
||||||
|
user_id = self.register_user("kermit", "test")
|
||||||
|
tok = self.login("kermit", "test")
|
||||||
|
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
|
||||||
|
room_version = self.get_success(main_store.get_room_version(room_id))
|
||||||
|
|
||||||
|
# allow the remote user to send state events
|
||||||
|
self.helper.send_state(
|
||||||
|
room_id,
|
||||||
|
"m.room.power_levels",
|
||||||
|
{"events_default": 0, "state_default": 0},
|
||||||
|
tok=tok,
|
||||||
|
)
|
||||||
|
|
||||||
|
# add the remote user to the room
|
||||||
|
member_event = self.get_success(
|
||||||
|
event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join")
|
||||||
|
)
|
||||||
|
|
||||||
|
initial_state_map = self.get_success(
|
||||||
|
main_store.get_partial_current_state_ids(room_id)
|
||||||
|
)
|
||||||
|
|
||||||
|
auth_event_ids = [
|
||||||
|
initial_state_map[("m.room.create", "")],
|
||||||
|
initial_state_map[("m.room.power_levels", "")],
|
||||||
|
member_event.event_id,
|
||||||
|
]
|
||||||
|
|
||||||
|
pulled_event = make_event_from_dict(
|
||||||
|
self.add_hashes_and_signatures_from_other_server(
|
||||||
|
{
|
||||||
|
"type": "test_regular_type",
|
||||||
|
"room_id": room_id,
|
||||||
|
"sender": OTHER_USER,
|
||||||
|
"prev_events": [member_event.event_id],
|
||||||
|
"auth_events": auth_event_ids,
|
||||||
|
"origin_server_ts": 1,
|
||||||
|
"depth": 12,
|
||||||
|
"content": {"body": "pulled"},
|
||||||
|
}
|
||||||
|
),
|
||||||
|
room_version,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Fake the "pulled" event failing to backfill once so we can test
|
||||||
|
# if it's cleared out later on.
|
||||||
|
self.get_success(
|
||||||
|
main_store.record_event_failed_pull_attempt(
|
||||||
|
pulled_event.room_id, pulled_event.event_id, "fake cause"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
# Make sure we have a failed pull attempt recorded for the pulled event
|
||||||
|
backfill_num_attempts = self.get_success(
|
||||||
|
main_store.db_pool.simple_select_one_onecol(
|
||||||
|
table="event_failed_pull_attempts",
|
||||||
|
keyvalues={"event_id": pulled_event.event_id},
|
||||||
|
retcol="num_attempts",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.assertEqual(backfill_num_attempts, 1)
|
||||||
|
|
||||||
|
# The function under test: try to process the pulled event
|
||||||
|
with LoggingContext("test"):
|
||||||
|
self.get_success(
|
||||||
|
self.hs.get_federation_event_handler()._process_pulled_event(
|
||||||
|
self.OTHER_SERVER_NAME, pulled_event, backfilled=True
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Make sure the failed pull attempts for the pulled event are cleared
|
||||||
|
backfill_num_attempts = self.get_success(
|
||||||
|
main_store.db_pool.simple_select_one_onecol(
|
||||||
|
table="event_failed_pull_attempts",
|
||||||
|
keyvalues={"event_id": pulled_event.event_id},
|
||||||
|
retcol="num_attempts",
|
||||||
|
allow_none=True,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.assertIsNone(backfill_num_attempts)
|
||||||
|
|
||||||
|
# And as a sanity check, make sure the "pulled" event was persisted.
|
||||||
|
persisted = self.get_success(
|
||||||
|
main_store.get_event(pulled_event.event_id, allow_none=True)
|
||||||
|
)
|
||||||
|
self.assertIsNotNone(persisted, "pulled event was not persisted at all")
|
||||||
|
Loading…
Reference in New Issue
Block a user