Update worker docs with recent enhancements (#7969)

This commit is contained in:
Erik Johnston 2020-07-29 23:22:13 +01:00 committed by GitHub
parent 7000a215e6
commit 2c1b9d6763
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 413 additions and 235 deletions

1
changelog.d/7969.doc Normal file
View File

@ -0,0 +1 @@
Update worker docs with latest enhancements.

View File

@ -2398,3 +2398,57 @@ opentracing:
# #
# logging: # logging:
# false # false
## Workers ##
# Disables sending of outbound federation transactions on the main process.
# Uncomment if using a federation sender worker.
#
#send_federation: false
# It is possible to run multiple federation sender workers, in which case the
# work is balanced across them.
#
# This configuration must be shared between all federation sender workers, and if
# changed all federation sender workers must be stopped at the same time and then
# started, to ensure that all instances are running with the same config (otherwise
# events may be dropped).
#
#federation_sender_instances:
# - federation_sender1
# When using workers this should be a map from `worker_name` to the
# HTTP replication listener of the worker, if configured.
#
#instance_map:
# worker1:
# host: localhost
# port: 8034
# Experimental: When using workers you can define which workers should
# handle event persistence and typing notifications. Any worker
# specified here must also be in the `instance_map`.
#
#stream_writers:
# events: worker1
# typing: worker1
# Configuration for Redis when using workers. This *must* be enabled when
# using workers (unless using old style direct TCP configuration).
#
redis:
# Uncomment the below to enable Redis support.
#
#enabled: true
# Optional host and port to use to connect to redis. Defaults to
# localhost and 6379
#
#host: localhost
#port: 6379
# Optional password if configured on the Redis instance
#
#password: <secret_password>

32
docs/synctl_workers.md Normal file
View File

@ -0,0 +1,32 @@
### Using synctl with workers
If you want to use `synctl` to manage your synapse processes, you will need to
create an an additional configuration file for the main synapse process. That
configuration should look like this:
```yaml
worker_app: synapse.app.homeserver
```
Additionally, each worker app must be configured with the name of a "pid file",
to which it will write its process ID when it starts. For example, for a
synchrotron, you might write:
```yaml
worker_pid_file: /home/matrix/synapse/worker1.pid
```
Finally, to actually run your worker-based synapse, you must pass synctl the `-a`
commandline option to tell it to operate on all the worker configurations found
in the given directory, e.g.:
synctl -a $CONFIG/workers start
Currently one should always restart all workers when restarting or upgrading
synapse, unless you explicitly know it's safe not to. For instance, restarting
synapse without restarting all the synchrotrons may result in broken typing
notifications.
To manipulate a specific worker, you pass the -w option to synctl:
synctl -w $CONFIG/workers/worker1.yaml restart

View File

@ -16,69 +16,106 @@ workers only work with PostgreSQL-based Synapse deployments. SQLite should only
be used for demo purposes and any admin considering workers should already be be used for demo purposes and any admin considering workers should already be
running PostgreSQL. running PostgreSQL.
## Master/worker communication ## Main process/worker communication
The workers communicate with the master process via a Synapse-specific protocol The processes communicate with each other via a Synapse-specific protocol called
called 'replication' (analogous to MySQL- or Postgres-style database 'replication' (analogous to MySQL- or Postgres-style database replication) which
replication) which feeds a stream of relevant data from the master to the feeds streams of newly written data between processes so they can be kept in
workers so they can be kept in sync with the master process and database state. sync with the database state.
Additionally, workers may make HTTP requests to the master, to send information Additionally, processes may make HTTP requests to each other. Typically this is
in the other direction. Typically this is used for operations which need to used for operations which need to wait for a reply - such as sending an event.
wait for a reply - such as sending an event.
## Configuration As of Synapse v1.13.0, it is possible to configure Synapse to send replication
via a [Redis pub/sub channel](https://redis.io/topics/pubsub), and is now the
recommended way of configuring replication. This is an alternative to the old
direct TCP connections to the main process: rather than all the workers
connecting to the main process, all the workers and the main process connect to
Redis, which relays replication commands between processes. This can give a
significant cpu saving on the main process and will be a prerequisite for
upcoming performance improvements.
(See the [Architectural diagram](#architectural-diagram) section at the end for
a visualisation of what this looks like)
## Setting up workers
A Redis server is required to manage the communication between the processes.
(The older direct TCP connections are now deprecated.) The Redis server
should be installed following the normal procedure for your distribution (e.g.
`apt install redis-server` on Debian). It is safe to use an existing Redis
deployment if you have one.
Once installed, check that Redis is running and accessible from the host running
Synapse, for example by executing `echo PING | nc -q1 localhost 6379` and seeing
a response of `+PONG`.
The appropriate dependencies must also be installed for Synapse. If using a
virtualenv, these can be installed with:
```sh
pip install matrix-synapse[redis]
```
Note that these dependencies are included when synapse is installed with `pip
install matrix-synapse[all]`. They are also included in the debian packages from
`matrix.org` and in the docker images at
https://hub.docker.com/r/matrixdotorg/synapse/.
To make effective use of the workers, you will need to configure an HTTP To make effective use of the workers, you will need to configure an HTTP
reverse-proxy such as nginx or haproxy, which will direct incoming requests to reverse-proxy such as nginx or haproxy, which will direct incoming requests to
the correct worker, or to the main synapse instance. Note that this includes the correct worker, or to the main synapse instance. See [reverse_proxy.md](reverse_proxy.md)
requests made to the federation port. See [reverse_proxy.md](reverse_proxy.md)
for information on setting up a reverse proxy. for information on setting up a reverse proxy.
To enable workers, you need to add *two* replication listeners to the To enable workers you should create a configuration file for each worker
main Synapse configuration file (`homeserver.yaml`). For example: process. Each worker configuration file inherits the configuration of the shared
homeserver configuration file. You can then override configuration specific to
that worker, e.g. the HTTP listener that it provides (if any); logging
configuration; etc. You should minimise the number of overrides though to
maintain a usable config.
Next you need to add both a HTTP replication listener and redis config to the
shared Synapse configuration file (`homeserver.yaml`). For example:
```yaml ```yaml
# extend the existing `listeners` section. This defines the ports that the
# main process will listen on.
listeners: listeners:
# The TCP replication port
- port: 9092
bind_address: '127.0.0.1'
type: replication
# The HTTP replication port # The HTTP replication port
- port: 9093 - port: 9093
bind_address: '127.0.0.1' bind_address: '127.0.0.1'
type: http type: http
resources: resources:
- names: [replication] - names: [replication]
redis:
enabled: true
``` ```
Under **no circumstances** should these replication API listeners be exposed to See the sample config for the full documentation of each option.
the public internet; they have no authentication and are unencrypted.
You should then create a set of configs for the various worker processes. Each Under **no circumstances** should the replication listener be exposed to the
worker configuration file inherits the configuration of the main homeserver public internet; it has no authentication and is unencrypted.
configuration file. You can then override configuration specific to that
worker, e.g. the HTTP listener that it provides (if any); logging
configuration; etc. You should minimise the number of overrides though to
maintain a usable config.
In the config file for each worker, you must specify the type of worker In the config file for each worker, you must specify the type of worker
application (`worker_app`). The currently available worker applications are application (`worker_app`), and you should specify a unqiue name for the worker
listed below. You must also specify the replication endpoints that it should (`worker_name`). The currently available worker applications are listed below.
talk to on the main synapse process. `worker_replication_host` should specify You must also specify the HTTP replication endpoint that it should talk to on
the host of the main synapse, `worker_replication_port` should point to the TCP the main synapse process. `worker_replication_host` should specify the host of
replication listener port and `worker_replication_http_port` should point to the main synapse and `worker_replication_http_port` should point to the HTTP
the HTTP replication port. replication port. If the worker will handle HTTP requests then the
`worker_listeners` option should be set with a `http` listener, in the same way
as the `listeners` option in the shared config.
For example: For example:
```yaml ```yaml
worker_app: synapse.app.synchrotron worker_app: synapse.app.generic_worker
worker_name: worker1
# The replication listener on the synapse to talk to. # The replication listener on the main synapse process.
worker_replication_host: 127.0.0.1 worker_replication_host: 127.0.0.1
worker_replication_port: 9092
worker_replication_http_port: 9093 worker_replication_http_port: 9093
worker_listeners: worker_listeners:
@ -87,13 +124,14 @@ worker_listeners:
resources: resources:
- names: - names:
- client - client
- federation
worker_log_config: /home/matrix/synapse/config/synchrotron_log_config.yaml worker_log_config: /home/matrix/synapse/config/worker1_log_config.yaml
``` ```
...is a full configuration for a synchrotron worker instance, which will expose a ...is a full configuration for a generic worker instance, which will expose a
plain HTTP `/sync` endpoint on port 8083 separately from the `/sync` endpoint provided plain HTTP endpoint on port 8083 separately serving various endpoints, e.g.
by the main synapse. `/sync`, which are listed below.
Obviously you should configure your reverse-proxy to route the relevant Obviously you should configure your reverse-proxy to route the relevant
endpoints to the worker (`localhost:8083` in the above example). endpoints to the worker (`localhost:8083` in the above example).
@ -102,127 +140,24 @@ Finally, you need to start your worker processes. This can be done with either
`synctl` or your distribution's preferred service manager such as `systemd`. We `synctl` or your distribution's preferred service manager such as `systemd`. We
recommend the use of `systemd` where available: for information on setting up recommend the use of `systemd` where available: for information on setting up
`systemd` to start synapse workers, see `systemd` to start synapse workers, see
[systemd-with-workers](systemd-with-workers). To use `synctl`, see below. [systemd-with-workers](systemd-with-workers). To use `synctl`, see
[synctl_workers.md](synctl_workers.md).
### **Experimental** support for replication over redis
As of Synapse v1.13.0, it is possible to configure Synapse to send replication
via a [Redis pub/sub channel](https://redis.io/topics/pubsub). This is an
alternative to direct TCP connections to the master: rather than all the
workers connecting to the master, all the workers and the master connect to
Redis, which relays replication commands between processes. This can give a
significant cpu saving on the master and will be a prerequisite for upcoming
performance improvements.
Note that this support is currently experimental; you may experience lost
messages and similar problems! It is strongly recommended that admins setting
up workers for the first time use direct TCP replication as above.
To configure Synapse to use Redis:
1. Install Redis following the normal procedure for your distribution - for
example, on Debian, `apt install redis-server`. (It is safe to use an
existing Redis deployment if you have one: we use a pub/sub stream named
according to the `server_name` of your synapse server.)
2. Check Redis is running and accessible: you should be able to `echo PING | nc -q1
localhost 6379` and get a response of `+PONG`.
3. Install the python prerequisites. If you installed synapse into a
virtualenv, this can be done with:
```sh
pip install matrix-synapse[redis]
```
The debian packages from matrix.org already include the required
dependencies.
4. Add config to the shared configuration (`homeserver.yaml`):
```yaml
redis:
enabled: true
```
Optional parameters which can go alongside `enabled` are `host`, `port`,
`password`. Normally none of these are required.
5. Restart master and all workers.
Once redis replication is in use, `worker_replication_port` is redundant and
can be removed from the worker configuration files. Similarly, the
configuration for the `listener` for the TCP replication port can be removed
from the main configuration file. Note that the HTTP replication port is
still required.
### Using synctl
If you want to use `synctl` to manage your synapse processes, you will need to
create an an additional configuration file for the master synapse process. That
configuration should look like this:
```yaml
worker_app: synapse.app.homeserver
```
Additionally, each worker app must be configured with the name of a "pid file",
to which it will write its process ID when it starts. For example, for a
synchrotron, you might write:
```yaml
worker_pid_file: /home/matrix/synapse/synchrotron.pid
```
Finally, to actually run your worker-based synapse, you must pass synctl the `-a`
commandline option to tell it to operate on all the worker configurations found
in the given directory, e.g.:
synctl -a $CONFIG/workers start
Currently one should always restart all workers when restarting or upgrading
synapse, unless you explicitly know it's safe not to. For instance, restarting
synapse without restarting all the synchrotrons may result in broken typing
notifications.
To manipulate a specific worker, you pass the -w option to synctl:
synctl -w $CONFIG/workers/synchrotron.yaml restart
## Available worker applications ## Available worker applications
### `synapse.app.pusher` ### `synapse.app.generic_worker`
Handles sending push notifications to sygnal and email. Doesn't handle any This worker can handle API requests matching the following regular
REST endpoints itself, but you should set `start_pushers: False` in the expressions:
shared configuration file to stop the main synapse sending these notifications.
Note this worker cannot be load-balanced: only one instance should be active.
### `synapse.app.synchrotron`
The synchrotron handles `sync` requests from clients. In particular, it can
handle REST endpoints matching the following regular expressions:
# Sync requests
^/_matrix/client/(v2_alpha|r0)/sync$ ^/_matrix/client/(v2_alpha|r0)/sync$
^/_matrix/client/(api/v1|v2_alpha|r0)/events$ ^/_matrix/client/(api/v1|v2_alpha|r0)/events$
^/_matrix/client/(api/v1|r0)/initialSync$ ^/_matrix/client/(api/v1|r0)/initialSync$
^/_matrix/client/(api/v1|r0)/rooms/[^/]+/initialSync$ ^/_matrix/client/(api/v1|r0)/rooms/[^/]+/initialSync$
The above endpoints should all be routed to the synchrotron worker by the # Federation requests
reverse-proxy configuration.
It is possible to run multiple instances of the synchrotron to scale
horizontally. In this case the reverse-proxy should be configured to
load-balance across the instances, though it will be more efficient if all
requests from a particular user are routed to a single instance. Extracting
a userid from the access token is currently left as an exercise for the reader.
### `synapse.app.appservice`
Handles sending output traffic to Application Services. Doesn't handle any
REST endpoints itself, but you should set `notify_appservices: False` in the
shared configuration file to stop the main synapse sending these notifications.
Note this worker cannot be load-balanced: only one instance should be active.
### `synapse.app.federation_reader`
Handles a subset of federation endpoints. In particular, it can handle REST
endpoints matching the following regular expressions:
^/_matrix/federation/v1/event/ ^/_matrix/federation/v1/event/
^/_matrix/federation/v1/state/ ^/_matrix/federation/v1/state/
^/_matrix/federation/v1/state_ids/ ^/_matrix/federation/v1/state_ids/
@ -242,40 +177,145 @@ endpoints matching the following regular expressions:
^/_matrix/federation/v1/event_auth/ ^/_matrix/federation/v1/event_auth/
^/_matrix/federation/v1/exchange_third_party_invite/ ^/_matrix/federation/v1/exchange_third_party_invite/
^/_matrix/federation/v1/user/devices/ ^/_matrix/federation/v1/user/devices/
^/_matrix/federation/v1/send/
^/_matrix/federation/v1/get_groups_publicised$ ^/_matrix/federation/v1/get_groups_publicised$
^/_matrix/key/v2/query ^/_matrix/key/v2/query
# Inbound federation transaction request
^/_matrix/federation/v1/send/
# Client API requests
^/_matrix/client/(api/v1|r0|unstable)/publicRooms$
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/joined_members$
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/context/.*$
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/members$
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state$
^/_matrix/client/(api/v1|r0|unstable)/account/3pid$
^/_matrix/client/(api/v1|r0|unstable)/keys/query$
^/_matrix/client/(api/v1|r0|unstable)/keys/changes$
^/_matrix/client/versions$
^/_matrix/client/(api/v1|r0|unstable)/voip/turnServer$
^/_matrix/client/(api/v1|r0|unstable)/joined_groups$
^/_matrix/client/(api/v1|r0|unstable)/publicised_groups$
^/_matrix/client/(api/v1|r0|unstable)/publicised_groups/
# Registration/login requests
^/_matrix/client/(api/v1|r0|unstable)/login$
^/_matrix/client/(r0|unstable)/register$
^/_matrix/client/(r0|unstable)/auth/.*/fallback/web$
# Event sending requests
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state/
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$
^/_matrix/client/(api/v1|r0|unstable)/join/
^/_matrix/client/(api/v1|r0|unstable)/profile/
Additionally, the following REST endpoints can be handled for GET requests: Additionally, the following REST endpoints can be handled for GET requests:
^/_matrix/federation/v1/groups/ ^/_matrix/federation/v1/groups/
The above endpoints should all be routed to the federation_reader worker by the Pagination requests can also be handled, but all requests for a given
reverse-proxy configuration. room must be routed to the same instance. Additionally, care must be taken to
ensure that the purge history admin API is not used while pagination requests
for the room are in flight:
The `^/_matrix/federation/v1/send/` endpoint must only be handled by a single ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/messages$
instance.
Note that `federation` must be added to the listener resources in the worker config: Note that a HTTP listener with `client` and `federation` resources must be
configured in the `worker_listeners` option in the worker config.
#### Load balancing
It is possible to run multiple instances of this worker app, with incoming requests
being load-balanced between them by the reverse-proxy. However, different endpoints
have different characteristics and so admins
may wish to run multiple groups of workers handling different endpoints so that
load balancing can be done in different ways.
For `/sync` and `/initialSync` requests it will be more efficient if all
requests from a particular user are routed to a single instance. Extracting a
user ID from the access token or `Authorization` header is currently left as an
exercise for the reader. Admins may additionally wish to separate out `/sync`
requests that have a `since` query parameter from those that don't (and
`/initialSync`), as requests that don't are known as "initial sync" that happens
when a user logs in on a new device and can be *very* resource intensive, so
isolating these requests will stop them from interfering with other users ongoing
syncs.
Federation and client requests can be balanced via simple round robin.
The inbound federation transaction request `^/_matrix/federation/v1/send/`
should be balanced by source IP so that transactions from the same remote server
go to the same process.
Registration/login requests can be handled separately purely to help ensure that
unexpected load doesn't affect new logins and sign ups.
Finally, event sending requests can be balanced by the room ID in the URI (or
the full URI, or even just round robin), the room ID is the path component after
`/rooms/`. If there is a large bridge connected that is sending or may send lots
of events, then a dedicated set of workers can be provisioned to limit the
effects of bursts of events from that bridge on events sent by normal users.
#### Stream writers
Additionally, there is *experimental* support for moving writing of specific
streams (such as events) off of the main process to a particular worker. (This
is only supported with Redis-based replication.)
Currently support streams are `events` and `typing`.
To enable this, the worker must have a HTTP replication listener configured,
have a `worker_name` and be listed in the `instance_map` config. For example to
move event persistence off to a dedicated worker, the shared configuration would
include:
```yaml ```yaml
worker_app: synapse.app.federation_reader instance_map:
... event_persister1:
worker_listeners: host: localhost
- type: http port: 8034
port: <port>
resources: streams_writers:
- names: events: event_persister1
- federation
``` ```
### `synapse.app.pusher`
Handles sending push notifications to sygnal and email. Doesn't handle any
REST endpoints itself, but you should set `start_pushers: False` in the
shared configuration file to stop the main synapse sending push notifications.
Note this worker cannot be load-balanced: only one instance should be active.
### `synapse.app.appservice`
Handles sending output traffic to Application Services. Doesn't handle any
REST endpoints itself, but you should set `notify_appservices: False` in the
shared configuration file to stop the main synapse sending appservice notifications.
Note this worker cannot be load-balanced: only one instance should be active.
### `synapse.app.federation_sender` ### `synapse.app.federation_sender`
Handles sending federation traffic to other servers. Doesn't handle any Handles sending federation traffic to other servers. Doesn't handle any
REST endpoints itself, but you should set `send_federation: False` in the REST endpoints itself, but you should set `send_federation: False` in the
shared configuration file to stop the main synapse sending this traffic. shared configuration file to stop the main synapse sending this traffic.
Note this worker cannot be load-balanced: only one instance should be active. If running multiple federation senders then you must list each
instance in the `federation_sender_instances` option by their `worker_name`.
All instances must be stopped and started when adding or removing instances.
For example:
```yaml
federation_sender_instances:
- federation_sender1
- federation_sender2
```
### `synapse.app.media_repository` ### `synapse.app.media_repository`
@ -314,46 +354,6 @@ and you must configure a single instance to run the background tasks, e.g.:
media_instance_running_background_jobs: "media-repository-1" media_instance_running_background_jobs: "media-repository-1"
``` ```
### `synapse.app.client_reader`
Handles client API endpoints. It can handle REST endpoints matching the
following regular expressions:
^/_matrix/client/(api/v1|r0|unstable)/publicRooms$
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/joined_members$
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/context/.*$
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/members$
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state$
^/_matrix/client/(api/v1|r0|unstable)/login$
^/_matrix/client/(api/v1|r0|unstable)/account/3pid$
^/_matrix/client/(api/v1|r0|unstable)/keys/query$
^/_matrix/client/(api/v1|r0|unstable)/keys/changes$
^/_matrix/client/versions$
^/_matrix/client/(api/v1|r0|unstable)/voip/turnServer$
^/_matrix/client/(api/v1|r0|unstable)/joined_groups$
^/_matrix/client/(api/v1|r0|unstable)/publicised_groups$
^/_matrix/client/(api/v1|r0|unstable)/publicised_groups/
Additionally, the following REST endpoints can be handled for GET requests:
^/_matrix/client/(api/v1|r0|unstable)/pushrules/.*$
^/_matrix/client/(api/v1|r0|unstable)/groups/.*$
^/_matrix/client/(api/v1|r0|unstable)/user/[^/]*/account_data/
^/_matrix/client/(api/v1|r0|unstable)/user/[^/]*/rooms/[^/]*/account_data/
Additionally, the following REST endpoints can be handled, but all requests must
be routed to the same instance:
^/_matrix/client/(r0|unstable)/register$
^/_matrix/client/(r0|unstable)/auth/.*/fallback/web$
Pagination requests can also be handled, but all requests with the same path
room must be routed to the same instance. Additionally, care must be taken to
ensure that the purge history admin API is not used while pagination requests
for the room are in flight:
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/messages$
### `synapse.app.user_dir` ### `synapse.app.user_dir`
Handles searches in the user directory. It can handle REST endpoints matching Handles searches in the user directory. It can handle REST endpoints matching
@ -388,15 +388,48 @@ file. For example:
worker_main_http_uri: http://127.0.0.1:8008 worker_main_http_uri: http://127.0.0.1:8008
### `synapse.app.event_creator` ### Historical apps
Handles some event creation. It can handle REST endpoints matching: *Note:* Historically there used to be more apps, however they have been
amalgamated into a single `synapse.app.generic_worker` app. The remaining apps
are ones that do specific processing unrelated to requests, e.g. the `pusher`
that handles sending out push notifications for new events. The intention is for
all these to be folded into the `generic_worker` app and to use config to define
which processes handle the various proccessing such as push notifications.
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/state/
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$
^/_matrix/client/(api/v1|r0|unstable)/join/
^/_matrix/client/(api/v1|r0|unstable)/profile/
It will create events locally and then send them on to the main synapse ## Architectural diagram
instance to be persisted and handled.
The following shows an example setup using Redis and a reverse proxy:
```
Clients & Federation
|
v
+-----------+
| |
| Reverse |
| Proxy |
| |
+-----------+
| | |
| | | HTTP requests
+-------------------+ | +-----------+
| +---+ |
| | |
v v v
+--------------+ +--------------+ +--------------+ +--------------+
| Main | | Generic | | Generic | | Event |
| Process | | Worker 1 | | Worker 2 | | Persister |
+--------------+ +--------------+ +--------------+ +--------------+
^ ^ | ^ | | ^ | ^ ^
| | | | | | | | | |
| | | | | HTTP | | | | |
| +----------+<--|---|---------+ | | | |
| | +-------------|-->+----------+ |
| | | |
| | | |
v v v v
====================================================================
Redis pub/sub channel
```

View File

@ -940,7 +940,7 @@ def start(config_options):
config.server.update_user_directory = False config.server.update_user_directory = False
if config.worker_app == "synapse.app.federation_sender": if config.worker_app == "synapse.app.federation_sender":
if config.federation.send_federation: if config.worker.send_federation:
sys.stderr.write( sys.stderr.write(
"\nThe send_federation must be disabled in the main synapse process" "\nThe send_federation must be disabled in the main synapse process"
"\nbefore they can be run in a separate worker." "\nbefore they can be run in a separate worker."
@ -950,10 +950,10 @@ def start(config_options):
sys.exit(1) sys.exit(1)
# Force the pushers to start since they will be disabled in the main config # Force the pushers to start since they will be disabled in the main config
config.federation.send_federation = True config.worker.send_federation = True
else: else:
# For other worker types we force this to off. # For other worker types we force this to off.
config.federation.send_federation = False config.worker.send_federation = False
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts

View File

@ -17,23 +17,13 @@ from typing import Optional
from netaddr import IPSet from netaddr import IPSet
from ._base import Config, ConfigError, ShardedWorkerHandlingConfig from ._base import Config, ConfigError
class FederationConfig(Config): class FederationConfig(Config):
section = "federation" section = "federation"
def read_config(self, config, **kwargs): def read_config(self, config, **kwargs):
# Whether to send federation traffic out in this process. This only
# applies to some federation traffic, and so shouldn't be used to
# "disable" federation
self.send_federation = config.get("send_federation", True)
federation_sender_instances = config.get("federation_sender_instances") or []
self.federation_shard_config = ShardedWorkerHandlingConfig(
federation_sender_instances
)
# FIXME: federation_domain_whitelist needs sytests # FIXME: federation_domain_whitelist needs sytests
self.federation_domain_whitelist = None # type: Optional[dict] self.federation_domain_whitelist = None # type: Optional[dict]
federation_domain_whitelist = config.get("federation_domain_whitelist", None) federation_domain_whitelist = config.get("federation_domain_whitelist", None)

View File

@ -78,7 +78,6 @@ class HomeServerConfig(RootConfig):
JWTConfig, JWTConfig,
PasswordConfig, PasswordConfig,
EmailConfig, EmailConfig,
WorkerConfig,
PasswordAuthProviderConfig, PasswordAuthProviderConfig,
PushConfig, PushConfig,
SpamCheckerConfig, SpamCheckerConfig,
@ -91,6 +90,7 @@ class HomeServerConfig(RootConfig):
RoomDirectoryConfig, RoomDirectoryConfig,
ThirdPartyRulesConfig, ThirdPartyRulesConfig,
TracerConfig, TracerConfig,
WorkerConfig,
RedisConfig, RedisConfig,
FederationConfig, FederationConfig,
] ]

View File

@ -214,7 +214,7 @@ def setup_logging(
Set up the logging subsystem. Set up the logging subsystem.
Args: Args:
config (LoggingConfig | synapse.config.workers.WorkerConfig): config (LoggingConfig | synapse.config.worker.WorkerConfig):
configuration data configuration data
use_worker_options (bool): True to use the 'worker_log_config' option use_worker_options (bool): True to use the 'worker_log_config' option

View File

@ -21,7 +21,7 @@ class RedisConfig(Config):
section = "redis" section = "redis"
def read_config(self, config, **kwargs): def read_config(self, config, **kwargs):
redis_config = config.get("redis", {}) redis_config = config.get("redis") or {}
self.redis_enabled = redis_config.get("enabled", False) self.redis_enabled = redis_config.get("enabled", False)
if not self.redis_enabled: if not self.redis_enabled:
@ -32,3 +32,24 @@ class RedisConfig(Config):
self.redis_host = redis_config.get("host", "localhost") self.redis_host = redis_config.get("host", "localhost")
self.redis_port = redis_config.get("port", 6379) self.redis_port = redis_config.get("port", 6379)
self.redis_password = redis_config.get("password") self.redis_password = redis_config.get("password")
def generate_config_section(self, config_dir_path, server_name, **kwargs):
return """\
# Configuration for Redis when using workers. This *must* be enabled when
# using workers (unless using old style direct TCP configuration).
#
redis:
# Uncomment the below to enable Redis support.
#
#enabled: true
# Optional host and port to use to connect to redis. Defaults to
# localhost and 6379
#
#host: localhost
#port: 6379
# Optional password if configured on the Redis instance
#
#password: <secret_password>
"""

View File

@ -15,7 +15,7 @@
import attr import attr
from ._base import Config, ConfigError from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
from .server import ListenerConfig, parse_listener_def from .server import ListenerConfig, parse_listener_def
@ -85,6 +85,16 @@ class WorkerConfig(Config):
) )
) )
# Whether to send federation traffic out in this process. This only
# applies to some federation traffic, and so shouldn't be used to
# "disable" federation
self.send_federation = config.get("send_federation", True)
federation_sender_instances = config.get("federation_sender_instances") or []
self.federation_shard_config = ShardedWorkerHandlingConfig(
federation_sender_instances
)
# A map from instance name to host/port of their HTTP replication endpoint. # A map from instance name to host/port of their HTTP replication endpoint.
instance_map = config.get("instance_map") or {} instance_map = config.get("instance_map") or {}
self.instance_map = { self.instance_map = {
@ -105,6 +115,43 @@ class WorkerConfig(Config):
% (instance, stream) % (instance, stream)
) )
def generate_config_section(self, config_dir_path, server_name, **kwargs):
return """\
## Workers ##
# Disables sending of outbound federation transactions on the main process.
# Uncomment if using a federation sender worker.
#
#send_federation: false
# It is possible to run multiple federation sender workers, in which case the
# work is balanced across them.
#
# This configuration must be shared between all federation sender workers, and if
# changed all federation sender workers must be stopped at the same time and then
# started, to ensure that all instances are running with the same config (otherwise
# events may be dropped).
#
#federation_sender_instances:
# - federation_sender1
# When using workers this should be a map from `worker_name` to the
# HTTP replication listener of the worker, if configured.
#
#instance_map:
# worker1:
# host: localhost
# port: 8034
# Experimental: When using workers you can define which workers should
# handle event persistence and typing notifications. Any worker
# specified here must also be in the `instance_map`.
#
#stream_writers:
# events: worker1
# typing: worker1
"""
def read_arguments(self, args): def read_arguments(self, args):
# We support a bunch of command line arguments that override options in # We support a bunch of command line arguments that override options in
# the config. A lot of these options have a worker_* prefix when running # the config. A lot of these options have a worker_* prefix when running

View File

@ -57,7 +57,7 @@ class FederationRemoteSendQueue(object):
# We may have multiple federation sender instances, so we need to track # We may have multiple federation sender instances, so we need to track
# their positions separately. # their positions separately.
self._sender_instances = hs.config.federation.federation_shard_config.instances self._sender_instances = hs.config.worker.federation_shard_config.instances
self._sender_positions = {} self._sender_positions = {}
# Pending presence map user_id -> UserPresenceState # Pending presence map user_id -> UserPresenceState

View File

@ -70,7 +70,7 @@ class FederationSender(object):
self._transaction_manager = TransactionManager(hs) self._transaction_manager = TransactionManager(hs)
self._instance_name = hs.get_instance_name() self._instance_name = hs.get_instance_name()
self._federation_shard_config = hs.config.federation.federation_shard_config self._federation_shard_config = hs.config.worker.federation_shard_config
# map from destination to PerDestinationQueue # map from destination to PerDestinationQueue
self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue] self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue]

View File

@ -75,7 +75,7 @@ class PerDestinationQueue(object):
self._store = hs.get_datastore() self._store = hs.get_datastore()
self._transaction_manager = transaction_manager self._transaction_manager = transaction_manager
self._instance_name = hs.get_instance_name() self._instance_name = hs.get_instance_name()
self._federation_shard_config = hs.config.federation.federation_shard_config self._federation_shard_config = hs.config.worker.federation_shard_config
self._should_send_on_this_instance = True self._should_send_on_this_instance = True
if not self._federation_shard_config.should_handle( if not self._federation_shard_config.should_handle(

View File

@ -255,7 +255,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
self._instance_name = hs.get_instance_name() self._instance_name = hs.get_instance_name()
self._send_federation = hs.should_send_federation() self._send_federation = hs.should_send_federation()
self._federation_shard_config = hs.config.federation.federation_shard_config self._federation_shard_config = hs.config.worker.federation_shard_config
# If we're a process that sends federation we may need to reset the # If we're a process that sends federation we may need to reset the
# `federation_stream_position` table to match the current sharding # `federation_stream_position` table to match the current sharding