mirror of
https://git.anonymousland.org/anonymousland/synapse-worker-docker.git
synced 2024-12-26 02:59:21 -05:00
Merge remote-tracking branch 'anonymousland/main'
This commit is contained in:
commit
8fb732d9a5
@ -110,7 +110,7 @@ VOLUME /data
|
|||||||
|
|
||||||
EXPOSE 8008/tcp
|
EXPOSE 8008/tcp
|
||||||
|
|
||||||
ENTRYPOINT ["python3", "workers.py"]
|
ENTRYPOINT ["python3", "configure_workers_and_start.py"]
|
||||||
|
|
||||||
HEALTHCHECK --start-period=5s --interval=15s --timeout=5s \
|
HEALTHCHECK --start-period=5s --interval=15s --timeout=5s \
|
||||||
CMD /bin/sh /healthcheck.sh
|
CMD /bin/sh /healthcheck.sh
|
713
rootfs/configure_workers_and_start.py.py
Normal file
713
rootfs/configure_workers_and_start.py.py
Normal file
@ -0,0 +1,713 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
# This script reads environment variables and generates a shared Synapse worker,
|
||||||
|
# nginx and supervisord configs depending on the workers requested.
|
||||||
|
#
|
||||||
|
# The environment variables it reads are:
|
||||||
|
# * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver.
|
||||||
|
# * SYNAPSE_REPORT_STATS: Whether to report stats.
|
||||||
|
# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKER_CONFIG
|
||||||
|
# below. Leave empty for no workers, or set to '*' for all possible workers.
|
||||||
|
# * SYNAPSE_AS_REGISTRATION_DIR: If specified, a directory in which .yaml and .yml files
|
||||||
|
# will be treated as Application Service registration files.
|
||||||
|
# * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format.
|
||||||
|
# * SYNAPSE_TLS_KEY: Path to a TLS key. If this and SYNAPSE_TLS_CERT are specified,
|
||||||
|
# Nginx will be configured to serve TLS on port 8448.
|
||||||
|
# * SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER: Whether to use the forking launcher,
|
||||||
|
# only intended for usage in Complement at the moment.
|
||||||
|
# No stability guarantees are provided.
|
||||||
|
# * SYNAPSE_LOG_LEVEL: Set this to DEBUG, INFO, WARNING or ERROR to change the
|
||||||
|
# log level. INFO is the default.
|
||||||
|
# * SYNAPSE_LOG_SENSITIVE: If unset, SQL and SQL values won't be logged,
|
||||||
|
# regardless of the SYNAPSE_LOG_LEVEL setting.
|
||||||
|
#
|
||||||
|
# NOTE: According to Complement's ENTRYPOINT expectations for a homeserver image (as defined
|
||||||
|
# in the project's README), this script may be run multiple times, and functionality should
|
||||||
|
# continue to work if so.
|
||||||
|
|
||||||
|
import os
|
||||||
|
import platform
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Dict, List, Mapping, MutableMapping, NoReturn, Optional, Set
|
||||||
|
|
||||||
|
import yaml
|
||||||
|
from jinja2 import Environment, FileSystemLoader
|
||||||
|
|
||||||
|
MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
|
||||||
|
|
||||||
|
# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources
|
||||||
|
# Watching /_matrix/client needs a "client" listener
|
||||||
|
# Watching /_matrix/federation needs a "federation" listener
|
||||||
|
# Watching /_matrix/media and related needs a "media" listener
|
||||||
|
# Stream Writers require "client" and "replication" listeners because they
|
||||||
|
# have to attach by instance_map to the master process and have client endpoints.
|
||||||
|
WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
|
||||||
|
"pusher": {
|
||||||
|
"app": "synapse.app.pusher",
|
||||||
|
"listener_resources": [],
|
||||||
|
"endpoint_patterns": [],
|
||||||
|
"shared_extra_conf": {"start_pushers": False},
|
||||||
|
"worker_extra_conf": "",
|
||||||
|
},
|
||||||
|
"user_dir": {
|
||||||
|
"app": "synapse.app.generic_worker",
|
||||||
|
"listener_resources": ["client"],
|
||||||
|
"endpoint_patterns": [
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$"
|
||||||
|
],
|
||||||
|
"shared_extra_conf": {"update_user_directory_from_worker": "user_dir1"},
|
||||||
|
"worker_extra_conf": "",
|
||||||
|
},
|
||||||
|
"media_repository": {
|
||||||
|
"app": "synapse.app.media_repository",
|
||||||
|
"listener_resources": ["media"],
|
||||||
|
"endpoint_patterns": [
|
||||||
|
"^/_matrix/media/",
|
||||||
|
"^/_synapse/admin/v1/purge_media_cache$",
|
||||||
|
"^/_synapse/admin/v1/room/.*/media.*$",
|
||||||
|
"^/_synapse/admin/v1/user/.*/media.*$",
|
||||||
|
"^/_synapse/admin/v1/media/.*$",
|
||||||
|
"^/_synapse/admin/v1/quarantine_media/.*$",
|
||||||
|
],
|
||||||
|
"shared_extra_conf": {"enable_media_repo": False},
|
||||||
|
"worker_extra_conf": "enable_media_repo: true",
|
||||||
|
},
|
||||||
|
"appservice": {
|
||||||
|
"app": "synapse.app.generic_worker",
|
||||||
|
"listener_resources": [],
|
||||||
|
"endpoint_patterns": [],
|
||||||
|
"shared_extra_conf": {"notify_appservices_from_worker": "appservice1"},
|
||||||
|
"worker_extra_conf": "",
|
||||||
|
},
|
||||||
|
"federation_sender": {
|
||||||
|
"app": "synapse.app.federation_sender",
|
||||||
|
"listener_resources": [],
|
||||||
|
"endpoint_patterns": [],
|
||||||
|
"shared_extra_conf": {"send_federation": False},
|
||||||
|
"worker_extra_conf": "",
|
||||||
|
},
|
||||||
|
"synchrotron": {
|
||||||
|
"app": "synapse.app.generic_worker",
|
||||||
|
"listener_resources": ["client"],
|
||||||
|
"endpoint_patterns": [
|
||||||
|
"^/_matrix/client/(v2_alpha|r0|v3)/sync$",
|
||||||
|
"^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$",
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3)/initialSync$",
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$",
|
||||||
|
],
|
||||||
|
"shared_extra_conf": {},
|
||||||
|
"worker_extra_conf": "",
|
||||||
|
},
|
||||||
|
"client_reader": {
|
||||||
|
"app": "synapse.app.generic_worker",
|
||||||
|
"listener_resources": ["client"],
|
||||||
|
"endpoint_patterns": [
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$",
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/joined_members$",
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$",
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/members$",
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/state$",
|
||||||
|
"^/_matrix/client/v1/rooms/.*/hierarchy$",
|
||||||
|
"^/_matrix/client/(v1|unstable)/rooms/.*/relations/",
|
||||||
|
"^/_matrix/client/v1/rooms/.*/threads$",
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/login$",
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/account/3pid$",
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/account/whoami$",
|
||||||
|
"^/_matrix/client/versions$",
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/voip/turnServer$",
|
||||||
|
"^/_matrix/client/(r0|v3|unstable)/register$",
|
||||||
|
"^/_matrix/client/(r0|v3|unstable)/auth/.*/fallback/web$",
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/messages$",
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/event",
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/joined_rooms",
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3|unstable/.*)/rooms/.*/aliases",
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/search",
|
||||||
|
],
|
||||||
|
"shared_extra_conf": {},
|
||||||
|
"worker_extra_conf": "",
|
||||||
|
},
|
||||||
|
"federation_reader": {
|
||||||
|
"app": "synapse.app.generic_worker",
|
||||||
|
"listener_resources": ["federation"],
|
||||||
|
"endpoint_patterns": [
|
||||||
|
"^/_matrix/federation/(v1|v2)/event/",
|
||||||
|
"^/_matrix/federation/(v1|v2)/state/",
|
||||||
|
"^/_matrix/federation/(v1|v2)/state_ids/",
|
||||||
|
"^/_matrix/federation/(v1|v2)/backfill/",
|
||||||
|
"^/_matrix/federation/(v1|v2)/get_missing_events/",
|
||||||
|
"^/_matrix/federation/(v1|v2)/publicRooms",
|
||||||
|
"^/_matrix/federation/(v1|v2)/query/",
|
||||||
|
"^/_matrix/federation/(v1|v2)/make_join/",
|
||||||
|
"^/_matrix/federation/(v1|v2)/make_leave/",
|
||||||
|
"^/_matrix/federation/(v1|v2)/send_join/",
|
||||||
|
"^/_matrix/federation/(v1|v2)/send_leave/",
|
||||||
|
"^/_matrix/federation/(v1|v2)/invite/",
|
||||||
|
"^/_matrix/federation/(v1|v2)/query_auth/",
|
||||||
|
"^/_matrix/federation/(v1|v2)/event_auth/",
|
||||||
|
"^/_matrix/federation/(v1|v2)/exchange_third_party_invite/",
|
||||||
|
"^/_matrix/federation/(v1|v2)/user/devices/",
|
||||||
|
"^/_matrix/federation/(v1|v2)/get_groups_publicised$",
|
||||||
|
"^/_matrix/key/v2/query",
|
||||||
|
],
|
||||||
|
"shared_extra_conf": {},
|
||||||
|
"worker_extra_conf": "",
|
||||||
|
},
|
||||||
|
"federation_inbound": {
|
||||||
|
"app": "synapse.app.generic_worker",
|
||||||
|
"listener_resources": ["federation"],
|
||||||
|
"endpoint_patterns": ["/_matrix/federation/(v1|v2)/send/"],
|
||||||
|
"shared_extra_conf": {},
|
||||||
|
"worker_extra_conf": "",
|
||||||
|
},
|
||||||
|
"event_persister": {
|
||||||
|
"app": "synapse.app.generic_worker",
|
||||||
|
"listener_resources": ["replication"],
|
||||||
|
"endpoint_patterns": [],
|
||||||
|
"shared_extra_conf": {},
|
||||||
|
"worker_extra_conf": "",
|
||||||
|
},
|
||||||
|
"background_worker": {
|
||||||
|
"app": "synapse.app.generic_worker",
|
||||||
|
"listener_resources": [],
|
||||||
|
"endpoint_patterns": [],
|
||||||
|
# This worker cannot be sharded. Therefore there should only ever be one background
|
||||||
|
# worker, and it should be named background_worker1
|
||||||
|
"shared_extra_conf": {"run_background_tasks_on": "background_worker1"},
|
||||||
|
"worker_extra_conf": "",
|
||||||
|
},
|
||||||
|
"event_creator": {
|
||||||
|
"app": "synapse.app.generic_worker",
|
||||||
|
"listener_resources": ["client"],
|
||||||
|
"endpoint_patterns": [
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact",
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send",
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$",
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/join/",
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
|
||||||
|
"^/_matrix/client/(v1|unstable/org.matrix.msc2716)/rooms/.*/batch_send",
|
||||||
|
],
|
||||||
|
"shared_extra_conf": {},
|
||||||
|
"worker_extra_conf": "",
|
||||||
|
},
|
||||||
|
"frontend_proxy": {
|
||||||
|
"app": "synapse.app.frontend_proxy",
|
||||||
|
"listener_resources": ["client", "replication"],
|
||||||
|
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
|
||||||
|
"shared_extra_conf": {},
|
||||||
|
"worker_extra_conf": (
|
||||||
|
"worker_main_http_uri: http://127.0.0.1:%d"
|
||||||
|
% (MAIN_PROCESS_HTTP_LISTENER_PORT,)
|
||||||
|
),
|
||||||
|
},
|
||||||
|
"account_data": {
|
||||||
|
"app": "synapse.app.generic_worker",
|
||||||
|
"listener_resources": ["client", "replication"],
|
||||||
|
"endpoint_patterns": [
|
||||||
|
"^/_matrix/client/(r0|v3|unstable)/.*/tags",
|
||||||
|
"^/_matrix/client/(r0|v3|unstable)/.*/account_data",
|
||||||
|
],
|
||||||
|
"shared_extra_conf": {},
|
||||||
|
"worker_extra_conf": "",
|
||||||
|
},
|
||||||
|
"presence": {
|
||||||
|
"app": "synapse.app.generic_worker",
|
||||||
|
"listener_resources": ["client", "replication"],
|
||||||
|
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"],
|
||||||
|
"shared_extra_conf": {},
|
||||||
|
"worker_extra_conf": "",
|
||||||
|
},
|
||||||
|
"receipts": {
|
||||||
|
"app": "synapse.app.generic_worker",
|
||||||
|
"listener_resources": ["client", "replication"],
|
||||||
|
"endpoint_patterns": [
|
||||||
|
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
|
||||||
|
"^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
|
||||||
|
],
|
||||||
|
"shared_extra_conf": {},
|
||||||
|
"worker_extra_conf": "",
|
||||||
|
},
|
||||||
|
"to_device": {
|
||||||
|
"app": "synapse.app.generic_worker",
|
||||||
|
"listener_resources": ["client", "replication"],
|
||||||
|
"endpoint_patterns": ["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"],
|
||||||
|
"shared_extra_conf": {},
|
||||||
|
"worker_extra_conf": "",
|
||||||
|
},
|
||||||
|
"typing": {
|
||||||
|
"app": "synapse.app.generic_worker",
|
||||||
|
"listener_resources": ["client", "replication"],
|
||||||
|
"endpoint_patterns": [
|
||||||
|
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"
|
||||||
|
],
|
||||||
|
"shared_extra_conf": {},
|
||||||
|
"worker_extra_conf": "",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
# Templates for sections that may be inserted multiple times in config files
|
||||||
|
NGINX_LOCATION_CONFIG_BLOCK = """
|
||||||
|
location ~* {endpoint} {{
|
||||||
|
proxy_pass {upstream};
|
||||||
|
proxy_set_header X-Forwarded-For $remote_addr;
|
||||||
|
proxy_set_header X-Forwarded-Proto $scheme;
|
||||||
|
proxy_set_header Host $host;
|
||||||
|
}}
|
||||||
|
"""
|
||||||
|
|
||||||
|
NGINX_UPSTREAM_CONFIG_BLOCK = """
|
||||||
|
upstream {upstream_worker_type} {{
|
||||||
|
{body}
|
||||||
|
}}
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
# Utility functions
|
||||||
|
def log(txt: str) -> None:
|
||||||
|
print(txt)
|
||||||
|
|
||||||
|
|
||||||
|
def error(txt: str) -> NoReturn:
|
||||||
|
print(txt, file=sys.stderr)
|
||||||
|
sys.exit(2)
|
||||||
|
|
||||||
|
|
||||||
|
def flush_buffers() -> None:
|
||||||
|
sys.stdout.flush()
|
||||||
|
sys.stderr.flush()
|
||||||
|
|
||||||
|
|
||||||
|
def convert(src: str, dst: str, **template_vars: object) -> None:
|
||||||
|
"""Generate a file from a template
|
||||||
|
|
||||||
|
Args:
|
||||||
|
src: Path to the input file.
|
||||||
|
dst: Path to write to.
|
||||||
|
template_vars: The arguments to replace placeholder variables in the template with.
|
||||||
|
"""
|
||||||
|
# Read the template file
|
||||||
|
# We disable autoescape to prevent template variables from being escaped,
|
||||||
|
# as we're not using HTML.
|
||||||
|
env = Environment(loader=FileSystemLoader(os.path.dirname(src)), autoescape=False)
|
||||||
|
template = env.get_template(os.path.basename(src))
|
||||||
|
|
||||||
|
# Generate a string from the template.
|
||||||
|
rendered = template.render(**template_vars)
|
||||||
|
|
||||||
|
# Write the generated contents to a file
|
||||||
|
#
|
||||||
|
# We use append mode in case the files have already been written to by something else
|
||||||
|
# (for instance, as part of the instructions in a dockerfile).
|
||||||
|
with open(dst, "a") as outfile:
|
||||||
|
# In case the existing file doesn't end with a newline
|
||||||
|
outfile.write("\n")
|
||||||
|
|
||||||
|
outfile.write(rendered)
|
||||||
|
|
||||||
|
|
||||||
|
def add_worker_roles_to_shared_config(
|
||||||
|
shared_config: dict,
|
||||||
|
worker_type: str,
|
||||||
|
worker_name: str,
|
||||||
|
worker_port: int,
|
||||||
|
) -> None:
|
||||||
|
"""Given a dictionary representing a config file shared across all workers,
|
||||||
|
append sharded worker information to it for the current worker_type instance.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
shared_config: The config dict that all worker instances share (after being converted to YAML)
|
||||||
|
worker_type: The type of worker (one of those defined in WORKERS_CONFIG).
|
||||||
|
worker_name: The name of the worker instance.
|
||||||
|
worker_port: The HTTP replication port that the worker instance is listening on.
|
||||||
|
"""
|
||||||
|
# The instance_map config field marks the workers that write to various replication streams
|
||||||
|
instance_map = shared_config.setdefault("instance_map", {})
|
||||||
|
|
||||||
|
# Worker-type specific sharding config
|
||||||
|
if worker_type == "pusher":
|
||||||
|
shared_config.setdefault("pusher_instances", []).append(worker_name)
|
||||||
|
|
||||||
|
elif worker_type == "federation_sender":
|
||||||
|
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
|
||||||
|
|
||||||
|
elif worker_type == "event_persister":
|
||||||
|
# Event persisters write to the events stream, so we need to update
|
||||||
|
# the list of event stream writers
|
||||||
|
shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
|
||||||
|
worker_name
|
||||||
|
)
|
||||||
|
|
||||||
|
# Map of stream writer instance names to host/ports combos
|
||||||
|
instance_map[worker_name] = {
|
||||||
|
"host": "localhost",
|
||||||
|
"port": worker_port,
|
||||||
|
}
|
||||||
|
|
||||||
|
elif worker_type in ["account_data", "presence", "receipts", "to_device", "typing"]:
|
||||||
|
# Update the list of stream writers
|
||||||
|
# It's convienent that the name of the worker type is the same as the event stream
|
||||||
|
shared_config.setdefault("stream_writers", {}).setdefault(
|
||||||
|
worker_type, []
|
||||||
|
).append(worker_name)
|
||||||
|
|
||||||
|
# Map of stream writer instance names to host/ports combos
|
||||||
|
# For now, all stream writers need http replication ports
|
||||||
|
instance_map[worker_name] = {
|
||||||
|
"host": "localhost",
|
||||||
|
"port": worker_port,
|
||||||
|
}
|
||||||
|
|
||||||
|
elif worker_type == "media_repository":
|
||||||
|
# The first configured media worker will run the media background jobs
|
||||||
|
shared_config.setdefault("media_instance_running_background_jobs", worker_name)
|
||||||
|
|
||||||
|
|
||||||
|
def generate_base_homeserver_config() -> None:
|
||||||
|
"""Starts Synapse and generates a basic homeserver config, which will later be
|
||||||
|
modified for worker support.
|
||||||
|
|
||||||
|
Raises: CalledProcessError if calling start.py returned a non-zero exit code.
|
||||||
|
"""
|
||||||
|
# start.py already does this for us, so just call that.
|
||||||
|
# note that this script is copied in in the official, monolith dockerfile
|
||||||
|
os.environ["SYNAPSE_HTTP_PORT"] = str(MAIN_PROCESS_HTTP_LISTENER_PORT)
|
||||||
|
subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)
|
||||||
|
|
||||||
|
|
||||||
|
def generate_worker_files(
|
||||||
|
environ: Mapping[str, str], config_path: str, data_dir: str
|
||||||
|
) -> None:
|
||||||
|
"""Read the desired list of workers from environment variables and generate
|
||||||
|
shared homeserver, nginx and supervisord configs.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
environ: os.environ instance.
|
||||||
|
config_path: The location of the generated Synapse main worker config file.
|
||||||
|
data_dir: The location of the synapse data directory. Where log and
|
||||||
|
user-facing config files live.
|
||||||
|
"""
|
||||||
|
# Note that yaml cares about indentation, so care should be taken to insert lines
|
||||||
|
# into files at the correct indentation below.
|
||||||
|
|
||||||
|
# shared_config is the contents of a Synapse config file that will be shared amongst
|
||||||
|
# the main Synapse process as well as all workers.
|
||||||
|
# It is intended mainly for disabling functionality when certain workers are spun up,
|
||||||
|
# and adding a replication listener.
|
||||||
|
|
||||||
|
# First read the original config file and extract the listeners block. Then we'll add
|
||||||
|
# another listener for replication. Later we'll write out the result to the shared
|
||||||
|
# config file.
|
||||||
|
listeners = [
|
||||||
|
{
|
||||||
|
"port": 9093,
|
||||||
|
"bind_address": "127.0.0.1",
|
||||||
|
"type": "http",
|
||||||
|
"resources": [{"names": ["replication"]}],
|
||||||
|
}
|
||||||
|
]
|
||||||
|
with open(config_path) as file_stream:
|
||||||
|
original_config = yaml.safe_load(file_stream)
|
||||||
|
original_listeners = original_config.get("listeners")
|
||||||
|
if original_listeners:
|
||||||
|
listeners += original_listeners
|
||||||
|
|
||||||
|
# The shared homeserver config. The contents of which will be inserted into the
|
||||||
|
# base shared worker jinja2 template.
|
||||||
|
#
|
||||||
|
# This config file will be passed to all workers, included Synapse's main process.
|
||||||
|
shared_config: Dict[str, Any] = {"listeners": listeners}
|
||||||
|
|
||||||
|
# List of dicts that describe workers.
|
||||||
|
# We pass this to the Supervisor template later to generate the appropriate
|
||||||
|
# program blocks.
|
||||||
|
worker_descriptors: List[Dict[str, Any]] = []
|
||||||
|
|
||||||
|
# Upstreams for load-balancing purposes. This dict takes the form of a worker type to the
|
||||||
|
# ports of each worker. For example:
|
||||||
|
# {
|
||||||
|
# worker_type: {1234, 1235, ...}}
|
||||||
|
# }
|
||||||
|
# and will be used to construct 'upstream' nginx directives.
|
||||||
|
nginx_upstreams: Dict[str, Set[int]] = {}
|
||||||
|
|
||||||
|
# A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what will be
|
||||||
|
# placed after the proxy_pass directive. The main benefit to representing this data as a
|
||||||
|
# dict over a str is that we can easily deduplicate endpoints across multiple instances
|
||||||
|
# of the same worker.
|
||||||
|
#
|
||||||
|
# An nginx site config that will be amended to depending on the workers that are
|
||||||
|
# spun up. To be placed in /etc/nginx/conf.d.
|
||||||
|
nginx_locations = {}
|
||||||
|
|
||||||
|
# Read the desired worker configuration from the environment
|
||||||
|
worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip()
|
||||||
|
if not worker_types_env:
|
||||||
|
# No workers, just the main process
|
||||||
|
worker_types = []
|
||||||
|
else:
|
||||||
|
# Split type names by comma, ignoring whitespace.
|
||||||
|
worker_types = [x.strip() for x in worker_types_env.split(",")]
|
||||||
|
|
||||||
|
# Create the worker configuration directory if it doesn't already exist
|
||||||
|
os.makedirs("/conf/workers", exist_ok=True)
|
||||||
|
|
||||||
|
# Start worker ports from this arbitrary port
|
||||||
|
worker_port = 18009
|
||||||
|
|
||||||
|
# A counter of worker_type -> int. Used for determining the name for a given
|
||||||
|
# worker type when generating its config file, as each worker's name is just
|
||||||
|
# worker_type + instance #
|
||||||
|
worker_type_counter: Dict[str, int] = {}
|
||||||
|
|
||||||
|
# A list of internal endpoints to healthcheck, starting with the main process
|
||||||
|
# which exists even if no workers do.
|
||||||
|
healthcheck_urls = ["http://localhost:8080/health"]
|
||||||
|
|
||||||
|
# For each worker type specified by the user, create config values
|
||||||
|
for worker_type in worker_types:
|
||||||
|
worker_config = WORKERS_CONFIG.get(worker_type)
|
||||||
|
if worker_config:
|
||||||
|
worker_config = worker_config.copy()
|
||||||
|
else:
|
||||||
|
log(worker_type + " is an unknown worker type! It will be ignored")
|
||||||
|
continue
|
||||||
|
|
||||||
|
new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1
|
||||||
|
worker_type_counter[worker_type] = new_worker_count
|
||||||
|
|
||||||
|
# Name workers by their type concatenated with an incrementing number
|
||||||
|
# e.g. federation_reader1
|
||||||
|
worker_name = worker_type + str(new_worker_count)
|
||||||
|
worker_config.update(
|
||||||
|
{"name": worker_name, "port": str(worker_port), "config_path": config_path}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Update the shared config with any worker-type specific options
|
||||||
|
shared_config.update(worker_config["shared_extra_conf"])
|
||||||
|
|
||||||
|
healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
|
||||||
|
|
||||||
|
# Check if more than one instance of this worker type has been specified
|
||||||
|
worker_type_total_count = worker_types.count(worker_type)
|
||||||
|
|
||||||
|
# Update the shared config with sharding-related options if necessary
|
||||||
|
add_worker_roles_to_shared_config(
|
||||||
|
shared_config, worker_type, worker_name, worker_port
|
||||||
|
)
|
||||||
|
|
||||||
|
# Enable the worker in supervisord
|
||||||
|
worker_descriptors.append(worker_config)
|
||||||
|
|
||||||
|
# Add nginx location blocks for this worker's endpoints (if any are defined)
|
||||||
|
for pattern in worker_config["endpoint_patterns"]:
|
||||||
|
# Determine whether we need to load-balance this worker
|
||||||
|
if worker_type_total_count > 1:
|
||||||
|
# Create or add to a load-balanced upstream for this worker
|
||||||
|
nginx_upstreams.setdefault(worker_type, set()).add(worker_port)
|
||||||
|
|
||||||
|
# Upstreams are named after the worker_type
|
||||||
|
upstream = "http://" + worker_type
|
||||||
|
else:
|
||||||
|
upstream = "http://localhost:%d" % (worker_port,)
|
||||||
|
|
||||||
|
# Note that this endpoint should proxy to this upstream
|
||||||
|
nginx_locations[pattern] = upstream
|
||||||
|
|
||||||
|
# Write out the worker's logging config file
|
||||||
|
|
||||||
|
log_config_filepath = generate_worker_log_config(environ, worker_name, data_dir)
|
||||||
|
|
||||||
|
# Then a worker config file
|
||||||
|
convert(
|
||||||
|
"/conf/worker.yaml.j2",
|
||||||
|
"/conf/workers/{name}.yaml".format(name=worker_name),
|
||||||
|
**worker_config,
|
||||||
|
worker_log_config_filepath=log_config_filepath,
|
||||||
|
)
|
||||||
|
|
||||||
|
worker_port += 1
|
||||||
|
|
||||||
|
# Build the nginx location config blocks
|
||||||
|
nginx_location_config = ""
|
||||||
|
for endpoint, upstream in nginx_locations.items():
|
||||||
|
nginx_location_config += NGINX_LOCATION_CONFIG_BLOCK.format(
|
||||||
|
endpoint=endpoint,
|
||||||
|
upstream=upstream,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Determine the load-balancing upstreams to configure
|
||||||
|
nginx_upstream_config = ""
|
||||||
|
|
||||||
|
for upstream_worker_type, upstream_worker_ports in nginx_upstreams.items():
|
||||||
|
body = ""
|
||||||
|
for port in upstream_worker_ports:
|
||||||
|
body += " server localhost:%d;\n" % (port,)
|
||||||
|
|
||||||
|
# Add to the list of configured upstreams
|
||||||
|
nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format(
|
||||||
|
upstream_worker_type=upstream_worker_type,
|
||||||
|
body=body,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Finally, we'll write out the config files.
|
||||||
|
|
||||||
|
# log config for the master process
|
||||||
|
master_log_config = generate_worker_log_config(environ, "master", data_dir)
|
||||||
|
shared_config["log_config"] = master_log_config
|
||||||
|
|
||||||
|
# Find application service registrations
|
||||||
|
appservice_registrations = None
|
||||||
|
appservice_registration_dir = os.environ.get("SYNAPSE_AS_REGISTRATION_DIR")
|
||||||
|
if appservice_registration_dir:
|
||||||
|
# Scan for all YAML files that should be application service registrations.
|
||||||
|
appservice_registrations = [
|
||||||
|
str(reg_path.resolve())
|
||||||
|
for reg_path in Path(appservice_registration_dir).iterdir()
|
||||||
|
if reg_path.suffix.lower() in (".yaml", ".yml")
|
||||||
|
]
|
||||||
|
|
||||||
|
workers_in_use = len(worker_types) > 0
|
||||||
|
|
||||||
|
# Shared homeserver config
|
||||||
|
convert(
|
||||||
|
"/conf/shared.yaml.j2",
|
||||||
|
"/conf/workers/shared.yaml",
|
||||||
|
shared_worker_config=yaml.dump(shared_config),
|
||||||
|
appservice_registrations=appservice_registrations,
|
||||||
|
enable_redis=workers_in_use,
|
||||||
|
workers_in_use=workers_in_use,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Nginx config
|
||||||
|
convert(
|
||||||
|
"/conf/nginx.conf.j2",
|
||||||
|
"/etc/nginx/conf.d/matrix-synapse.conf",
|
||||||
|
worker_locations=nginx_location_config,
|
||||||
|
upstream_directives=nginx_upstream_config,
|
||||||
|
tls_cert_path=os.environ.get("SYNAPSE_TLS_CERT"),
|
||||||
|
tls_key_path=os.environ.get("SYNAPSE_TLS_KEY"),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Supervisord config
|
||||||
|
os.makedirs("/etc/supervisor", exist_ok=True)
|
||||||
|
convert(
|
||||||
|
"/conf/supervisord.conf.j2",
|
||||||
|
"/etc/supervisor/supervisord.conf",
|
||||||
|
main_config_path=config_path,
|
||||||
|
enable_redis=workers_in_use,
|
||||||
|
)
|
||||||
|
|
||||||
|
convert(
|
||||||
|
"/conf/synapse.supervisord.conf.j2",
|
||||||
|
"/etc/supervisor/conf.d/synapse.conf",
|
||||||
|
workers=worker_descriptors,
|
||||||
|
main_config_path=config_path,
|
||||||
|
use_forking_launcher=environ.get("SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER"),
|
||||||
|
)
|
||||||
|
|
||||||
|
# healthcheck config
|
||||||
|
convert(
|
||||||
|
"/conf/healthcheck.sh.j2",
|
||||||
|
"/healthcheck.sh",
|
||||||
|
healthcheck_urls=healthcheck_urls,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Ensure the logging directory exists
|
||||||
|
log_dir = data_dir + "/logs"
|
||||||
|
if not os.path.exists(log_dir):
|
||||||
|
os.mkdir(log_dir)
|
||||||
|
|
||||||
|
|
||||||
|
def generate_worker_log_config(
|
||||||
|
environ: Mapping[str, str], worker_name: str, data_dir: str
|
||||||
|
) -> str:
|
||||||
|
"""Generate a log.config file for the given worker.
|
||||||
|
|
||||||
|
Returns: the path to the generated file
|
||||||
|
"""
|
||||||
|
# Check whether we should write worker logs to disk, in addition to the console
|
||||||
|
extra_log_template_args: Dict[str, Optional[str]] = {}
|
||||||
|
if environ.get("SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK"):
|
||||||
|
extra_log_template_args["LOG_FILE_PATH"] = f"{data_dir}/logs/{worker_name}.log"
|
||||||
|
|
||||||
|
extra_log_template_args["SYNAPSE_LOG_LEVEL"] = environ.get("SYNAPSE_LOG_LEVEL")
|
||||||
|
extra_log_template_args["SYNAPSE_LOG_SENSITIVE"] = environ.get(
|
||||||
|
"SYNAPSE_LOG_SENSITIVE"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Render and write the file
|
||||||
|
log_config_filepath = f"/conf/workers/{worker_name}.log.config"
|
||||||
|
convert(
|
||||||
|
"/conf/log.config",
|
||||||
|
log_config_filepath,
|
||||||
|
worker_name=worker_name,
|
||||||
|
**extra_log_template_args,
|
||||||
|
include_worker_name_in_log_line=environ.get(
|
||||||
|
"SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER"
|
||||||
|
),
|
||||||
|
)
|
||||||
|
return log_config_filepath
|
||||||
|
|
||||||
|
|
||||||
|
def main(args: List[str], environ: MutableMapping[str, str]) -> None:
|
||||||
|
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
|
||||||
|
config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
|
||||||
|
data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
|
||||||
|
|
||||||
|
# override SYNAPSE_NO_TLS, we don't support TLS in worker mode,
|
||||||
|
# this needs to be handled by a frontend proxy
|
||||||
|
environ["SYNAPSE_NO_TLS"] = "yes"
|
||||||
|
|
||||||
|
# Generate the base homeserver config if one does not yet exist
|
||||||
|
if not os.path.exists(config_path):
|
||||||
|
log("Generating base homeserver config")
|
||||||
|
generate_base_homeserver_config()
|
||||||
|
|
||||||
|
# This script may be run multiple times (mostly by Complement, see note at top of file).
|
||||||
|
# Don't re-configure workers in this instance.
|
||||||
|
mark_filepath = "/conf/workers_have_been_configured"
|
||||||
|
if not os.path.exists(mark_filepath):
|
||||||
|
# Always regenerate all other config files
|
||||||
|
generate_worker_files(environ, config_path, data_dir)
|
||||||
|
|
||||||
|
# Mark workers as being configured
|
||||||
|
with open(mark_filepath, "w") as f:
|
||||||
|
f.write("")
|
||||||
|
|
||||||
|
# Lifted right out of start.py
|
||||||
|
jemallocpath = "/usr/lib/%s-linux-gnu/libjemalloc.so.2" % (platform.machine(),)
|
||||||
|
|
||||||
|
if os.path.isfile(jemallocpath):
|
||||||
|
environ["LD_PRELOAD"] = jemallocpath
|
||||||
|
else:
|
||||||
|
log("Could not find %s, will not use" % (jemallocpath,))
|
||||||
|
|
||||||
|
# Start supervisord, which will start Synapse, all of the configured worker
|
||||||
|
# processes, redis, nginx etc. according to the config we created above.
|
||||||
|
log("Starting supervisord")
|
||||||
|
flush_buffers()
|
||||||
|
os.execle(
|
||||||
|
"/usr/local/bin/supervisord",
|
||||||
|
"supervisord",
|
||||||
|
"-c",
|
||||||
|
"/etc/supervisor/supervisord.conf",
|
||||||
|
environ,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main(sys.argv, os.environ)
|
@ -1,650 +1,218 @@
|
|||||||
#!/usr/bin/env python
|
#!/usr/local/bin/python
|
||||||
# Copyright 2021 The Matrix.org Foundation C.I.C.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
# This script reads environment variables and generates a shared Synapse worker,
|
|
||||||
# nginx and supervisord configs depending on the workers requested.
|
|
||||||
#
|
|
||||||
# The environment variables it reads are:
|
|
||||||
# * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver.
|
|
||||||
# * SYNAPSE_REPORT_STATS: Whether to report stats.
|
|
||||||
# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKER_CONFIG
|
|
||||||
# below. Leave empty for no workers, or set to '*' for all possible workers.
|
|
||||||
# * SYNAPSE_AS_REGISTRATION_DIR: If specified, a directory in which .yaml and .yml files
|
|
||||||
# will be treated as Application Service registration files.
|
|
||||||
# * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format.
|
|
||||||
# * SYNAPSE_TLS_KEY: Path to a TLS key. If this and SYNAPSE_TLS_CERT are specified,
|
|
||||||
# Nginx will be configured to serve TLS on port 8448.
|
|
||||||
# * SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER: Whether to use the forking launcher,
|
|
||||||
# only intended for usage in Complement at the moment.
|
|
||||||
# No stability guarantees are provided.
|
|
||||||
# * SYNAPSE_LOG_LEVEL: Set this to DEBUG, INFO, WARNING or ERROR to change the
|
|
||||||
# log level. INFO is the default.
|
|
||||||
# * SYNAPSE_LOG_SENSITIVE: If unset, SQL and SQL values won't be logged,
|
|
||||||
# regardless of the SYNAPSE_LOG_LEVEL setting.
|
|
||||||
#
|
|
||||||
# NOTE: According to Complement's ENTRYPOINT expectations for a homeserver image (as defined
|
|
||||||
# in the project's README), this script may be run multiple times, and functionality should
|
|
||||||
# continue to work if so.
|
|
||||||
|
|
||||||
|
import codecs
|
||||||
|
import glob
|
||||||
import os
|
import os
|
||||||
import platform
|
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
from pathlib import Path
|
|
||||||
from typing import Any, Dict, List, Mapping, MutableMapping, NoReturn, Optional, Set
|
|
||||||
|
|
||||||
import yaml
|
import jinja2
|
||||||
from jinja2 import Environment, FileSystemLoader
|
|
||||||
|
|
||||||
MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
|
|
||||||
|
|
||||||
|
|
||||||
WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
|
|
||||||
"pusher": {
|
|
||||||
"app": "synapse.app.pusher",
|
|
||||||
"listener_resources": [],
|
|
||||||
"endpoint_patterns": [],
|
|
||||||
"shared_extra_conf": {"start_pushers": False},
|
|
||||||
"worker_extra_conf": "",
|
|
||||||
},
|
|
||||||
"user_dir": {
|
|
||||||
"app": "synapse.app.generic_worker",
|
|
||||||
"listener_resources": ["client"],
|
|
||||||
"endpoint_patterns": [
|
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$"
|
|
||||||
],
|
|
||||||
"shared_extra_conf": {"update_user_directory_from_worker": "user_dir1"},
|
|
||||||
"worker_extra_conf": "",
|
|
||||||
},
|
|
||||||
"media_repository": {
|
|
||||||
"app": "synapse.app.media_repository",
|
|
||||||
"listener_resources": ["media"],
|
|
||||||
"endpoint_patterns": [
|
|
||||||
"^/_matrix/media/",
|
|
||||||
"^/_synapse/admin/v1/purge_media_cache$",
|
|
||||||
"^/_synapse/admin/v1/room/.*/media.*$",
|
|
||||||
"^/_synapse/admin/v1/user/.*/media.*$",
|
|
||||||
"^/_synapse/admin/v1/media/.*$",
|
|
||||||
"^/_synapse/admin/v1/quarantine_media/.*$",
|
|
||||||
],
|
|
||||||
"shared_extra_conf": {"enable_media_repo": False},
|
|
||||||
"worker_extra_conf": "enable_media_repo: true",
|
|
||||||
},
|
|
||||||
"appservice": {
|
|
||||||
"app": "synapse.app.generic_worker",
|
|
||||||
"listener_resources": [],
|
|
||||||
"endpoint_patterns": [],
|
|
||||||
"shared_extra_conf": {"notify_appservices_from_worker": "appservice1"},
|
|
||||||
"worker_extra_conf": "",
|
|
||||||
},
|
|
||||||
"federation_sender": {
|
|
||||||
"app": "synapse.app.federation_sender",
|
|
||||||
"listener_resources": [],
|
|
||||||
"endpoint_patterns": [],
|
|
||||||
"shared_extra_conf": {"send_federation": False},
|
|
||||||
"worker_extra_conf": "",
|
|
||||||
},
|
|
||||||
"synchrotron": {
|
|
||||||
"app": "synapse.app.generic_worker",
|
|
||||||
"listener_resources": ["client"],
|
|
||||||
"endpoint_patterns": [
|
|
||||||
"^/_matrix/client/(v2_alpha|r0|v3)/sync$",
|
|
||||||
"^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$",
|
|
||||||
"^/_matrix/client/(api/v1|r0|v3)/initialSync$",
|
|
||||||
"^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$",
|
|
||||||
],
|
|
||||||
"shared_extra_conf": {},
|
|
||||||
"worker_extra_conf": "",
|
|
||||||
},
|
|
||||||
"client_reader": {
|
|
||||||
"app": "synapse.app.generic_worker",
|
|
||||||
"listener_resources": ["client"],
|
|
||||||
"endpoint_patterns": [
|
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$",
|
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/joined_members$",
|
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$",
|
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/members$",
|
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/state$",
|
|
||||||
"^/_matrix/client/v1/rooms/.*/hierarchy$",
|
|
||||||
"^/_matrix/client/(v1|unstable)/rooms/.*/relations/",
|
|
||||||
"^/_matrix/client/v1/rooms/.*/threads$",
|
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/login$",
|
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/account/3pid$",
|
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/account/whoami$",
|
|
||||||
"^/_matrix/client/versions$",
|
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/voip/turnServer$",
|
|
||||||
"^/_matrix/client/(r0|v3|unstable)/register$",
|
|
||||||
"^/_matrix/client/(r0|v3|unstable)/auth/.*/fallback/web$",
|
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/messages$",
|
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/event",
|
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/joined_rooms",
|
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable/.*)/rooms/.*/aliases",
|
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/search",
|
|
||||||
],
|
|
||||||
"shared_extra_conf": {},
|
|
||||||
"worker_extra_conf": "",
|
|
||||||
},
|
|
||||||
"federation_reader": {
|
|
||||||
"app": "synapse.app.generic_worker",
|
|
||||||
"listener_resources": ["federation"],
|
|
||||||
"endpoint_patterns": [
|
|
||||||
"^/_matrix/federation/(v1|v2)/event/",
|
|
||||||
"^/_matrix/federation/(v1|v2)/state/",
|
|
||||||
"^/_matrix/federation/(v1|v2)/state_ids/",
|
|
||||||
"^/_matrix/federation/(v1|v2)/backfill/",
|
|
||||||
"^/_matrix/federation/(v1|v2)/get_missing_events/",
|
|
||||||
"^/_matrix/federation/(v1|v2)/publicRooms",
|
|
||||||
"^/_matrix/federation/(v1|v2)/query/",
|
|
||||||
"^/_matrix/federation/(v1|v2)/make_join/",
|
|
||||||
"^/_matrix/federation/(v1|v2)/make_leave/",
|
|
||||||
"^/_matrix/federation/(v1|v2)/send_join/",
|
|
||||||
"^/_matrix/federation/(v1|v2)/send_leave/",
|
|
||||||
"^/_matrix/federation/(v1|v2)/invite/",
|
|
||||||
"^/_matrix/federation/(v1|v2)/query_auth/",
|
|
||||||
"^/_matrix/federation/(v1|v2)/event_auth/",
|
|
||||||
"^/_matrix/federation/(v1|v2)/exchange_third_party_invite/",
|
|
||||||
"^/_matrix/federation/(v1|v2)/user/devices/",
|
|
||||||
"^/_matrix/federation/(v1|v2)/get_groups_publicised$",
|
|
||||||
"^/_matrix/key/v2/query",
|
|
||||||
],
|
|
||||||
"shared_extra_conf": {},
|
|
||||||
"worker_extra_conf": "",
|
|
||||||
},
|
|
||||||
"federation_inbound": {
|
|
||||||
"app": "synapse.app.generic_worker",
|
|
||||||
"listener_resources": ["federation"],
|
|
||||||
"endpoint_patterns": ["/_matrix/federation/(v1|v2)/send/"],
|
|
||||||
"shared_extra_conf": {},
|
|
||||||
"worker_extra_conf": "",
|
|
||||||
},
|
|
||||||
"event_persister": {
|
|
||||||
"app": "synapse.app.generic_worker",
|
|
||||||
"listener_resources": ["replication"],
|
|
||||||
"endpoint_patterns": [],
|
|
||||||
"shared_extra_conf": {},
|
|
||||||
"worker_extra_conf": "",
|
|
||||||
},
|
|
||||||
"background_worker": {
|
|
||||||
"app": "synapse.app.generic_worker",
|
|
||||||
"listener_resources": [],
|
|
||||||
"endpoint_patterns": [],
|
|
||||||
# This worker cannot be sharded. Therefore there should only ever be one background
|
|
||||||
# worker, and it should be named background_worker1
|
|
||||||
"shared_extra_conf": {"run_background_tasks_on": "background_worker1"},
|
|
||||||
"worker_extra_conf": "",
|
|
||||||
},
|
|
||||||
"event_creator": {
|
|
||||||
"app": "synapse.app.generic_worker",
|
|
||||||
"listener_resources": ["client"],
|
|
||||||
"endpoint_patterns": [
|
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact",
|
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send",
|
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$",
|
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/join/",
|
|
||||||
"^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
|
|
||||||
"^/_matrix/client/(v1|unstable/org.matrix.msc2716)/rooms/.*/batch_send",
|
|
||||||
],
|
|
||||||
"shared_extra_conf": {},
|
|
||||||
"worker_extra_conf": "",
|
|
||||||
},
|
|
||||||
"frontend_proxy": {
|
|
||||||
"app": "synapse.app.frontend_proxy",
|
|
||||||
"listener_resources": ["client", "replication"],
|
|
||||||
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
|
|
||||||
"shared_extra_conf": {},
|
|
||||||
"worker_extra_conf": (
|
|
||||||
"worker_main_http_uri: http://127.0.0.1:%d"
|
|
||||||
% (MAIN_PROCESS_HTTP_LISTENER_PORT,)
|
|
||||||
),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
# Templates for sections that may be inserted multiple times in config files
|
|
||||||
NGINX_LOCATION_CONFIG_BLOCK = """
|
|
||||||
location ~* {endpoint} {{
|
|
||||||
proxy_pass {upstream};
|
|
||||||
proxy_set_header X-Forwarded-For $remote_addr;
|
|
||||||
proxy_set_header X-Forwarded-Proto $scheme;
|
|
||||||
proxy_set_header Host $host;
|
|
||||||
}}
|
|
||||||
"""
|
|
||||||
|
|
||||||
NGINX_UPSTREAM_CONFIG_BLOCK = """
|
|
||||||
upstream {upstream_worker_type} {{
|
|
||||||
{body}
|
|
||||||
}}
|
|
||||||
"""
|
|
||||||
|
|
||||||
|
|
||||||
# Utility functions
|
# Utility functions
|
||||||
def log(txt: str) -> None:
|
def log(txt):
|
||||||
print(txt)
|
|
||||||
|
|
||||||
|
|
||||||
def error(txt: str) -> NoReturn:
|
|
||||||
print(txt, file=sys.stderr)
|
print(txt, file=sys.stderr)
|
||||||
|
|
||||||
|
|
||||||
|
def error(txt):
|
||||||
|
log(txt)
|
||||||
sys.exit(2)
|
sys.exit(2)
|
||||||
|
|
||||||
|
|
||||||
def flush_buffers() -> None:
|
def convert(src, dst, environ):
|
||||||
sys.stdout.flush()
|
|
||||||
sys.stderr.flush()
|
|
||||||
|
|
||||||
|
|
||||||
def convert(src: str, dst: str, **template_vars: object) -> None:
|
|
||||||
"""Generate a file from a template
|
"""Generate a file from a template
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
src: Path to the input file.
|
src (str): path to input file
|
||||||
dst: Path to write to.
|
dst (str): path to file to write
|
||||||
template_vars: The arguments to replace placeholder variables in the template with.
|
environ (dict): environment dictionary, for replacement mappings.
|
||||||
"""
|
"""
|
||||||
# Read the template file
|
with open(src) as infile:
|
||||||
# We disable autoescape to prevent template variables from being escaped,
|
template = infile.read()
|
||||||
# as we're not using HTML.
|
rendered = jinja2.Template(template).render(**environ)
|
||||||
env = Environment(loader=FileSystemLoader(os.path.dirname(src)), autoescape=False)
|
with open(dst, "w") as outfile:
|
||||||
template = env.get_template(os.path.basename(src))
|
|
||||||
|
|
||||||
# Generate a string from the template.
|
|
||||||
rendered = template.render(**template_vars)
|
|
||||||
|
|
||||||
# Write the generated contents to a file
|
|
||||||
#
|
|
||||||
# We use append mode in case the files have already been written to by something else
|
|
||||||
# (for instance, as part of the instructions in a dockerfile).
|
|
||||||
with open(dst, "a") as outfile:
|
|
||||||
# In case the existing file doesn't end with a newline
|
|
||||||
outfile.write("\n")
|
|
||||||
|
|
||||||
outfile.write(rendered)
|
outfile.write(rendered)
|
||||||
|
|
||||||
|
|
||||||
def add_sharding_to_shared_config(
|
def generate_config_from_template(config_dir, config_path, environ):
|
||||||
shared_config: dict,
|
"""Generate a homeserver.yaml from environment variables
|
||||||
worker_type: str,
|
|
||||||
worker_name: str,
|
|
||||||
worker_port: int,
|
|
||||||
) -> None:
|
|
||||||
"""Given a dictionary representing a config file shared across all workers,
|
|
||||||
append sharded worker information to it for the current worker_type instance.
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
shared_config: The config dict that all worker instances share (after being converted to YAML)
|
config_dir (str): where to put generated config files
|
||||||
worker_type: The type of worker (one of those defined in WORKERS_CONFIG).
|
config_path (str): where to put the main config file
|
||||||
worker_name: The name of the worker instance.
|
environ (dict): environment dictionary
|
||||||
worker_port: The HTTP replication port that the worker instance is listening on.
|
|
||||||
"""
|
"""
|
||||||
# The instance_map config field marks the workers that write to various replication streams
|
for v in ("SYNAPSE_SERVER_NAME", "SYNAPSE_REPORT_STATS"):
|
||||||
instance_map = shared_config.setdefault("instance_map", {})
|
if v not in environ:
|
||||||
|
error(
|
||||||
# Worker-type specific sharding config
|
"Environment variable '%s' is mandatory when generating a config file."
|
||||||
if worker_type == "pusher":
|
% (v,)
|
||||||
shared_config.setdefault("pusher_instances", []).append(worker_name)
|
|
||||||
|
|
||||||
elif worker_type == "federation_sender":
|
|
||||||
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
|
|
||||||
|
|
||||||
elif worker_type == "event_persister":
|
|
||||||
# Event persisters write to the events stream, so we need to update
|
|
||||||
# the list of event stream writers
|
|
||||||
shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
|
|
||||||
worker_name
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# Map of stream writer instance names to host/ports combos
|
# populate some params from data files (if they exist, else create new ones)
|
||||||
instance_map[worker_name] = {
|
environ = environ.copy()
|
||||||
"host": "localhost",
|
secrets = {
|
||||||
"port": worker_port,
|
"registration": "SYNAPSE_REGISTRATION_SHARED_SECRET",
|
||||||
|
"macaroon": "SYNAPSE_MACAROON_SECRET_KEY",
|
||||||
}
|
}
|
||||||
|
|
||||||
elif worker_type == "media_repository":
|
for name, secret in secrets.items():
|
||||||
# The first configured media worker will run the media background jobs
|
if secret not in environ:
|
||||||
shared_config.setdefault("media_instance_running_background_jobs", worker_name)
|
filename = "/data/%s.%s.key" % (environ["SYNAPSE_SERVER_NAME"], name)
|
||||||
|
|
||||||
|
# if the file already exists, load in the existing value; otherwise,
|
||||||
|
# generate a new secret and write it to a file
|
||||||
|
|
||||||
|
if os.path.exists(filename):
|
||||||
|
log("Reading %s from %s" % (secret, filename))
|
||||||
|
with open(filename) as handle:
|
||||||
|
value = handle.read()
|
||||||
|
else:
|
||||||
|
log("Generating a random secret for {}".format(secret))
|
||||||
|
value = codecs.encode(os.urandom(32), "hex").decode()
|
||||||
|
with open(filename, "w") as handle:
|
||||||
|
handle.write(value)
|
||||||
|
environ[secret] = value
|
||||||
|
|
||||||
|
environ["SYNAPSE_APPSERVICES"] = glob.glob("/data/appservices/*.yaml")
|
||||||
|
if not os.path.exists(config_dir):
|
||||||
|
os.mkdir(config_dir)
|
||||||
|
|
||||||
|
# Convert SYNAPSE_NO_TLS to boolean if exists
|
||||||
|
if "SYNAPSE_NO_TLS" in environ:
|
||||||
|
tlsanswerstring = str.lower(environ["SYNAPSE_NO_TLS"])
|
||||||
|
if tlsanswerstring in ("true", "on", "1", "yes"):
|
||||||
|
environ["SYNAPSE_NO_TLS"] = True
|
||||||
|
else:
|
||||||
|
if tlsanswerstring in ("false", "off", "0", "no"):
|
||||||
|
environ["SYNAPSE_NO_TLS"] = False
|
||||||
|
else:
|
||||||
|
error(
|
||||||
|
'Environment variable "SYNAPSE_NO_TLS" found but value "'
|
||||||
|
+ tlsanswerstring
|
||||||
|
+ '" unrecognized; exiting.'
|
||||||
|
)
|
||||||
|
|
||||||
|
if "SYNAPSE_LOG_CONFIG" not in environ:
|
||||||
|
environ["SYNAPSE_LOG_CONFIG"] = config_dir + "/log.config"
|
||||||
|
|
||||||
|
log("Generating synapse config file " + config_path)
|
||||||
|
convert("/conf/homeserver.yaml", config_path, environ)
|
||||||
|
|
||||||
|
log_config_file = environ["SYNAPSE_LOG_CONFIG"]
|
||||||
|
log("Generating log config file " + log_config_file)
|
||||||
|
convert("/conf/log.config", log_config_file, environ)
|
||||||
|
|
||||||
|
# Hopefully we already have a signing key, but generate one if not.
|
||||||
|
args = [
|
||||||
|
"python",
|
||||||
|
"-m",
|
||||||
|
"synapse.app.homeserver",
|
||||||
|
"--config-path",
|
||||||
|
config_path,
|
||||||
|
# tell synapse to put generated keys in /data rather than /compiled
|
||||||
|
"--keys-directory",
|
||||||
|
config_dir,
|
||||||
|
"--generate-keys",
|
||||||
|
]
|
||||||
|
|
||||||
|
subprocess.check_output(args)
|
||||||
|
|
||||||
|
|
||||||
def generate_base_homeserver_config() -> None:
|
def run_generate_config(environ):
|
||||||
"""Starts Synapse and generates a basic homeserver config, which will later be
|
"""Run synapse with a --generate-config param to generate a template config file
|
||||||
modified for worker support.
|
|
||||||
|
|
||||||
Raises: CalledProcessError if calling start.py returned a non-zero exit code.
|
|
||||||
"""
|
|
||||||
# start.py already does this for us, so just call that.
|
|
||||||
# note that this script is copied in in the official, monolith dockerfile
|
|
||||||
os.environ["SYNAPSE_HTTP_PORT"] = str(MAIN_PROCESS_HTTP_LISTENER_PORT)
|
|
||||||
subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)
|
|
||||||
|
|
||||||
|
|
||||||
def generate_worker_files(
|
|
||||||
environ: Mapping[str, str], config_path: str, data_dir: str
|
|
||||||
) -> None:
|
|
||||||
"""Read the desired list of workers from environment variables and generate
|
|
||||||
shared homeserver, nginx and supervisord configs.
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
environ: os.environ instance.
|
environ (dict): env var dict
|
||||||
config_path: The location of the generated Synapse main worker config file.
|
|
||||||
data_dir: The location of the synapse data directory. Where log and
|
Never returns.
|
||||||
user-facing config files live.
|
|
||||||
"""
|
"""
|
||||||
# Note that yaml cares about indentation, so care should be taken to insert lines
|
for v in ("SYNAPSE_SERVER_NAME", "SYNAPSE_REPORT_STATS"):
|
||||||
# into files at the correct indentation below.
|
if v not in environ:
|
||||||
|
error("Environment variable '%s' is mandatory in `generate` mode." % (v,))
|
||||||
|
|
||||||
# shared_config is the contents of a Synapse config file that will be shared amongst
|
server_name = environ["SYNAPSE_SERVER_NAME"]
|
||||||
# the main Synapse process as well as all workers.
|
|
||||||
# It is intended mainly for disabling functionality when certain workers are spun up,
|
|
||||||
# and adding a replication listener.
|
|
||||||
|
|
||||||
# First read the original config file and extract the listeners block. Then we'll add
|
|
||||||
# another listener for replication. Later we'll write out the result to the shared
|
|
||||||
# config file.
|
|
||||||
listeners = [
|
|
||||||
{
|
|
||||||
"port": 9093,
|
|
||||||
"bind_address": "127.0.0.1",
|
|
||||||
"type": "http",
|
|
||||||
"resources": [{"names": ["replication"]}],
|
|
||||||
}
|
|
||||||
]
|
|
||||||
with open(config_path) as file_stream:
|
|
||||||
original_config = yaml.safe_load(file_stream)
|
|
||||||
original_listeners = original_config.get("listeners")
|
|
||||||
if original_listeners:
|
|
||||||
listeners += original_listeners
|
|
||||||
|
|
||||||
# The shared homeserver config. The contents of which will be inserted into the
|
|
||||||
# base shared worker jinja2 template.
|
|
||||||
#
|
|
||||||
# This config file will be passed to all workers, included Synapse's main process.
|
|
||||||
shared_config: Dict[str, Any] = {"listeners": listeners}
|
|
||||||
|
|
||||||
# List of dicts that describe workers.
|
|
||||||
# We pass this to the Supervisor template later to generate the appropriate
|
|
||||||
# program blocks.
|
|
||||||
worker_descriptors: List[Dict[str, Any]] = []
|
|
||||||
|
|
||||||
# Upstreams for load-balancing purposes. This dict takes the form of a worker type to the
|
|
||||||
# ports of each worker. For example:
|
|
||||||
# {
|
|
||||||
# worker_type: {1234, 1235, ...}}
|
|
||||||
# }
|
|
||||||
# and will be used to construct 'upstream' nginx directives.
|
|
||||||
nginx_upstreams: Dict[str, Set[int]] = {}
|
|
||||||
|
|
||||||
# A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what will be
|
|
||||||
# placed after the proxy_pass directive. The main benefit to representing this data as a
|
|
||||||
# dict over a str is that we can easily deduplicate endpoints across multiple instances
|
|
||||||
# of the same worker.
|
|
||||||
#
|
|
||||||
# An nginx site config that will be amended to depending on the workers that are
|
|
||||||
# spun up. To be placed in /etc/nginx/conf.d.
|
|
||||||
nginx_locations = {}
|
|
||||||
|
|
||||||
# Read the desired worker configuration from the environment
|
|
||||||
worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip()
|
|
||||||
if not worker_types_env:
|
|
||||||
# No workers, just the main process
|
|
||||||
worker_types = []
|
|
||||||
else:
|
|
||||||
# Split type names by comma, ignoring whitespace.
|
|
||||||
worker_types = [x.strip() for x in worker_types_env.split(",")]
|
|
||||||
|
|
||||||
# Create the worker configuration directory if it doesn't already exist
|
|
||||||
os.makedirs("/conf/workers", exist_ok=True)
|
|
||||||
|
|
||||||
# Start worker ports from this arbitrary port
|
|
||||||
worker_port = 18009
|
|
||||||
|
|
||||||
# A counter of worker_type -> int. Used for determining the name for a given
|
|
||||||
# worker type when generating its config file, as each worker's name is just
|
|
||||||
# worker_type + instance #
|
|
||||||
worker_type_counter: Dict[str, int] = {}
|
|
||||||
|
|
||||||
# A list of internal endpoints to healthcheck, starting with the main process
|
|
||||||
# which exists even if no workers do.
|
|
||||||
healthcheck_urls = ["http://localhost:8080/health"]
|
|
||||||
|
|
||||||
# For each worker type specified by the user, create config values
|
|
||||||
for worker_type in worker_types:
|
|
||||||
worker_config = WORKERS_CONFIG.get(worker_type)
|
|
||||||
if worker_config:
|
|
||||||
worker_config = worker_config.copy()
|
|
||||||
else:
|
|
||||||
log(worker_type + " is an unknown worker type! It will be ignored")
|
|
||||||
continue
|
|
||||||
|
|
||||||
new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1
|
|
||||||
worker_type_counter[worker_type] = new_worker_count
|
|
||||||
|
|
||||||
# Name workers by their type concatenated with an incrementing number
|
|
||||||
# e.g. federation_reader1
|
|
||||||
worker_name = worker_type + str(new_worker_count)
|
|
||||||
worker_config.update(
|
|
||||||
{"name": worker_name, "port": str(worker_port), "config_path": config_path}
|
|
||||||
)
|
|
||||||
|
|
||||||
# Update the shared config with any worker-type specific options
|
|
||||||
shared_config.update(worker_config["shared_extra_conf"])
|
|
||||||
|
|
||||||
healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
|
|
||||||
|
|
||||||
# Check if more than one instance of this worker type has been specified
|
|
||||||
worker_type_total_count = worker_types.count(worker_type)
|
|
||||||
if worker_type_total_count > 1:
|
|
||||||
# Update the shared config with sharding-related options if necessary
|
|
||||||
add_sharding_to_shared_config(
|
|
||||||
shared_config, worker_type, worker_name, worker_port
|
|
||||||
)
|
|
||||||
|
|
||||||
# Enable the worker in supervisord
|
|
||||||
worker_descriptors.append(worker_config)
|
|
||||||
|
|
||||||
# Add nginx location blocks for this worker's endpoints (if any are defined)
|
|
||||||
for pattern in worker_config["endpoint_patterns"]:
|
|
||||||
# Determine whether we need to load-balance this worker
|
|
||||||
if worker_type_total_count > 1:
|
|
||||||
# Create or add to a load-balanced upstream for this worker
|
|
||||||
nginx_upstreams.setdefault(worker_type, set()).add(worker_port)
|
|
||||||
|
|
||||||
# Upstreams are named after the worker_type
|
|
||||||
upstream = "http://" + worker_type
|
|
||||||
else:
|
|
||||||
upstream = "http://localhost:%d" % (worker_port,)
|
|
||||||
|
|
||||||
# Note that this endpoint should proxy to this upstream
|
|
||||||
nginx_locations[pattern] = upstream
|
|
||||||
|
|
||||||
# Write out the worker's logging config file
|
|
||||||
|
|
||||||
log_config_filepath = generate_worker_log_config(environ, worker_name, data_dir)
|
|
||||||
|
|
||||||
# Then a worker config file
|
|
||||||
convert(
|
|
||||||
"/conf/worker.yaml.j2",
|
|
||||||
"/conf/workers/{name}.yaml".format(name=worker_name),
|
|
||||||
**worker_config,
|
|
||||||
worker_log_config_filepath=log_config_filepath,
|
|
||||||
)
|
|
||||||
|
|
||||||
worker_port += 1
|
|
||||||
|
|
||||||
# Build the nginx location config blocks
|
|
||||||
nginx_location_config = ""
|
|
||||||
for endpoint, upstream in nginx_locations.items():
|
|
||||||
nginx_location_config += NGINX_LOCATION_CONFIG_BLOCK.format(
|
|
||||||
endpoint=endpoint,
|
|
||||||
upstream=upstream,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Determine the load-balancing upstreams to configure
|
|
||||||
nginx_upstream_config = ""
|
|
||||||
|
|
||||||
for upstream_worker_type, upstream_worker_ports in nginx_upstreams.items():
|
|
||||||
body = ""
|
|
||||||
for port in upstream_worker_ports:
|
|
||||||
body += " server localhost:%d;\n" % (port,)
|
|
||||||
|
|
||||||
# Add to the list of configured upstreams
|
|
||||||
nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format(
|
|
||||||
upstream_worker_type=upstream_worker_type,
|
|
||||||
body=body,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Finally, we'll write out the config files.
|
|
||||||
|
|
||||||
# log config for the master process
|
|
||||||
master_log_config = generate_worker_log_config(environ, "master", data_dir)
|
|
||||||
shared_config["log_config"] = master_log_config
|
|
||||||
|
|
||||||
# Find application service registrations
|
|
||||||
appservice_registrations = None
|
|
||||||
appservice_registration_dir = os.environ.get("SYNAPSE_AS_REGISTRATION_DIR")
|
|
||||||
if appservice_registration_dir:
|
|
||||||
# Scan for all YAML files that should be application service registrations.
|
|
||||||
appservice_registrations = [
|
|
||||||
str(reg_path.resolve())
|
|
||||||
for reg_path in Path(appservice_registration_dir).iterdir()
|
|
||||||
if reg_path.suffix.lower() in (".yaml", ".yml")
|
|
||||||
]
|
|
||||||
|
|
||||||
workers_in_use = len(worker_types) > 0
|
|
||||||
|
|
||||||
# Shared homeserver config
|
|
||||||
convert(
|
|
||||||
"/conf/shared.yaml.j2",
|
|
||||||
"/conf/workers/shared.yaml",
|
|
||||||
shared_worker_config=yaml.dump(shared_config),
|
|
||||||
appservice_registrations=appservice_registrations,
|
|
||||||
enable_redis=workers_in_use,
|
|
||||||
workers_in_use=workers_in_use,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Nginx config
|
|
||||||
convert(
|
|
||||||
"/conf/nginx.conf.j2",
|
|
||||||
"/etc/nginx/conf.d/matrix-synapse.conf",
|
|
||||||
worker_locations=nginx_location_config,
|
|
||||||
upstream_directives=nginx_upstream_config,
|
|
||||||
tls_cert_path=os.environ.get("SYNAPSE_TLS_CERT"),
|
|
||||||
tls_key_path=os.environ.get("SYNAPSE_TLS_KEY"),
|
|
||||||
)
|
|
||||||
|
|
||||||
# Supervisord config
|
|
||||||
os.makedirs("/etc/supervisor", exist_ok=True)
|
|
||||||
convert(
|
|
||||||
"/conf/supervisord.conf.j2",
|
|
||||||
"/etc/supervisor/supervisord.conf",
|
|
||||||
main_config_path=config_path,
|
|
||||||
enable_redis=workers_in_use,
|
|
||||||
)
|
|
||||||
|
|
||||||
convert(
|
|
||||||
"/conf/synapse.supervisord.conf.j2",
|
|
||||||
"/etc/supervisor/conf.d/synapse.conf",
|
|
||||||
workers=worker_descriptors,
|
|
||||||
main_config_path=config_path,
|
|
||||||
use_forking_launcher=environ.get("SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER"),
|
|
||||||
)
|
|
||||||
|
|
||||||
# healthcheck config
|
|
||||||
convert(
|
|
||||||
"/conf/healthcheck.sh.j2",
|
|
||||||
"/healthcheck.sh",
|
|
||||||
healthcheck_urls=healthcheck_urls,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Ensure the logging directory exists
|
|
||||||
log_dir = data_dir + "/logs"
|
|
||||||
if not os.path.exists(log_dir):
|
|
||||||
os.mkdir(log_dir)
|
|
||||||
|
|
||||||
|
|
||||||
def generate_worker_log_config(
|
|
||||||
environ: Mapping[str, str], worker_name: str, data_dir: str
|
|
||||||
) -> str:
|
|
||||||
"""Generate a log.config file for the given worker.
|
|
||||||
|
|
||||||
Returns: the path to the generated file
|
|
||||||
"""
|
|
||||||
# Check whether we should write worker logs to disk, in addition to the console
|
|
||||||
extra_log_template_args: Dict[str, Optional[str]] = {}
|
|
||||||
if environ.get("SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK"):
|
|
||||||
extra_log_template_args["LOG_FILE_PATH"] = f"{data_dir}/logs/{worker_name}.log"
|
|
||||||
|
|
||||||
extra_log_template_args["SYNAPSE_LOG_LEVEL"] = environ.get("SYNAPSE_LOG_LEVEL")
|
|
||||||
extra_log_template_args["SYNAPSE_LOG_SENSITIVE"] = environ.get(
|
|
||||||
"SYNAPSE_LOG_SENSITIVE"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Render and write the file
|
|
||||||
log_config_filepath = f"/conf/workers/{worker_name}.log.config"
|
|
||||||
convert(
|
|
||||||
"/conf/log.config",
|
|
||||||
log_config_filepath,
|
|
||||||
worker_name=worker_name,
|
|
||||||
**extra_log_template_args,
|
|
||||||
include_worker_name_in_log_line=environ.get(
|
|
||||||
"SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER"
|
|
||||||
),
|
|
||||||
)
|
|
||||||
return log_config_filepath
|
|
||||||
|
|
||||||
|
|
||||||
def main(args: List[str], environ: MutableMapping[str, str]) -> None:
|
|
||||||
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
|
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
|
||||||
config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
|
config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
|
||||||
data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
|
data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
|
||||||
|
|
||||||
# override SYNAPSE_NO_TLS, we don't support TLS in worker mode,
|
# create a suitable log config from our template
|
||||||
# this needs to be handled by a frontend proxy
|
log_config_file = "%s/%s.log.config" % (config_dir, server_name)
|
||||||
environ["SYNAPSE_NO_TLS"] = "yes"
|
if not os.path.exists(log_config_file):
|
||||||
|
log("Creating log config %s" % (log_config_file,))
|
||||||
|
convert("/conf/log.config", log_config_file, environ)
|
||||||
|
|
||||||
# Generate the base homeserver config if one does not yet exist
|
args = [
|
||||||
if not os.path.exists(config_path):
|
"python",
|
||||||
log("Generating base homeserver config")
|
"-m",
|
||||||
generate_base_homeserver_config()
|
"synapse.app.homeserver",
|
||||||
|
"--server-name",
|
||||||
|
server_name,
|
||||||
|
"--report-stats",
|
||||||
|
environ["SYNAPSE_REPORT_STATS"],
|
||||||
|
"--config-path",
|
||||||
|
config_path,
|
||||||
|
"--config-directory",
|
||||||
|
config_dir,
|
||||||
|
"--data-directory",
|
||||||
|
data_dir,
|
||||||
|
"--generate-config",
|
||||||
|
"--open-private-ports",
|
||||||
|
]
|
||||||
|
# log("running %s" % (args, ))
|
||||||
|
|
||||||
# This script may be run multiple times (mostly by Complement, see note at top of file).
|
os.execv("/usr/local/bin/python", args)
|
||||||
# Don't re-configure workers in this instance.
|
|
||||||
mark_filepath = "/conf/workers_have_been_configured"
|
|
||||||
if not os.path.exists(mark_filepath):
|
|
||||||
# Always regenerate all other config files
|
|
||||||
generate_worker_files(environ, config_path, data_dir)
|
|
||||||
|
|
||||||
# Mark workers as being configured
|
|
||||||
with open(mark_filepath, "w") as f:
|
|
||||||
f.write("")
|
|
||||||
|
|
||||||
# Lifted right out of start.py
|
def main(args, environ):
|
||||||
jemallocpath = "/usr/lib/%s-linux-gnu/libjemalloc.so.2" % (platform.machine(),)
|
mode = args[1] if len(args) > 1 else None
|
||||||
|
synapse_worker = environ.get("SYNAPSE_WORKER", "synapse.app.homeserver")
|
||||||
|
|
||||||
if os.path.isfile(jemallocpath):
|
# In generate mode, generate a configuration and missing keys, then exit
|
||||||
environ["LD_PRELOAD"] = jemallocpath
|
if mode == "generate":
|
||||||
else:
|
return run_generate_config(environ)
|
||||||
log("Could not find %s, will not use" % (jemallocpath,))
|
|
||||||
|
|
||||||
# Start supervisord, which will start Synapse, all of the configured worker
|
if mode == "migrate_config":
|
||||||
# processes, redis, nginx etc. according to the config we created above.
|
# generate a config based on environment vars.
|
||||||
log("Starting supervisord")
|
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
|
||||||
flush_buffers()
|
config_path = environ.get(
|
||||||
os.execle(
|
"SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml"
|
||||||
"/usr/local/bin/supervisord",
|
|
||||||
"supervisord",
|
|
||||||
"-c",
|
|
||||||
"/etc/supervisor/supervisord.conf",
|
|
||||||
environ,
|
|
||||||
)
|
)
|
||||||
|
return generate_config_from_template(
|
||||||
|
config_dir, config_path, environ
|
||||||
|
)
|
||||||
|
|
||||||
|
if mode is not None:
|
||||||
|
error("Unknown execution mode '%s'" % (mode,))
|
||||||
|
|
||||||
|
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
|
||||||
|
config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
|
||||||
|
|
||||||
|
if not os.path.exists(config_path):
|
||||||
|
if "SYNAPSE_SERVER_NAME" in environ:
|
||||||
|
error(
|
||||||
|
"""\
|
||||||
|
Config file '%s' does not exist.
|
||||||
|
|
||||||
|
The synapse docker image no longer supports generating a config file on-the-fly
|
||||||
|
based on environment variables. You can migrate to a static config file by
|
||||||
|
running with 'migrate_config'. See the README for more details.
|
||||||
|
"""
|
||||||
|
% (config_path,)
|
||||||
|
)
|
||||||
|
|
||||||
|
error(
|
||||||
|
"Config file '%s' does not exist. You should either create a new "
|
||||||
|
"config file by running with the `generate` argument (and then edit "
|
||||||
|
"the resulting file before restarting) or specify the path to an "
|
||||||
|
"existing config file with the SYNAPSE_CONFIG_PATH variable."
|
||||||
|
% (config_path,)
|
||||||
|
)
|
||||||
|
|
||||||
|
log("Starting synapse with config file " + config_path)
|
||||||
|
|
||||||
|
args = ["python", "-m", synapse_worker, "--config-path", config_path]
|
||||||
|
os.execv("/usr/local/bin/python", args)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
Loading…
Reference in New Issue
Block a user