Add Initial Code (Not production ready)

This commit is contained in:
deathrow 2022-10-31 19:41:26 -04:00
commit 1a6c426720
No known key found for this signature in database
GPG Key ID: FF39D67A22069F73
11 changed files with 1220 additions and 0 deletions

115
Worker.dockerfile Normal file
View File

@ -0,0 +1,115 @@
ARG SYNAPSE_VERSION=1.70.1
ARG HARDENED_MALLOC_VERSION=11
ARG UID=991
ARG GID=991
### Build Hardened Malloc
FROM alpine:latest as build-malloc
ARG HARDENED_MALLOC_VERSION
ARG CONFIG_NATIVE=false
ARG VARIANT=default
RUN apk --no-cache add build-base git gnupg && cd /tmp \
&& wget -q https://github.com/thestinger.gpg && gpg --import thestinger.gpg \
&& git clone --depth 1 --branch ${HARDENED_MALLOC_VERSION} https://github.com/GrapheneOS/hardened_malloc \
&& cd hardened_malloc && git verify-tag $(git describe --tags) \
&& make CONFIG_NATIVE=${CONFIG_NATIVE} VARIANT=${VARIANT}
### Nginx & Redis
FROM alpine:latest as deps_base
RUN apk --no-cache add nginx redis
### Redis Base
FROM redis:6-alpine AS redis_base
### Build Synapse
FROM python:alpine as builder
ARG SYNAPSE_VERSION
RUN apk -U upgrade \
&& apk add -t build-deps \
build-base \
libffi-dev \
libjpeg-turbo-dev \
libressl-dev \
libxslt-dev \
linux-headers \
postgresql-dev \
rustup \
zlib-dev \
&& rustup-init -y && source $HOME/.cargo/env \
&& pip install --upgrade pip \
&& pip install --prefix="/install" --no-warn-script-location \
matrix-synapse[all]==${SYNAPSE_VERSION}
### Worker Build Configuration
FROM python:alpine as worker_build
RUN --mount=type=cache,target=/root/.cache/pip \
pip install supervisor~=4.2
RUN mkdir -p /etc/supervisor/conf.d
RUN rm /etc/nginx/sites-enabled/default
RUN mkdir /var/log/nginx /var/lib/nginx
RUN chown www-data /var/lib/nginx
RUN ln -sf /dev/stdout /var/log/nginx/access.log
RUN ln -sf /dev/stderr /var/log/nginx/error.log
### Build Production
FROM python:alpine
ARG UID
ARG GID
RUN apk -U upgrade \
&& apk add -t run-deps \
libffi \
libgcc \
libjpeg-turbo \
libressl \
libstdc++ \
libxslt \
libpq \
zlib \
tzdata \
xmlsec \
git \
curl \
&& adduser -g ${GID} -u ${UID} --disabled-password --gecos "" synapse \
&& rm -rf /var/cache/apk/*
RUN pip install --upgrade pip \
&& pip install -e "git+https://github.com/matrix-org/mjolnir.git#egg=mjolnir&subdirectory=synapse_antispam"
COPY --from=build-malloc /tmp/hardened_malloc/out/libhardened_malloc.so /usr/local/lib/
COPY --from=builder /install /usr/local
COPY --chown=synapse:synapse rootfs /
COPY --from=redis_base /usr/local/bin/redis-server /usr/local/bin
COPY --from=deps_base /usr/sbin/nginx /usr/sbin
COPY --from=deps_base /usr/share/nginx /usr/share/nginx
COPY --from=deps_base /usr/lib/nginx /usr/lib/nginx
COPY --from=deps_base /etc/nginx /etc/nginx
COPY ./rootfs/conf-workers/* /conf/
# Copy a script to prefix log lines with the supervisor program name
COPY ./rootfs/prefix-log /usr/local/bin/
ENV LD_PRELOAD="/usr/local/lib/libhardened_malloc.so"
USER synapse
VOLUME /data
EXPOSE 8008/tcp
ENTRYPOINT ["python3", "start.py"]
HEALTHCHECK --start-period=5s --interval=15s --timeout=5s \
CMD /bin/sh /healthcheck.sh

16
readme.md Normal file
View File

@ -0,0 +1,16 @@
# Synapse Worker Docker
A docker image for synapse workers based off [Synapse-Docker](https://github.com/tommytran732/Synapse-Docker).
This is designed for users using Synapse inside of docker and wish to use workers.
It would be advisable to use this in conjunction with Synapse-Docker.
Uses alpine as the base images and features a hardened memory allocator and has mjonir support.
## Links
- [Synapse Docker](https://github.com/matrix-org/synapse/tree/develop/docker)
- [Docker Compose Workers](https://github.com/matrix-org/synapse/tree/develop/contrib/docker_compose_workers)

View File

@ -0,0 +1,6 @@
#!/bin/sh
# This healthcheck script is designed to return OK when every
# host involved returns OK
{%- for healthcheck_url in healthcheck_urls %}
curl -fSs {{ healthcheck_url }} || exit 1
{%- endfor %}

View File

@ -0,0 +1,43 @@
# This file contains the base config for the reverse proxy, as part of ../Dockerfile-workers.
# configure_workers_and_start.py uses and amends to this file depending on the workers
# that have been selected.
{{ upstream_directives }}
server {
# Listen on an unoccupied port number
listen 8008;
listen [::]:8008;
{% if tls_cert_path is not none and tls_key_path is not none %}
listen 8448 ssl;
listen [::]:8448 ssl;
ssl_certificate {{ tls_cert_path }};
ssl_certificate_key {{ tls_key_path }};
# Some directives from cipherlist.eu (fka cipherli.st):
ssl_protocols TLSv1 TLSv1.1 TLSv1.2 TLSv1.3;
ssl_prefer_server_ciphers on;
ssl_ciphers "EECDH+AESGCM:EDH+AESGCM:AES256+EECDH:AES256+EDH";
ssl_ecdh_curve secp384r1; # Requires nginx >= 1.1.0
ssl_session_cache shared:SSL:10m;
ssl_session_tickets off; # Requires nginx >= 1.5.9
{% endif %}
server_name localhost;
# Nginx by default only allows file uploads up to 1M in size
# Increase client_max_body_size to match max_upload_size defined in homeserver.yaml
client_max_body_size 100M;
{{ worker_locations }}
# Send all other traffic to the main process
location ~* ^(\\/_matrix|\\/_synapse) {
proxy_pass http://localhost:8080;
proxy_set_header X-Forwarded-For $remote_addr;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header Host $host;
}
}

View File

@ -0,0 +1,20 @@
# This file contains the base for the shared homeserver config file between Synapse workers,
# as part of ./Dockerfile-workers.
# configure_workers_and_start.py uses and amends to this file depending on the workers
# that have been selected.
{% if enable_redis %}
redis:
enabled: true
{% endif %}
{% if appservice_registrations is not none %}
## Application Services ##
# A list of application service config files to use.
app_service_config_files:
{%- for path in appservice_registrations %}
- "{{ path }}"
{%- endfor %}
{%- endif %}
{{ shared_worker_config }}

View File

@ -0,0 +1,32 @@
# This file contains the base config for supervisord, as part of ../Dockerfile-workers.
# configure_workers_and_start.py uses and amends to this file depending on the workers
# that have been selected.
[supervisord]
nodaemon=true
user=root
[include]
files = /etc/supervisor/conf.d/*.conf
[program:nginx]
command=/usr/local/bin/prefix-log /usr/sbin/nginx -g "daemon off;"
priority=500
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
username=www-data
autorestart=true
[program:redis]
command=/usr/local/bin/prefix-log /usr/local/bin/redis-server
priority=1
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
username=redis
autorestart=true
# Redis can be disabled if the image is being used without workers
autostart={{ enable_redis }}

View File

@ -0,0 +1,52 @@
{% if use_forking_launcher %}
[program:synapse_fork]
command=/usr/local/bin/python -m synapse.app.complement_fork_starter
{{ main_config_path }}
synapse.app.homeserver
--config-path="{{ main_config_path }}"
--config-path=/conf/workers/shared.yaml
{%- for worker in workers %}
-- {{ worker.app }}
--config-path="{{ main_config_path }}"
--config-path=/conf/workers/shared.yaml
--config-path=/conf/workers/{{ worker.name }}.yaml
{%- endfor %}
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
autorestart=unexpected
exitcodes=0
{% else %}
[program:synapse_main]
command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app.homeserver
--config-path="{{ main_config_path }}"
--config-path=/conf/workers/shared.yaml
priority=10
# Log startup failures to supervisord's stdout/err
# Regular synapse logs will still go in the configured data directory
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
autorestart=unexpected
exitcodes=0
{% for worker in workers %}
[program:synapse_{{ worker.name }}]
command=/usr/local/bin/prefix-log /usr/local/bin/python -m {{ worker.app }}
--config-path="{{ main_config_path }}"
--config-path=/conf/workers/shared.yaml
--config-path=/conf/workers/{{ worker.name }}.yaml
autorestart=unexpected
priority=500
exitcodes=0
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
{% endfor %}
{% endif %}

View File

@ -0,0 +1,26 @@
# This is a configuration template for a single worker instance, and is
# used by Dockerfile-workers.
# Values will be change depending on whichever workers are selected when
# running that image.
worker_app: "{{ app }}"
worker_name: "{{ name }}"
# The replication listener on the main synapse process.
worker_replication_host: 127.0.0.1
worker_replication_http_port: 9093
worker_listeners:
- type: http
port: {{ port }}
{% if listener_resources %}
resources:
- names:
{%- for resource in listener_resources %}
- {{ resource }}
{%- endfor %}
{% endif %}
worker_log_config: {{ worker_log_config_filepath }}
{{ worker_extra_conf }}

186
rootfs/conf/homeserver.yaml Normal file
View File

@ -0,0 +1,186 @@
# vim:ft=yaml
## TLS ##
{% if not SYNAPSE_NO_TLS %}
tls_certificate_path: "/data/{{ SYNAPSE_SERVER_NAME }}.tls.crt"
tls_private_key_path: "/data/{{ SYNAPSE_SERVER_NAME }}.tls.key"
{% endif %}
## Server ##
server_name: "{{ SYNAPSE_SERVER_NAME }}"
pid_file: /homeserver.pid
web_client: False
soft_file_limit: 0
log_config: "{{ SYNAPSE_LOG_CONFIG }}"
## Ports ##
listeners:
{% if not SYNAPSE_NO_TLS %}
-
port: 8448
bind_addresses: ['::']
type: http
tls: true
x_forwarded: false
resources:
- names: [client]
compress: true
- names: [federation] # Federation APIs
compress: false
{% endif %}
# Allow configuring in case we want to reverse proxy 8008
# using another process in the same container
- port: {{ SYNAPSE_HTTP_PORT or 8008 }}
tls: false
bind_addresses: ['::']
type: http
x_forwarded: false
resources:
- names: [client]
compress: true
- names: [federation]
compress: false
## Database ##
{% if POSTGRES_PASSWORD %}
database:
name: "psycopg2"
args:
user: "{{ POSTGRES_USER or "synapse" }}"
password: "{{ POSTGRES_PASSWORD }}"
database: "{{ POSTGRES_DB or "synapse" }}"
host: "{{ POSTGRES_HOST or "db" }}"
port: "{{ POSTGRES_PORT or "5432" }}"
cp_min: 5
cp_max: 10
{% else %}
database:
name: "sqlite3"
args:
database: "/data/homeserver.db"
{% endif %}
## Performance ##
event_cache_size: "{{ SYNAPSE_EVENT_CACHE_SIZE or "10K" }}"
## Ratelimiting ##
rc_messages_per_second: 0.2
rc_message_burst_count: 10.0
federation_rc_window_size: 1000
federation_rc_sleep_limit: 10
federation_rc_sleep_delay: 500
federation_rc_reject_limit: 50
federation_rc_concurrent: 3
## Files ##
media_store_path: "/data/media"
max_upload_size: "{{ SYNAPSE_MAX_UPLOAD_SIZE or "50M" }}"
max_image_pixels: "32M"
dynamic_thumbnails: false
# List of thumbnail to precalculate when an image is uploaded.
thumbnail_sizes:
- width: 32
height: 32
method: crop
- width: 96
height: 96
method: crop
- width: 320
height: 240
method: scale
- width: 640
height: 480
method: scale
- width: 800
height: 600
method: scale
url_preview_enabled: False
max_spider_size: "10M"
## Captcha ##
{% if SYNAPSE_RECAPTCHA_PUBLIC_KEY %}
recaptcha_public_key: "{{ SYNAPSE_RECAPTCHA_PUBLIC_KEY }}"
recaptcha_private_key: "{{ SYNAPSE_RECAPTCHA_PRIVATE_KEY }}"
enable_registration_captcha: True
recaptcha_siteverify_api: "https://www.google.com/recaptcha/api/siteverify"
{% else %}
recaptcha_public_key: "YOUR_PUBLIC_KEY"
recaptcha_private_key: "YOUR_PRIVATE_KEY"
enable_registration_captcha: False
recaptcha_siteverify_api: "https://www.google.com/recaptcha/api/siteverify"
{% endif %}
## Turn ##
{% if SYNAPSE_TURN_URIS %}
turn_uris:
{% for uri in SYNAPSE_TURN_URIS.split(',') %} - "{{ uri }}"
{% endfor %}
turn_shared_secret: "{{ SYNAPSE_TURN_SECRET }}"
turn_user_lifetime: "1h"
turn_allow_guests: True
{% else %}
turn_uris: []
turn_shared_secret: "YOUR_SHARED_SECRET"
turn_user_lifetime: "1h"
turn_allow_guests: True
{% endif %}
## Registration ##
enable_registration: {{ "True" if SYNAPSE_ENABLE_REGISTRATION else "False" }}
registration_shared_secret: "{{ SYNAPSE_REGISTRATION_SHARED_SECRET }}"
bcrypt_rounds: 12
allow_guest_access: {{ "True" if SYNAPSE_ALLOW_GUEST else "False" }}
enable_group_creation: true
## Metrics ###
{% if SYNAPSE_REPORT_STATS.lower() == "yes" %}
enable_metrics: True
report_stats: True
{% else %}
enable_metrics: False
report_stats: False
{% endif %}
## API Configuration ##
{% if SYNAPSE_APPSERVICES %}
app_service_config_files:
{% for appservice in SYNAPSE_APPSERVICES %} - "{{ appservice }}"
{% endfor %}
{% endif %}
macaroon_secret_key: "{{ SYNAPSE_MACAROON_SECRET_KEY }}"
expire_access_token: False
## Signing Keys ##
signing_key_path: "/data/{{ SYNAPSE_SERVER_NAME }}.signing.key"
old_signing_keys: {}
key_refresh_interval: "1d" # 1 Day.
# The trusted servers to download signing keys from.
trusted_key_servers:
- server_name: matrix.org
verify_keys:
"ed25519:auto": "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
password_config:
enabled: true

73
rootfs/conf/log.config Normal file
View File

@ -0,0 +1,73 @@
version: 1
formatters:
precise:
{% if include_worker_name_in_log_line %}
format: '{{ worker_name }} | %(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
{% else %}
format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
{% endif %}
handlers:
{% if LOG_FILE_PATH %}
file:
class: logging.handlers.TimedRotatingFileHandler
formatter: precise
filename: {{ LOG_FILE_PATH }}
when: "midnight"
backupCount: 6 # Does not include the current log file.
encoding: utf8
# Default to buffering writes to log file for efficiency.
# WARNING/ERROR logs will still be flushed immediately, but there will be a
# delay (of up to `period` seconds, or until the buffer is full with
# `capacity` messages) before INFO/DEBUG logs get written.
buffer:
class: synapse.logging.handlers.PeriodicallyFlushingMemoryHandler
target: file
# The capacity is the maximum number of log lines that are buffered
# before being written to disk. Increasing this will lead to better
# performance, at the expensive of it taking longer for log lines to
# be written to disk.
# This parameter is required.
capacity: 10
# Logs with a level at or above the flush level will cause the buffer to
# be flushed immediately.
# Default value: 40 (ERROR)
# Other values: 50 (CRITICAL), 30 (WARNING), 20 (INFO), 10 (DEBUG)
flushLevel: 30 # Flush immediately for WARNING logs and higher
# The period of time, in seconds, between forced flushes.
# Messages will not be delayed for longer than this time.
# Default value: 5 seconds
period: 5
{% endif %}
console:
class: logging.StreamHandler
formatter: precise
{% if not SYNAPSE_LOG_SENSITIVE %}
{#
If SYNAPSE_LOG_SENSITIVE is unset, then override synapse.storage.SQL to INFO
so that DEBUG entries (containing sensitive information) are not emitted.
#}
loggers:
synapse.storage.SQL:
# beware: increasing this to DEBUG will make synapse log sensitive
# information such as access tokens.
level: INFO
{% endif %}
root:
level: {{ SYNAPSE_LOG_LEVEL or "INFO" }}
{% if LOG_FILE_PATH %}
handlers: [console, buffer]
{% else %}
handlers: [console]
{% endif %}
disable_existing_loggers: false

651
rootfs/start.py Normal file
View File

@ -0,0 +1,651 @@
#!/usr/bin/env python
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This script reads environment variables and generates a shared Synapse worker,
# nginx and supervisord configs depending on the workers requested.
#
# The environment variables it reads are:
# * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver.
# * SYNAPSE_REPORT_STATS: Whether to report stats.
# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKER_CONFIG
# below. Leave empty for no workers, or set to '*' for all possible workers.
# * SYNAPSE_AS_REGISTRATION_DIR: If specified, a directory in which .yaml and .yml files
# will be treated as Application Service registration files.
# * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format.
# * SYNAPSE_TLS_KEY: Path to a TLS key. If this and SYNAPSE_TLS_CERT are specified,
# Nginx will be configured to serve TLS on port 8448.
# * SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER: Whether to use the forking launcher,
# only intended for usage in Complement at the moment.
# No stability guarantees are provided.
# * SYNAPSE_LOG_LEVEL: Set this to DEBUG, INFO, WARNING or ERROR to change the
# log level. INFO is the default.
# * SYNAPSE_LOG_SENSITIVE: If unset, SQL and SQL values won't be logged,
# regardless of the SYNAPSE_LOG_LEVEL setting.
#
# NOTE: According to Complement's ENTRYPOINT expectations for a homeserver image (as defined
# in the project's README), this script may be run multiple times, and functionality should
# continue to work if so.
import os
import platform
import subprocess
import sys
from pathlib import Path
from typing import Any, Dict, List, Mapping, MutableMapping, NoReturn, Optional, Set
import yaml
from jinja2 import Environment, FileSystemLoader
MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"pusher": {
"app": "synapse.app.pusher",
"listener_resources": [],
"endpoint_patterns": [],
"shared_extra_conf": {"start_pushers": False},
"worker_extra_conf": "",
},
"user_dir": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client"],
"endpoint_patterns": [
"^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$"
],
"shared_extra_conf": {"update_user_directory_from_worker": "user_dir1"},
"worker_extra_conf": "",
},
"media_repository": {
"app": "synapse.app.media_repository",
"listener_resources": ["media"],
"endpoint_patterns": [
"^/_matrix/media/",
"^/_synapse/admin/v1/purge_media_cache$",
"^/_synapse/admin/v1/room/.*/media.*$",
"^/_synapse/admin/v1/user/.*/media.*$",
"^/_synapse/admin/v1/media/.*$",
"^/_synapse/admin/v1/quarantine_media/.*$",
],
"shared_extra_conf": {"enable_media_repo": False},
"worker_extra_conf": "enable_media_repo: true",
},
"appservice": {
"app": "synapse.app.generic_worker",
"listener_resources": [],
"endpoint_patterns": [],
"shared_extra_conf": {"notify_appservices_from_worker": "appservice1"},
"worker_extra_conf": "",
},
"federation_sender": {
"app": "synapse.app.federation_sender",
"listener_resources": [],
"endpoint_patterns": [],
"shared_extra_conf": {"send_federation": False},
"worker_extra_conf": "",
},
"synchrotron": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client"],
"endpoint_patterns": [
"^/_matrix/client/(v2_alpha|r0|v3)/sync$",
"^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$",
"^/_matrix/client/(api/v1|r0|v3)/initialSync$",
"^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"client_reader": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client"],
"endpoint_patterns": [
"^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/joined_members$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/members$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/state$",
"^/_matrix/client/v1/rooms/.*/hierarchy$",
"^/_matrix/client/(v1|unstable)/rooms/.*/relations/",
"^/_matrix/client/v1/rooms/.*/threads$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/login$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/account/3pid$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/account/whoami$",
"^/_matrix/client/versions$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/voip/turnServer$",
"^/_matrix/client/(r0|v3|unstable)/register$",
"^/_matrix/client/(r0|v3|unstable)/auth/.*/fallback/web$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/messages$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/event",
"^/_matrix/client/(api/v1|r0|v3|unstable)/joined_rooms",
"^/_matrix/client/(api/v1|r0|v3|unstable/.*)/rooms/.*/aliases",
"^/_matrix/client/(api/v1|r0|v3|unstable)/search",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"federation_reader": {
"app": "synapse.app.generic_worker",
"listener_resources": ["federation"],
"endpoint_patterns": [
"^/_matrix/federation/(v1|v2)/event/",
"^/_matrix/federation/(v1|v2)/state/",
"^/_matrix/federation/(v1|v2)/state_ids/",
"^/_matrix/federation/(v1|v2)/backfill/",
"^/_matrix/federation/(v1|v2)/get_missing_events/",
"^/_matrix/federation/(v1|v2)/publicRooms",
"^/_matrix/federation/(v1|v2)/query/",
"^/_matrix/federation/(v1|v2)/make_join/",
"^/_matrix/federation/(v1|v2)/make_leave/",
"^/_matrix/federation/(v1|v2)/send_join/",
"^/_matrix/federation/(v1|v2)/send_leave/",
"^/_matrix/federation/(v1|v2)/invite/",
"^/_matrix/federation/(v1|v2)/query_auth/",
"^/_matrix/federation/(v1|v2)/event_auth/",
"^/_matrix/federation/(v1|v2)/exchange_third_party_invite/",
"^/_matrix/federation/(v1|v2)/user/devices/",
"^/_matrix/federation/(v1|v2)/get_groups_publicised$",
"^/_matrix/key/v2/query",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"federation_inbound": {
"app": "synapse.app.generic_worker",
"listener_resources": ["federation"],
"endpoint_patterns": ["/_matrix/federation/(v1|v2)/send/"],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"event_persister": {
"app": "synapse.app.generic_worker",
"listener_resources": ["replication"],
"endpoint_patterns": [],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"background_worker": {
"app": "synapse.app.generic_worker",
"listener_resources": [],
"endpoint_patterns": [],
# This worker cannot be sharded. Therefore there should only ever be one background
# worker, and it should be named background_worker1
"shared_extra_conf": {"run_background_tasks_on": "background_worker1"},
"worker_extra_conf": "",
},
"event_creator": {
"app": "synapse.app.generic_worker",
"listener_resources": ["client"],
"endpoint_patterns": [
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send",
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/join/",
"^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
"^/_matrix/client/(v1|unstable/org.matrix.msc2716)/rooms/.*/batch_send",
],
"shared_extra_conf": {},
"worker_extra_conf": "",
},
"frontend_proxy": {
"app": "synapse.app.frontend_proxy",
"listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
"shared_extra_conf": {},
"worker_extra_conf": (
"worker_main_http_uri: http://127.0.0.1:%d"
% (MAIN_PROCESS_HTTP_LISTENER_PORT,)
),
},
}
# Templates for sections that may be inserted multiple times in config files
NGINX_LOCATION_CONFIG_BLOCK = """
location ~* {endpoint} {{
proxy_pass {upstream};
proxy_set_header X-Forwarded-For $remote_addr;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header Host $host;
}}
"""
NGINX_UPSTREAM_CONFIG_BLOCK = """
upstream {upstream_worker_type} {{
{body}
}}
"""
# Utility functions
def log(txt: str) -> None:
print(txt)
def error(txt: str) -> NoReturn:
print(txt, file=sys.stderr)
sys.exit(2)
def flush_buffers() -> None:
sys.stdout.flush()
sys.stderr.flush()
def convert(src: str, dst: str, **template_vars: object) -> None:
"""Generate a file from a template
Args:
src: Path to the input file.
dst: Path to write to.
template_vars: The arguments to replace placeholder variables in the template with.
"""
# Read the template file
# We disable autoescape to prevent template variables from being escaped,
# as we're not using HTML.
env = Environment(loader=FileSystemLoader(os.path.dirname(src)), autoescape=False)
template = env.get_template(os.path.basename(src))
# Generate a string from the template.
rendered = template.render(**template_vars)
# Write the generated contents to a file
#
# We use append mode in case the files have already been written to by something else
# (for instance, as part of the instructions in a dockerfile).
with open(dst, "a") as outfile:
# In case the existing file doesn't end with a newline
outfile.write("\n")
outfile.write(rendered)
def add_sharding_to_shared_config(
shared_config: dict,
worker_type: str,
worker_name: str,
worker_port: int,
) -> None:
"""Given a dictionary representing a config file shared across all workers,
append sharded worker information to it for the current worker_type instance.
Args:
shared_config: The config dict that all worker instances share (after being converted to YAML)
worker_type: The type of worker (one of those defined in WORKERS_CONFIG).
worker_name: The name of the worker instance.
worker_port: The HTTP replication port that the worker instance is listening on.
"""
# The instance_map config field marks the workers that write to various replication streams
instance_map = shared_config.setdefault("instance_map", {})
# Worker-type specific sharding config
if worker_type == "pusher":
shared_config.setdefault("pusher_instances", []).append(worker_name)
elif worker_type == "federation_sender":
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
elif worker_type == "event_persister":
# Event persisters write to the events stream, so we need to update
# the list of event stream writers
shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
worker_name
)
# Map of stream writer instance names to host/ports combos
instance_map[worker_name] = {
"host": "localhost",
"port": worker_port,
}
elif worker_type == "media_repository":
# The first configured media worker will run the media background jobs
shared_config.setdefault("media_instance_running_background_jobs", worker_name)
def generate_base_homeserver_config() -> None:
"""Starts Synapse and generates a basic homeserver config, which will later be
modified for worker support.
Raises: CalledProcessError if calling start.py returned a non-zero exit code.
"""
# start.py already does this for us, so just call that.
# note that this script is copied in in the official, monolith dockerfile
os.environ["SYNAPSE_HTTP_PORT"] = str(MAIN_PROCESS_HTTP_LISTENER_PORT)
subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)
def generate_worker_files(
environ: Mapping[str, str], config_path: str, data_dir: str
) -> None:
"""Read the desired list of workers from environment variables and generate
shared homeserver, nginx and supervisord configs.
Args:
environ: os.environ instance.
config_path: The location of the generated Synapse main worker config file.
data_dir: The location of the synapse data directory. Where log and
user-facing config files live.
"""
# Note that yaml cares about indentation, so care should be taken to insert lines
# into files at the correct indentation below.
# shared_config is the contents of a Synapse config file that will be shared amongst
# the main Synapse process as well as all workers.
# It is intended mainly for disabling functionality when certain workers are spun up,
# and adding a replication listener.
# First read the original config file and extract the listeners block. Then we'll add
# another listener for replication. Later we'll write out the result to the shared
# config file.
listeners = [
{
"port": 9093,
"bind_address": "127.0.0.1",
"type": "http",
"resources": [{"names": ["replication"]}],
}
]
with open(config_path) as file_stream:
original_config = yaml.safe_load(file_stream)
original_listeners = original_config.get("listeners")
if original_listeners:
listeners += original_listeners
# The shared homeserver config. The contents of which will be inserted into the
# base shared worker jinja2 template.
#
# This config file will be passed to all workers, included Synapse's main process.
shared_config: Dict[str, Any] = {"listeners": listeners}
# List of dicts that describe workers.
# We pass this to the Supervisor template later to generate the appropriate
# program blocks.
worker_descriptors: List[Dict[str, Any]] = []
# Upstreams for load-balancing purposes. This dict takes the form of a worker type to the
# ports of each worker. For example:
# {
# worker_type: {1234, 1235, ...}}
# }
# and will be used to construct 'upstream' nginx directives.
nginx_upstreams: Dict[str, Set[int]] = {}
# A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what will be
# placed after the proxy_pass directive. The main benefit to representing this data as a
# dict over a str is that we can easily deduplicate endpoints across multiple instances
# of the same worker.
#
# An nginx site config that will be amended to depending on the workers that are
# spun up. To be placed in /etc/nginx/conf.d.
nginx_locations = {}
# Read the desired worker configuration from the environment
worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip()
if not worker_types_env:
# No workers, just the main process
worker_types = []
else:
# Split type names by comma, ignoring whitespace.
worker_types = [x.strip() for x in worker_types_env.split(",")]
# Create the worker configuration directory if it doesn't already exist
os.makedirs("/conf/workers", exist_ok=True)
# Start worker ports from this arbitrary port
worker_port = 18009
# A counter of worker_type -> int. Used for determining the name for a given
# worker type when generating its config file, as each worker's name is just
# worker_type + instance #
worker_type_counter: Dict[str, int] = {}
# A list of internal endpoints to healthcheck, starting with the main process
# which exists even if no workers do.
healthcheck_urls = ["http://localhost:8080/health"]
# For each worker type specified by the user, create config values
for worker_type in worker_types:
worker_config = WORKERS_CONFIG.get(worker_type)
if worker_config:
worker_config = worker_config.copy()
else:
log(worker_type + " is an unknown worker type! It will be ignored")
continue
new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1
worker_type_counter[worker_type] = new_worker_count
# Name workers by their type concatenated with an incrementing number
# e.g. federation_reader1
worker_name = worker_type + str(new_worker_count)
worker_config.update(
{"name": worker_name, "port": str(worker_port), "config_path": config_path}
)
# Update the shared config with any worker-type specific options
shared_config.update(worker_config["shared_extra_conf"])
healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
# Check if more than one instance of this worker type has been specified
worker_type_total_count = worker_types.count(worker_type)
if worker_type_total_count > 1:
# Update the shared config with sharding-related options if necessary
add_sharding_to_shared_config(
shared_config, worker_type, worker_name, worker_port
)
# Enable the worker in supervisord
worker_descriptors.append(worker_config)
# Add nginx location blocks for this worker's endpoints (if any are defined)
for pattern in worker_config["endpoint_patterns"]:
# Determine whether we need to load-balance this worker
if worker_type_total_count > 1:
# Create or add to a load-balanced upstream for this worker
nginx_upstreams.setdefault(worker_type, set()).add(worker_port)
# Upstreams are named after the worker_type
upstream = "http://" + worker_type
else:
upstream = "http://localhost:%d" % (worker_port,)
# Note that this endpoint should proxy to this upstream
nginx_locations[pattern] = upstream
# Write out the worker's logging config file
log_config_filepath = generate_worker_log_config(environ, worker_name, data_dir)
# Then a worker config file
convert(
"/conf/worker.yaml.j2",
"/conf/workers/{name}.yaml".format(name=worker_name),
**worker_config,
worker_log_config_filepath=log_config_filepath,
)
worker_port += 1
# Build the nginx location config blocks
nginx_location_config = ""
for endpoint, upstream in nginx_locations.items():
nginx_location_config += NGINX_LOCATION_CONFIG_BLOCK.format(
endpoint=endpoint,
upstream=upstream,
)
# Determine the load-balancing upstreams to configure
nginx_upstream_config = ""
for upstream_worker_type, upstream_worker_ports in nginx_upstreams.items():
body = ""
for port in upstream_worker_ports:
body += " server localhost:%d;\n" % (port,)
# Add to the list of configured upstreams
nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format(
upstream_worker_type=upstream_worker_type,
body=body,
)
# Finally, we'll write out the config files.
# log config for the master process
master_log_config = generate_worker_log_config(environ, "master", data_dir)
shared_config["log_config"] = master_log_config
# Find application service registrations
appservice_registrations = None
appservice_registration_dir = os.environ.get("SYNAPSE_AS_REGISTRATION_DIR")
if appservice_registration_dir:
# Scan for all YAML files that should be application service registrations.
appservice_registrations = [
str(reg_path.resolve())
for reg_path in Path(appservice_registration_dir).iterdir()
if reg_path.suffix.lower() in (".yaml", ".yml")
]
workers_in_use = len(worker_types) > 0
# Shared homeserver config
convert(
"/conf/shared.yaml.j2",
"/conf/workers/shared.yaml",
shared_worker_config=yaml.dump(shared_config),
appservice_registrations=appservice_registrations,
enable_redis=workers_in_use,
workers_in_use=workers_in_use,
)
# Nginx config
convert(
"/conf/nginx.conf.j2",
"/etc/nginx/conf.d/matrix-synapse.conf",
worker_locations=nginx_location_config,
upstream_directives=nginx_upstream_config,
tls_cert_path=os.environ.get("SYNAPSE_TLS_CERT"),
tls_key_path=os.environ.get("SYNAPSE_TLS_KEY"),
)
# Supervisord config
os.makedirs("/etc/supervisor", exist_ok=True)
convert(
"/conf/supervisord.conf.j2",
"/etc/supervisor/supervisord.conf",
main_config_path=config_path,
enable_redis=workers_in_use,
)
convert(
"/conf/synapse.supervisord.conf.j2",
"/etc/supervisor/conf.d/synapse.conf",
workers=worker_descriptors,
main_config_path=config_path,
use_forking_launcher=environ.get("SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER"),
)
# healthcheck config
convert(
"/conf/healthcheck.sh.j2",
"/healthcheck.sh",
healthcheck_urls=healthcheck_urls,
)
# Ensure the logging directory exists
log_dir = data_dir + "/logs"
if not os.path.exists(log_dir):
os.mkdir(log_dir)
def generate_worker_log_config(
environ: Mapping[str, str], worker_name: str, data_dir: str
) -> str:
"""Generate a log.config file for the given worker.
Returns: the path to the generated file
"""
# Check whether we should write worker logs to disk, in addition to the console
extra_log_template_args: Dict[str, Optional[str]] = {}
if environ.get("SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK"):
extra_log_template_args["LOG_FILE_PATH"] = f"{data_dir}/logs/{worker_name}.log"
extra_log_template_args["SYNAPSE_LOG_LEVEL"] = environ.get("SYNAPSE_LOG_LEVEL")
extra_log_template_args["SYNAPSE_LOG_SENSITIVE"] = environ.get(
"SYNAPSE_LOG_SENSITIVE"
)
# Render and write the file
log_config_filepath = f"/conf/workers/{worker_name}.log.config"
convert(
"/conf/log.config",
log_config_filepath,
worker_name=worker_name,
**extra_log_template_args,
include_worker_name_in_log_line=environ.get(
"SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER"
),
)
return log_config_filepath
def main(args: List[str], environ: MutableMapping[str, str]) -> None:
config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
# override SYNAPSE_NO_TLS, we don't support TLS in worker mode,
# this needs to be handled by a frontend proxy
environ["SYNAPSE_NO_TLS"] = "yes"
# Generate the base homeserver config if one does not yet exist
if not os.path.exists(config_path):
log("Generating base homeserver config")
generate_base_homeserver_config()
# This script may be run multiple times (mostly by Complement, see note at top of file).
# Don't re-configure workers in this instance.
mark_filepath = "/conf/workers_have_been_configured"
if not os.path.exists(mark_filepath):
# Always regenerate all other config files
generate_worker_files(environ, config_path, data_dir)
# Mark workers as being configured
with open(mark_filepath, "w") as f:
f.write("")
# Lifted right out of start.py
jemallocpath = "/usr/lib/%s-linux-gnu/libjemalloc.so.2" % (platform.machine(),)
if os.path.isfile(jemallocpath):
environ["LD_PRELOAD"] = jemallocpath
else:
log("Could not find %s, will not use" % (jemallocpath,))
# Start supervisord, which will start Synapse, all of the configured worker
# processes, redis, nginx etc. according to the config we created above.
log("Starting supervisord")
flush_buffers()
os.execle(
"/usr/local/bin/supervisord",
"supervisord",
"-c",
"/etc/supervisor/supervisord.conf",
environ,
)
if __name__ == "__main__":
main(sys.argv, os.environ)