This commit is contained in:
deathrow 2022-12-05 15:09:17 -05:00
parent 1f08a237a3
commit 8c69553db7
No known key found for this signature in database
GPG Key ID: FF39D67A22069F73
2 changed files with 18 additions and 26 deletions

View File

@ -20,7 +20,7 @@
# * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver. # * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver.
# * SYNAPSE_REPORT_STATS: Whether to report stats. # * SYNAPSE_REPORT_STATS: Whether to report stats.
# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKER_CONFIG # * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKER_CONFIG
# below. Leave empty for no workers, or set to '*' for all possible workers. # below. Leave empty for no workers.
# * SYNAPSE_AS_REGISTRATION_DIR: If specified, a directory in which .yaml and .yml files # * SYNAPSE_AS_REGISTRATION_DIR: If specified, a directory in which .yaml and .yml files
# will be treated as Application Service registration files. # will be treated as Application Service registration files.
# * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format. # * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format.
@ -58,10 +58,10 @@ MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
# have to attach by instance_map to the master process and have client endpoints. # have to attach by instance_map to the master process and have client endpoints.
WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"pusher": { "pusher": {
"app": "synapse.app.pusher", "app": "synapse.app.generic_worker",
"listener_resources": [], "listener_resources": [],
"endpoint_patterns": [], "endpoint_patterns": [],
"shared_extra_conf": {"start_pushers": False}, "shared_extra_conf": {},
"worker_extra_conf": "", "worker_extra_conf": "",
}, },
"user_dir": { "user_dir": {
@ -84,7 +84,11 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_synapse/admin/v1/media/.*$", "^/_synapse/admin/v1/media/.*$",
"^/_synapse/admin/v1/quarantine_media/.*$", "^/_synapse/admin/v1/quarantine_media/.*$",
], ],
"shared_extra_conf": {"enable_media_repo": False}, # The first configured media worker will run the media background jobs
"shared_extra_conf": {
"enable_media_repo": False,
"media_instance_running_background_jobs": "media_repository1",
},
"worker_extra_conf": "enable_media_repo: true", "worker_extra_conf": "enable_media_repo: true",
}, },
"appservice": { "appservice": {
@ -95,10 +99,10 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"worker_extra_conf": "", "worker_extra_conf": "",
}, },
"federation_sender": { "federation_sender": {
"app": "synapse.app.federation_sender", "app": "synapse.app.generic_worker",
"listener_resources": [], "listener_resources": [],
"endpoint_patterns": [], "endpoint_patterns": [],
"shared_extra_conf": {"send_federation": False}, "shared_extra_conf": {},
"worker_extra_conf": "", "worker_extra_conf": "",
}, },
"synchrotron": { "synchrotron": {
@ -136,6 +140,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/event", "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/event",
"^/_matrix/client/(api/v1|r0|v3|unstable)/joined_rooms", "^/_matrix/client/(api/v1|r0|v3|unstable)/joined_rooms",
"^/_matrix/client/(api/v1|r0|v3|unstable/.*)/rooms/.*/aliases", "^/_matrix/client/(api/v1|r0|v3|unstable/.*)/rooms/.*/aliases",
"^/_matrix/client/v1/rooms/.*/timestamp_to_event$",
"^/_matrix/client/(api/v1|r0|v3|unstable)/search", "^/_matrix/client/(api/v1|r0|v3|unstable)/search",
], ],
"shared_extra_conf": {}, "shared_extra_conf": {},
@ -159,6 +164,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"^/_matrix/federation/(v1|v2)/invite/", "^/_matrix/federation/(v1|v2)/invite/",
"^/_matrix/federation/(v1|v2)/query_auth/", "^/_matrix/federation/(v1|v2)/query_auth/",
"^/_matrix/federation/(v1|v2)/event_auth/", "^/_matrix/federation/(v1|v2)/event_auth/",
"^/_matrix/federation/v1/timestamp_to_event/",
"^/_matrix/federation/(v1|v2)/exchange_third_party_invite/", "^/_matrix/federation/(v1|v2)/exchange_third_party_invite/",
"^/_matrix/federation/(v1|v2)/user/devices/", "^/_matrix/federation/(v1|v2)/user/devices/",
"^/_matrix/federation/(v1|v2)/get_groups_publicised$", "^/_matrix/federation/(v1|v2)/get_groups_publicised$",
@ -205,14 +211,11 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"worker_extra_conf": "", "worker_extra_conf": "",
}, },
"frontend_proxy": { "frontend_proxy": {
"app": "synapse.app.frontend_proxy", "app": "synapse.app.generic_worker",
"listener_resources": ["client", "replication"], "listener_resources": ["client", "replication"],
"endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"], "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
"shared_extra_conf": {}, "shared_extra_conf": {},
"worker_extra_conf": ( "worker_extra_conf": "",
"worker_main_http_uri: http://127.0.0.1:%d"
% (MAIN_PROCESS_HTTP_LISTENER_PORT,)
),
}, },
"account_data": { "account_data": {
"app": "synapse.app.generic_worker", "app": "synapse.app.generic_worker",
@ -293,7 +296,6 @@ def flush_buffers() -> None:
def convert(src: str, dst: str, **template_vars: object) -> None: def convert(src: str, dst: str, **template_vars: object) -> None:
"""Generate a file from a template """Generate a file from a template
Args: Args:
src: Path to the input file. src: Path to the input file.
dst: Path to write to. dst: Path to write to.
@ -326,8 +328,7 @@ def add_worker_roles_to_shared_config(
worker_port: int, worker_port: int,
) -> None: ) -> None:
"""Given a dictionary representing a config file shared across all workers, """Given a dictionary representing a config file shared across all workers,
append sharded worker information to it for the current worker_type instance. append appropriate worker information to it for the current worker_type instance.
Args: Args:
shared_config: The config dict that all worker instances share (after being converted to YAML) shared_config: The config dict that all worker instances share (after being converted to YAML)
worker_type: The type of worker (one of those defined in WORKERS_CONFIG). worker_type: The type of worker (one of those defined in WORKERS_CONFIG).
@ -359,7 +360,7 @@ def add_worker_roles_to_shared_config(
elif worker_type in ["account_data", "presence", "receipts", "to_device", "typing"]: elif worker_type in ["account_data", "presence", "receipts", "to_device", "typing"]:
# Update the list of stream writers # Update the list of stream writers
# It's convienent that the name of the worker type is the same as the event stream # It's convenient that the name of the worker type is the same as the stream to write
shared_config.setdefault("stream_writers", {}).setdefault( shared_config.setdefault("stream_writers", {}).setdefault(
worker_type, [] worker_type, []
).append(worker_name) ).append(worker_name)
@ -371,15 +372,10 @@ def add_worker_roles_to_shared_config(
"port": worker_port, "port": worker_port,
} }
elif worker_type == "media_repository":
# The first configured media worker will run the media background jobs
shared_config.setdefault("media_instance_running_background_jobs", worker_name)
def generate_base_homeserver_config() -> None: def generate_base_homeserver_config() -> None:
"""Starts Synapse and generates a basic homeserver config, which will later be """Starts Synapse and generates a basic homeserver config, which will later be
modified for worker support. modified for worker support.
Raises: CalledProcessError if calling start.py returned a non-zero exit code. Raises: CalledProcessError if calling start.py returned a non-zero exit code.
""" """
# start.py already does this for us, so just call that. # start.py already does this for us, so just call that.
@ -393,7 +389,6 @@ def generate_worker_files(
) -> None: ) -> None:
"""Read the desired list of workers from environment variables and generate """Read the desired list of workers from environment variables and generate
shared homeserver, nginx and supervisord configs. shared homeserver, nginx and supervisord configs.
Args: Args:
environ: os.environ instance. environ: os.environ instance.
config_path: The location of the generated Synapse main worker config file. config_path: The location of the generated Synapse main worker config file.
@ -483,8 +478,7 @@ def generate_worker_files(
if worker_config: if worker_config:
worker_config = worker_config.copy() worker_config = worker_config.copy()
else: else:
log(worker_type + " is an unknown worker type! It will be ignored") error(worker_type + " is an unknown worker type! Please fix!")
continue
new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1 new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1
worker_type_counter[worker_type] = new_worker_count worker_type_counter[worker_type] = new_worker_count
@ -636,7 +630,6 @@ def generate_worker_log_config(
environ: Mapping[str, str], worker_name: str, data_dir: str environ: Mapping[str, str], worker_name: str, data_dir: str
) -> str: ) -> str:
"""Generate a log.config file for the given worker. """Generate a log.config file for the given worker.
Returns: the path to the generated file Returns: the path to the generated file
""" """
# Check whether we should write worker logs to disk, in addition to the console # Check whether we should write worker logs to disk, in addition to the console
@ -710,4 +703,4 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
if __name__ == "__main__": if __name__ == "__main__":
main(sys.argv, os.environ) main(sys.argv, os.environ)

View File

@ -1,4 +1,3 @@
#!/usr/local/bin/python #!/usr/local/bin/python
import codecs import codecs