From 38919b521e209d5020dba3e8394d8213b07f6bf9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 28 Apr 2020 13:34:12 +0100 Subject: [PATCH] Run replication streamers on workers (#7146) Currently we never write to streams from workers, but that will change soon --- changelog.d/7146.misc | 1 + synapse/app/generic_worker.py | 13 ++++++++---- synapse/replication/tcp/resource.py | 33 +++++++++++++---------------- 3 files changed, 25 insertions(+), 22 deletions(-) create mode 100644 changelog.d/7146.misc diff --git a/changelog.d/7146.misc b/changelog.d/7146.misc new file mode 100644 index 000000000..facde0695 --- /dev/null +++ b/changelog.d/7146.misc @@ -0,0 +1 @@ +Run replication streamers on workers. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 2a56fe0bd..d125327f0 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -960,17 +960,22 @@ def start(config_options): synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts - ss = GenericWorkerServer( + hs = GenericWorkerServer( config.server_name, config=config, version_string="Synapse/" + get_version_string(synapse), ) - setup_logging(ss, config, use_worker_options=True) + setup_logging(hs, config, use_worker_options=True) + + hs.setup() + + # Ensure the replication streamer is always started in case we write to any + # streams. Will no-op if no streams can be written to by this worker. + hs.get_replication_streamer() - ss.setup() reactor.addSystemEventTrigger( - "before", "startup", _base.start, ss, config.worker_listeners + "before", "startup", _base.start, hs, config.worker_listeners ) _base.start_worker_reactor("synapse-generic-worker", config) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index b2d6baa2a..33d2f589a 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -17,9 +17,7 @@ import logging import random -from typing import Dict - -from six import itervalues +from typing import Dict, List from prometheus_client import Counter @@ -71,29 +69,28 @@ class ReplicationStreamer(object): def __init__(self, hs): self.store = hs.get_datastore() - self.presence_handler = hs.get_presence_handler() self.clock = hs.get_clock() self.notifier = hs.get_notifier() - self._server_notices_sender = hs.get_server_notices_sender() self._replication_torture_level = hs.config.replication_torture_level - # List of streams that clients can subscribe to. - # We only support federation stream if federation sending hase been - # disabled on the master. - self.streams = [ - stream(hs) - for stream in itervalues(STREAMS_MAP) - if stream != FederationStream or not hs.config.send_federation - ] + # Work out list of streams that this instance is the source of. + self.streams = [] # type: List[Stream] + if hs.config.worker_app is None: + for stream in STREAMS_MAP.values(): + if stream == FederationStream and hs.config.send_federation: + # We only support federation stream if federation sending + # hase been disabled on the master. + continue + + self.streams.append(stream(hs)) self.streams_by_name = {stream.NAME: stream for stream in self.streams} - self.federation_sender = None - if not hs.config.send_federation: - self.federation_sender = hs.get_federation_sender() - - self.notifier.add_replication_callback(self.on_notifier_poke) + # Only bother registering the notifier callback if we have streams to + # publish. + if self.streams: + self.notifier.add_replication_callback(self.on_notifier_poke) # Keeps track of whether we are currently checking for updates self.is_looping = False