diff --git a/.gitignore b/.gitignore index 2cef1b0a5..295a18b53 100644 --- a/.gitignore +++ b/.gitignore @@ -6,13 +6,14 @@ *.egg *.egg-info *.lock -*.pyc +*.py[cod] *.snap *.tac _trial_temp/ _trial_temp*/ /out .DS_Store +__pycache__/ # stuff that is likely to exist when you run a server locally /*.db diff --git a/CHANGES.md b/CHANGES.md index d584d342d..15d2894bc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,60 @@ +Synapse 1.29.0rc1 (2021-03-04) +============================== + +Note that synapse now expects an `X-Forwarded-Proto` header when used with a reverse proxy. Please see [UPGRADE.rst](UPGRADE.rst#upgrading-to-v1290) for more details on this change. + + +Features +-------- + +- Add rate limiters to cross-user key sharing requests. ([\#8957](https://github.com/matrix-org/synapse/issues/8957)) +- Add `order_by` to the admin API `GET /_synapse/admin/v1/users//media`. Contributed by @dklimpel. ([\#8978](https://github.com/matrix-org/synapse/issues/8978)) +- Add some configuration settings to make users' profile data more private. ([\#9203](https://github.com/matrix-org/synapse/issues/9203)) +- The `no_proxy` and `NO_PROXY` environment variables are now respected in proxied HTTP clients with the lowercase form taking precedence if both are present. Additionally, the lowercase `https_proxy` environment variable is now respected in proxied HTTP clients on top of existing support for the uppercase `HTTPS_PROXY` form and takes precedence if both are present. Contributed by Timothy Leung. ([\#9372](https://github.com/matrix-org/synapse/issues/9372)) +- Add a configuration option, `user_directory.prefer_local_users`, which when enabled will make it more likely for users on the same server as you to appear above other users. ([\#9383](https://github.com/matrix-org/synapse/issues/9383), [\#9385](https://github.com/matrix-org/synapse/issues/9385)) +- Add support for regenerating thumbnails if they have been deleted but the original image is still stored. ([\#9438](https://github.com/matrix-org/synapse/issues/9438)) +- Add support for `X-Forwarded-Proto` header when using a reverse proxy. ([\#9472](https://github.com/matrix-org/synapse/issues/9472), [\#9501](https://github.com/matrix-org/synapse/issues/9501), [\#9512](https://github.com/matrix-org/synapse/issues/9512), [\#9539](https://github.com/matrix-org/synapse/issues/9539)) + + +Bugfixes +-------- + +- Fix a bug where users' pushers were not all deleted when they deactivated their account. ([\#9285](https://github.com/matrix-org/synapse/issues/9285), [\#9516](https://github.com/matrix-org/synapse/issues/9516)) +- Fix a bug where a lot of unnecessary presence updates were sent when joining a room. ([\#9402](https://github.com/matrix-org/synapse/issues/9402)) +- Fix a bug that caused multiple calls to the experimental `shared_rooms` endpoint to return stale results. ([\#9416](https://github.com/matrix-org/synapse/issues/9416)) +- Fix a bug in single sign-on which could cause a "No session cookie found" error. ([\#9436](https://github.com/matrix-org/synapse/issues/9436)) +- Fix bug introduced in v1.27.0 where allowing a user to choose their own username when logging in via single sign-on did not work unless an `idp_icon` was defined. ([\#9440](https://github.com/matrix-org/synapse/issues/9440)) +- Fix a bug introduced in v1.26.0 where some sequences were not properly configured when running `synapse_port_db`. ([\#9449](https://github.com/matrix-org/synapse/issues/9449)) +- Fix deleting pushers when using sharded pushers. ([\#9465](https://github.com/matrix-org/synapse/issues/9465), [\#9466](https://github.com/matrix-org/synapse/issues/9466), [\#9479](https://github.com/matrix-org/synapse/issues/9479), [\#9536](https://github.com/matrix-org/synapse/issues/9536)) +- Fix missing startup checks for the consistency of certain PostgreSQL sequences. ([\#9470](https://github.com/matrix-org/synapse/issues/9470)) +- Fix a long-standing bug where the media repository could leak file descriptors while previewing media. ([\#9497](https://github.com/matrix-org/synapse/issues/9497)) +- Properly purge the event chain cover index when purging history. ([\#9498](https://github.com/matrix-org/synapse/issues/9498)) +- Fix missing chain cover index due to a schema delta not being applied correctly. Only affected servers that ran development versions. ([\#9503](https://github.com/matrix-org/synapse/issues/9503)) +- Fix a bug introduced in v1.25.0 where `/_synapse/admin/join/` would fail when given a room alias. ([\#9506](https://github.com/matrix-org/synapse/issues/9506)) +- Prevent presence background jobs from running when presence is disabled. ([\#9530](https://github.com/matrix-org/synapse/issues/9530)) +- Fix rare edge case that caused a background update to fail if the server had rejected an event that had duplicate auth events. ([\#9537](https://github.com/matrix-org/synapse/issues/9537)) + + +Improved Documentation +---------------------- + +- Update the example systemd config to propagate reloads to individual units. ([\#9463](https://github.com/matrix-org/synapse/issues/9463)) + + +Internal Changes +---------------- + +- Add documentation and type hints to `parse_duration`. ([\#9432](https://github.com/matrix-org/synapse/issues/9432)) +- Remove vestiges of `uploads_path` configuration setting. ([\#9462](https://github.com/matrix-org/synapse/issues/9462)) +- Add a comment about systemd-python. ([\#9464](https://github.com/matrix-org/synapse/issues/9464)) +- Test that we require validated email for email pushers. ([\#9496](https://github.com/matrix-org/synapse/issues/9496)) +- Allow python to generate bytecode for synapse. ([\#9502](https://github.com/matrix-org/synapse/issues/9502)) +- Fix incorrect type hints. ([\#9515](https://github.com/matrix-org/synapse/issues/9515), [\#9518](https://github.com/matrix-org/synapse/issues/9518)) +- Add type hints to device and event report admin API. ([\#9519](https://github.com/matrix-org/synapse/issues/9519)) +- Add type hints to user admin API. ([\#9521](https://github.com/matrix-org/synapse/issues/9521)) +- Bump the versions of mypy and mypy-zope used for static type checking. ([\#9529](https://github.com/matrix-org/synapse/issues/9529)) + + Synapse 1.28.0 (2021-02-25) =========================== diff --git a/UPGRADE.rst b/UPGRADE.rst index 6f628a694..031e02bda 100644 --- a/UPGRADE.rst +++ b/UPGRADE.rst @@ -85,6 +85,26 @@ for example: wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb +Upgrading to v1.29.0 +==================== + +Requirement for X-Forwarded-Proto header +---------------------------------------- + +When using Synapse with a reverse proxy (in particular, when using the +`x_forwarded` option on an HTTP listener), Synapse now expects to receive an +`X-Forwarded-Proto` header on incoming HTTP requests. If it is not set, Synapse +will log a warning on each received request. + +To avoid the warning, administrators using a reverse proxy should ensure that +the reverse proxy sets `X-Forwarded-Proto` header to `https` or `http` to +indicate the protocol used by the client. See the `reverse proxy documentation +`_, where the example configurations have been updated to +show how to set this header. + +(Users of `Caddy `_ are unaffected, since we believe it +sets `X-Forwarded-Proto` by default.) + Upgrading to v1.27.0 ==================== diff --git a/debian/build_virtualenv b/debian/build_virtualenv index cf19084a9..cad7d1688 100755 --- a/debian/build_virtualenv +++ b/debian/build_virtualenv @@ -58,10 +58,10 @@ trap "rm -r $tmpdir" EXIT cp -r tests "$tmpdir" PYTHONPATH="$tmpdir" \ - "${TARGET_PYTHON}" -B -m twisted.trial --reporter=text -j2 tests + "${TARGET_PYTHON}" -m twisted.trial --reporter=text -j2 tests # build the config file -"${TARGET_PYTHON}" -B "${VIRTUALENV_DIR}/bin/generate_config" \ +"${TARGET_PYTHON}" "${VIRTUALENV_DIR}/bin/generate_config" \ --config-dir="/etc/matrix-synapse" \ --data-dir="/var/lib/matrix-synapse" | perl -pe ' @@ -87,7 +87,7 @@ PYTHONPATH="$tmpdir" \ ' > "${PACKAGE_BUILD_DIR}/etc/matrix-synapse/homeserver.yaml" # build the log config file -"${TARGET_PYTHON}" -B "${VIRTUALENV_DIR}/bin/generate_log_config" \ +"${TARGET_PYTHON}" "${VIRTUALENV_DIR}/bin/generate_log_config" \ --output-file="${PACKAGE_BUILD_DIR}/etc/matrix-synapse/log.yaml" # add a dependency on the right version of python to substvars. diff --git a/debian/changelog b/debian/changelog index 642e4d381..04cd0d0d6 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,10 @@ +matrix-synapse-py3 (1.29.0) UNRELEASED; urgency=medium + + [ Jonathan de Jong ] + * Remove the python -B flag (don't generate bytecode) in scripts and documentation. + + -- Synapse Packaging team Fri, 26 Feb 2021 14:41:31 +0100 + matrix-synapse-py3 (1.28.0) stable; urgency=medium * New synapse release 1.28.0. diff --git a/debian/synctl.1 b/debian/synctl.1 index 437f8f9e0..af58c8d22 100644 --- a/debian/synctl.1 +++ b/debian/synctl.1 @@ -44,7 +44,7 @@ Configuration file may be generated as follows: . .nf -$ python \-B \-m synapse\.app\.homeserver \-c config\.yaml \-\-generate\-config \-\-server\-name= +$ python \-m synapse\.app\.homeserver \-c config\.yaml \-\-generate\-config \-\-server\-name= . .fi . diff --git a/debian/synctl.ronn b/debian/synctl.ronn index 1bad6094f..10cbda988 100644 --- a/debian/synctl.ronn +++ b/debian/synctl.ronn @@ -41,7 +41,7 @@ process. Configuration file may be generated as follows: - $ python -B -m synapse.app.homeserver -c config.yaml --generate-config --server-name= + $ python -m synapse.app.homeserver -c config.yaml --generate-config --server-name= ## ENVIRONMENT diff --git a/docker/README.md b/docker/README.md index c8f27b856..7b138df4d 100644 --- a/docker/README.md +++ b/docker/README.md @@ -11,7 +11,6 @@ The image also does *not* provide a TURN server. By default, the image expects a single volume, located at ``/data``, that will hold: * configuration files; -* temporary files during uploads; * uploaded media and thumbnails; * the SQLite database if you do not configure postgres; * the appservices configuration. diff --git a/docker/conf/homeserver.yaml b/docker/conf/homeserver.yaml index 2ed570a5d..0dea62a87 100644 --- a/docker/conf/homeserver.yaml +++ b/docker/conf/homeserver.yaml @@ -89,7 +89,6 @@ federation_rc_concurrent: 3 ## Files ## media_store_path: "/data/media" -uploads_path: "/data/uploads" max_upload_size: "{{ SYNAPSE_MAX_UPLOAD_SIZE or "50M" }}" max_image_pixels: "32M" dynamic_thumbnails: false diff --git a/docs/admin_api/user_admin_api.rst b/docs/admin_api/user_admin_api.rst index 33dfbcfb4..8d4ec5a6f 100644 --- a/docs/admin_api/user_admin_api.rst +++ b/docs/admin_api/user_admin_api.rst @@ -379,11 +379,12 @@ The following fields are returned in the JSON response body: - ``total`` - Number of rooms. -List media of an user -================================ +List media of a user +==================== Gets a list of all local media that a specific ``user_id`` has created. -The response is ordered by creation date descending and media ID descending. -The newest media is on top. +By default, the response is ordered by descending creation date and ascending media ID. +The newest media is on top. You can change the order with parameters +``order_by`` and ``dir``. The API is:: @@ -440,6 +441,35 @@ The following parameters should be set in the URL: denoting the offset in the returned results. This should be treated as an opaque value and not explicitly set to anything other than the return value of ``next_token`` from a previous call. Defaults to ``0``. +- ``order_by`` - The method by which to sort the returned list of media. + If the ordered field has duplicates, the second order is always by ascending ``media_id``, + which guarantees a stable ordering. Valid values are: + + - ``media_id`` - Media are ordered alphabetically by ``media_id``. + - ``upload_name`` - Media are ordered alphabetically by name the media was uploaded with. + - ``created_ts`` - Media are ordered by when the content was uploaded in ms. + Smallest to largest. This is the default. + - ``last_access_ts`` - Media are ordered by when the content was last accessed in ms. + Smallest to largest. + - ``media_length`` - Media are ordered by length of the media in bytes. + Smallest to largest. + - ``media_type`` - Media are ordered alphabetically by MIME-type. + - ``quarantined_by`` - Media are ordered alphabetically by the user ID that + initiated the quarantine request for this media. + - ``safe_from_quarantine`` - Media are ordered by the status if this media is safe + from quarantining. + +- ``dir`` - Direction of media order. Either ``f`` for forwards or ``b`` for backwards. + Setting this value to ``b`` will reverse the above sort order. Defaults to ``f``. + +If neither ``order_by`` nor ``dir`` is set, the default order is newest media on top +(corresponds to ``order_by`` = ``created_ts`` and ``dir`` = ``b``). + +Caution. The database only has indexes on the columns ``media_id``, +``user_id`` and ``created_ts``. This means that if a different sort order is used +(``upload_name``, ``last_access_ts``, ``media_length``, ``media_type``, +``quarantined_by`` or ``safe_from_quarantine``), this can cause a large load on the +database, especially for large environments. **Response** diff --git a/docs/reverse_proxy.md b/docs/reverse_proxy.md index 04b6e2412..81e5a68a3 100644 --- a/docs/reverse_proxy.md +++ b/docs/reverse_proxy.md @@ -9,23 +9,23 @@ of doing so is that it means that you can expose the default https port (443) to Matrix clients without needing to run Synapse with root privileges. -**NOTE**: Your reverse proxy must not `canonicalise` or `normalise` -the requested URI in any way (for example, by decoding `%xx` escapes). -Beware that Apache *will* canonicalise URIs unless you specify -`nocanon`. +You should configure your reverse proxy to forward requests to `/_matrix` or +`/_synapse/client` to Synapse, and have it set the `X-Forwarded-For` and +`X-Forwarded-Proto` request headers. -When setting up a reverse proxy, remember that Matrix clients and other -Matrix servers do not necessarily need to connect to your server via the -same server name or port. Indeed, clients will use port 443 by default, -whereas servers default to port 8448. Where these are different, we -refer to the 'client port' and the 'federation port'. See [the Matrix +You should remember that Matrix clients and other Matrix servers do not +necessarily need to connect to your server via the same server name or +port. Indeed, clients will use port 443 by default, whereas servers default to +port 8448. Where these are different, we refer to the 'client port' and the +'federation port'. See [the Matrix specification](https://matrix.org/docs/spec/server_server/latest#resolving-server-names) for more details of the algorithm used for federation connections, and [delegate.md]() for instructions on setting up delegation. -Endpoints that are part of the standardised Matrix specification are -located under `/_matrix`, whereas endpoints specific to Synapse are -located under `/_synapse/client`. +**NOTE**: Your reverse proxy must not `canonicalise` or `normalise` +the requested URI in any way (for example, by decoding `%xx` escapes). +Beware that Apache *will* canonicalise URIs unless you specify +`nocanon`. Let's assume that we expect clients to connect to our server at `https://matrix.example.com`, and other servers to connect at @@ -52,6 +52,9 @@ server { location ~* ^(\/_matrix|\/_synapse\/client) { proxy_pass http://localhost:8008; proxy_set_header X-Forwarded-For $remote_addr; + proxy_set_header X-Forwarded-Proto $scheme; + proxy_set_header Host $host; + # Nginx by default only allows file uploads up to 1M in size # Increase client_max_body_size to match max_upload_size defined in homeserver.yaml client_max_body_size 50M; @@ -102,6 +105,7 @@ example.com:8448 { SSLEngine on ServerName matrix.example.com; + RequestHeader set "X-Forwarded-Proto" expr=%{REQUEST_SCHEME} AllowEncodedSlashes NoDecode ProxyPass /_matrix http://127.0.0.1:8008/_matrix nocanon ProxyPassReverse /_matrix http://127.0.0.1:8008/_matrix @@ -113,6 +117,7 @@ example.com:8448 { SSLEngine on ServerName example.com; + RequestHeader set "X-Forwarded-Proto" expr=%{REQUEST_SCHEME} AllowEncodedSlashes NoDecode ProxyPass /_matrix http://127.0.0.1:8008/_matrix nocanon ProxyPassReverse /_matrix http://127.0.0.1:8008/_matrix @@ -134,6 +139,9 @@ example.com:8448 { ``` frontend https bind :::443 v4v6 ssl crt /etc/ssl/haproxy/ strict-sni alpn h2,http/1.1 + http-request set-header X-Forwarded-Proto https if { ssl_fc } + http-request set-header X-Forwarded-Proto http if !{ ssl_fc } + http-request set-header X-Forwarded-For %[src] # Matrix client traffic acl matrix-host hdr(host) -i matrix.example.com @@ -144,6 +152,10 @@ frontend https frontend matrix-federation bind :::8448 v4v6 ssl crt /etc/ssl/haproxy/synapse.pem alpn h2,http/1.1 + http-request set-header X-Forwarded-Proto https if { ssl_fc } + http-request set-header X-Forwarded-Proto http if !{ ssl_fc } + http-request set-header X-Forwarded-For %[src] + default_backend matrix backend matrix diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 52380dfb0..4dbef41b7 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -101,6 +101,14 @@ pid_file: DATADIR/homeserver.pid # #limit_profile_requests_to_users_who_share_rooms: true +# Uncomment to prevent a user's profile data from being retrieved and +# displayed in a room until they have joined it. By default, a user's +# profile data is included in an invite event, regardless of the values +# of the above two settings, and whether or not the users share a server. +# Defaults to 'true'. +# +#include_profile_data_on_invite: false + # If set to 'true', removes the need for authentication to access the server's # public rooms directory through the client API, meaning that anyone can # query the room directory. Defaults to 'false'. @@ -699,6 +707,12 @@ acme: # - matrix.org # - example.com +# Uncomment to disable profile lookup over federation. By default, the +# Federation API allows other homeservers to obtain profile data of any user +# on this homeserver. Defaults to 'true'. +# +#allow_profile_lookup_over_federation: false + ## Caching ## @@ -2530,19 +2544,35 @@ spam_checker: # User Directory configuration # -# 'enabled' defines whether users can search the user directory. If -# false then empty responses are returned to all queries. Defaults to -# true. -# -# 'search_all_users' defines whether to search all users visible to your HS -# when searching the user directory, rather than limiting to users visible -# in public rooms. Defaults to false. If you set it True, you'll have to -# rebuild the user_directory search indexes, see -# https://github.com/matrix-org/synapse/blob/master/docs/user_directory.md -# -#user_directory: -# enabled: true -# search_all_users: false +user_directory: + # Defines whether users can search the user directory. If false then + # empty responses are returned to all queries. Defaults to true. + # + # Uncomment to disable the user directory. + # + #enabled: false + + # Defines whether to search all users visible to your HS when searching + # the user directory, rather than limiting to users visible in public + # rooms. Defaults to false. + # + # If you set it true, you'll have to rebuild the user_directory search + # indexes, see: + # https://github.com/matrix-org/synapse/blob/master/docs/user_directory.md + # + # Uncomment to return search results containing all known users, even if that + # user does not share a room with the requester. + # + #search_all_users: true + + # Defines whether to prefer local users in search query results. + # If True, local users are more likely to appear above remote users + # when searching the user directory. Defaults to false. + # + # Uncomment to prefer local over remote users in user directory search + # results. + # + #prefer_local_users: true # User Consent configuration diff --git a/docs/spam_checker.md b/docs/spam_checker.md index 47a27bf85..e615ac991 100644 --- a/docs/spam_checker.md +++ b/docs/spam_checker.md @@ -25,7 +25,7 @@ well as some specific methods: * `check_username_for_spam` * `check_registration_for_spam` -The details of the each of these methods (as well as their inputs and outputs) +The details of each of these methods (as well as their inputs and outputs) are documented in the `synapse.events.spamcheck.SpamChecker` class. The `ModuleApi` class provides a way for the custom spam checker class to diff --git a/docs/systemd-with-workers/system/matrix-synapse-worker@.service b/docs/systemd-with-workers/system/matrix-synapse-worker@.service index cb5ac0ac8..d164e8ce1 100644 --- a/docs/systemd-with-workers/system/matrix-synapse-worker@.service +++ b/docs/systemd-with-workers/system/matrix-synapse-worker@.service @@ -4,6 +4,7 @@ AssertPathExists=/etc/matrix-synapse/workers/%i.yaml # This service should be restarted when the synapse target is restarted. PartOf=matrix-synapse.target +ReloadPropagatedFrom=matrix-synapse.target # if this is started at the same time as the main, let the main process start # first, to initialise the database schema. diff --git a/docs/systemd-with-workers/system/matrix-synapse.service b/docs/systemd-with-workers/system/matrix-synapse.service index c7b5ddfa4..f6b6dfd3c 100644 --- a/docs/systemd-with-workers/system/matrix-synapse.service +++ b/docs/systemd-with-workers/system/matrix-synapse.service @@ -3,6 +3,7 @@ Description=Synapse master # This service should be restarted when the synapse target is restarted. PartOf=matrix-synapse.target +ReloadPropagatedFrom=matrix-synapse.target [Service] Type=notify diff --git a/docs/tcp_replication.md b/docs/tcp_replication.md index ad145439b..15df949de 100644 --- a/docs/tcp_replication.md +++ b/docs/tcp_replication.md @@ -220,10 +220,6 @@ Asks the server for the current position of all streams. Acknowledge receipt of some federation data -#### REMOVE_PUSHER (C) - - Inform the server a pusher should be removed - ### REMOTE_SERVER_UP (S, C) Inform other processes that a remote server may have come back online. diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 69bf9110a..58edf6af6 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -22,7 +22,7 @@ import logging import sys import time import traceback -from typing import Dict, Optional, Set +from typing import Dict, Iterable, Optional, Set import yaml @@ -47,6 +47,7 @@ from synapse.storage.databases.main.events_bg_updates import ( from synapse.storage.databases.main.media_repository import ( MediaRepositoryBackgroundUpdateStore, ) +from synapse.storage.databases.main.pusher import PusherWorkerStore from synapse.storage.databases.main.registration import ( RegistrationBackgroundUpdateStore, find_max_generated_user_id_localpart, @@ -177,6 +178,7 @@ class Store( UserDirectoryBackgroundUpdateStore, EndToEndKeyBackgroundStore, StatsStore, + PusherWorkerStore, ): def execute(self, f, *args, **kwargs): return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs) @@ -629,7 +631,13 @@ class Porter(object): await self._setup_state_group_id_seq() await self._setup_user_id_seq() await self._setup_events_stream_seqs() - await self._setup_device_inbox_seq() + await self._setup_sequence( + "device_inbox_sequence", ("device_inbox", "device_federation_outbox") + ) + await self._setup_sequence( + "account_data_sequence", ("room_account_data", "room_tags_revisions", "account_data")) + await self._setup_sequence("receipts_sequence", ("receipts_linearized", )) + await self._setup_auth_chain_sequence() # Step 3. Get tables. self.progress.set_state("Fetching tables") @@ -854,7 +862,7 @@ class Porter(object): return done, remaining + done - async def _setup_state_group_id_seq(self): + async def _setup_state_group_id_seq(self) -> None: curr_id = await self.sqlite_store.db_pool.simple_select_one_onecol( table="state_groups", keyvalues={}, retcol="MAX(id)", allow_none=True ) @@ -868,7 +876,7 @@ class Porter(object): await self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r) - async def _setup_user_id_seq(self): + async def _setup_user_id_seq(self) -> None: curr_id = await self.sqlite_store.db_pool.runInteraction( "setup_user_id_seq", find_max_generated_user_id_localpart ) @@ -877,9 +885,9 @@ class Porter(object): next_id = curr_id + 1 txn.execute("ALTER SEQUENCE user_id_seq RESTART WITH %s", (next_id,)) - return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r) + await self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r) - async def _setup_events_stream_seqs(self): + async def _setup_events_stream_seqs(self) -> None: """Set the event stream sequences to the correct values. """ @@ -908,35 +916,46 @@ class Porter(object): (curr_backward_id + 1,), ) - return await self.postgres_store.db_pool.runInteraction( + await self.postgres_store.db_pool.runInteraction( "_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos, ) - async def _setup_device_inbox_seq(self): - """Set the device inbox sequence to the correct value. + async def _setup_sequence(self, sequence_name: str, stream_id_tables: Iterable[str]) -> None: + """Set a sequence to the correct value. """ - curr_local_id = await self.sqlite_store.db_pool.simple_select_one_onecol( - table="device_inbox", - keyvalues={}, - retcol="COALESCE(MAX(stream_id), 1)", - allow_none=True, - ) + current_stream_ids = [] + for stream_id_table in stream_id_tables: + max_stream_id = await self.sqlite_store.db_pool.simple_select_one_onecol( + table=stream_id_table, + keyvalues={}, + retcol="COALESCE(MAX(stream_id), 1)", + allow_none=True, + ) + current_stream_ids.append(max_stream_id) - curr_federation_id = await self.sqlite_store.db_pool.simple_select_one_onecol( - table="device_federation_outbox", - keyvalues={}, - retcol="COALESCE(MAX(stream_id), 1)", - allow_none=True, - ) + next_id = max(current_stream_ids) + 1 - next_id = max(curr_local_id, curr_federation_id) + 1 + def r(txn): + sql = "ALTER SEQUENCE %s RESTART WITH" % (sequence_name, ) + txn.execute(sql + " %s", (next_id, )) + + await self.postgres_store.db_pool.runInteraction("_setup_%s" % (sequence_name,), r) + + async def _setup_auth_chain_sequence(self) -> None: + curr_chain_id = await self.sqlite_store.db_pool.simple_select_one_onecol( + table="event_auth_chains", keyvalues={}, retcol="MAX(chain_id)", allow_none=True + ) def r(txn): txn.execute( - "ALTER SEQUENCE device_inbox_sequence RESTART WITH %s", (next_id,) + "ALTER SEQUENCE event_auth_chain_id RESTART WITH %s", + (curr_chain_id,), ) - return self.postgres_store.db_pool.runInteraction("_setup_device_inbox_seq", r) + await self.postgres_store.db_pool.runInteraction( + "_setup_event_auth_chain_id", r, + ) + ############################################## diff --git a/setup.py b/setup.py index 08ba4eb76..bbd9e7862 100755 --- a/setup.py +++ b/setup.py @@ -102,7 +102,7 @@ CONDITIONAL_REQUIREMENTS["lint"] = [ "flake8", ] -CONDITIONAL_REQUIREMENTS["mypy"] = ["mypy==0.790", "mypy-zope==0.2.8"] +CONDITIONAL_REQUIREMENTS["mypy"] = ["mypy==0.812", "mypy-zope==0.2.11"] # Dependencies which are exclusively required by unit test code. This is # NOT a list of all modules that are necessary to run the unit tests. diff --git a/synapse/__init__.py b/synapse/__init__.py index 869e860fb..2c24d4ae0 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -48,7 +48,7 @@ try: except ImportError: pass -__version__ = "1.28.0" +__version__ = "1.29.0rc1" if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when diff --git a/synapse/api/constants.py b/synapse/api/constants.py index af8d59cf8..691f8f9ad 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -98,11 +98,14 @@ class EventTypes: Retention = "m.room.retention" - Presence = "m.presence" - Dummy = "org.matrix.dummy_event" +class EduTypes: + Presence = "m.presence" + RoomKeyRequest = "m.room_key_request" + + class RejectedReason: AUTH_ERROR = "auth_error" diff --git a/synapse/api/ratelimiting.py b/synapse/api/ratelimiting.py index 5d9d5a228..c3f07bc1a 100644 --- a/synapse/api/ratelimiting.py +++ b/synapse/api/ratelimiting.py @@ -14,7 +14,7 @@ # limitations under the License. from collections import OrderedDict -from typing import Any, Optional, Tuple +from typing import Hashable, Optional, Tuple from synapse.api.errors import LimitExceededError from synapse.types import Requester @@ -42,7 +42,9 @@ class Ratelimiter: # * How many times an action has occurred since a point in time # * The point in time # * The rate_hz of this particular entry. This can vary per request - self.actions = OrderedDict() # type: OrderedDict[Any, Tuple[float, int, float]] + self.actions = ( + OrderedDict() + ) # type: OrderedDict[Hashable, Tuple[float, int, float]] def can_requester_do_action( self, @@ -82,7 +84,7 @@ class Ratelimiter: def can_do_action( self, - key: Any, + key: Hashable, rate_hz: Optional[float] = None, burst_count: Optional[int] = None, update: bool = True, @@ -175,7 +177,7 @@ class Ratelimiter: def ratelimit( self, - key: Any, + key: Hashable, rate_hz: Optional[float] = None, burst_count: Optional[int] = None, update: bool = True, diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py index a01bac299..4a9b0129c 100644 --- a/synapse/app/__init__.py +++ b/synapse/app/__init__.py @@ -17,8 +17,6 @@ import sys from synapse import python_dependencies # noqa: E402 -sys.dont_write_bytecode = True - logger = logging.getLogger(__name__) try: diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index b4bd4d8e7..9f99651aa 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -210,7 +210,9 @@ def start(config_options): config.update_user_directory = False config.run_background_tasks = False config.start_pushers = False + config.pusher_shard_config.instances = [] config.send_federation = False + config.federation_shard_config.instances = [] synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 6526acb2f..274d582d0 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -23,6 +23,7 @@ from typing_extensions import ContextManager from twisted.internet import address from twisted.web.resource import IResource +from twisted.web.server import Request import synapse import synapse.events @@ -190,7 +191,7 @@ class KeyUploadServlet(RestServlet): self.http_client = hs.get_simple_http_client() self.main_uri = hs.config.worker_main_http_uri - async def on_POST(self, request, device_id): + async def on_POST(self, request: Request, device_id: Optional[str]): requester = await self.auth.get_user_by_req(request, allow_guest=True) user_id = requester.user.to_string() body = parse_json_object_from_request(request) @@ -223,10 +224,12 @@ class KeyUploadServlet(RestServlet): header: request.requestHeaders.getRawHeaders(header, []) for header in (b"Authorization", b"User-Agent") } - # Add the previous hop the the X-Forwarded-For header. + # Add the previous hop to the X-Forwarded-For header. x_forwarded_for = request.requestHeaders.getRawHeaders( b"X-Forwarded-For", [] ) + # we use request.client here, since we want the previous hop, not the + # original client (as returned by request.getClientAddress()). if isinstance(request.client, (address.IPv4Address, address.IPv6Address)): previous_host = request.client.host.encode("ascii") # If the header exists, add to the comma-separated list of the first @@ -239,6 +242,14 @@ class KeyUploadServlet(RestServlet): x_forwarded_for = [previous_host] headers[b"X-Forwarded-For"] = x_forwarded_for + # Replicate the original X-Forwarded-Proto header. Note that + # XForwardedForRequest overrides isSecure() to give us the original protocol + # used by the client, as opposed to the protocol used by our upstream proxy + # - which is what we want here. + headers[b"X-Forwarded-Proto"] = [ + b"https" if request.isSecure() else b"http" + ] + try: result = await self.http_client.post_json_get_json( self.main_uri + request.uri.decode("ascii"), body, headers=headers @@ -645,9 +656,6 @@ class GenericWorkerServer(HomeServer): self.get_tcp_replication().start_replication(self) - async def remove_pusher(self, app_id, push_key, user_id): - self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id) - @cache_in_self def get_replication_data_handler(self): return GenericWorkerReplicationHandler(self) @@ -922,22 +930,6 @@ def start(config_options): # For other worker types we force this to off. config.appservice.notify_appservices = False - if config.worker_app == "synapse.app.pusher": - if config.server.start_pushers: - sys.stderr.write( - "\nThe pushers must be disabled in the main synapse process" - "\nbefore they can be run in a separate worker." - "\nPlease add ``start_pushers: false`` to the main config" - "\n" - ) - sys.exit(1) - - # Force the pushers to start since they will be disabled in the main config - config.server.start_pushers = True - else: - # For other worker types we force this to off. - config.server.start_pushers = False - if config.worker_app == "synapse.app.user_dir": if config.server.update_user_directory: sys.stderr.write( @@ -954,22 +946,6 @@ def start(config_options): # For other worker types we force this to off. config.server.update_user_directory = False - if config.worker_app == "synapse.app.federation_sender": - if config.worker.send_federation: - sys.stderr.write( - "\nThe send_federation must be disabled in the main synapse process" - "\nbefore they can be run in a separate worker." - "\nPlease add ``send_federation: false`` to the main config" - "\n" - ) - sys.exit(1) - - # Force the pushers to start since they will be disabled in the main config - config.worker.send_federation = True - else: - # For other worker types we force this to off. - config.worker.send_federation = False - synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts hs = GenericWorkerServer( diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 97399eb9b..402696671 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -21,7 +21,7 @@ import os from collections import OrderedDict from hashlib import sha256 from textwrap import dedent -from typing import Any, Iterable, List, MutableMapping, Optional +from typing import Any, Iterable, List, MutableMapping, Optional, Union import attr import jinja2 @@ -147,7 +147,20 @@ class Config: return int(value) * size @staticmethod - def parse_duration(value): + def parse_duration(value: Union[str, int]) -> int: + """Convert a duration as a string or integer to a number of milliseconds. + + If an integer is provided it is treated as milliseconds and is unchanged. + + String durations can have a suffix of 's', 'm', 'h', 'd', 'w', or 'y'. + No suffix is treated as milliseconds. + + Args: + value: The duration to parse. + + Returns: + The number of milliseconds in the duration. + """ if isinstance(value, int): return value second = 1000 @@ -831,22 +844,23 @@ class ShardedWorkerHandlingConfig: def should_handle(self, instance_name: str, key: str) -> bool: """Whether this instance is responsible for handling the given key.""" - # If multiple instances are not defined we always return true - if not self.instances or len(self.instances) == 1: - return True + # If no instances are defined we assume some other worker is handling + # this. + if not self.instances: + return False - return self.get_instance(key) == instance_name + return self._get_instance(key) == instance_name - def get_instance(self, key: str) -> str: + def _get_instance(self, key: str) -> str: """Get the instance responsible for handling the given key. - Note: For things like federation sending the config for which instance - is sending is known only to the sender instance if there is only one. - Therefore `should_handle` should be used where possible. + Note: For federation sending and pushers the config for which instance + is sending is known only to the sender instance, so we don't expose this + method by default. """ if not self.instances: - return "master" + raise Exception("Unknown worker") if len(self.instances) == 1: return self.instances[0] @@ -863,4 +877,21 @@ class ShardedWorkerHandlingConfig: return self.instances[remainder] +@attr.s +class RoutableShardedWorkerHandlingConfig(ShardedWorkerHandlingConfig): + """A version of `ShardedWorkerHandlingConfig` that is used for config + options where all instances know which instances are responsible for the + sharded work. + """ + + def __attrs_post_init__(self): + # We require that `self.instances` is non-empty. + if not self.instances: + raise Exception("Got empty list of instances for shard config") + + def get_instance(self, key: str) -> str: + """Get the instance responsible for handling the given key.""" + return self._get_instance(key) + + __all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"] diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi index 70025b5d6..db16c86f5 100644 --- a/synapse/config/_base.pyi +++ b/synapse/config/_base.pyi @@ -149,4 +149,6 @@ class ShardedWorkerHandlingConfig: instances: List[str] def __init__(self, instances: List[str]) -> None: ... def should_handle(self, instance_name: str, key: str) -> bool: ... + +class RoutableShardedWorkerHandlingConfig(ShardedWorkerHandlingConfig): def get_instance(self, key: str) -> str: ... diff --git a/synapse/config/federation.py b/synapse/config/federation.py index 9f3c57e6a..55e4db544 100644 --- a/synapse/config/federation.py +++ b/synapse/config/federation.py @@ -41,6 +41,10 @@ class FederationConfig(Config): ) self.federation_metrics_domains = set(federation_metrics_domains) + self.allow_profile_lookup_over_federation = config.get( + "allow_profile_lookup_over_federation", True + ) + def generate_config_section(self, config_dir_path, server_name, **kwargs): return """\ ## Federation ## @@ -66,6 +70,12 @@ class FederationConfig(Config): #federation_metrics_domains: # - matrix.org # - example.com + + # Uncomment to disable profile lookup over federation. By default, the + # Federation API allows other homeservers to obtain profile data of any user + # on this homeserver. Defaults to 'true'. + # + #allow_profile_lookup_over_federation: false """ diff --git a/synapse/config/push.py b/synapse/config/push.py index 3adbfb73e..7831a2ef7 100644 --- a/synapse/config/push.py +++ b/synapse/config/push.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ._base import Config, ShardedWorkerHandlingConfig +from ._base import Config class PushConfig(Config): @@ -27,9 +27,6 @@ class PushConfig(Config): "group_unread_count_by_room", True ) - pusher_instances = config.get("pusher_instances") or [] - self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances) - # There was a a 'redact_content' setting but mistakenly read from the # 'email'section'. Check for the flag in the 'push' section, and log, # but do not honour it to avoid nasty surprises when people upgrade. diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py index def33a60a..847d25122 100644 --- a/synapse/config/ratelimiting.py +++ b/synapse/config/ratelimiting.py @@ -102,6 +102,16 @@ class RatelimitConfig(Config): defaults={"per_second": 0.01, "burst_count": 3}, ) + # Ratelimit cross-user key requests: + # * For local requests this is keyed by the sending device. + # * For requests received over federation this is keyed by the origin. + # + # Note that this isn't exposed in the configuration as it is obscure. + self.rc_key_requests = RateLimitConfig( + config.get("rc_key_requests", {}), + defaults={"per_second": 20, "burst_count": 100}, + ) + self.rc_3pid_validation = RateLimitConfig( config.get("rc_3pid_validation") or {}, defaults={"per_second": 0.003, "burst_count": 5}, diff --git a/synapse/config/repository.py b/synapse/config/repository.py index 9416f95a6..5ff64cda9 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -207,7 +207,6 @@ class ContentRepositoryConfig(Config): def generate_config_section(self, data_dir_path, **kwargs): media_store = os.path.join(data_dir_path, "media_store") - uploads_path = os.path.join(data_dir_path, "uploads") formatted_thumbnail_sizes = "".join( THUMBNAIL_SIZE_YAML % s for s in DEFAULT_THUMBNAIL_SIZES diff --git a/synapse/config/server.py b/synapse/config/server.py index 6f3325ff8..2afca36e7 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -263,6 +263,12 @@ class ServerConfig(Config): False, ) + # Whether to retrieve and display profile data for a user when they + # are invited to a room + self.include_profile_data_on_invite = config.get( + "include_profile_data_on_invite", True + ) + if "restrict_public_rooms_to_local_users" in config and ( "allow_public_rooms_without_auth" in config or "allow_public_rooms_over_federation" in config @@ -391,7 +397,6 @@ class ServerConfig(Config): if self.public_baseurl is not None: if self.public_baseurl[-1] != "/": self.public_baseurl += "/" - self.start_pushers = config.get("start_pushers", True) # (undocumented) option for torturing the worker-mode replication a bit, # for testing. The value defines the number of milliseconds to pause before @@ -848,6 +853,14 @@ class ServerConfig(Config): # #limit_profile_requests_to_users_who_share_rooms: true + # Uncomment to prevent a user's profile data from being retrieved and + # displayed in a room until they have joined it. By default, a user's + # profile data is included in an invite event, regardless of the values + # of the above two settings, and whether or not the users share a server. + # Defaults to 'true'. + # + #include_profile_data_on_invite: false + # If set to 'true', removes the need for authentication to access the server's # public rooms directory through the client API, meaning that anyone can # query the room directory. Defaults to 'false'. diff --git a/synapse/config/user_directory.py b/synapse/config/user_directory.py index c8d19c5d6..8d05ef173 100644 --- a/synapse/config/user_directory.py +++ b/synapse/config/user_directory.py @@ -24,32 +24,46 @@ class UserDirectoryConfig(Config): section = "userdirectory" def read_config(self, config, **kwargs): - self.user_directory_search_enabled = True - self.user_directory_search_all_users = False - user_directory_config = config.get("user_directory", None) - if user_directory_config: - self.user_directory_search_enabled = user_directory_config.get( - "enabled", True - ) - self.user_directory_search_all_users = user_directory_config.get( - "search_all_users", False - ) + user_directory_config = config.get("user_directory") or {} + self.user_directory_search_enabled = user_directory_config.get("enabled", True) + self.user_directory_search_all_users = user_directory_config.get( + "search_all_users", False + ) + self.user_directory_search_prefer_local_users = user_directory_config.get( + "prefer_local_users", False + ) def generate_config_section(self, config_dir_path, server_name, **kwargs): return """ # User Directory configuration # - # 'enabled' defines whether users can search the user directory. If - # false then empty responses are returned to all queries. Defaults to - # true. - # - # 'search_all_users' defines whether to search all users visible to your HS - # when searching the user directory, rather than limiting to users visible - # in public rooms. Defaults to false. If you set it True, you'll have to - # rebuild the user_directory search indexes, see - # https://github.com/matrix-org/synapse/blob/master/docs/user_directory.md - # - #user_directory: - # enabled: true - # search_all_users: false + user_directory: + # Defines whether users can search the user directory. If false then + # empty responses are returned to all queries. Defaults to true. + # + # Uncomment to disable the user directory. + # + #enabled: false + + # Defines whether to search all users visible to your HS when searching + # the user directory, rather than limiting to users visible in public + # rooms. Defaults to false. + # + # If you set it true, you'll have to rebuild the user_directory search + # indexes, see: + # https://github.com/matrix-org/synapse/blob/master/docs/user_directory.md + # + # Uncomment to return search results containing all known users, even if that + # user does not share a room with the requester. + # + #search_all_users: true + + # Defines whether to prefer local users in search query results. + # If True, local users are more likely to appear above remote users + # when searching the user directory. Defaults to false. + # + # Uncomment to prefer local over remote users in user directory search + # results. + # + #prefer_local_users: true """ diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 7a0ca16da..ac92375a8 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -17,9 +17,28 @@ from typing import List, Union import attr -from ._base import Config, ConfigError, ShardedWorkerHandlingConfig +from ._base import ( + Config, + ConfigError, + RoutableShardedWorkerHandlingConfig, + ShardedWorkerHandlingConfig, +) from .server import ListenerConfig, parse_listener_def +_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """ +The send_federation config option must be disabled in the main +synapse process before they can be run in a separate worker. + +Please add ``send_federation: false`` to the main config +""" + +_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """ +The start_pushers config option must be disabled in the main +synapse process before they can be run in a separate worker. + +Please add ``start_pushers: false`` to the main config +""" + def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]: """Helper for allowing parsing a string or list of strings to a config @@ -103,6 +122,7 @@ class WorkerConfig(Config): self.worker_replication_secret = config.get("worker_replication_secret", None) self.worker_name = config.get("worker_name", self.worker_app) + self.instance_name = self.worker_name or "master" self.worker_main_http_uri = config.get("worker_main_http_uri", None) @@ -118,12 +138,41 @@ class WorkerConfig(Config): ) ) - # Whether to send federation traffic out in this process. This only - # applies to some federation traffic, and so shouldn't be used to - # "disable" federation - self.send_federation = config.get("send_federation", True) + # Handle federation sender configuration. + # + # There are two ways of configuring which instances handle federation + # sending: + # 1. The old way where "send_federation" is set to false and running a + # `synapse.app.federation_sender` worker app. + # 2. Specifying the workers sending federation in + # `federation_sender_instances`. + # - federation_sender_instances = config.get("federation_sender_instances") or [] + send_federation = config.get("send_federation", True) + + federation_sender_instances = config.get("federation_sender_instances") + if federation_sender_instances is None: + # Default to an empty list, which means "another, unknown, worker is + # responsible for it". + federation_sender_instances = [] + + # If no federation sender instances are set we check if + # `send_federation` is set, which means use master + if send_federation: + federation_sender_instances = ["master"] + + if self.worker_app == "synapse.app.federation_sender": + if send_federation: + # If we're running federation senders, and not using + # `federation_sender_instances`, then we should have + # explicitly set `send_federation` to false. + raise ConfigError( + _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR + ) + + federation_sender_instances = [self.worker_name] + + self.send_federation = self.instance_name in federation_sender_instances self.federation_shard_config = ShardedWorkerHandlingConfig( federation_sender_instances ) @@ -164,7 +213,37 @@ class WorkerConfig(Config): "Must only specify one instance to handle `receipts` messages." ) - self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events) + if len(self.writers.events) == 0: + raise ConfigError("Must specify at least one instance to handle `events`.") + + self.events_shard_config = RoutableShardedWorkerHandlingConfig( + self.writers.events + ) + + # Handle sharded push + start_pushers = config.get("start_pushers", True) + pusher_instances = config.get("pusher_instances") + if pusher_instances is None: + # Default to an empty list, which means "another, unknown, worker is + # responsible for it". + pusher_instances = [] + + # If no pushers instances are set we check if `start_pushers` is + # set, which means use master + if start_pushers: + pusher_instances = ["master"] + + if self.worker_app == "synapse.app.pusher": + if start_pushers: + # If we're running pushers, and not using + # `pusher_instances`, then we should have explicitly set + # `start_pushers` to false. + raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR) + + pusher_instances = [self.instance_name] + + self.start_pushers = self.instance_name in pusher_instances + self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances) # Whether this worker should run background tasks or not. # diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 8d4bb621e..2f832b47f 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -34,7 +34,7 @@ from twisted.internet import defer from twisted.internet.abstract import isIPAddress from twisted.python import failure -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EduTypes, EventTypes, Membership from synapse.api.errors import ( AuthError, Codes, @@ -44,6 +44,7 @@ from synapse.api.errors import ( SynapseError, UnsupportedRoomVersionError, ) +from synapse.api.ratelimiting import Ratelimiter from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import EventBase from synapse.federation.federation_base import FederationBase, event_from_pdu_json @@ -869,6 +870,13 @@ class FederationHandlerRegistry: # EDU received. self._edu_type_to_instance = {} # type: Dict[str, List[str]] + # A rate limiter for incoming room key requests per origin. + self._room_key_request_rate_limiter = Ratelimiter( + clock=self.clock, + rate_hz=self.config.rc_key_requests.per_second, + burst_count=self.config.rc_key_requests.burst_count, + ) + def register_edu_handler( self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]] ): @@ -917,7 +925,15 @@ class FederationHandlerRegistry: self._edu_type_to_instance[edu_type] = instance_names async def on_edu(self, edu_type: str, origin: str, content: dict): - if not self.config.use_presence and edu_type == "m.presence": + if not self.config.use_presence and edu_type == EduTypes.Presence: + return + + # If the incoming room key requests from a particular origin are over + # the limit, drop them. + if ( + edu_type == EduTypes.RoomKeyRequest + and not self._room_key_request_rate_limiter.can_do_action(origin) + ): return # Check if we have a handler on this instance diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 97fc4d0a8..24ebc4b80 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -474,7 +474,7 @@ class FederationSender: self._processing_pending_presence = False def send_presence_to_destinations( - self, states: List[UserPresenceState], destinations: List[str] + self, states: Iterable[UserPresenceState], destinations: Iterable[str] ) -> None: """Send the given presence states to the given destinations. destinations (list[str]) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index cce83704d..2cf935f38 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -484,10 +484,9 @@ class FederationQueryServlet(BaseFederationServlet): # This is when we receive a server-server Query async def on_GET(self, origin, content, query, query_type): - return await self.handler.on_query_request( - query_type, - {k.decode("utf8"): v[0].decode("utf-8") for k, v in query.items()}, - ) + args = {k.decode("utf8"): v[0].decode("utf-8") for k, v in query.items()} + args["origin"] = origin + return await self.handler.on_query_request(query_type, args) class FederationMakeJoinServlet(BaseFederationServlet): diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 9ba9f591d..3978e4151 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -36,7 +36,7 @@ import attr import bcrypt import pymacaroons -from twisted.web.http import Request +from twisted.web.server import Request from synapse.api.constants import LoginType from synapse.api.errors import ( @@ -481,7 +481,7 @@ class AuthHandler(BaseHandler): sid = authdict["session"] # Convert the URI and method to strings. - uri = request.uri.decode("utf-8") + uri = request.uri.decode("utf-8") # type: ignore method = request.method.decode("utf-8") # If there's no session ID, create a new session. diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index 94f3f3163..3886d3124 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -120,6 +120,11 @@ class DeactivateAccountHandler(BaseHandler): await self.store.user_set_password_hash(user_id, None) + # Most of the pushers will have been deleted when we logged out the + # associated devices above, but we still need to delete pushers not + # associated with devices, e.g. email pushers. + await self.store.delete_all_pushers_for_user(user_id) + # Add the user to a table of users pending deactivation (ie. # removal from all the rooms they're a member of) await self.store.add_user_pending_deactivation(user_id) diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 1aa7d803b..7db4f4896 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -16,7 +16,9 @@ import logging from typing import TYPE_CHECKING, Any, Dict +from synapse.api.constants import EduTypes from synapse.api.errors import SynapseError +from synapse.api.ratelimiting import Ratelimiter from synapse.logging.context import run_in_background from synapse.logging.opentracing import ( get_active_span_text_map, @@ -25,7 +27,7 @@ from synapse.logging.opentracing import ( start_active_span, ) from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet -from synapse.types import JsonDict, UserID, get_domain_from_id +from synapse.types import JsonDict, Requester, UserID, get_domain_from_id from synapse.util import json_encoder from synapse.util.stringutils import random_string @@ -78,6 +80,12 @@ class DeviceMessageHandler: ReplicationUserDevicesResyncRestServlet.make_client(hs) ) + self._ratelimiter = Ratelimiter( + clock=hs.get_clock(), + rate_hz=hs.config.rc_key_requests.per_second, + burst_count=hs.config.rc_key_requests.burst_count, + ) + async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None: local_messages = {} sender_user_id = content["sender"] @@ -168,15 +176,27 @@ class DeviceMessageHandler: async def send_device_message( self, - sender_user_id: str, + requester: Requester, message_type: str, messages: Dict[str, Dict[str, JsonDict]], ) -> None: + sender_user_id = requester.user.to_string() + set_tag("number_of_messages", len(messages)) set_tag("sender", sender_user_id) local_messages = {} remote_messages = {} # type: Dict[str, Dict[str, Dict[str, JsonDict]]] for user_id, by_device in messages.items(): + # Ratelimit local cross-user key requests by the sending device. + if ( + message_type == EduTypes.RoomKeyRequest + and user_id != sender_user_id + and self._ratelimiter.can_do_action( + (sender_user_id, requester.device_id) + ) + ): + continue + # we use UserID.from_string to catch invalid user ids if self.is_mine(UserID.from_string(user_id)): messages_by_device = { diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 3e23f82cf..f46cab732 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -17,7 +17,7 @@ import logging import random from typing import TYPE_CHECKING, Iterable, List, Optional -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EduTypes, EventTypes, Membership from synapse.api.errors import AuthError, SynapseError from synapse.events import EventBase from synapse.handlers.presence import format_user_presence_state @@ -113,7 +113,7 @@ class EventStreamHandler(BaseHandler): states = await presence_handler.get_states(users) to_add.extend( { - "type": EventTypes.Presence, + "type": EduTypes.Presence, "content": format_user_presence_state(state, time_now), } for state in states diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 78c3e5a10..71a507667 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, Optional, Tuple from twisted.internet import defer -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EduTypes, EventTypes, Membership from synapse.api.errors import SynapseError from synapse.events.validator import EventValidator from synapse.handlers.presence import format_user_presence_state @@ -412,7 +412,7 @@ class InitialSyncHandler(BaseHandler): return [ { - "type": EventTypes.Presence, + "type": EduTypes.Presence, "content": format_user_presence_state(s, time_now), } for s in states diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 048e45f4a..42d4ea30b 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -387,6 +387,12 @@ class EventCreationHandler: self.room_invite_state_types = self.hs.config.room_invite_state_types + self.membership_types_to_include_profile_data_in = ( + {Membership.JOIN, Membership.INVITE} + if self.hs.config.include_profile_data_on_invite + else {Membership.JOIN} + ) + self.send_event = ReplicationSendEventRestServlet.make_client(hs) # This is only used to get at ratelimit function, and maybe_kick_guest_users @@ -502,7 +508,7 @@ class EventCreationHandler: membership = builder.content.get("membership", None) target = UserID.from_string(builder.state_key) - if membership in {Membership.JOIN, Membership.INVITE}: + if membership in self.membership_types_to_include_profile_data_in: # If event doesn't include a display name, add one. profile = self.profile_handler content = builder.content diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index fb85b1977..54631b4ee 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -274,22 +274,25 @@ class PresenceHandler(BasePresenceHandler): self.external_sync_linearizer = Linearizer(name="external_sync_linearizer") - # Start a LoopingCall in 30s that fires every 5s. - # The initial delay is to allow disconnected clients a chance to - # reconnect before we treat them as offline. - def run_timeout_handler(): - return run_as_background_process( - "handle_presence_timeouts", self._handle_timeouts + if self._presence_enabled: + # Start a LoopingCall in 30s that fires every 5s. + # The initial delay is to allow disconnected clients a chance to + # reconnect before we treat them as offline. + def run_timeout_handler(): + return run_as_background_process( + "handle_presence_timeouts", self._handle_timeouts + ) + + self.clock.call_later( + 30, self.clock.looping_call, run_timeout_handler, 5000 ) - self.clock.call_later(30, self.clock.looping_call, run_timeout_handler, 5000) + def run_persister(): + return run_as_background_process( + "persist_presence_changes", self._persist_unpersisted_changes + ) - def run_persister(): - return run_as_background_process( - "persist_presence_changes", self._persist_unpersisted_changes - ) - - self.clock.call_later(60, self.clock.looping_call, run_persister, 60 * 1000) + self.clock.call_later(60, self.clock.looping_call, run_persister, 60 * 1000) LaterGauge( "synapse_handlers_presence_wheel_timer_size", @@ -299,7 +302,7 @@ class PresenceHandler(BasePresenceHandler): ) # Used to handle sending of presence to newly joined users/servers - if hs.config.use_presence: + if self._presence_enabled: self.notifier.add_replication_callback(self.notify_new_event) # Presence is best effort and quickly heals itself, so lets just always @@ -849,6 +852,9 @@ class PresenceHandler(BasePresenceHandler): """Process current state deltas to find new joins that need to be handled. """ + # A map of destination to a set of user state that they should receive + presence_destinations = {} # type: Dict[str, Set[UserPresenceState]] + for delta in deltas: typ = delta["type"] state_key = delta["state_key"] @@ -858,6 +864,7 @@ class PresenceHandler(BasePresenceHandler): logger.debug("Handling: %r %r, %s", typ, state_key, event_id) + # Drop any event that isn't a membership join if typ != EventTypes.Member: continue @@ -880,13 +887,38 @@ class PresenceHandler(BasePresenceHandler): # Ignore changes to join events. continue - await self._on_user_joined_room(room_id, state_key) + # Retrieve any user presence state updates that need to be sent as a result, + # and the destinations that need to receive it + destinations, user_presence_states = await self._on_user_joined_room( + room_id, state_key + ) - async def _on_user_joined_room(self, room_id: str, user_id: str) -> None: + # Insert the destinations and respective updates into our destinations dict + for destination in destinations: + presence_destinations.setdefault(destination, set()).update( + user_presence_states + ) + + # Send out user presence updates for each destination + for destination, user_state_set in presence_destinations.items(): + self.federation.send_presence_to_destinations( + destinations=[destination], states=user_state_set + ) + + async def _on_user_joined_room( + self, room_id: str, user_id: str + ) -> Tuple[List[str], List[UserPresenceState]]: """Called when we detect a user joining the room via the current state - delta stream. - """ + delta stream. Returns the destinations that need to be updated and the + presence updates to send to them. + Args: + room_id: The ID of the room that the user has joined. + user_id: The ID of the user that has joined the room. + + Returns: + A tuple of destinations and presence updates to send to them. + """ if self.is_mine_id(user_id): # If this is a local user then we need to send their presence # out to hosts in the room (who don't already have it) @@ -894,15 +926,15 @@ class PresenceHandler(BasePresenceHandler): # TODO: We should be able to filter the hosts down to those that # haven't previously seen the user - state = await self.current_state_for_user(user_id) - hosts = await self.state.get_current_hosts_in_room(room_id) + remote_hosts = await self.state.get_current_hosts_in_room(room_id) # Filter out ourselves. - hosts = {host for host in hosts if host != self.server_name} + filtered_remote_hosts = [ + host for host in remote_hosts if host != self.server_name + ] - self.federation.send_presence_to_destinations( - states=[state], destinations=hosts - ) + state = await self.current_state_for_user(user_id) + return filtered_remote_hosts, [state] else: # A remote user has joined the room, so we need to: # 1. Check if this is a new server in the room @@ -915,6 +947,8 @@ class PresenceHandler(BasePresenceHandler): # TODO: Check that this is actually a new server joining the # room. + remote_host = get_domain_from_id(user_id) + users = await self.state.get_current_users_in_room(room_id) user_ids = list(filter(self.is_mine_id, users)) @@ -934,10 +968,7 @@ class PresenceHandler(BasePresenceHandler): or state.status_msg is not None ] - if states: - self.federation.send_presence_to_destinations( - states=states, destinations=[get_domain_from_id(user_id)] - ) + return [remote_host], states def should_notify(old_state, new_state): diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 2f62d84fb..dd59392bd 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -310,6 +310,15 @@ class ProfileHandler(BaseHandler): await self._update_join_states(requester, target_user) async def on_profile_query(self, args: JsonDict) -> JsonDict: + """Handles federation profile query requests.""" + + if not self.hs.config.allow_profile_lookup_over_federation: + raise SynapseError( + 403, + "Profile lookup over federation is disabled on this homeserver", + Codes.FORBIDDEN, + ) + user = UserID.from_string(args["user_id"]) if not self.hs.is_mine(user): raise SynapseError(400, "User is not hosted on this homeserver") diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py index 514b1f69d..80e28bdcb 100644 --- a/synapse/handlers/sso.py +++ b/synapse/handlers/sso.py @@ -31,8 +31,8 @@ from urllib.parse import urlencode import attr from typing_extensions import NoReturn, Protocol -from twisted.web.http import Request from twisted.web.iweb import IRequest +from twisted.web.server import Request from synapse.api.constants import LoginType from synapse.api.errors import Codes, NotFoundError, RedirectException, SynapseError diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py index c658862fe..142b007d0 100644 --- a/synapse/http/__init__.py +++ b/synapse/http/__init__.py @@ -14,8 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. import re +from typing import Union -from twisted.internet import task +from twisted.internet import address, task from twisted.web.client import FileBodyProducer from twisted.web.iweb import IRequest @@ -53,6 +54,40 @@ class QuieterFileBodyProducer(FileBodyProducer): pass +def get_request_uri(request: IRequest) -> bytes: + """Return the full URI that was requested by the client""" + return b"%s://%s%s" % ( + b"https" if request.isSecure() else b"http", + _get_requested_host(request), + # despite its name, "request.uri" is only the path and query-string. + request.uri, + ) + + +def _get_requested_host(request: IRequest) -> bytes: + hostname = request.getHeader(b"host") + if hostname: + return hostname + + # no Host header, use the address/port that the request arrived on + host = request.getHost() # type: Union[address.IPv4Address, address.IPv6Address] + + hostname = host.host.encode("ascii") + + if request.isSecure() and host.port == 443: + # default port for https + return hostname + + if not request.isSecure() and host.port == 80: + # default port for http + return hostname + + return b"%s:%i" % ( + hostname, + host.port, + ) + + def get_request_user_agent(request: IRequest, default: str = "") -> str: """Return the last User-Agent header, or the given default.""" # There could be raw utf-8 bytes in the User-Agent header. diff --git a/synapse/http/client.py b/synapse/http/client.py index 31a05c4bc..813c4728d 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -289,9 +289,8 @@ class SimpleHttpClient: treq_args: Dict[str, Any] = {}, ip_whitelist: Optional[IPSet] = None, ip_blacklist: Optional[IPSet] = None, - http_proxy: Optional[bytes] = None, - https_proxy: Optional[bytes] = None, user_agent: Optional[str] = None, + use_proxy: bool = False, ): """ Args: @@ -301,8 +300,8 @@ class SimpleHttpClient: we may not request. ip_whitelist: The whitelisted IP addresses, that we can request if it were otherwise caught in a blacklist. - http_proxy: proxy server to use for http connections. host[:port] - https_proxy: proxy server to use for https connections. host[:port] + use_proxy: Whether proxy settings should be discovered and used + from conventional environment variables. """ self.hs = hs @@ -346,8 +345,7 @@ class SimpleHttpClient: connectTimeout=15, contextFactory=self.hs.get_http_client_context_factory(), pool=pool, - http_proxy=http_proxy, - https_proxy=https_proxy, + use_proxy=use_proxy, ) if self._ip_blacklist: @@ -751,7 +749,32 @@ class BodyExceededMaxSize(Exception): """The maximum allowed size of the HTTP body was exceeded.""" +class _DiscardBodyWithMaxSizeProtocol(protocol.Protocol): + """A protocol which immediately errors upon receiving data.""" + + def __init__(self, deferred: defer.Deferred): + self.deferred = deferred + + def _maybe_fail(self): + """ + Report a max size exceed error and disconnect the first time this is called. + """ + if not self.deferred.called: + self.deferred.errback(BodyExceededMaxSize()) + # Close the connection (forcefully) since all the data will get + # discarded anyway. + self.transport.abortConnection() + + def dataReceived(self, data: bytes) -> None: + self._maybe_fail() + + def connectionLost(self, reason: Failure) -> None: + self._maybe_fail() + + class _ReadBodyWithMaxSizeProtocol(protocol.Protocol): + """A protocol which reads body to a stream, erroring if the body exceeds a maximum size.""" + def __init__( self, stream: BinaryIO, deferred: defer.Deferred, max_size: Optional[int] ): @@ -808,13 +831,15 @@ def read_body_with_max_size( Returns: A Deferred which resolves to the length of the read body. """ + d = defer.Deferred() + # If the Content-Length header gives a size larger than the maximum allowed # size, do not bother downloading the body. if max_size is not None and response.length != UNKNOWN_LENGTH: if response.length > max_size: - return defer.fail(BodyExceededMaxSize()) + response.deliverBody(_DiscardBodyWithMaxSizeProtocol(d)) + return d - d = defer.Deferred() response.deliverBody(_ReadBodyWithMaxSizeProtocol(stream, d, max_size)) return d diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index 2e83fa677..b07aa59c0 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -14,7 +14,7 @@ # limitations under the License. import logging import urllib.parse -from typing import List, Optional +from typing import Any, Generator, List, Optional from netaddr import AddrFormatError, IPAddress, IPSet from zope.interface import implementer @@ -116,7 +116,7 @@ class MatrixFederationAgent: uri: bytes, headers: Optional[Headers] = None, bodyProducer: Optional[IBodyProducer] = None, - ) -> defer.Deferred: + ) -> Generator[defer.Deferred, Any, defer.Deferred]: """ Args: method: HTTP method: GET/POST/etc @@ -177,17 +177,17 @@ class MatrixFederationAgent: # We need to make sure the host header is set to the netloc of the # server and that a user-agent is provided. if headers is None: - headers = Headers() + request_headers = Headers() else: - headers = headers.copy() + request_headers = headers.copy() - if not headers.hasHeader(b"host"): - headers.addRawHeader(b"host", parsed_uri.netloc) - if not headers.hasHeader(b"user-agent"): - headers.addRawHeader(b"user-agent", self.user_agent) + if not request_headers.hasHeader(b"host"): + request_headers.addRawHeader(b"host", parsed_uri.netloc) + if not request_headers.hasHeader(b"user-agent"): + request_headers.addRawHeader(b"user-agent", self.user_agent) res = yield make_deferred_yieldable( - self._agent.request(method, uri, headers, bodyProducer) + self._agent.request(method, uri, request_headers, bodyProducer) ) return res diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index cde42e9f5..0f107714e 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -1049,14 +1049,14 @@ def check_content_type_is_json(headers: Headers) -> None: RequestSendFailed: if the Content-Type header is missing or isn't JSON """ - c_type = headers.getRawHeaders(b"Content-Type") - if c_type is None: + content_type_headers = headers.getRawHeaders(b"Content-Type") + if content_type_headers is None: raise RequestSendFailed( RuntimeError("No Content-Type header received from remote server"), can_retry=False, ) - c_type = c_type[0].decode("ascii") # only the first header + c_type = content_type_headers[0].decode("ascii") # only the first header val, options = cgi.parse_header(c_type) if val != "application/json": raise RequestSendFailed( diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py index b730d2c63..3d553ae23 100644 --- a/synapse/http/proxyagent.py +++ b/synapse/http/proxyagent.py @@ -14,6 +14,7 @@ # limitations under the License. import logging import re +from urllib.request import getproxies_environment, proxy_bypass_environment from zope.interface import implementer @@ -58,6 +59,9 @@ class ProxyAgent(_AgentBase): pool (HTTPConnectionPool|None): connection pool to be used. If None, a non-persistent pool instance will be created. + + use_proxy (bool): Whether proxy settings should be discovered and used + from conventional environment variables. """ def __init__( @@ -68,8 +72,7 @@ class ProxyAgent(_AgentBase): connectTimeout=None, bindAddress=None, pool=None, - http_proxy=None, - https_proxy=None, + use_proxy=False, ): _AgentBase.__init__(self, reactor, pool) @@ -84,6 +87,15 @@ class ProxyAgent(_AgentBase): if bindAddress is not None: self._endpoint_kwargs["bindAddress"] = bindAddress + http_proxy = None + https_proxy = None + no_proxy = None + if use_proxy: + proxies = getproxies_environment() + http_proxy = proxies["http"].encode() if "http" in proxies else None + https_proxy = proxies["https"].encode() if "https" in proxies else None + no_proxy = proxies["no"] if "no" in proxies else None + self.http_proxy_endpoint = _http_proxy_endpoint( http_proxy, self.proxy_reactor, **self._endpoint_kwargs ) @@ -92,6 +104,8 @@ class ProxyAgent(_AgentBase): https_proxy, self.proxy_reactor, **self._endpoint_kwargs ) + self.no_proxy = no_proxy + self._policy_for_https = contextFactory self._reactor = reactor @@ -139,13 +153,28 @@ class ProxyAgent(_AgentBase): pool_key = (parsed_uri.scheme, parsed_uri.host, parsed_uri.port) request_path = parsed_uri.originForm - if parsed_uri.scheme == b"http" and self.http_proxy_endpoint: + should_skip_proxy = False + if self.no_proxy is not None: + should_skip_proxy = proxy_bypass_environment( + parsed_uri.host.decode(), + proxies={"no": self.no_proxy}, + ) + + if ( + parsed_uri.scheme == b"http" + and self.http_proxy_endpoint + and not should_skip_proxy + ): # Cache *all* connections under the same key, since we are only # connecting to a single destination, the proxy: pool_key = ("http-proxy", self.http_proxy_endpoint) endpoint = self.http_proxy_endpoint request_path = uri - elif parsed_uri.scheme == b"https" and self.https_proxy_endpoint: + elif ( + parsed_uri.scheme == b"https" + and self.https_proxy_endpoint + and not should_skip_proxy + ): endpoint = HTTPConnectProxyEndpoint( self.proxy_reactor, self.https_proxy_endpoint, diff --git a/synapse/http/server.py b/synapse/http/server.py index 845db9b78..fa8926085 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -21,6 +21,7 @@ import logging import types import urllib from http import HTTPStatus +from inspect import isawaitable from io import BytesIO from typing import ( Any, @@ -30,6 +31,7 @@ from typing import ( Iterable, Iterator, List, + Optional, Pattern, Tuple, Union, @@ -79,10 +81,12 @@ def return_json_error(f: failure.Failure, request: SynapseRequest) -> None: """Sends a JSON error response to clients.""" if f.check(SynapseError): - error_code = f.value.code - error_dict = f.value.error_dict() + # mypy doesn't understand that f.check asserts the type. + exc = f.value # type: SynapseError # type: ignore + error_code = exc.code + error_dict = exc.error_dict() - logger.info("%s SynapseError: %s - %s", request, error_code, f.value.msg) + logger.info("%s SynapseError: %s - %s", request, error_code, exc.msg) else: error_code = 500 error_dict = {"error": "Internal server error", "errcode": Codes.UNKNOWN} @@ -91,7 +95,7 @@ def return_json_error(f: failure.Failure, request: SynapseRequest) -> None: "Failed handle request via %r: %r", request.request_metrics.name, request, - exc_info=(f.type, f.value, f.getTracebackObject()), + exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore ) # Only respond with an error response if we haven't already started writing, @@ -128,7 +132,8 @@ def return_html_error( `{msg}` placeholders), or a jinja2 template """ if f.check(CodeMessageException): - cme = f.value + # mypy doesn't understand that f.check asserts the type. + cme = f.value # type: CodeMessageException # type: ignore code = cme.code msg = cme.msg @@ -142,7 +147,7 @@ def return_html_error( logger.error( "Failed handle request %r", request, - exc_info=(f.type, f.value, f.getTracebackObject()), + exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore ) else: code = HTTPStatus.INTERNAL_SERVER_ERROR @@ -151,7 +156,7 @@ def return_html_error( logger.error( "Failed handle request %r", request, - exc_info=(f.type, f.value, f.getTracebackObject()), + exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore ) if isinstance(error_template, str): @@ -278,7 +283,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta): raw_callback_return = method_handler(request) # Is it synchronous? We'll allow this for now. - if isinstance(raw_callback_return, (defer.Deferred, types.CoroutineType)): + if isawaitable(raw_callback_return): callback_return = await raw_callback_return else: callback_return = raw_callback_return # type: ignore @@ -399,8 +404,10 @@ class JsonResource(DirectServeJsonResource): A tuple of the callback to use, the name of the servlet, and the key word arguments to pass to the callback """ + # At this point the path must be bytes. + request_path_bytes = request.path # type: bytes # type: ignore + request_path = request_path_bytes.decode("ascii") # Treat HEAD requests as GET requests. - request_path = request.path.decode("ascii") request_method = request.method if request_method == b"HEAD": request_method = b"GET" @@ -551,7 +558,7 @@ class _ByteProducer: request: Request, iterator: Iterator[bytes], ): - self._request = request + self._request = request # type: Optional[Request] self._iterator = iterator self._paused = False @@ -563,7 +570,7 @@ class _ByteProducer: """ Send a list of bytes as a chunk of a response. """ - if not data: + if not data or not self._request: return self._request.write(b"".join(data)) diff --git a/synapse/http/site.py b/synapse/http/site.py index 4a4fb5ef2..47754aff4 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -14,8 +14,12 @@ import contextlib import logging import time -from typing import Optional, Union +from typing import Optional, Type, Union +import attr +from zope.interface import implementer + +from twisted.internet.interfaces import IAddress from twisted.python.failure import Failure from twisted.web.server import Request, Site @@ -53,7 +57,7 @@ class SynapseRequest(Request): def __init__(self, channel, *args, **kw): Request.__init__(self, channel, *args, **kw) - self.site = channel.site + self.site = channel.site # type: SynapseSite self._channel = channel # this is used by the tests self.start_time = 0.0 @@ -92,25 +96,34 @@ class SynapseRequest(Request): def get_request_id(self): return "%s-%i" % (self.get_method(), self.request_seq) - def get_redacted_uri(self): - uri = self.uri + def get_redacted_uri(self) -> str: + """Gets the redacted URI associated with the request (or placeholder if the URI + has not yet been received). + + Note: This is necessary as the placeholder value in twisted is str + rather than bytes, so we need to sanitise `self.uri`. + + Returns: + The redacted URI as a string. + """ + uri = self.uri # type: Union[bytes, str] if isinstance(uri, bytes): - uri = self.uri.decode("ascii", errors="replace") + uri = uri.decode("ascii", errors="replace") return redact_uri(uri) - def get_method(self): - """Gets the method associated with the request (or placeholder if not - method has yet been received). + def get_method(self) -> str: + """Gets the method associated with the request (or placeholder if method + has not yet been received). Note: This is necessary as the placeholder value in twisted is str rather than bytes, so we need to sanitise `self.method`. Returns: - str + The request method as a string. """ - method = self.method + method = self.method # type: Union[bytes, str] if isinstance(method, bytes): - method = self.method.decode("ascii") + return self.method.decode("ascii") return method def render(self, resrc): @@ -333,27 +346,78 @@ class SynapseRequest(Request): class XForwardedForRequest(SynapseRequest): - def __init__(self, *args, **kw): - SynapseRequest.__init__(self, *args, **kw) + """Request object which honours proxy headers - """ - Add a layer on top of another request that only uses the value of an - X-Forwarded-For header as the result of C{getClientIP}. + Extends SynapseRequest to replace getClientIP, getClientAddress, and isSecure with + information from request headers. """ - def getClientIP(self): - """ - @return: The client address (the first address) in the value of the - I{X-Forwarded-For header}. If the header is not present, return - C{b"-"}. - """ - return ( - self.requestHeaders.getRawHeaders(b"x-forwarded-for", [b"-"])[0] - .split(b",")[0] - .strip() - .decode("ascii") + # the client IP and ssl flag, as extracted from the headers. + _forwarded_for = None # type: Optional[_XForwardedForAddress] + _forwarded_https = False # type: bool + + def requestReceived(self, command, path, version): + # this method is called by the Channel once the full request has been + # received, to dispatch the request to a resource. + # We can use it to set the IP address and protocol according to the + # headers. + self._process_forwarded_headers() + return super().requestReceived(command, path, version) + + def _process_forwarded_headers(self): + headers = self.requestHeaders.getRawHeaders(b"x-forwarded-for") + if not headers: + return + + # for now, we just use the first x-forwarded-for header. Really, we ought + # to start from the client IP address, and check whether it is trusted; if it + # is, work backwards through the headers until we find an untrusted address. + # see https://github.com/matrix-org/synapse/issues/9471 + self._forwarded_for = _XForwardedForAddress( + headers[0].split(b",")[0].strip().decode("ascii") ) + # if we got an x-forwarded-for header, also look for an x-forwarded-proto header + header = self.getHeader(b"x-forwarded-proto") + if header is not None: + self._forwarded_https = header.lower() == b"https" + else: + # this is done largely for backwards-compatibility so that people that + # haven't set an x-forwarded-proto header don't get a redirect loop. + logger.warning( + "forwarded request lacks an x-forwarded-proto header: assuming https" + ) + self._forwarded_https = True + + def isSecure(self): + if self._forwarded_https: + return True + return super().isSecure() + + def getClientIP(self) -> str: + """ + Return the IP address of the client who submitted this request. + + This method is deprecated. Use getClientAddress() instead. + """ + if self._forwarded_for is not None: + return self._forwarded_for.host + return super().getClientIP() + + def getClientAddress(self) -> IAddress: + """ + Return the address of the client who submitted this request. + """ + if self._forwarded_for is not None: + return self._forwarded_for + return super().getClientAddress() + + +@implementer(IAddress) +@attr.s(frozen=True, slots=True) +class _XForwardedForAddress: + host = attr.ib(type=str) + class SynapseSite(Site): """ @@ -377,7 +441,9 @@ class SynapseSite(Site): assert config.http_options is not None proxied = config.http_options.x_forwarded - self.requestFactory = XForwardedForRequest if proxied else SynapseRequest + self.requestFactory = ( + XForwardedForRequest if proxied else SynapseRequest + ) # type: Type[Request] self.access_logger = logging.getLogger(logger_name) self.server_version_string = server_version_string.encode("ascii") diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py index f8e9112b5..174ca7be5 100644 --- a/synapse/logging/_remote.py +++ b/synapse/logging/_remote.py @@ -32,7 +32,7 @@ from twisted.internet.endpoints import ( TCP4ClientEndpoint, TCP6ClientEndpoint, ) -from twisted.internet.interfaces import IPushProducer, ITransport +from twisted.internet.interfaces import IPushProducer, IStreamClientEndpoint, ITransport from twisted.internet.protocol import Factory, Protocol from twisted.python.failure import Failure @@ -121,7 +121,9 @@ class RemoteHandler(logging.Handler): try: ip = ip_address(self.host) if isinstance(ip, IPv4Address): - endpoint = TCP4ClientEndpoint(_reactor, self.host, self.port) + endpoint = TCP4ClientEndpoint( + _reactor, self.host, self.port + ) # type: IStreamClientEndpoint elif isinstance(ip, IPv6Address): endpoint = TCP6ClientEndpoint(_reactor, self.host, self.port) else: diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index a8cb49d5b..3b499efc0 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -527,7 +527,7 @@ class ReactorLastSeenMetric: REGISTRY.register(ReactorLastSeenMetric()) -def runUntilCurrentTimer(func): +def runUntilCurrentTimer(reactor, func): @functools.wraps(func) def f(*args, **kwargs): now = reactor.seconds() @@ -590,13 +590,14 @@ def runUntilCurrentTimer(func): try: # Ensure the reactor has all the attributes we expect - reactor.runUntilCurrent - reactor._newTimedCalls - reactor.threadCallQueue + reactor.seconds # type: ignore + reactor.runUntilCurrent # type: ignore + reactor._newTimedCalls # type: ignore + reactor.threadCallQueue # type: ignore # runUntilCurrent is called when we have pending calls. It is called once # per iteratation after fd polling. - reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent) + reactor.runUntilCurrent = runUntilCurrentTimer(reactor, reactor.runUntilCurrent) # type: ignore # We manually run the GC each reactor tick so that we can get some metrics # about time spent doing GC, diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 2e3b311c4..db2d400b7 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Iterable, Optional, Tuple +from typing import TYPE_CHECKING, Any, Generator, Iterable, Optional, Tuple from twisted.internet import defer @@ -307,7 +307,7 @@ class ModuleApi: @defer.inlineCallbacks def get_state_events_in_room( self, room_id: str, types: Iterable[Tuple[str, Optional[str]]] - ) -> defer.Deferred: + ) -> Generator[defer.Deferred, Any, defer.Deferred]: """Gets current state events for the given room. (This is exposed for compatibility with the old SpamCheckerApi. We should diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index d2d0bf48e..a279f0839 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -15,11 +15,12 @@ # limitations under the License. import logging import urllib.parse -from typing import TYPE_CHECKING, Any, Dict, Iterable, Union +from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Union from prometheus_client import Counter from twisted.internet.error import AlreadyCalled, AlreadyCancelled +from twisted.internet.interfaces import IDelayedCall from synapse.api.constants import EventTypes from synapse.events import EventBase @@ -71,9 +72,10 @@ class HttpPusher(Pusher): self.data = pusher_config.data self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.failing_since = pusher_config.failing_since - self.timed_call = None + self.timed_call = None # type: Optional[IDelayedCall] self._is_processing = False self._group_unread_count_by_room = hs.config.push_group_unread_count_by_room + self._pusherpool = hs.get_pusherpool() self.data = pusher_config.data if self.data is None: @@ -292,7 +294,7 @@ class HttpPusher(Pusher): ) else: logger.info("Pushkey %s was rejected: removing", pk) - await self.hs.remove_pusher(self.app_id, pk, self.user_id) + await self._pusherpool.remove_pusher(self.app_id, pk, self.user_id) return True async def _build_notification_dict( diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index ae1145be0..4c7f5fece 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -19,12 +19,14 @@ from typing import TYPE_CHECKING, Dict, Iterable, Optional from prometheus_client import Gauge +from synapse.api.errors import Codes, SynapseError from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, ) from synapse.push import Pusher, PusherConfig, PusherConfigException from synapse.push.pusher import PusherFactory +from synapse.replication.http.push import ReplicationRemovePusherRestServlet from synapse.types import JsonDict, RoomStreamToken from synapse.util.async_helpers import concurrently_execute @@ -58,7 +60,6 @@ class PusherPool: def __init__(self, hs: "HomeServer"): self.hs = hs self.pusher_factory = PusherFactory(hs) - self._should_start_pushers = hs.config.start_pushers self.store = self.hs.get_datastore() self.clock = self.hs.get_clock() @@ -67,6 +68,16 @@ class PusherPool: # We shard the handling of push notifications by user ID. self._pusher_shard_config = hs.config.push.pusher_shard_config self._instance_name = hs.get_instance_name() + self._should_start_pushers = ( + self._instance_name in self._pusher_shard_config.instances + ) + + # We can only delete pushers on master. + self._remove_pusher_client = None + if hs.config.worker.worker_app: + self._remove_pusher_client = ReplicationRemovePusherRestServlet.make_client( + hs + ) # Record the last stream ID that we were poked about so we can get # changes since then. We set this to the current max stream ID on @@ -103,6 +114,11 @@ class PusherPool: The newly created pusher. """ + if kind == "email": + email_owner = await self.store.get_user_id_by_threepid("email", pushkey) + if email_owner != user_id: + raise SynapseError(400, "Email not found", Codes.THREEPID_NOT_FOUND) + time_now_msec = self.clock.time_msec() # create the pusher setting last_stream_ordering to the current maximum @@ -175,9 +191,6 @@ class PusherPool: user_id: user to remove pushers for access_tokens: access token *ids* to remove pushers for """ - if not self._pusher_shard_config.should_handle(self._instance_name, user_id): - return - tokens = set(access_tokens) for p in await self.store.get_pushers_by_user_id(user_id): if p.access_token in tokens: @@ -380,6 +393,12 @@ class PusherPool: synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec() - await self.store.delete_pusher_by_app_id_pushkey_user_id( - app_id, pushkey, user_id - ) + # We can only delete pushers on master. + if self._remove_pusher_client: + await self._remove_pusher_client( + app_id=app_id, pushkey=pushkey, user_id=user_id + ) + else: + await self.store.delete_pusher_by_app_id_pushkey_user_id( + app_id, pushkey, user_id + ) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 8a2b73b75..321a33382 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -106,6 +106,9 @@ CONDITIONAL_REQUIREMENTS = { "pysaml2>=4.5.0;python_version>='3.6'", ], "oidc": ["authlib>=0.14.0"], + # systemd-python is necessary for logging to the systemd journal via + # `systemd.journal.JournalHandler`, as is documented in + # `contrib/systemd/log_config.yaml`. "systemd": ["systemd-python>=231"], "url_preview": ["lxml>=3.5.0"], "sentry": ["sentry-sdk>=0.7.2"], diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index dd527e807..cb4a52dbe 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -21,6 +21,7 @@ from synapse.replication.http import ( login, membership, presence, + push, register, send_event, streams, @@ -42,6 +43,7 @@ class ReplicationRestResource(JsonResource): membership.register_servlets(hs, self) streams.register_servlets(hs, self) account_data.register_servlets(hs, self) + push.register_servlets(hs, self) # The following can't currently be instantiated on workers. if hs.config.worker.worker_app is None: diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 7a0dbb5b1..8af53b4f2 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -213,8 +213,9 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint): content = parse_json_object_from_request(request) args = content["args"] + args["origin"] = content["origin"] - logger.info("Got %r query", query_type) + logger.info("Got %r query from %s", query_type, args["origin"]) result = await self.registry.on_query(query_type, args) diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index 439881be6..c10992ff5 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -15,9 +15,10 @@ import logging from typing import TYPE_CHECKING, List, Optional, Tuple -from twisted.web.http import Request +from twisted.web.server import Request from synapse.http.servlet import parse_json_object_from_request +from synapse.http.site import SynapseRequest from synapse.replication.http._base import ReplicationEndpoint from synapse.types import JsonDict, Requester, UserID from synapse.util.distributor import user_left_room @@ -78,7 +79,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint): } async def _handle_request( # type: ignore - self, request: Request, room_id: str, user_id: str + self, request: SynapseRequest, room_id: str, user_id: str ) -> Tuple[int, JsonDict]: content = parse_json_object_from_request(request) @@ -86,7 +87,6 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint): event_content = content["content"] requester = Requester.deserialize(self.store, content["requester"]) - request.requester = requester logger.info("remote_join: %s into room: %s", user_id, room_id) @@ -147,7 +147,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): } async def _handle_request( # type: ignore - self, request: Request, invite_event_id: str + self, request: SynapseRequest, invite_event_id: str ) -> Tuple[int, JsonDict]: content = parse_json_object_from_request(request) @@ -155,7 +155,6 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): event_content = content["content"] requester = Requester.deserialize(self.store, content["requester"]) - request.requester = requester # hopefully we're now on the master, so this won't recurse! diff --git a/synapse/replication/http/push.py b/synapse/replication/http/push.py new file mode 100644 index 000000000..054ed64d3 --- /dev/null +++ b/synapse/replication/http/push.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import TYPE_CHECKING + +from synapse.http.servlet import parse_json_object_from_request +from synapse.replication.http._base import ReplicationEndpoint + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class ReplicationRemovePusherRestServlet(ReplicationEndpoint): + """Deletes the given pusher. + + Request format: + + POST /_synapse/replication/remove_pusher/:user_id + + { + "app_id": "", + "pushkey": "" + } + + """ + + NAME = "add_user_account_data" + PATH_ARGS = ("user_id",) + CACHE = False + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self.pusher_pool = hs.get_pusherpool() + + @staticmethod + async def _serialize_payload(app_id, pushkey, user_id): + payload = { + "app_id": app_id, + "pushkey": pushkey, + } + + return payload + + async def _handle_request(self, request, user_id): + content = parse_json_object_from_request(request) + + app_id = content["app_id"] + pushkey = content["pushkey"] + + await self.pusher_pool.remove_pusher(app_id, pushkey, user_id) + + return 200, {} + + +def register_servlets(hs, http_server): + ReplicationRemovePusherRestServlet(hs).register(http_server) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 2618eb1e5..3455839d6 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -108,9 +108,7 @@ class ReplicationDataHandler: # Map from stream to list of deferreds waiting for the stream to # arrive at a particular position. The lists are sorted by stream position. - self._streams_to_waiters = ( - {} - ) # type: Dict[str, List[Tuple[int, Deferred[None]]]] + self._streams_to_waiters = {} # type: Dict[str, List[Tuple[int, Deferred]]] async def on_rdata( self, stream_name: str, instance_name: str, token: int, rows: list diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 0a9da79c3..bb447f75b 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -325,31 +325,6 @@ class FederationAckCommand(Command): return "%s %s" % (self.instance_name, self.token) -class RemovePusherCommand(Command): - """Sent by the client to request the master remove the given pusher. - - Format:: - - REMOVE_PUSHER - """ - - NAME = "REMOVE_PUSHER" - - def __init__(self, app_id, push_key, user_id): - self.user_id = user_id - self.app_id = app_id - self.push_key = push_key - - @classmethod - def from_line(cls, line): - app_id, push_key, user_id = line.split(" ", 2) - - return cls(app_id, push_key, user_id) - - def to_line(self): - return " ".join((self.app_id, self.push_key, self.user_id)) - - class UserIpCommand(Command): """Sent periodically when a worker sees activity from a client. @@ -416,7 +391,6 @@ _COMMANDS = ( ReplicateCommand, UserSyncCommand, FederationAckCommand, - RemovePusherCommand, UserIpCommand, RemoteServerUpCommand, ClearUserSyncsCommand, @@ -443,7 +417,6 @@ VALID_CLIENT_COMMANDS = ( UserSyncCommand.NAME, ClearUserSyncsCommand.NAME, FederationAckCommand.NAME, - RemovePusherCommand.NAME, UserIpCommand.NAME, ErrorCommand.NAME, RemoteServerUpCommand.NAME, diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index d1d00c371..a7245da15 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -44,7 +44,6 @@ from synapse.replication.tcp.commands import ( PositionCommand, RdataCommand, RemoteServerUpCommand, - RemovePusherCommand, ReplicateCommand, UserIpCommand, UserSyncCommand, @@ -373,23 +372,6 @@ class ReplicationCommandHandler: if self._federation_sender: self._federation_sender.federation_ack(cmd.instance_name, cmd.token) - def on_REMOVE_PUSHER( - self, conn: AbstractConnection, cmd: RemovePusherCommand - ) -> Optional[Awaitable[None]]: - remove_pusher_counter.inc() - - if self._is_master: - return self._handle_remove_pusher(cmd) - else: - return None - - async def _handle_remove_pusher(self, cmd: RemovePusherCommand): - await self._store.delete_pusher_by_app_id_pushkey_user_id( - app_id=cmd.app_id, pushkey=cmd.push_key, user_id=cmd.user_id - ) - - self._notifier.on_new_replication_data() - def on_USER_IP( self, conn: AbstractConnection, cmd: UserIpCommand ) -> Optional[Awaitable[None]]: @@ -684,11 +666,6 @@ class ReplicationCommandHandler: UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms) ) - def send_remove_pusher(self, app_id: str, push_key: str, user_id: str): - """Poke the master to remove a pusher for a user""" - cmd = RemovePusherCommand(app_id, push_key, user_id) - self.send_command(cmd) - def send_user_ip( self, user_id: str, diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 38809b5b7..f45e7a8c8 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -502,7 +502,7 @@ class AccountDataStream(Stream): """Global or per room account data was changed""" AccountDataStreamRow = namedtuple( - "AccountDataStream", + "AccountDataStreamRow", ("user_id", "room_id", "data_type"), # str # Optional[str] # str ) diff --git a/synapse/res/templates/sso_auth_account_details.html b/synapse/res/templates/sso_auth_account_details.html index f4fdc40b2..00e1dcdbb 100644 --- a/synapse/res/templates/sso_auth_account_details.html +++ b/synapse/res/templates/sso_auth_account_details.html @@ -145,7 +145,7 @@ {% if user_attributes.avatar_url or user_attributes.display_name or user_attributes.emails %}
-

Information from {{ idp.idp_name }}

+

{% if idp.idp_icon %}{% endif %}Information from {{ idp.idp_name }}

{% if user_attributes.avatar_url %}