Merge remote-tracking branch 'upstream/release-v1.110'

This commit is contained in:
Tulir Asokan 2024-07-02 21:28:58 +03:00
commit 685822f0c2
23 changed files with 455 additions and 50 deletions

View File

@ -1,3 +1,26 @@
# Synapse 1.110.0rc3 (2024-07-02)
### Bugfixes
- Fix bug where `/sync` requests could get blocked indefinitely after an upgrade from Synapse versions before v1.109.0. ([\#17386](https://github.com/element-hq/synapse/issues/17386), [\#17391](https://github.com/element-hq/synapse/issues/17391))
### Internal Changes
- Limit size of presence EDUs to 50 entries. ([\#17371](https://github.com/element-hq/synapse/issues/17371))
- Fix building debian package for debian sid. ([\#17389](https://github.com/element-hq/synapse/issues/17389))
# Synapse 1.110.0rc2 (2024-06-26)
### Internal Changes
- Fix uploading packages to PyPi. ([\#17363](https://github.com/element-hq/synapse/issues/17363))
# Synapse 1.110.0rc1 (2024-06-26) # Synapse 1.110.0rc1 (2024-06-26)
### Features ### Features

View File

@ -5,9 +5,9 @@
|support| |development| |documentation| |license| |pypi| |python| |support| |development| |documentation| |license| |pypi| |python|
Synapse is an open source `Matrix <https://matrix.org>`_ homeserver Synapse is an open source `Matrix <https://matrix.org>`__ homeserver
implementation, written and maintained by `Element <https://element.io>`_. implementation, written and maintained by `Element <https://element.io>`_.
`Matrix <https://github.com/matrix-org>`_ is the open standard for `Matrix <https://github.com/matrix-org>`__ is the open standard for
secure and interoperable real time communications. You can directly run secure and interoperable real time communications. You can directly run
and manage the source code in this repository, available under an AGPL and manage the source code in this repository, available under an AGPL
license. There is no support provided from Element unless you have a license. There is no support provided from Element unless you have a
@ -119,7 +119,7 @@ impact to other applications will be minimal.
🧪 Testing a new installation 🧪 Testing a new installation
============================ =============================
The easiest way to try out your new Synapse installation is by connecting to it The easiest way to try out your new Synapse installation is by connecting to it
from a web client. from a web client.
@ -173,10 +173,10 @@ As when logging in, you will need to specify a "Custom server". Specify your
desired ``localpart`` in the 'User name' box. desired ``localpart`` in the 'User name' box.
🎯 Troubleshooting and support 🎯 Troubleshooting and support
============================= ==============================
🚀 Professional support 🚀 Professional support
---------------------- -----------------------
Enterprise quality support for Synapse including SLAs is available as part of an Enterprise quality support for Synapse including SLAs is available as part of an
`Element Server Suite (ESS) <https://element.io/pricing>` subscription. `Element Server Suite (ESS) <https://element.io/pricing>` subscription.
@ -185,7 +185,7 @@ If you are an existing ESS subscriber then you can raise a `support request <htt
and access the `knowledge base <https://ems-docs.element.io>`. and access the `knowledge base <https://ems-docs.element.io>`.
🤝 Community support 🤝 Community support
------------------- --------------------
The `Admin FAQ <https://element-hq.github.io/synapse/latest/usage/administration/admin_faq.html>`_ The `Admin FAQ <https://element-hq.github.io/synapse/latest/usage/administration/admin_faq.html>`_
includes tips on dealing with some common problems. For more details, see includes tips on dealing with some common problems. For more details, see
@ -202,7 +202,7 @@ issues for support requests, only for bug reports and feature requests.
.. _docs: docs .. _docs: docs
🪪 Identity Servers 🪪 Identity Servers
================== ===================
Identity servers have the job of mapping email addresses and other 3rd Party Identity servers have the job of mapping email addresses and other 3rd Party
IDs (3PIDs) to Matrix user IDs, as well as verifying the ownership of 3PIDs IDs (3PIDs) to Matrix user IDs, as well as verifying the ownership of 3PIDs

12
debian/changelog vendored
View File

@ -1,3 +1,15 @@
matrix-synapse-py3 (1.110.0~rc3) stable; urgency=medium
* New Synapse release 1.110.0rc3.
-- Synapse Packaging team <packages@matrix.org> Tue, 02 Jul 2024 08:28:56 -0600
matrix-synapse-py3 (1.110.0~rc2) stable; urgency=medium
* New Synapse release 1.110.0rc2.
-- Synapse Packaging team <packages@matrix.org> Wed, 26 Jun 2024 18:14:48 +0200
matrix-synapse-py3 (1.110.0~rc1) stable; urgency=medium matrix-synapse-py3 (1.110.0~rc1) stable; urgency=medium
* `register_new_matrix_user` now supports a --password-file and a --exists-ok flag. * `register_new_matrix_user` now supports a --password-file and a --exists-ok flag.

View File

@ -73,6 +73,8 @@ RUN apt-get update -qq -o Acquire::Languages=none \
curl \ curl \
debhelper \ debhelper \
devscripts \ devscripts \
# Required for building cffi from source.
libffi-dev \
libsystemd-dev \ libsystemd-dev \
lsb-release \ lsb-release \
pkg-config \ pkg-config \

View File

@ -96,7 +96,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry] [tool.poetry]
name = "matrix-synapse" name = "matrix-synapse"
version = "1.110.0rc1" version = "1.110.0rc3"
description = "Homeserver for the Matrix decentralised comms protocol" description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"] authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "AGPL-3.0-or-later" license = "AGPL-3.0-or-later"

View File

@ -21,6 +21,7 @@
# #
import datetime import datetime
import logging import logging
from collections import OrderedDict
from types import TracebackType from types import TracebackType
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, Type from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, Type
@ -68,6 +69,10 @@ sent_edus_by_type = Counter(
# If the retry interval is larger than this then we enter "catchup" mode # If the retry interval is larger than this then we enter "catchup" mode
CATCHUP_RETRY_INTERVAL = 60 * 60 * 1000 CATCHUP_RETRY_INTERVAL = 60 * 60 * 1000
# Limit how many presence states we add to each presence EDU, to ensure that
# they are bounded in size.
MAX_PRESENCE_STATES_PER_EDU = 50
class PerDestinationQueue: class PerDestinationQueue:
""" """
@ -144,7 +149,7 @@ class PerDestinationQueue:
# Map of user_id -> UserPresenceState of pending presence to be sent to this # Map of user_id -> UserPresenceState of pending presence to be sent to this
# destination # destination
self._pending_presence: Dict[str, UserPresenceState] = {} self._pending_presence: OrderedDict[str, UserPresenceState] = OrderedDict()
# List of room_id -> receipt_type -> user_id -> receipt_dict, # List of room_id -> receipt_type -> user_id -> receipt_dict,
# #
@ -399,7 +404,7 @@ class PerDestinationQueue:
# through another mechanism, because this is all volatile! # through another mechanism, because this is all volatile!
self._pending_edus = [] self._pending_edus = []
self._pending_edus_keyed = {} self._pending_edus_keyed = {}
self._pending_presence = {} self._pending_presence.clear()
self._pending_receipt_edus = [] self._pending_receipt_edus = []
self._start_catching_up() self._start_catching_up()
@ -721,22 +726,26 @@ class _TransactionQueueManager:
# Add presence EDU. # Add presence EDU.
if self.queue._pending_presence: if self.queue._pending_presence:
# Only send max 50 presence entries in the EDU, to bound the amount
# of data we're sending.
presence_to_add: List[JsonDict] = []
while (
self.queue._pending_presence
and len(presence_to_add) < MAX_PRESENCE_STATES_PER_EDU
):
_, presence = self.queue._pending_presence.popitem(last=False)
presence_to_add.append(
format_user_presence_state(presence, self.queue._clock.time_msec())
)
pending_edus.append( pending_edus.append(
Edu( Edu(
origin=self.queue._server_name, origin=self.queue._server_name,
destination=self.queue._destination, destination=self.queue._destination,
edu_type=EduTypes.PRESENCE, edu_type=EduTypes.PRESENCE,
content={ content={"push": presence_to_add},
"push": [
format_user_presence_state(
presence, self.queue._clock.time_msec()
)
for presence in self.queue._pending_presence.values()
]
},
) )
) )
self.queue._pending_presence = {}
# Add read receipt EDUs. # Add read receipt EDUs.
pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5)) pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5))

View File

@ -764,6 +764,13 @@ class Notifier:
async def wait_for_stream_token(self, stream_token: StreamToken) -> bool: async def wait_for_stream_token(self, stream_token: StreamToken) -> bool:
"""Wait for this worker to catch up with the given stream token.""" """Wait for this worker to catch up with the given stream token."""
current_token = self.event_sources.get_current_token()
if stream_token.is_before_or_eq(current_token):
return True
# Work around a bug where older Synapse versions gave out tokens "from
# the future", i.e. that are ahead of the tokens persisted in the DB.
stream_token = await self.event_sources.bound_future_token(stream_token)
start = self.clock.time_msec() start = self.clock.time_msec()
while True: while True:

View File

@ -43,10 +43,7 @@ from synapse.storage.database import (
) )
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.push_rule import PushRulesWorkerStore from synapse.storage.databases.main.push_rule import PushRulesWorkerStore
from synapse.storage.util.id_generators import ( from synapse.storage.util.id_generators import MultiWriterIdGenerator
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
)
from synapse.types import JsonDict, JsonMapping from synapse.types import JsonDict, JsonMapping
from synapse.util import json_encoder from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached from synapse.util.caches.descriptors import cached
@ -71,7 +68,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
self._instance_name in hs.config.worker.writers.account_data self._instance_name in hs.config.worker.writers.account_data
) )
self._account_data_id_gen: AbstractStreamIdGenerator self._account_data_id_gen: MultiWriterIdGenerator
self._account_data_id_gen = MultiWriterIdGenerator( self._account_data_id_gen = MultiWriterIdGenerator(
db_conn=db_conn, db_conn=db_conn,
@ -113,6 +110,9 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
""" """
return self._account_data_id_gen.get_current_token() return self._account_data_id_gen.get_current_token()
def get_account_data_id_generator(self) -> MultiWriterIdGenerator:
return self._account_data_id_gen
@cached() @cached()
async def get_global_account_data_for_user( async def get_global_account_data_for_user(
self, user_id: str self, user_id: str

View File

@ -50,10 +50,7 @@ from synapse.storage.database import (
LoggingTransaction, LoggingTransaction,
make_in_list_sql_clause, make_in_list_sql_clause,
) )
from synapse.storage.util.id_generators import ( from synapse.storage.util.id_generators import MultiWriterIdGenerator
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
)
from synapse.types import JsonDict from synapse.types import JsonDict
from synapse.util import json_encoder from synapse.util import json_encoder
from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.expiringcache import ExpiringCache
@ -92,7 +89,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
self._instance_name in hs.config.worker.writers.to_device self._instance_name in hs.config.worker.writers.to_device
) )
self._to_device_msg_id_gen: AbstractStreamIdGenerator = MultiWriterIdGenerator( self._to_device_msg_id_gen: MultiWriterIdGenerator = MultiWriterIdGenerator(
db_conn=db_conn, db_conn=db_conn,
db=database, db=database,
notifier=hs.get_replication_notifier(), notifier=hs.get_replication_notifier(),
@ -169,6 +166,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
def get_to_device_stream_token(self) -> int: def get_to_device_stream_token(self) -> int:
return self._to_device_msg_id_gen.get_current_token() return self._to_device_msg_id_gen.get_current_token()
def get_to_device_id_generator(self) -> MultiWriterIdGenerator:
return self._to_device_msg_id_gen
async def get_messages_for_user_devices( async def get_messages_for_user_devices(
self, self,
user_ids: Collection[str], user_ids: Collection[str],

View File

@ -243,6 +243,9 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
def get_device_stream_token(self) -> int: def get_device_stream_token(self) -> int:
return self._device_list_id_gen.get_current_token() return self._device_list_id_gen.get_current_token()
def get_device_stream_id_generator(self) -> MultiWriterIdGenerator:
return self._device_list_id_gen
async def count_devices_by_users( async def count_devices_by_users(
self, user_ids: Optional[Collection[str]] = None self, user_ids: Optional[Collection[str]] = None
) -> int: ) -> int:

View File

@ -192,8 +192,8 @@ class EventsWorkerStore(SQLBaseStore):
): ):
super().__init__(database, db_conn, hs) super().__init__(database, db_conn, hs)
self._stream_id_gen: AbstractStreamIdGenerator self._stream_id_gen: MultiWriterIdGenerator
self._backfill_id_gen: AbstractStreamIdGenerator self._backfill_id_gen: MultiWriterIdGenerator
self._stream_id_gen = MultiWriterIdGenerator( self._stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn, db_conn=db_conn,

View File

@ -42,10 +42,7 @@ from synapse.storage.database import (
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.engines._base import IsolationLevel from synapse.storage.engines._base import IsolationLevel
from synapse.storage.types import Connection from synapse.storage.types import Connection
from synapse.storage.util.id_generators import ( from synapse.storage.util.id_generators import MultiWriterIdGenerator
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
)
from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.iterutils import batch_iter from synapse.util.iterutils import batch_iter
@ -83,7 +80,7 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
super().__init__(database, db_conn, hs) super().__init__(database, db_conn, hs)
self._instance_name = hs.get_instance_name() self._instance_name = hs.get_instance_name()
self._presence_id_gen: AbstractStreamIdGenerator self._presence_id_gen: MultiWriterIdGenerator
self._can_persist_presence = ( self._can_persist_presence = (
self._instance_name in hs.config.worker.writers.presence self._instance_name in hs.config.worker.writers.presence
@ -455,6 +452,9 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore)
def get_current_presence_token(self) -> int: def get_current_presence_token(self) -> int:
return self._presence_id_gen.get_current_token() return self._presence_id_gen.get_current_token()
def get_presence_stream_id_gen(self) -> MultiWriterIdGenerator:
return self._presence_id_gen
def _get_active_presence(self, db_conn: Connection) -> List[UserPresenceState]: def _get_active_presence(self, db_conn: Connection) -> List[UserPresenceState]:
"""Fetch non-offline presence from the database so that we can register """Fetch non-offline presence from the database so that we can register
the appropriate time outs. the appropriate time outs.

View File

@ -178,6 +178,9 @@ class PushRulesWorkerStore(
""" """
return self._push_rules_stream_id_gen.get_current_token() return self._push_rules_stream_id_gen.get_current_token()
def get_push_rules_stream_id_gen(self) -> MultiWriterIdGenerator:
return self._push_rules_stream_id_gen
def process_replication_rows( def process_replication_rows(
self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
) -> None: ) -> None:

View File

@ -45,10 +45,7 @@ from synapse.storage.database import (
LoggingTransaction, LoggingTransaction,
) )
from synapse.storage.engines._base import IsolationLevel from synapse.storage.engines._base import IsolationLevel
from synapse.storage.util.id_generators import ( from synapse.storage.util.id_generators import MultiWriterIdGenerator
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
)
from synapse.types import ( from synapse.types import (
JsonDict, JsonDict,
JsonMapping, JsonMapping,
@ -76,7 +73,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
# In the worker store this is an ID tracker which we overwrite in the non-worker # In the worker store this is an ID tracker which we overwrite in the non-worker
# class below that is used on the main process. # class below that is used on the main process.
self._receipts_id_gen: AbstractStreamIdGenerator self._receipts_id_gen: MultiWriterIdGenerator
self._can_write_to_receipts = ( self._can_write_to_receipts = (
self._instance_name in hs.config.worker.writers.receipts self._instance_name in hs.config.worker.writers.receipts
@ -136,6 +133,9 @@ class ReceiptsWorkerStore(SQLBaseStore):
def get_receipt_stream_id_for_instance(self, instance_name: str) -> int: def get_receipt_stream_id_for_instance(self, instance_name: str) -> int:
return self._receipts_id_gen.get_current_token_for_writer(instance_name) return self._receipts_id_gen.get_current_token_for_writer(instance_name)
def get_receipts_stream_id_gen(self) -> MultiWriterIdGenerator:
return self._receipts_id_gen
def get_last_unthreaded_receipt_for_user_txn( def get_last_unthreaded_receipt_for_user_txn(
self, self,
txn: LoggingTransaction, txn: LoggingTransaction,

View File

@ -59,11 +59,7 @@ from synapse.storage.database import (
) )
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.types import Cursor from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import ( from synapse.storage.util.id_generators import IdGenerator, MultiWriterIdGenerator
AbstractStreamIdGenerator,
IdGenerator,
MultiWriterIdGenerator,
)
from synapse.types import JsonDict, RetentionPolicy, StrCollection, ThirdPartyInstanceID from synapse.types import JsonDict, RetentionPolicy, StrCollection, ThirdPartyInstanceID
from synapse.util import json_encoder from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.descriptors import cached, cachedList
@ -151,7 +147,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
self.config: HomeServerConfig = hs.config self.config: HomeServerConfig = hs.config
self._un_partial_stated_rooms_stream_id_gen: AbstractStreamIdGenerator self._un_partial_stated_rooms_stream_id_gen: MultiWriterIdGenerator
self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator( self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator(
db_conn=db_conn, db_conn=db_conn,
@ -1409,6 +1405,9 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
instance_name instance_name
) )
def get_un_partial_stated_rooms_id_generator(self) -> MultiWriterIdGenerator:
return self._un_partial_stated_rooms_stream_id_gen
async def get_un_partial_stated_rooms_between( async def get_un_partial_stated_rooms_between(
self, last_id: int, current_id: int, room_ids: Collection[str] self, last_id: int, current_id: int, room_ids: Collection[str]
) -> Set[str]: ) -> Set[str]:

View File

@ -577,6 +577,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return RoomStreamToken(stream=min_pos, instance_map=immutabledict(positions)) return RoomStreamToken(stream=min_pos, instance_map=immutabledict(positions))
def get_events_stream_id_generator(self) -> MultiWriterIdGenerator:
return self._stream_id_gen
async def get_room_events_stream_for_rooms( async def get_room_events_stream_for_rooms(
self, self,
room_ids: Collection[str], room_ids: Collection[str],

View File

@ -812,6 +812,11 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator):
pos = self.get_current_token_for_writer(self._instance_name) pos = self.get_current_token_for_writer(self._instance_name)
txn.execute(sql, (self._stream_name, self._instance_name, pos)) txn.execute(sql, (self._stream_name, self._instance_name, pos))
async def get_max_allocated_token(self) -> int:
return await self._db.runInteraction(
"get_max_allocated_token", self._sequence_gen.get_max_allocated
)
@attr.s(frozen=True, auto_attribs=True) @attr.s(frozen=True, auto_attribs=True)
class _AsyncCtxManagerWrapper(Generic[T]): class _AsyncCtxManagerWrapper(Generic[T]):

View File

@ -88,6 +88,10 @@ class SequenceGenerator(metaclass=abc.ABCMeta):
""" """
... ...
@abc.abstractmethod
def get_max_allocated(self, txn: Cursor) -> int:
"""Get the maximum ID that we have allocated"""
class PostgresSequenceGenerator(SequenceGenerator): class PostgresSequenceGenerator(SequenceGenerator):
"""An implementation of SequenceGenerator which uses a postgres sequence""" """An implementation of SequenceGenerator which uses a postgres sequence"""
@ -190,6 +194,17 @@ class PostgresSequenceGenerator(SequenceGenerator):
% {"seq": self._sequence_name, "stream_name": stream_name} % {"seq": self._sequence_name, "stream_name": stream_name}
) )
def get_max_allocated(self, txn: Cursor) -> int:
# We just read from the sequence what the last value we fetched was.
txn.execute(f"SELECT last_value, is_called FROM {self._sequence_name}")
row = txn.fetchone()
assert row is not None
last_value, is_called = row
if not is_called:
last_value -= 1
return last_value
GetFirstCallbackType = Callable[[Cursor], int] GetFirstCallbackType = Callable[[Cursor], int]
@ -248,6 +263,15 @@ class LocalSequenceGenerator(SequenceGenerator):
# There is nothing to do for in memory sequences # There is nothing to do for in memory sequences
pass pass
def get_max_allocated(self, txn: Cursor) -> int:
with self._lock:
if self._current_max_id is None:
assert self._callback is not None
self._current_max_id = self._callback(txn)
self._callback = None
return self._current_max_id
def build_sequence_generator( def build_sequence_generator(
db_conn: "LoggingDatabaseConnection", db_conn: "LoggingDatabaseConnection",

View File

@ -19,6 +19,7 @@
# #
# #
import logging
from typing import TYPE_CHECKING, Sequence, Tuple from typing import TYPE_CHECKING, Sequence, Tuple
import attr import attr
@ -30,12 +31,20 @@ from synapse.handlers.room import RoomEventSource
from synapse.handlers.typing import TypingNotificationEventSource from synapse.handlers.typing import TypingNotificationEventSource
from synapse.logging.opentracing import trace from synapse.logging.opentracing import trace
from synapse.streams import EventSource from synapse.streams import EventSource
from synapse.types import MultiWriterStreamToken, StreamKeyType, StreamToken from synapse.types import (
AbstractMultiWriterStreamToken,
MultiWriterStreamToken,
StreamKeyType,
StreamToken,
)
if TYPE_CHECKING: if TYPE_CHECKING:
from synapse.server import HomeServer from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@attr.s(frozen=True, slots=True, auto_attribs=True) @attr.s(frozen=True, slots=True, auto_attribs=True)
class _EventSourcesInner: class _EventSourcesInner:
room: RoomEventSource room: RoomEventSource
@ -91,6 +100,77 @@ class EventSources:
) )
return token return token
async def bound_future_token(self, token: StreamToken) -> StreamToken:
"""Bound a token that is ahead of the current token to the maximum
persisted values.
This ensures that if we wait for the given token we know the stream will
eventually advance to that point.
This works around a bug where older Synapse versions will give out
tokens for streams, and then after a restart will give back tokens where
the stream has "gone backwards".
"""
current_token = self.get_current_token()
stream_key_to_id_gen = {
StreamKeyType.ROOM: self.store.get_events_stream_id_generator(),
StreamKeyType.PRESENCE: self.store.get_presence_stream_id_gen(),
StreamKeyType.RECEIPT: self.store.get_receipts_stream_id_gen(),
StreamKeyType.ACCOUNT_DATA: self.store.get_account_data_id_generator(),
StreamKeyType.PUSH_RULES: self.store.get_push_rules_stream_id_gen(),
StreamKeyType.TO_DEVICE: self.store.get_to_device_id_generator(),
StreamKeyType.DEVICE_LIST: self.store.get_device_stream_id_generator(),
StreamKeyType.UN_PARTIAL_STATED_ROOMS: self.store.get_un_partial_stated_rooms_id_generator(),
}
for _, key in StreamKeyType.__members__.items():
if key == StreamKeyType.TYPING:
# Typing stream is allowed to "reset", and so comparisons don't
# really make sense as is.
# TODO: Figure out a better way of tracking resets.
continue
token_value = token.get_field(key)
current_value = current_token.get_field(key)
if isinstance(token_value, AbstractMultiWriterStreamToken):
assert type(current_value) is type(token_value)
if not token_value.is_before_or_eq(current_value): # type: ignore[arg-type]
max_token = await stream_key_to_id_gen[
key
].get_max_allocated_token()
if max_token < token_value.get_max_stream_pos():
logger.error(
"Bounding token from the future '%s': token: %s, bound: %s",
key,
token_value,
max_token,
)
token = token.copy_and_replace(
key, token_value.bound_stream_token(max_token)
)
else:
assert isinstance(current_value, int)
if current_value < token_value:
max_token = await stream_key_to_id_gen[
key
].get_max_allocated_token()
if max_token < token_value:
logger.error(
"Bounding token from the future '%s': token: %s, bound: %s",
key,
token_value,
max_token,
)
token = token.copy_and_replace(key, max_token)
return token
@trace @trace
async def get_start_token_for_pagination(self, room_id: str) -> StreamToken: async def get_start_token_for_pagination(self, room_id: str) -> StreamToken:
"""Get the start token for a given room to be used to paginate """Get the start token for a given room to be used to paginate

View File

@ -536,6 +536,16 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
return True return True
def bound_stream_token(self, max_stream: int) -> "Self":
"""Bound the stream positions to a maximum value"""
return type(self)(
stream=min(self.stream, max_stream),
instance_map=immutabledict(
{k: min(s, max_stream) for k, s in self.instance_map.items()}
),
)
@attr.s(frozen=True, slots=True, order=False) @attr.s(frozen=True, slots=True, order=False)
class RoomStreamToken(AbstractMultiWriterStreamToken): class RoomStreamToken(AbstractMultiWriterStreamToken):
@ -722,6 +732,14 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
else: else:
return "s%d" % (self.stream,) return "s%d" % (self.stream,)
def bound_stream_token(self, max_stream: int) -> "RoomStreamToken":
"""See super class"""
# This only makes sense for stream tokens.
assert self.topological is None
return super().bound_stream_token(max_stream)
@attr.s(frozen=True, slots=True, order=False) @attr.s(frozen=True, slots=True, order=False)
class MultiWriterStreamToken(AbstractMultiWriterStreamToken): class MultiWriterStreamToken(AbstractMultiWriterStreamToken):

View File

@ -27,6 +27,8 @@ from twisted.internet import defer
from twisted.test.proto_helpers import MemoryReactor from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import EduTypes, RoomEncryptionAlgorithms from synapse.api.constants import EduTypes, RoomEncryptionAlgorithms
from synapse.api.presence import UserPresenceState
from synapse.federation.sender.per_destination_queue import MAX_PRESENCE_STATES_PER_EDU
from synapse.federation.units import Transaction from synapse.federation.units import Transaction
from synapse.handlers.device import DeviceHandler from synapse.handlers.device import DeviceHandler
from synapse.rest import admin from synapse.rest import admin
@ -266,6 +268,123 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
) )
class FederationSenderPresenceTestCases(HomeserverTestCase):
"""
Test federation sending for presence updates.
"""
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
self.federation_transport_client = Mock(spec=["send_transaction"])
self.federation_transport_client.send_transaction = AsyncMock()
hs = self.setup_test_homeserver(
federation_transport_client=self.federation_transport_client,
)
return hs
def default_config(self) -> JsonDict:
config = super().default_config()
config["federation_sender_instances"] = None
return config
def test_presence_simple(self) -> None:
"Test that sending a single presence update works"
mock_send_transaction: AsyncMock = (
self.federation_transport_client.send_transaction
)
mock_send_transaction.return_value = {}
sender = self.hs.get_federation_sender()
self.get_success(
sender.send_presence_to_destinations(
[UserPresenceState.default("@user:test")],
["server"],
)
)
self.pump()
# expect a call to send_transaction
mock_send_transaction.assert_awaited_once()
json_cb = mock_send_transaction.call_args[0][1]
data = json_cb()
self.assertEqual(
data["edus"],
[
{
"edu_type": EduTypes.PRESENCE,
"content": {
"push": [
{
"presence": "offline",
"user_id": "@user:test",
}
]
},
}
],
)
def test_presence_batched(self) -> None:
"""Test that sending lots of presence updates to a destination are
batched, rather than having them all sent in one EDU."""
mock_send_transaction: AsyncMock = (
self.federation_transport_client.send_transaction
)
mock_send_transaction.return_value = {}
sender = self.hs.get_federation_sender()
# We now send lots of presence updates to force the federation sender to
# batch the mup.
number_presence_updates_to_send = MAX_PRESENCE_STATES_PER_EDU * 2
self.get_success(
sender.send_presence_to_destinations(
[
UserPresenceState.default(f"@user{i}:test")
for i in range(number_presence_updates_to_send)
],
["server"],
)
)
self.pump()
# We should have seen at least one transcation be sent by now.
mock_send_transaction.assert_called()
# We don't want to specify exactly how the presence EDUs get sent out,
# could be one per transaction or multiple per transaction. We just want
# to assert that a) each presence EDU has bounded number of updates, and
# b) that all updates get sent out.
presence_edus = []
for transaction_call in mock_send_transaction.call_args_list:
json_cb = transaction_call[0][1]
data = json_cb()
for edu in data["edus"]:
self.assertEqual(edu.get("edu_type"), EduTypes.PRESENCE)
presence_edus.append(edu)
# A set of all user presence we see, this should end up matching the
# number we sent out above.
seen_users: Set[str] = set()
for edu in presence_edus:
presence_states = edu["content"]["push"]
# This is where we actually check that the number of presence
# updates is bounded.
self.assertLessEqual(len(presence_states), MAX_PRESENCE_STATES_PER_EDU)
seen_users.update(p["user_id"] for p in presence_states)
self.assertEqual(len(seen_users), number_presence_updates_to_send)
class FederationSenderDevicesTestCases(HomeserverTestCase): class FederationSenderDevicesTestCases(HomeserverTestCase):
""" """
Test federation sending to update devices. Test federation sending to update devices.

View File

@ -22,6 +22,7 @@ from unittest.mock import AsyncMock, Mock, patch
from parameterized import parameterized from parameterized import parameterized
from twisted.internet import defer
from twisted.test.proto_helpers import MemoryReactor from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import AccountDataTypes, EventTypes, JoinRules from synapse.api.constants import AccountDataTypes, EventTypes, JoinRules
@ -35,7 +36,14 @@ from synapse.handlers.sync import SyncConfig, SyncRequestKey, SyncResult, SyncVe
from synapse.rest import admin from synapse.rest import admin
from synapse.rest.client import knock, login, room from synapse.rest.client import knock, login, room
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.types import JsonDict, UserID, create_requester from synapse.types import (
JsonDict,
MultiWriterStreamToken,
RoomStreamToken,
StreamKeyType,
UserID,
create_requester,
)
from synapse.util import Clock from synapse.util import Clock
import tests.unittest import tests.unittest
@ -959,6 +967,94 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.fail("No push rules found") self.fail("No push rules found")
def test_wait_for_future_sync_token(self) -> None:
"""Test that if we receive a token that is ahead of our current token,
we'll wait until the stream position advances.
This can happen if replication streams start lagging, and the client's
previous sync request was serviced by a worker ahead of ours.
"""
user = self.register_user("alice", "password")
# We simulate a lagging stream by getting a stream ID from the ID gen
# and then waiting to mark it as "persisted".
presence_id_gen = self.store.get_presence_stream_id_gen()
ctx_mgr = presence_id_gen.get_next()
stream_id = self.get_success(ctx_mgr.__aenter__())
# Create the new token based on the stream ID above.
current_token = self.hs.get_event_sources().get_current_token()
since_token = current_token.copy_and_advance(StreamKeyType.PRESENCE, stream_id)
sync_d = defer.ensureDeferred(
self.sync_handler.wait_for_sync_for_user(
create_requester(user),
generate_sync_config(user),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=since_token,
timeout=0,
)
)
# This should block waiting for the presence stream to update
self.pump()
self.assertFalse(sync_d.called)
# Marking the stream ID as persisted should unblock the request.
self.get_success(ctx_mgr.__aexit__(None, None, None))
self.get_success(sync_d, by=1.0)
@parameterized.expand(
[(key,) for key in StreamKeyType.__members__.values()],
name_func=lambda func, _, param: f"{func.__name__}_{param.args[0].name}",
)
def test_wait_for_invalid_future_sync_token(
self, stream_key: StreamKeyType
) -> None:
"""Like the previous test, except we give a token that has a stream
position ahead of what is in the DB, i.e. its invalid and we shouldn't
wait for the stream to advance (as it may never do so).
This can happen due to older versions of Synapse giving out stream
positions without persisting them in the DB, and so on restart the
stream would get reset back to an older position.
"""
user = self.register_user("alice", "password")
# Create a token and advance one of the streams.
current_token = self.hs.get_event_sources().get_current_token()
token_value = current_token.get_field(stream_key)
# How we advance the streams depends on the type.
if isinstance(token_value, int):
since_token = current_token.copy_and_advance(stream_key, token_value + 1)
elif isinstance(token_value, MultiWriterStreamToken):
since_token = current_token.copy_and_advance(
stream_key, MultiWriterStreamToken(stream=token_value.stream + 1)
)
elif isinstance(token_value, RoomStreamToken):
since_token = current_token.copy_and_advance(
stream_key, RoomStreamToken(stream=token_value.stream + 1)
)
else:
raise Exception("Unreachable")
sync_d = defer.ensureDeferred(
self.sync_handler.wait_for_sync_for_user(
create_requester(user),
generate_sync_config(user),
sync_version=SyncVersion.SYNC_V2,
request_key=generate_request_key(),
since_token=since_token,
timeout=0,
)
)
# We should return without waiting for the presence stream to advance.
self.get_success(sync_d)
def generate_sync_config( def generate_sync_config(
user_id: str, user_id: str,

View File

@ -1386,10 +1386,12 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
# Create a future token that will cause us to wait. Since we never send a new # Create a future token that will cause us to wait. Since we never send a new
# event to reach that future stream_ordering, the worker will wait until the # event to reach that future stream_ordering, the worker will wait until the
# full timeout. # full timeout.
stream_id_gen = self.store.get_events_stream_id_generator()
stream_id = self.get_success(stream_id_gen.get_next().__aenter__())
current_token = self.event_sources.get_current_token() current_token = self.event_sources.get_current_token()
future_position_token = current_token.copy_and_replace( future_position_token = current_token.copy_and_replace(
StreamKeyType.ROOM, StreamKeyType.ROOM,
RoomStreamToken(stream=current_token.room_key.stream + 1), RoomStreamToken(stream=stream_id),
) )
future_position_token_serialized = self.get_success( future_position_token_serialized = self.get_success(