Deduplicate some code in synapse.app (#4567)

This commit is contained in:
Amber Brown 2019-02-08 17:25:57 +00:00 committed by GitHub
parent d008330d7d
commit 9cd33d2f4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 83 additions and 158 deletions

1
changelog.d/4567.misc Normal file
View File

@ -0,0 +1 @@
Reduce duplication of ``synapse.app`` code.

View File

@ -15,7 +15,9 @@
import gc import gc
import logging import logging
import signal
import sys import sys
import traceback
import psutil import psutil
from daemonize import Daemonize from daemonize import Daemonize
@ -23,11 +25,25 @@ from daemonize import Daemonize
from twisted.internet import error, reactor from twisted.internet import error, reactor
from synapse.app import check_bind_error from synapse.app import check_bind_error
from synapse.crypto import context_factory
from synapse.util import PreserveLoggingContext from synapse.util import PreserveLoggingContext
from synapse.util.rlimit import change_resource_limit from synapse.util.rlimit import change_resource_limit
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_sighup_callbacks = []
def register_sighup(func):
"""
Register a function to be called when a SIGHUP occurs.
Args:
func (function): Function to be called when sent a SIGHUP signal.
Will be called with a single argument, the homeserver.
"""
_sighup_callbacks.append(func)
def start_worker_reactor(appname, config): def start_worker_reactor(appname, config):
""" Run the reactor in the main process """ Run the reactor in the main process
@ -189,3 +205,50 @@ def listen_ssl(
logger.info("Synapse now listening on port %d (TLS)", port) logger.info("Synapse now listening on port %d (TLS)", port)
return r return r
def refresh_certificate(hs):
"""
Refresh the TLS certificates that Synapse is using by re-reading them from
disk and updating the TLS context factories to use them.
"""
logging.info("Loading certificate from disk...")
hs.config.read_certificate_from_disk()
hs.tls_server_context_factory = context_factory.ServerContextFactory(hs.config)
hs.tls_client_options_factory = context_factory.ClientTLSOptionsFactory(
hs.config
)
logging.info("Certificate loaded.")
def start(hs, listeners=None):
"""
Start a Synapse server or worker.
Args:
hs (synapse.server.HomeServer)
listeners (list[dict]): Listener configuration ('listeners' in homeserver.yaml)
"""
try:
# Set up the SIGHUP machinery.
if hasattr(signal, "SIGHUP"):
def handle_sighup(*args, **kwargs):
for i in _sighup_callbacks:
i(hs)
signal.signal(signal.SIGHUP, handle_sighup)
register_sighup(refresh_certificate)
# Load the certificate from disk.
refresh_certificate(hs)
# It is now safe to start your Synapse.
hs.start_listening(listeners)
hs.get_datastore().start_profiling()
except Exception:
traceback.print_exc(file=sys.stderr)
reactor = hs.get_reactor()
if reactor.running:
reactor.stop()
sys.exit(1)

View File

@ -168,12 +168,7 @@ def start(config_options):
) )
ps.setup() ps.setup()
ps.start_listening(config.worker_listeners) reactor.callWhenRunning(_base.start, ps, config.worker_listeners)
def start():
ps.get_datastore().start_profiling()
reactor.callWhenRunning(start)
_base.start_worker_reactor("synapse-appservice", config) _base.start_worker_reactor("synapse-appservice", config)

View File

@ -25,7 +25,6 @@ from synapse.app import _base
from synapse.config._base import ConfigError from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.http.server import JsonResource from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy from synapse.metrics import RegistryProxy
@ -173,17 +172,7 @@ def start(config_options):
) )
ss.setup() ss.setup()
reactor.callWhenRunning(_base.start, ss, config.worker_listeners)
def start():
ss.config.read_certificate_from_disk()
ss.tls_server_context_factory = context_factory.ServerContextFactory(config)
ss.tls_client_options_factory = context_factory.ClientTLSOptionsFactory(
config
)
ss.start_listening(config.worker_listeners)
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
_base.start_worker_reactor("synapse-client-reader", config) _base.start_worker_reactor("synapse-client-reader", config)

View File

@ -25,7 +25,6 @@ from synapse.app import _base
from synapse.config._base import ConfigError from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.http.server import JsonResource from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy from synapse.metrics import RegistryProxy
@ -194,17 +193,7 @@ def start(config_options):
) )
ss.setup() ss.setup()
reactor.callWhenRunning(_base.start, ss, config.worker_listeners)
def start():
ss.config.read_certificate_from_disk()
ss.tls_server_context_factory = context_factory.ServerContextFactory(config)
ss.tls_client_options_factory = context_factory.ClientTLSOptionsFactory(
config
)
ss.start_listening(config.worker_listeners)
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
_base.start_worker_reactor("synapse-event-creator", config) _base.start_worker_reactor("synapse-event-creator", config)

View File

@ -26,7 +26,6 @@ from synapse.app import _base
from synapse.config._base import ConfigError from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.federation.transport.server import TransportLayerServer from synapse.federation.transport.server import TransportLayerServer
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy from synapse.metrics import RegistryProxy
@ -160,17 +159,7 @@ def start(config_options):
) )
ss.setup() ss.setup()
reactor.callWhenRunning(_base.start, ss, config.worker_listeners)
def start():
ss.config.read_certificate_from_disk()
ss.tls_server_context_factory = context_factory.ServerContextFactory(config)
ss.tls_client_options_factory = context_factory.ClientTLSOptionsFactory(
config
)
ss.start_listening(config.worker_listeners)
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
_base.start_worker_reactor("synapse-federation-reader", config) _base.start_worker_reactor("synapse-federation-reader", config)

View File

@ -25,7 +25,6 @@ from synapse.app import _base
from synapse.config._base import ConfigError from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.federation import send_queue from synapse.federation import send_queue
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy from synapse.metrics import RegistryProxy
@ -192,17 +191,8 @@ def start(config_options):
) )
ss.setup() ss.setup()
reactor.callWhenRunning(_base.start, ss, config.worker_listeners)
def start():
ss.config.read_certificate_from_disk()
ss.tls_server_context_factory = context_factory.ServerContextFactory(config)
ss.tls_client_options_factory = context_factory.ClientTLSOptionsFactory(
config
)
ss.start_listening(config.worker_listeners)
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
_base.start_worker_reactor("synapse-federation-sender", config) _base.start_worker_reactor("synapse-federation-sender", config)

View File

@ -26,7 +26,6 @@ from synapse.app import _base
from synapse.config._base import ConfigError from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.http.server import JsonResource from synapse.http.server import JsonResource
from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
@ -250,17 +249,7 @@ def start(config_options):
) )
ss.setup() ss.setup()
reactor.callWhenRunning(_base.start, ss, config.worker_listeners)
def start():
ss.config.read_certificate_from_disk()
ss.tls_server_context_factory = context_factory.ServerContextFactory(config)
ss.tls_client_options_factory = context_factory.ClientTLSOptionsFactory(
config
)
ss.start_listening(config.worker_listeners)
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
_base.start_worker_reactor("synapse-frontend-proxy", config) _base.start_worker_reactor("synapse-frontend-proxy", config)

View File

@ -17,7 +17,6 @@
import gc import gc
import logging import logging
import os import os
import signal
import sys import sys
import traceback import traceback
@ -28,7 +27,6 @@ from prometheus_client import Gauge
from twisted.application import service from twisted.application import service
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
from twisted.protocols.tls import TLSMemoryBIOFactory
from twisted.web.resource import EncodingResourceWrapper, NoResource from twisted.web.resource import EncodingResourceWrapper, NoResource
from twisted.web.server import GzipEncoderFactory from twisted.web.server import GzipEncoderFactory
from twisted.web.static import File from twisted.web.static import File
@ -49,7 +47,6 @@ from synapse.app import _base
from synapse.app._base import listen_ssl, listen_tcp, quit_with_error from synapse.app._base import listen_ssl, listen_tcp, quit_with_error
from synapse.config._base import ConfigError from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
from synapse.federation.transport.server import TransportLayerServer from synapse.federation.transport.server import TransportLayerServer
from synapse.http.additional_resource import AdditionalResource from synapse.http.additional_resource import AdditionalResource
from synapse.http.server import RootRedirect from synapse.http.server import RootRedirect
@ -241,10 +238,10 @@ class SynapseHomeServer(HomeServer):
return resources return resources
def start_listening(self): def start_listening(self, listeners):
config = self.get_config() config = self.get_config()
for listener in config.listeners: for listener in listeners:
if listener["type"] == "http": if listener["type"] == "http":
self._listening_services.extend( self._listening_services.extend(
self._listener_http(config, listener) self._listener_http(config, listener)
@ -328,20 +325,11 @@ def setup(config_options):
# generating config files and shouldn't try to continue. # generating config files and shouldn't try to continue.
sys.exit(0) sys.exit(0)
sighup_callbacks = []
synapse.config.logger.setup_logging( synapse.config.logger.setup_logging(
config, config,
use_worker_options=False, use_worker_options=False
register_sighup=sighup_callbacks.append
) )
def handle_sighup(*args, **kwargs):
for i in sighup_callbacks:
i(*args, **kwargs)
if hasattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, handle_sighup)
events.USE_FROZEN_DICTS = config.use_frozen_dicts events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config) database_engine = create_engine(config.database_config)
@ -377,31 +365,6 @@ def setup(config_options):
hs.setup() hs.setup()
def refresh_certificate(*args):
"""
Refresh the TLS certificates that Synapse is using by re-reading them
from disk and updating the TLS context factories to use them.
"""
logging.info("Reloading certificate from disk...")
hs.config.read_certificate_from_disk()
hs.tls_server_context_factory = context_factory.ServerContextFactory(config)
hs.tls_client_options_factory = context_factory.ClientTLSOptionsFactory(
config
)
logging.info("Certificate reloaded.")
logging.info("Updating context factories...")
for i in hs._listening_services:
if isinstance(i.factory, TLSMemoryBIOFactory):
i.factory = TLSMemoryBIOFactory(
hs.tls_server_context_factory,
False,
i.factory.wrappedFactory
)
logging.info("Context factories updated.")
sighup_callbacks.append(refresh_certificate)
@defer.inlineCallbacks @defer.inlineCallbacks
def start(): def start():
try: try:
@ -425,18 +388,9 @@ def setup(config_options):
): ):
yield acme.provision_certificate() yield acme.provision_certificate()
# Read the certificate from disk and build the context factories for _base.start(hs, config.listeners)
# TLS.
hs.config.read_certificate_from_disk()
hs.tls_server_context_factory = context_factory.ServerContextFactory(config)
hs.tls_client_options_factory = context_factory.ClientTLSOptionsFactory(
config
)
# It is now safe to start your Synapse.
hs.start_listening()
hs.get_pusherpool().start() hs.get_pusherpool().start()
hs.get_datastore().start_profiling()
hs.get_datastore().start_doing_background_updates() hs.get_datastore().start_doing_background_updates()
except Exception as e: except Exception as e:
# If a DeferredList failed (like in listening on the ACME listener), # If a DeferredList failed (like in listening on the ACME listener),

View File

@ -26,7 +26,6 @@ from synapse.app import _base
from synapse.config._base import ConfigError from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
@ -160,17 +159,7 @@ def start(config_options):
) )
ss.setup() ss.setup()
reactor.callWhenRunning(_base.start, ss, config.worker_listeners)
def start():
ss.config.read_certificate_from_disk()
ss.tls_server_context_factory = context_factory.ServerContextFactory(config)
ss.tls_client_options_factory = context_factory.ClientTLSOptionsFactory(
config
)
ss.start_listening(config.worker_listeners)
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
_base.start_worker_reactor("synapse-media-repository", config) _base.start_worker_reactor("synapse-media-repository", config)

View File

@ -224,11 +224,10 @@ def start(config_options):
) )
ps.setup() ps.setup()
ps.start_listening(config.worker_listeners)
def start(): def start():
_base.start(ps, config.worker_listeners)
ps.get_pusherpool().start() ps.get_pusherpool().start()
ps.get_datastore().start_profiling()
reactor.callWhenRunning(start) reactor.callWhenRunning(start)

View File

@ -445,12 +445,7 @@ def start(config_options):
) )
ss.setup() ss.setup()
ss.start_listening(config.worker_listeners) reactor.callWhenRunning(_base.start, ss, config.worker_listeners)
def start():
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
_base.start_worker_reactor("synapse-synchrotron", config) _base.start_worker_reactor("synapse-synchrotron", config)

View File

@ -26,7 +26,6 @@ from synapse.app import _base
from synapse.config._base import ConfigError from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.http.server import JsonResource from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy from synapse.metrics import RegistryProxy
@ -220,17 +219,7 @@ def start(config_options):
) )
ss.setup() ss.setup()
reactor.callWhenRunning(_base.start, ss, config.worker_listeners)
def start():
ss.config.read_certificate_from_disk()
ss.tls_server_context_factory = context_factory.ServerContextFactory(config)
ss.tls_client_options_factory = context_factory.ClientTLSOptionsFactory(
config
)
ss.start_listening(config.worker_listeners)
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
_base.start_worker_reactor("synapse-user-dir", config) _base.start_worker_reactor("synapse-user-dir", config)

View File

@ -15,7 +15,6 @@
import logging import logging
import logging.config import logging.config
import os import os
import signal
import sys import sys
from string import Template from string import Template
@ -24,6 +23,7 @@ import yaml
from twisted.logger import STDLibLogObserver, globalLogBeginner from twisted.logger import STDLibLogObserver, globalLogBeginner
import synapse import synapse
from synapse.app import _base as appbase
from synapse.util.logcontext import LoggingContextFilter from synapse.util.logcontext import LoggingContextFilter
from synapse.util.versionstring import get_version_string from synapse.util.versionstring import get_version_string
@ -127,7 +127,7 @@ class LoggingConfig(Config):
) )
def setup_logging(config, use_worker_options=False, register_sighup=None): def setup_logging(config, use_worker_options=False):
""" Set up python logging """ Set up python logging
Args: Args:
@ -140,12 +140,6 @@ def setup_logging(config, use_worker_options=False, register_sighup=None):
register_sighup (func | None): Function to call to register a register_sighup (func | None): Function to call to register a
sighup handler. sighup handler.
""" """
if not register_sighup:
if getattr(signal, "SIGHUP"):
register_sighup = lambda x: signal.signal(signal.SIGHUP, x)
else:
register_sighup = lambda x: None
log_config = (config.worker_log_config if use_worker_options log_config = (config.worker_log_config if use_worker_options
else config.log_config) else config.log_config)
log_file = (config.worker_log_file if use_worker_options log_file = (config.worker_log_file if use_worker_options
@ -187,7 +181,7 @@ def setup_logging(config, use_worker_options=False, register_sighup=None):
else: else:
handler = logging.StreamHandler() handler = logging.StreamHandler()
def sighup(signum, stack): def sighup(*args):
pass pass
handler.setFormatter(formatter) handler.setFormatter(formatter)
@ -200,14 +194,14 @@ def setup_logging(config, use_worker_options=False, register_sighup=None):
with open(log_config, 'r') as f: with open(log_config, 'r') as f:
logging.config.dictConfig(yaml.load(f)) logging.config.dictConfig(yaml.load(f))
def sighup(signum, stack): def sighup(*args):
# it might be better to use a file watcher or something for this. # it might be better to use a file watcher or something for this.
load_log_config() load_log_config()
logging.info("Reloaded log config from %s due to SIGHUP", log_config) logging.info("Reloaded log config from %s due to SIGHUP", log_config)
load_log_config() load_log_config()
register_sighup(sighup) appbase.register_sighup(sighup)
# make sure that the first thing we log is a thing we can grep backwards # make sure that the first thing we log is a thing we can grep backwards
# for # for