mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2024-10-01 08:25:44 -04:00
Avoid checking the event cache when backfilling events (#14164)
This commit is contained in:
parent
828b5502cf
commit
dc02d9f8c5
1
changelog.d/14164.bugfix
Normal file
1
changelog.d/14164.bugfix
Normal file
@ -0,0 +1 @@
|
|||||||
|
Fix a bug introduced in 1.30.0 where purging and rejoining a room without restarting in-between would result in a broken room.
|
@ -798,9 +798,42 @@ class FederationEventHandler:
|
|||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Check if we already any of these have these events.
|
||||||
|
# Note: we currently make a lookup in the database directly here rather than
|
||||||
|
# checking the event cache, due to:
|
||||||
|
# https://github.com/matrix-org/synapse/issues/13476
|
||||||
|
existing_events_map = await self._store._get_events_from_db(
|
||||||
|
[event.event_id for event in events]
|
||||||
|
)
|
||||||
|
|
||||||
|
new_events = []
|
||||||
|
for event in events:
|
||||||
|
event_id = event.event_id
|
||||||
|
|
||||||
|
# If we've already seen this event ID...
|
||||||
|
if event_id in existing_events_map:
|
||||||
|
existing_event = existing_events_map[event_id]
|
||||||
|
|
||||||
|
# ...and the event itself was not previously stored as an outlier...
|
||||||
|
if not existing_event.event.internal_metadata.is_outlier():
|
||||||
|
# ...then there's no need to persist it. We have it already.
|
||||||
|
logger.info(
|
||||||
|
"_process_pulled_event: Ignoring received event %s which we "
|
||||||
|
"have already seen",
|
||||||
|
event.event_id,
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# While we have seen this event before, it was stored as an outlier.
|
||||||
|
# We'll now persist it as a non-outlier.
|
||||||
|
logger.info("De-outliering event %s", event_id)
|
||||||
|
|
||||||
|
# Continue on with the events that are new to us.
|
||||||
|
new_events.append(event)
|
||||||
|
|
||||||
# We want to sort these by depth so we process them and
|
# We want to sort these by depth so we process them and
|
||||||
# tell clients about them in order.
|
# tell clients about them in order.
|
||||||
sorted_events = sorted(events, key=lambda x: x.depth)
|
sorted_events = sorted(new_events, key=lambda x: x.depth)
|
||||||
for ev in sorted_events:
|
for ev in sorted_events:
|
||||||
with nested_logging_context(ev.event_id):
|
with nested_logging_context(ev.event_id):
|
||||||
await self._process_pulled_event(origin, ev, backfilled=backfilled)
|
await self._process_pulled_event(origin, ev, backfilled=backfilled)
|
||||||
@ -852,18 +885,6 @@ class FederationEventHandler:
|
|||||||
|
|
||||||
event_id = event.event_id
|
event_id = event.event_id
|
||||||
|
|
||||||
existing = await self._store.get_event(
|
|
||||||
event_id, allow_none=True, allow_rejected=True
|
|
||||||
)
|
|
||||||
if existing:
|
|
||||||
if not existing.internal_metadata.is_outlier():
|
|
||||||
logger.info(
|
|
||||||
"_process_pulled_event: Ignoring received event %s which we have already seen",
|
|
||||||
event_id,
|
|
||||||
)
|
|
||||||
return
|
|
||||||
logger.info("De-outliering event %s", event_id)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._sanity_check_event(event)
|
self._sanity_check_event(event)
|
||||||
except SynapseError as err:
|
except SynapseError as err:
|
||||||
|
@ -374,7 +374,7 @@ class EventsWorkerStore(SQLBaseStore):
|
|||||||
If there is a mismatch, behave as per allow_none.
|
If there is a mismatch, behave as per allow_none.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The event, or None if the event was not found.
|
The event, or None if the event was not found and allow_none is `True`.
|
||||||
"""
|
"""
|
||||||
if not isinstance(event_id, str):
|
if not isinstance(event_id, str):
|
||||||
raise TypeError("Invalid event event_id %r" % (event_id,))
|
raise TypeError("Invalid event event_id %r" % (event_id,))
|
||||||
|
@ -19,7 +19,13 @@ from unittest.mock import Mock, patch
|
|||||||
from twisted.test.proto_helpers import MemoryReactor
|
from twisted.test.proto_helpers import MemoryReactor
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.api.errors import AuthError, Codes, LimitExceededError, SynapseError
|
from synapse.api.errors import (
|
||||||
|
AuthError,
|
||||||
|
Codes,
|
||||||
|
LimitExceededError,
|
||||||
|
NotFoundError,
|
||||||
|
SynapseError,
|
||||||
|
)
|
||||||
from synapse.api.room_versions import RoomVersions
|
from synapse.api.room_versions import RoomVersions
|
||||||
from synapse.events import EventBase, make_event_from_dict
|
from synapse.events import EventBase, make_event_from_dict
|
||||||
from synapse.federation.federation_base import event_from_pdu_json
|
from synapse.federation.federation_base import event_from_pdu_json
|
||||||
@ -28,6 +34,7 @@ from synapse.logging.context import LoggingContext, run_in_background
|
|||||||
from synapse.rest import admin
|
from synapse.rest import admin
|
||||||
from synapse.rest.client import login, room
|
from synapse.rest.client import login, room
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
|
from synapse.storage.databases.main.events_worker import EventCacheEntry
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
from synapse.util.stringutils import random_string
|
from synapse.util.stringutils import random_string
|
||||||
|
|
||||||
@ -322,6 +329,102 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase):
|
|||||||
)
|
)
|
||||||
self.get_success(d)
|
self.get_success(d)
|
||||||
|
|
||||||
|
def test_backfill_ignores_known_events(self) -> None:
|
||||||
|
"""
|
||||||
|
Tests that events that we already know about are ignored when backfilling.
|
||||||
|
"""
|
||||||
|
# Set up users
|
||||||
|
user_id = self.register_user("kermit", "test")
|
||||||
|
tok = self.login("kermit", "test")
|
||||||
|
|
||||||
|
other_server = "otherserver"
|
||||||
|
other_user = "@otheruser:" + other_server
|
||||||
|
|
||||||
|
# Create a room to backfill events into
|
||||||
|
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
|
||||||
|
room_version = self.get_success(self.store.get_room_version(room_id))
|
||||||
|
|
||||||
|
# Build an event to backfill
|
||||||
|
event = event_from_pdu_json(
|
||||||
|
{
|
||||||
|
"type": EventTypes.Message,
|
||||||
|
"content": {"body": "hello world", "msgtype": "m.text"},
|
||||||
|
"room_id": room_id,
|
||||||
|
"sender": other_user,
|
||||||
|
"depth": 32,
|
||||||
|
"prev_events": [],
|
||||||
|
"auth_events": [],
|
||||||
|
"origin_server_ts": self.clock.time_msec(),
|
||||||
|
},
|
||||||
|
room_version,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Ensure the event is not already in the DB
|
||||||
|
self.get_failure(
|
||||||
|
self.store.get_event(event.event_id),
|
||||||
|
NotFoundError,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Backfill the event and check that it has entered the DB.
|
||||||
|
|
||||||
|
# We mock out the FederationClient.backfill method, to pretend that a remote
|
||||||
|
# server has returned our fake event.
|
||||||
|
federation_client_backfill_mock = Mock(return_value=make_awaitable([event]))
|
||||||
|
self.hs.get_federation_client().backfill = federation_client_backfill_mock
|
||||||
|
|
||||||
|
# We also mock the persist method with a side effect of itself. This allows us
|
||||||
|
# to track when it has been called while preserving its function.
|
||||||
|
persist_events_and_notify_mock = Mock(
|
||||||
|
side_effect=self.hs.get_federation_event_handler().persist_events_and_notify
|
||||||
|
)
|
||||||
|
self.hs.get_federation_event_handler().persist_events_and_notify = (
|
||||||
|
persist_events_and_notify_mock
|
||||||
|
)
|
||||||
|
|
||||||
|
# Small side-tangent. We populate the event cache with the event, even though
|
||||||
|
# it is not yet in the DB. This is an invalid scenario that can currently occur
|
||||||
|
# due to not properly invalidating the event cache.
|
||||||
|
# See https://github.com/matrix-org/synapse/issues/13476.
|
||||||
|
#
|
||||||
|
# As a result, backfill should not rely on the event cache to check whether
|
||||||
|
# we already have an event in the DB.
|
||||||
|
# TODO: Remove this bit when the event cache is properly invalidated.
|
||||||
|
cache_entry = EventCacheEntry(
|
||||||
|
event=event,
|
||||||
|
redacted_event=None,
|
||||||
|
)
|
||||||
|
self.store._get_event_cache.set_local((event.event_id,), cache_entry)
|
||||||
|
|
||||||
|
# We now call FederationEventHandler.backfill (a separate method) to trigger
|
||||||
|
# a backfill request. It should receive the fake event.
|
||||||
|
self.get_success(
|
||||||
|
self.hs.get_federation_event_handler().backfill(
|
||||||
|
other_user,
|
||||||
|
room_id,
|
||||||
|
limit=10,
|
||||||
|
extremities=[],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check that our fake event was persisted.
|
||||||
|
persist_events_and_notify_mock.assert_called_once()
|
||||||
|
persist_events_and_notify_mock.reset_mock()
|
||||||
|
|
||||||
|
# Now we repeat the backfill, having the homeserver receive the fake event
|
||||||
|
# again.
|
||||||
|
self.get_success(
|
||||||
|
self.hs.get_federation_event_handler().backfill(
|
||||||
|
other_user,
|
||||||
|
room_id,
|
||||||
|
limit=10,
|
||||||
|
extremities=[],
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
# This time, we expect no event persistence to have occurred, as we already
|
||||||
|
# have this event.
|
||||||
|
persist_events_and_notify_mock.assert_not_called()
|
||||||
|
|
||||||
@unittest.override_config(
|
@unittest.override_config(
|
||||||
{"rc_invites": {"per_user": {"per_second": 0.5, "burst_count": 3}}}
|
{"rc_invites": {"per_user": {"per_second": 0.5, "burst_count": 3}}}
|
||||||
)
|
)
|
||||||
|
Loading…
Reference in New Issue
Block a user