mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2025-01-31 20:34:54 -05:00
Add a config option for torture-testing worker replication. (#4902)
Setting this to 50 or so makes a bunch of sytests fail in worker mode.
This commit is contained in:
parent
a902d13180
commit
cdb8036161
1
changelog.d/4902.misc
Normal file
1
changelog.d/4902.misc
Normal file
@ -0,0 +1 @@
|
|||||||
|
Add a config option for torture-testing worker replication.
|
@ -126,6 +126,11 @@ class ServerConfig(Config):
|
|||||||
self.public_baseurl += '/'
|
self.public_baseurl += '/'
|
||||||
self.start_pushers = config.get("start_pushers", True)
|
self.start_pushers = config.get("start_pushers", True)
|
||||||
|
|
||||||
|
# (undocumented) option for torturing the worker-mode replication a bit,
|
||||||
|
# for testing. The value defines the number of milliseconds to pause before
|
||||||
|
# sending out any replication updates.
|
||||||
|
self.replication_torture_level = config.get("replication_torture_level")
|
||||||
|
|
||||||
self.listeners = []
|
self.listeners = []
|
||||||
for listener in config.get("listeners", []):
|
for listener in config.get("listeners", []):
|
||||||
if not isinstance(listener.get("port", None), int):
|
if not isinstance(listener.get("port", None), int):
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
import random
|
||||||
|
|
||||||
from six import itervalues
|
from six import itervalues
|
||||||
|
|
||||||
@ -74,6 +75,8 @@ class ReplicationStreamer(object):
|
|||||||
self.notifier = hs.get_notifier()
|
self.notifier = hs.get_notifier()
|
||||||
self._server_notices_sender = hs.get_server_notices_sender()
|
self._server_notices_sender = hs.get_server_notices_sender()
|
||||||
|
|
||||||
|
self._replication_torture_level = hs.config.replication_torture_level
|
||||||
|
|
||||||
# Current connections.
|
# Current connections.
|
||||||
self.connections = []
|
self.connections = []
|
||||||
|
|
||||||
@ -157,10 +160,23 @@ class ReplicationStreamer(object):
|
|||||||
for stream in self.streams:
|
for stream in self.streams:
|
||||||
stream.advance_current_token()
|
stream.advance_current_token()
|
||||||
|
|
||||||
for stream in self.streams:
|
all_streams = self.streams
|
||||||
|
|
||||||
|
if self._replication_torture_level is not None:
|
||||||
|
# there is no guarantee about ordering between the streams,
|
||||||
|
# so let's shuffle them around a bit when we are in torture mode.
|
||||||
|
all_streams = list(all_streams)
|
||||||
|
random.shuffle(all_streams)
|
||||||
|
|
||||||
|
for stream in all_streams:
|
||||||
if stream.last_token == stream.upto_token:
|
if stream.last_token == stream.upto_token:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
if self._replication_torture_level:
|
||||||
|
yield self.clock.sleep(
|
||||||
|
self._replication_torture_level / 1000.0
|
||||||
|
)
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Getting stream: %s: %s -> %s",
|
"Getting stream: %s: %s -> %s",
|
||||||
stream.NAME, stream.last_token, stream.upto_token
|
stream.NAME, stream.last_token, stream.upto_token
|
||||||
|
Loading…
x
Reference in New Issue
Block a user