mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2025-01-26 02:55:56 -05:00
Fix race when persisting an event and deleting a room (#12594)
This works by taking a row level lock on the `rooms` table at the start of both transactions, ensuring that they don't run at the same time. In the event persistence transaction we also check that there is an entry still in the `rooms` table. I can't figure out how to do this in SQLite. I was just going to lock the table, but it seems that we don't support that in SQLite either, so I'm *really* confused as to how we maintain integrity in SQLite when using `lock_table`....
This commit is contained in:
parent
01dcf7532d
commit
ae7858f184
1
changelog.d/12594.bugfix
Normal file
1
changelog.d/12594.bugfix
Normal file
@ -0,0 +1 @@
|
||||
Fix race when persisting an event and deleting a room that could lead to outbound federation breaking.
|
@ -47,6 +47,7 @@ from synapse.storage.database import (
|
||||
)
|
||||
from synapse.storage.databases.main.events_worker import EventCacheEntry
|
||||
from synapse.storage.databases.main.search import SearchEntry
|
||||
from synapse.storage.engines.postgres import PostgresEngine
|
||||
from synapse.storage.util.id_generators import AbstractStreamIdGenerator
|
||||
from synapse.storage.util.sequence import SequenceGenerator
|
||||
from synapse.types import StateMap, get_domain_from_id
|
||||
@ -364,6 +365,20 @@ class PersistEventsStore:
|
||||
min_stream_order = events_and_contexts[0][0].internal_metadata.stream_ordering
|
||||
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
|
||||
|
||||
# We check that the room still exists for events we're trying to
|
||||
# persist. This is to protect against races with deleting a room.
|
||||
#
|
||||
# Annoyingly SQLite doesn't support row level locking.
|
||||
if isinstance(self.database_engine, PostgresEngine):
|
||||
for room_id in {e.room_id for e, _ in events_and_contexts}:
|
||||
txn.execute(
|
||||
"SELECT room_version FROM rooms WHERE room_id = ? FOR SHARE",
|
||||
(room_id,),
|
||||
)
|
||||
row = txn.fetchone()
|
||||
if row is None:
|
||||
raise Exception(f"Room does not exist {room_id}")
|
||||
|
||||
# stream orderings should have been assigned by now
|
||||
assert min_stream_order
|
||||
assert max_stream_order
|
||||
|
@ -324,7 +324,12 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||
)
|
||||
|
||||
def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
|
||||
# First we fetch all the state groups that should be deleted, before
|
||||
# We *immediately* delete the room from the rooms table. This ensures
|
||||
# that we don't race when persisting events (as that transaction checks
|
||||
# that the room exists).
|
||||
txn.execute("DELETE FROM rooms WHERE room_id = ?", (room_id,))
|
||||
|
||||
# Next, we fetch all the state groups that should be deleted, before
|
||||
# we delete that information.
|
||||
txn.execute(
|
||||
"""
|
||||
@ -403,7 +408,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
|
||||
"room_stats_state",
|
||||
"room_stats_current",
|
||||
"room_stats_earliest_token",
|
||||
"rooms",
|
||||
"stream_ordering_to_exterm",
|
||||
"users_in_public_rooms",
|
||||
"users_who_share_private_rooms",
|
||||
|
Loading…
x
Reference in New Issue
Block a user