mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-01-13 07:39:31 -05:00
Fix bug in account data replication stream. (#7656)
* Ensure account data stream IDs are unique. The account data stream is shared between three tables, and the maximum allocated ID was tracked in a dedicated table. Updating the max ID happened outside the transaction that allocated the ID, leading to a race where if the server was restarted then the same ID could be allocated but the max ID failed to be updated, leading it to be reused. The ID generators have support for tracking across multiple tables, so we may as well use that instead of a dedicated table. * Fix bug in account data replication stream. If the same stream ID was used in both global and room account data then the getting updates for the replication stream would fail due to `heapq.merge(..)` trying to compare a `str` with a `None`. (This is because you'd have two rows like `(534, '!room')` and `(534, None)` from the room and global account data tables). Fix is just to order by stream ID, since we don't rely on the ordering beyond that. The bug where stream IDs can be reused should be fixed now, so this case shouldn't happen going forward. Fixes #7617
This commit is contained in:
parent
3c45a78090
commit
664409b169
1
changelog.d/7656.bugfix
Normal file
1
changelog.d/7656.bugfix
Normal file
@ -0,0 +1 @@
|
||||
Fix bug in account data replication stream.
|
@ -24,7 +24,13 @@ from synapse.storage.database import Database
|
||||
class SlavedAccountDataStore(TagsWorkerStore, AccountDataWorkerStore, BaseSlavedStore):
|
||||
def __init__(self, database: Database, db_conn, hs):
|
||||
self._account_data_id_gen = SlavedIdTracker(
|
||||
db_conn, "account_data_max_stream_id", "stream_id"
|
||||
db_conn,
|
||||
"account_data",
|
||||
"stream_id",
|
||||
extra_tables=[
|
||||
("room_account_data", "stream_id"),
|
||||
("room_tags_revisions", "stream_id"),
|
||||
],
|
||||
)
|
||||
|
||||
super(SlavedAccountDataStore, self).__init__(database, db_conn, hs)
|
||||
|
@ -600,8 +600,14 @@ class AccountDataStream(Stream):
|
||||
for stream_id, user_id, room_id, account_data_type in room_results
|
||||
)
|
||||
|
||||
# we need to return a sorted list, so merge them together.
|
||||
updates = list(heapq.merge(room_rows, global_rows))
|
||||
# We need to return a sorted list, so merge them together.
|
||||
#
|
||||
# Note: We order only by the stream ID to work around a bug where the
|
||||
# same stream ID could appear in both `global_rows` and `room_rows`,
|
||||
# leading to a comparison between the data tuples. The comparison could
|
||||
# fail due to attempting to compare the `room_id` which results in a
|
||||
# `TypeError` from comparing a `str` vs `None`.
|
||||
updates = list(heapq.merge(room_rows, global_rows, key=lambda row: row[0]))
|
||||
return updates, to_token, limited
|
||||
|
||||
|
||||
|
@ -297,7 +297,13 @@ class AccountDataWorkerStore(SQLBaseStore):
|
||||
class AccountDataStore(AccountDataWorkerStore):
|
||||
def __init__(self, database: Database, db_conn, hs):
|
||||
self._account_data_id_gen = StreamIdGenerator(
|
||||
db_conn, "account_data_max_stream_id", "stream_id"
|
||||
db_conn,
|
||||
"account_data_max_stream_id",
|
||||
"stream_id",
|
||||
extra_tables=[
|
||||
("room_account_data", "stream_id"),
|
||||
("room_tags_revisions", "stream_id"),
|
||||
],
|
||||
)
|
||||
|
||||
super(AccountDataStore, self).__init__(database, db_conn, hs)
|
||||
@ -387,6 +393,10 @@ class AccountDataStore(AccountDataWorkerStore):
|
||||
# doesn't sound any worse than the whole update getting lost,
|
||||
# which is what would happen if we combined the two into one
|
||||
# transaction.
|
||||
#
|
||||
# Note: This is only here for backwards compat to allow admins to
|
||||
# roll back to a previous Synapse version. Next time we update the
|
||||
# database version we can remove this table.
|
||||
yield self._update_max_stream_id(next_id)
|
||||
|
||||
self._account_data_stream_cache.entity_has_changed(user_id, next_id)
|
||||
@ -405,6 +415,10 @@ class AccountDataStore(AccountDataWorkerStore):
|
||||
next_id(int): The the revision to advance to.
|
||||
"""
|
||||
|
||||
# Note: This is only here for backwards compat to allow admins to
|
||||
# roll back to a previous Synapse version. Next time we update the
|
||||
# database version we can remove this table.
|
||||
|
||||
def _update(txn):
|
||||
update_max_id_sql = (
|
||||
"UPDATE account_data_max_stream_id"
|
||||
|
@ -233,6 +233,9 @@ class TagsStore(TagsWorkerStore):
|
||||
self._account_data_stream_cache.entity_has_changed, user_id, next_id
|
||||
)
|
||||
|
||||
# Note: This is only here for backwards compat to allow admins to
|
||||
# roll back to a previous Synapse version. Next time we update the
|
||||
# database version we can remove this table.
|
||||
update_max_id_sql = (
|
||||
"UPDATE account_data_max_stream_id"
|
||||
" SET stream_id = ?"
|
||||
|
@ -33,6 +33,7 @@ logger = logging.getLogger(__name__)
|
||||
# schema files, so the users will be informed on server restarts.
|
||||
# XXX: If you're about to bump this to 59 (or higher) please create an update
|
||||
# that drops the unused `cache_invalidation_stream` table, as per #7436!
|
||||
# XXX: Also add an update to drop `account_data_max_stream_id` as per #7656!
|
||||
SCHEMA_VERSION = 58
|
||||
|
||||
dir_path = os.path.abspath(os.path.dirname(__file__))
|
||||
|
Loading…
Reference in New Issue
Block a user