Merge remote-tracking branch 'upstream/release-v1.34.0'

This commit is contained in:
Tulir Asokan 2021-05-12 19:12:46 +03:00
commit 2d57abd6b7
363 changed files with 2301 additions and 737 deletions

View file

@ -1,36 +0,0 @@
#!/usr/bin/env python
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from synapse.storage.engines import create_engine
logger = logging.getLogger("create_postgres_db")
if __name__ == "__main__":
# Create a PostgresEngine.
db_engine = create_engine({"name": "psycopg2", "args": {}})
# Connect to postgres to create the base database.
# We use "postgres" as a database because it's bound to exist and the "synapse" one
# doesn't exist yet.
db_conn = db_engine.module.connect(
user="postgres", host="postgres", password="postgres", dbname="postgres"
)
db_conn.autocommit = True
cur = db_conn.cursor()
cur.execute("CREATE DATABASE synapse;")
cur.close()
db_conn.close()

View file

@ -0,0 +1,31 @@
#!/usr/bin/env python
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import psycopg2
# a very simple replacment for `psql`, to make up for the lack of the postgres client
# libraries in the synapse docker image.
# We use "postgres" as a database because it's bound to exist and the "synapse" one
# doesn't exist yet.
db_conn = psycopg2.connect(
user="postgres", host="postgres", password="postgres", dbname="postgres"
)
db_conn.autocommit = True
cur = db_conn.cursor()
for c in sys.argv[1:]:
cur.execute(c)

View file

@ -1,10 +1,10 @@
#!/usr/bin/env bash
#
# Test script for 'synapse_port_db', which creates a virtualenv, installs Synapse along
# with additional dependencies needed for the test (such as coverage or the PostgreSQL
# driver), update the schema of the test SQLite database and run background updates on it,
# create an empty test database in PostgreSQL, then run the 'synapse_port_db' script to
# test porting the SQLite database to the PostgreSQL database (with coverage).
# Test script for 'synapse_port_db'.
# - sets up synapse and deps
# - runs the port script on a prepopulated test sqlite db
# - also runs it against an new sqlite db
set -xe
cd `dirname $0`/../..
@ -22,15 +22,32 @@ echo "--- Generate the signing key"
# Generate the server's signing key.
python -m synapse.app.homeserver --generate-keys -c .buildkite/sqlite-config.yaml
echo "--- Prepare the databases"
echo "--- Prepare test database"
# Make sure the SQLite3 database is using the latest schema and has no pending background update.
scripts-dev/update_database --database-config .buildkite/sqlite-config.yaml
# Create the PostgreSQL database.
./.buildkite/scripts/create_postgres_db.py
./.buildkite/scripts/postgres_exec.py "CREATE DATABASE synapse"
echo "+++ Run synapse_port_db"
# Run the script
echo "+++ Run synapse_port_db against test database"
coverage run scripts/synapse_port_db --sqlite-database .buildkite/test_db.db --postgres-config .buildkite/postgres-config.yaml
#####
# Now do the same again, on an empty database.
echo "--- Prepare empty SQLite database"
# we do this by deleting the sqlite db, and then doing the same again.
rm .buildkite/test_db.db
scripts-dev/update_database --database-config .buildkite/sqlite-config.yaml
# re-create the PostgreSQL database.
./.buildkite/scripts/postgres_exec.py \
"DROP DATABASE synapse" \
"CREATE DATABASE synapse"
echo "+++ Run synapse_port_db against empty database"
coverage run scripts/synapse_port_db --sqlite-database .buildkite/test_db.db --postgres-config .buildkite/postgres-config.yaml

View file

@ -273,7 +273,7 @@ jobs:
python-version: ${{ matrix.python-version }}
- name: Patch Buildkite-specific test scripts
run: |
sed -i -e 's/host="postgres"/host="localhost"/' .buildkite/scripts/create_postgres_db.py
sed -i -e 's/host="postgres"/host="localhost"/' .buildkite/scripts/postgres_exec.py
sed -i -e 's/host: postgres/host: localhost/' .buildkite/postgres-config.yaml
sed -i -e 's|/src/||' .buildkite/{sqlite,postgres}-config.yaml
sed -i -e 's/\$TOP/\$GITHUB_WORKSPACE/' .coveragerc

View file

@ -1,3 +1,93 @@
Synapse 1.34.0rc1 (2021-05-12)
==============================
This release deprecates the `room_invite_state_types` configuration setting. See the [upgrade notes](https://github.com/matrix-org/synapse/blob/release-v1.34.0/UPGRADE.rst#upgrading-to-v1340) for instructions on updating your configuration file to use the new `room_prejoin_state` setting.
This release also deprecates the `POST /_synapse/admin/v1/rooms/<room_id>/delete` admin API route. Server administrators are encouraged to update their scripts to use the new `DELETE /_synapse/admin/v1/rooms/<room_id>` route instead.
Features
--------
- Add experimental option to track memory usage of the caches. ([\#9881](https://github.com/matrix-org/synapse/issues/9881))
- Add support for `DELETE /_synapse/admin/v1/rooms/<room_id>`. ([\#9889](https://github.com/matrix-org/synapse/issues/9889))
- Add limits to how often Synapse will GC, ensuring that large servers do not end up GC thrashing if `gc_thresholds` has not been correctly set. ([\#9902](https://github.com/matrix-org/synapse/issues/9902))
- Improve performance of sending events for worker-based deployments using Redis. ([\#9905](https://github.com/matrix-org/synapse/issues/9905), [\#9950](https://github.com/matrix-org/synapse/issues/9950), [\#9951](https://github.com/matrix-org/synapse/issues/9951))
- Improve performance after joining a large room when presence is enabled. ([\#9910](https://github.com/matrix-org/synapse/issues/9910), [\#9916](https://github.com/matrix-org/synapse/issues/9916))
- Support stable identifiers for [MSC1772](https://github.com/matrix-org/matrix-doc/pull/1772) Spaces. `m.space.child` events will now be taken into account when populating the experimental spaces summary response. Please see [the upgrade notes](https://github.com/matrix-org/synapse/blob/release-v1.34.0/UPGRADE.rst#upgrading-to-v1340) if you have customised `room_invite_state_types` in your configuration. ([\#9915](https://github.com/matrix-org/synapse/issues/9915), [\#9966](https://github.com/matrix-org/synapse/issues/9966))
- Improve performance of backfilling in large rooms. ([\#9935](https://github.com/matrix-org/synapse/issues/9935))
- Add a config option to allow you to prevent device display names from being shared over federation. Contributed by @aaronraimist. ([\#9945](https://github.com/matrix-org/synapse/issues/9945))
- Update support for [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946): Spaces Summary. ([\#9947](https://github.com/matrix-org/synapse/issues/9947), [\#9954](https://github.com/matrix-org/synapse/issues/9954))
Bugfixes
--------
- Fix a bug introduced in v1.32.0 where the associated connection was improperly logged for SQL logging statements. ([\#9895](https://github.com/matrix-org/synapse/issues/9895))
- Correct the type hint for the `user_may_create_room_alias` method of spam checkers. It is provided a `RoomAlias`, not a `str`. ([\#9896](https://github.com/matrix-org/synapse/issues/9896))
- Fix bug where user directory could get out of sync if room visibility and membership changed in quick succession. ([\#9910](https://github.com/matrix-org/synapse/issues/9910))
- Include the `origin_server_ts` property in the experimental [MSC2946](https://github.com/matrix-org/matrix-doc/pull/2946) support to allow clients to properly sort rooms. ([\#9928](https://github.com/matrix-org/synapse/issues/9928))
- Fix bugs introduced in v1.23.0 which made the PostgreSQL port script fail when run with a newly-created SQLite database. ([\#9930](https://github.com/matrix-org/synapse/issues/9930))
- Fix a bug introduced in Synapse 1.29.0 which caused `m.room_key_request` to-device messages sent from one user to another to be dropped. ([\#9961](https://github.com/matrix-org/synapse/issues/9961), [\#9965](https://github.com/matrix-org/synapse/issues/9965))
- Fix a bug introduced in v1.27.0 preventing users and appservices exempt from ratelimiting from creating rooms with many invitees. ([\#9968](https://github.com/matrix-org/synapse/issues/9968))
Updates to the Docker image
---------------------------
- Add `startup_delay` to docker healthcheck to reduce waiting time for coming online and update the documentation with extra options. Contributed by @Maquis196. ([\#9913](https://github.com/matrix-org/synapse/issues/9913))
Improved Documentation
----------------------
- Add `port` argument to the Postgres database sample config section. ([\#9911](https://github.com/matrix-org/synapse/issues/9911))
Deprecations and Removals
-------------------------
- Mark as deprecated `POST /_synapse/admin/v1/rooms/<room_id>/delete`. ([\#9889](https://github.com/matrix-org/synapse/issues/9889))
Internal Changes
----------------
- Reduce the length of Synapse's access tokens. ([\#5588](https://github.com/matrix-org/synapse/issues/5588))
- Export jemalloc stats to Prometheus if it is being used. ([\#9882](https://github.com/matrix-org/synapse/issues/9882))
- Add type hints to presence handler. ([\#9885](https://github.com/matrix-org/synapse/issues/9885))
- Reduce memory usage of the LRU caches. ([\#9886](https://github.com/matrix-org/synapse/issues/9886))
- Add type hints to the `synapse.handlers` module. ([\#9896](https://github.com/matrix-org/synapse/issues/9896))
- Time response time for external cache requests. ([\#9904](https://github.com/matrix-org/synapse/issues/9904))
- Minor fixes to the `make_full_schema.sh` script. ([\#9931](https://github.com/matrix-org/synapse/issues/9931))
- Move database schema files into a common directory. ([\#9932](https://github.com/matrix-org/synapse/issues/9932))
- Add debug logging for lost/delayed to-device messages. ([\#9959](https://github.com/matrix-org/synapse/issues/9959))
Synapse 1.33.2 (2021-05-11)
===========================
Due to the security issue highlighted below, server administrators are encouraged to update Synapse. We are not aware of these vulnerabilities being exploited in the wild.
Security advisory
-----------------
This release fixes a denial of service attack ([CVE-2021-29471](https://github.com/matrix-org/synapse/security/advisories/GHSA-x345-32rc-8h85)) against Synapse's push rules implementation. Server admins are encouraged to upgrade.
Internal Changes
----------------
- Unpin attrs dependency. ([\#9946](https://github.com/matrix-org/synapse/issues/9946))
Synapse 1.33.1 (2021-05-06)
===========================
Bugfixes
--------
- Fix bug where `/sync` would break if using the latest version of `attrs` dependency, by pinning to a previous version. ([\#9937](https://github.com/matrix-org/synapse/issues/9937))
Synapse 1.33.0 (2021-05-05)
===========================

View file

@ -85,6 +85,35 @@ for example:
wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb
dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
Upgrading to v1.34.0
====================
`room_invite_state_types` configuration setting
-----------------------------------------------
The ``room_invite_state_types`` configuration setting has been deprecated and
replaced with ``room_prejoin_state``. See the `sample configuration file <https://github.com/matrix-org/synapse/blob/v1.34.0/docs/sample_config.yaml#L1515>`_.
If you have set ``room_invite_state_types`` to the default value you should simply
remove it from your configuration file. The default value used to be:
.. code:: yaml
room_invite_state_types:
- "m.room.join_rules"
- "m.room.canonical_alias"
- "m.room.avatar"
- "m.room.encryption"
- "m.room.name"
If you have customised this value by adding addition state types, you should
remove ``room_invite_state_types`` and configure ``additional_event_types`` with
your customisations.
If you have customised this value by removing state types, you should rename
``room_invite_state_types`` to ``additional_event_types``, and set
``disable_default_event_types`` to ``true``.
Upgrading to v1.33.0
====================

12
debian/changelog vendored
View file

@ -1,3 +1,15 @@
matrix-synapse-py3 (1.33.2) stable; urgency=medium
* New synapse release 1.33.2.
-- Synapse Packaging team <packages@matrix.org> Tue, 11 May 2021 11:17:59 +0100
matrix-synapse-py3 (1.33.1) stable; urgency=medium
* New synapse release 1.33.1.
-- Synapse Packaging team <packages@matrix.org> Thu, 06 May 2021 14:06:33 +0100
matrix-synapse-py3 (1.33.0) stable; urgency=medium
* New synapse release 1.33.0.

View file

@ -88,5 +88,5 @@ EXPOSE 8008/tcp 8009/tcp 8448/tcp
ENTRYPOINT ["/start.py"]
HEALTHCHECK --interval=1m --timeout=5s \
HEALTHCHECK --start-period=5s --interval=15s --timeout=5s \
CMD curl -fSs http://localhost:8008/health || exit 1

View file

@ -191,6 +191,16 @@ whilst running the above `docker run` commands.
```
--no-healthcheck
```
## Disabling the healthcheck in docker-compose file
If you wish to disable the healthcheck via docker-compose, append the following to your service configuration.
```
healthcheck:
disable: true
```
## Setting custom healthcheck on docker run
If you wish to point the healthcheck at a different port with docker command, add the following
@ -202,14 +212,15 @@ If you wish to point the healthcheck at a different port with docker command, ad
## Setting the healthcheck in docker-compose file
You can add the following to set a custom healthcheck in a docker compose file.
You will need version >2.1 for this to work.
You will need docker-compose version >2.1 for this to work.
```
healthcheck:
test: ["CMD", "curl", "-fSs", "http://localhost:8008/health"]
interval: 1m
timeout: 10s
interval: 15s
timeout: 5s
retries: 3
start_period: 5s
```
## Using jemalloc

View file

@ -427,7 +427,7 @@ the new room. Users on other servers will be unaffected.
The API is:
```
POST /_synapse/admin/v1/rooms/<room_id>/delete
DELETE /_synapse/admin/v1/rooms/<room_id>
```
with a body of:
@ -528,6 +528,15 @@ You will have to manually handle, if you so choose, the following:
* Users that would have been booted from the room (and will have been force-joined to the Content Violation room).
* Removal of the Content Violation room if desired.
## Deprecated endpoint
The previous deprecated API will be removed in a future release, it was:
```
POST /_synapse/admin/v1/rooms/<room_id>/delete
```
It behaves the same way than the current endpoint except the path and the method.
# Make Room Admin API

View file

@ -152,6 +152,16 @@ presence:
#
#gc_thresholds: [700, 10, 10]
# The minimum time in seconds between each GC for a generation, regardless of
# the GC thresholds. This ensures that we don't do GC too frequently.
#
# A value of `[1s, 10s, 30s]` indicates that a second must pass between consecutive
# generation 0 GCs, etc.
#
# Defaults to `[1s, 10s, 30s]`.
#
#gc_min_interval: [0.5s, 30s, 1m]
# Set the limit on the returned events in the timeline in the get
# and sync operations. The default value is 100. -1 means no upper limit.
#
@ -731,6 +741,12 @@ acme:
#
#allow_profile_lookup_over_federation: false
# Uncomment to disable device display name lookup over federation. By default, the
# Federation API allows other homeservers to obtain device display names of any user
# on this homeserver. Defaults to 'true'.
#
#allow_device_name_lookup_over_federation: false
## Caching ##
@ -810,6 +826,7 @@ caches:
# password: secretpassword
# database: synapse
# host: localhost
# port: 5432
# cp_min: 5
# cp_max: 10
#
@ -1504,6 +1521,7 @@ room_prejoin_state:
# - m.room.avatar
# - m.room.encryption
# - m.room.name
# - m.room.create
#
# Uncomment the following to disable these defaults (so that only the event
# types listed in 'additional_event_types' are shared). Defaults to 'false'.

View file

@ -171,3 +171,6 @@ ignore_missing_imports = True
[mypy-txacme.*]
ignore_missing_imports = True
[mypy-pympler.*]
ignore_missing_imports = True

View file

@ -1,4 +1,4 @@
#!/usr/bin/env python2
#!/usr/bin/env python
import sys

View file

@ -6,7 +6,7 @@
# It does so by having Synapse generate an up-to-date SQLite DB, then running
# synapse_port_db to convert it to Postgres. It then dumps the contents of both.
POSTGRES_HOST="localhost"
export PGHOST="localhost"
POSTGRES_DB_NAME="synapse_full_schema.$$"
SQLITE_FULL_SCHEMA_OUTPUT_FILE="full.sql.sqlite"
@ -32,7 +32,7 @@ usage() {
while getopts "p:co:h" opt; do
case $opt in
p)
POSTGRES_USERNAME=$OPTARG
export PGUSER=$OPTARG
;;
c)
# Print all commands that are being executed
@ -69,7 +69,7 @@ if [ ${#unsatisfied_requirements} -ne 0 ]; then
exit 1
fi
if [ -z "$POSTGRES_USERNAME" ]; then
if [ -z "$PGUSER" ]; then
echo "No postgres username supplied"
usage
exit 1
@ -84,8 +84,9 @@ fi
# Create the output directory if it doesn't exist
mkdir -p "$OUTPUT_DIR"
read -rsp "Postgres password for '$POSTGRES_USERNAME': " POSTGRES_PASSWORD
read -rsp "Postgres password for '$PGUSER': " PGPASSWORD
echo ""
export PGPASSWORD
# Exit immediately if a command fails
set -e
@ -131,9 +132,9 @@ report_stats: false
database:
name: "psycopg2"
args:
user: "$POSTGRES_USERNAME"
host: "$POSTGRES_HOST"
password: "$POSTGRES_PASSWORD"
user: "$PGUSER"
host: "$PGHOST"
password: "$PGPASSWORD"
database: "$POSTGRES_DB_NAME"
# Suppress the key server warning.
@ -150,7 +151,7 @@ scripts-dev/update_database --database-config "$SQLITE_CONFIG"
# Create the PostgreSQL database.
echo "Creating postgres database..."
createdb $POSTGRES_DB_NAME
createdb --lc-collate=C --lc-ctype=C --template=template0 "$POSTGRES_DB_NAME"
echo "Copying data from SQLite3 to Postgres with synapse_port_db..."
if [ -z "$COVERAGE" ]; then
@ -181,7 +182,7 @@ DROP TABLE user_directory_search_docsize;
DROP TABLE user_directory_search_stat;
"
sqlite3 "$SQLITE_DB" <<< "$SQL"
psql $POSTGRES_DB_NAME -U "$POSTGRES_USERNAME" -w <<< "$SQL"
psql "$POSTGRES_DB_NAME" -w <<< "$SQL"
echo "Dumping SQLite3 schema to '$OUTPUT_DIR/$SQLITE_FULL_SCHEMA_OUTPUT_FILE'..."
sqlite3 "$SQLITE_DB" ".dump" > "$OUTPUT_DIR/$SQLITE_FULL_SCHEMA_OUTPUT_FILE"

View file

@ -913,6 +913,7 @@ class Porter(object):
(curr_forward_id + 1,),
)
if curr_backward_id:
txn.execute(
"ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s",
(curr_backward_id + 1,),
@ -954,12 +955,13 @@ class Porter(object):
(curr_chain_id,),
)
if curr_chain_id is not None:
await self.postgres_store.db_pool.runInteraction(
"_setup_event_auth_chain_id", r,
"_setup_event_auth_chain_id",
r,
)
##############################################
# The following is simply UI stuff
##############################################

View file

@ -47,7 +47,7 @@ try:
except ImportError:
pass
__version__ = "1.33.0"
__version__ = "1.34.0rc1"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when

View file

@ -110,13 +110,18 @@ class EventTypes:
Dummy = "org.matrix.dummy_event"
SpaceChild = "m.space.child"
SpaceParent = "m.space.parent"
MSC1772_SPACE_CHILD = "org.matrix.msc1772.space.child"
MSC1772_SPACE_PARENT = "org.matrix.msc1772.space.parent"
class ToDeviceEventTypes:
RoomKeyRequest = "m.room_key_request"
class EduTypes:
Presence = "m.presence"
RoomKeyRequest = "m.room_key_request"
class RejectedReason:
@ -174,6 +179,7 @@ class EventContentFields:
SELF_DESTRUCT_AFTER = "org.matrix.self_destruct_after"
# cf https://github.com/matrix-org/matrix-doc/pull/1772
ROOM_TYPE = "type"
MSC1772_ROOM_TYPE = "org.matrix.msc1772.type"

View file

@ -57,6 +57,7 @@ class Ratelimiter:
rate_hz: Optional[float] = None,
burst_count: Optional[int] = None,
update: bool = True,
n_actions: int = 1,
_time_now_s: Optional[int] = None,
) -> Tuple[bool, float]:
"""Can the entity (e.g. user or IP address) perform the action?
@ -76,6 +77,9 @@ class Ratelimiter:
burst_count: How many actions that can be performed before being limited.
Overrides the value set during instantiation if set.
update: Whether to count this check as performing the action
n_actions: The number of times the user wants to do this action. If the user
cannot do all of the actions, the user's action count is not incremented
at all.
_time_now_s: The current time. Optional, defaults to the current time according
to self.clock. Only used by tests.
@ -124,17 +128,20 @@ class Ratelimiter:
time_delta = time_now_s - time_start
performed_count = action_count - time_delta * rate_hz
if performed_count < 0:
# Allow, reset back to count 1
allowed = True
performed_count = 0
time_start = time_now_s
action_count = 1.0
elif performed_count > burst_count - 1.0:
# This check would be easier read as performed_count + n_actions > burst_count,
# but performed_count might be a very precise float (with lots of numbers
# following the point) in which case Python might round it up when adding it to
# n_actions. Writing it this way ensures it doesn't happen.
if performed_count > burst_count - n_actions:
# Deny, we have exceeded our burst count
allowed = False
else:
# We haven't reached our limit yet
allowed = True
action_count += 1.0
action_count = performed_count + n_actions
if update:
self.actions[key] = (action_count, time_start, rate_hz)
@ -182,6 +189,7 @@ class Ratelimiter:
rate_hz: Optional[float] = None,
burst_count: Optional[int] = None,
update: bool = True,
n_actions: int = 1,
_time_now_s: Optional[int] = None,
):
"""Checks if an action can be performed. If not, raises a LimitExceededError
@ -201,6 +209,9 @@ class Ratelimiter:
burst_count: How many actions that can be performed before being limited.
Overrides the value set during instantiation if set.
update: Whether to count this check as performing the action
n_actions: The number of times the user wants to do this action. If the user
cannot do all of the actions, the user's action count is not incremented
at all.
_time_now_s: The current time. Optional, defaults to the current time according
to self.clock. Only used by tests.
@ -216,6 +227,7 @@ class Ratelimiter:
rate_hz=rate_hz,
burst_count=burst_count,
update=update,
n_actions=n_actions,
_time_now_s=time_now_s,
)

View file

@ -37,6 +37,7 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
from synapse.logging.context import PreserveLoggingContext
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.metrics.jemalloc import setup_jemalloc_stats
from synapse.util.async_helpers import Linearizer
from synapse.util.daemonize import daemonize_process
from synapse.util.rlimit import change_resource_limit
@ -115,6 +116,7 @@ def start_reactor(
def run():
logger.info("Running")
setup_jemalloc_stats()
change_resource_limit(soft_file_limit)
if gc_thresholds:
gc.set_threshold(*gc_thresholds)

View file

@ -454,6 +454,10 @@ def start(config_options):
config.server.update_user_directory = False
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
synapse.util.caches.TRACK_MEMORY_USAGE = config.caches.track_memory_usage
if config.server.gc_seconds:
synapse.metrics.MIN_TIME_BETWEEN_GCS = config.server.gc_seconds
hs = GenericWorkerServer(
config.server_name,

View file

@ -341,6 +341,10 @@ def setup(config_options):
sys.exit(0)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
synapse.util.caches.TRACK_MEMORY_USAGE = config.caches.track_memory_usage
if config.server.gc_seconds:
synapse.metrics.MIN_TIME_BETWEEN_GCS = config.server.gc_seconds
hs = SynapseHomeServer(
config.server_name,

View file

@ -88,10 +88,6 @@ class ApiConfig(Config):
if not room_prejoin_state_config.get("disable_default_event_types"):
yield from _DEFAULT_PREJOIN_STATE_TYPES
if self.spaces_enabled:
# MSC1772 suggests adding m.room.create to the prejoin state
yield EventTypes.Create
yield from room_prejoin_state_config.get("additional_event_types", [])
@ -109,6 +105,8 @@ _DEFAULT_PREJOIN_STATE_TYPES = [
EventTypes.RoomAvatar,
EventTypes.RoomEncryption,
EventTypes.Name,
# Per MSC1772.
EventTypes.Create,
]

View file

@ -17,6 +17,8 @@ import re
import threading
from typing import Callable, Dict
from synapse.python_dependencies import DependencyException, check_requirements
from ._base import Config, ConfigError
# The prefix for all cache factor-related environment variables
@ -189,6 +191,15 @@ class CacheConfig(Config):
)
self.cache_factors[cache] = factor
self.track_memory_usage = cache_config.get("track_memory_usage", False)
if self.track_memory_usage:
try:
check_requirements("cache_memory")
except DependencyException as e:
raise ConfigError(
e.message # noqa: B306, DependencyException.message is a property
)
# Resize all caches (if necessary) with the new factors we've loaded
self.resize_all_caches()

View file

@ -58,6 +58,7 @@ DEFAULT_CONFIG = """\
# password: secretpassword
# database: synapse
# host: localhost
# port: 5432
# cp_min: 5
# cp_max: 10
#

View file

@ -44,6 +44,10 @@ class FederationConfig(Config):
"allow_profile_lookup_over_federation", True
)
self.allow_device_name_lookup_over_federation = config.get(
"allow_device_name_lookup_over_federation", True
)
def generate_config_section(self, config_dir_path, server_name, **kwargs):
return """\
## Federation ##
@ -75,6 +79,12 @@ class FederationConfig(Config):
# on this homeserver. Defaults to 'true'.
#
#allow_profile_lookup_over_federation: false
# Uncomment to disable device display name lookup over federation. By default, the
# Federation API allows other homeservers to obtain device display names of any user
# on this homeserver. Defaults to 'true'.
#
#allow_device_name_lookup_over_federation: false
"""

View file

@ -19,7 +19,7 @@ import logging
import os.path
import re
from textwrap import indent
from typing import Any, Dict, Iterable, List, Optional, Set
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
import attr
import yaml
@ -572,6 +572,7 @@ class ServerConfig(Config):
_warn_if_webclient_configured(self.listeners)
self.gc_thresholds = read_gc_thresholds(config.get("gc_thresholds", None))
self.gc_seconds = self.read_gc_intervals(config.get("gc_min_interval", None))
@attr.s
class LimitRemoteRoomsConfig:
@ -917,6 +918,16 @@ class ServerConfig(Config):
#
#gc_thresholds: [700, 10, 10]
# The minimum time in seconds between each GC for a generation, regardless of
# the GC thresholds. This ensures that we don't do GC too frequently.
#
# A value of `[1s, 10s, 30s]` indicates that a second must pass between consecutive
# generation 0 GCs, etc.
#
# Defaults to `[1s, 10s, 30s]`.
#
#gc_min_interval: [0.5s, 30s, 1m]
# Set the limit on the returned events in the timeline in the get
# and sync operations. The default value is 100. -1 means no upper limit.
#
@ -1305,6 +1316,24 @@ class ServerConfig(Config):
help="Turn on the twisted telnet manhole service on the given port.",
)
def read_gc_intervals(self, durations) -> Optional[Tuple[float, float, float]]:
"""Reads the three durations for the GC min interval option, returning seconds."""
if durations is None:
return None
try:
if len(durations) != 3:
raise ValueError()
return (
self.parse_duration(durations[0]) / 1000,
self.parse_duration(durations[1]) / 1000,
self.parse_duration(durations[2]) / 1000,
)
except Exception:
raise ConfigError(
"Value of `gc_min_interval` must be a list of three durations if set"
)
def is_threepid_reserved(reserved_threepids, threepid):
"""Check the threepid against the reserved threepid config

View file

@ -17,7 +17,7 @@ import os
import warnings
from datetime import datetime
from hashlib import sha256
from typing import List, Optional
from typing import List, Optional, Pattern
from unpaddedbase64 import encode_base64
@ -124,7 +124,7 @@ class TlsConfig(Config):
fed_whitelist_entries = []
# Support globs (*) in whitelist values
self.federation_certificate_verification_whitelist = [] # type: List[str]
self.federation_certificate_verification_whitelist = [] # type: List[Pattern]
for entry in fed_whitelist_entries:
try:
entry_regex = glob_to_regex(entry.encode("ascii").decode("ascii"))

View file

@ -20,6 +20,7 @@ from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional, Tuple,
from synapse.rest.media.v1._base import FileInfo
from synapse.rest.media.v1.media_storage import ReadableFileWrapper
from synapse.spam_checker_api import RegistrationBehaviour
from synapse.types import RoomAlias
from synapse.util.async_helpers import maybe_awaitable
if TYPE_CHECKING:
@ -113,7 +114,9 @@ class SpamChecker:
return True
async def user_may_create_room_alias(self, userid: str, room_alias: str) -> bool:
async def user_may_create_room_alias(
self, userid: str, room_alias: RoomAlias
) -> bool:
"""Checks if a given user may create a room alias
If this method returns false, the association request will be rejected.

View file

@ -44,7 +44,6 @@ from synapse.api.errors import (
SynapseError,
UnsupportedRoomVersionError,
)
from synapse.api.ratelimiting import Ratelimiter
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
@ -865,14 +864,6 @@ class FederationHandlerRegistry:
# EDU received.
self._edu_type_to_instance = {} # type: Dict[str, List[str]]
# A rate limiter for incoming room key requests per origin.
self._room_key_request_rate_limiter = Ratelimiter(
store=hs.get_datastore(),
clock=self.clock,
rate_hz=self.config.rc_key_requests.per_second,
burst_count=self.config.rc_key_requests.burst_count,
)
def register_edu_handler(
self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]]
) -> None:
@ -926,16 +917,6 @@ class FederationHandlerRegistry:
if not self.config.use_presence and edu_type == EduTypes.Presence:
return
# If the incoming room key requests from a particular origin are over
# the limit, drop them.
if (
edu_type == EduTypes.RoomKeyRequest
and not await self._room_key_request_rate_limiter.can_do_action(
None, origin
)
):
return
# Check if we have a handler on this instance
handler = self.edu_handlers.get(edu_type)
if handler:

View file

@ -28,6 +28,7 @@ from synapse.api.presence import UserPresenceState
from synapse.events import EventBase
from synapse.federation.units import Edu
from synapse.handlers.presence import format_user_presence_state
from synapse.logging import issue9533_logger
from synapse.logging.opentracing import SynapseTags, set_tag
from synapse.metrics import sent_transactions_counter
from synapse.metrics.background_process_metrics import run_as_background_process
@ -574,6 +575,14 @@ class PerDestinationQueue:
for content in contents
]
if edus:
issue9533_logger.debug(
"Sending %i to-device messages to %s, up to stream id %i",
len(edus),
self._destination,
stream_id,
)
return (edus, stream_id)
def _start_catching_up(self) -> None:

View file

@ -995,6 +995,7 @@ class TransportLayerClient:
returned per space
exclude_rooms: a list of any rooms we can skip
"""
# TODO When switching to the stable endpoint, use GET instead of POST.
path = _create_path(
FEDERATION_UNSTABLE_PREFIX, "/org.matrix.msc2946/spaces/%s", room_id
)

View file

@ -1376,6 +1376,32 @@ class FederationSpaceSummaryServlet(BaseFederationServlet):
PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc2946"
PATH = "/spaces/(?P<room_id>[^/]*)"
async def on_GET(
self,
origin: str,
content: JsonDict,
query: Mapping[bytes, Sequence[bytes]],
room_id: str,
) -> Tuple[int, JsonDict]:
suggested_only = parse_boolean_from_args(query, "suggested_only", default=False)
max_rooms_per_space = parse_integer_from_args(query, "max_rooms_per_space")
exclude_rooms = []
if b"exclude_rooms" in query:
try:
exclude_rooms = [
room_id.decode("ascii") for room_id in query[b"exclude_rooms"]
]
except Exception:
raise SynapseError(
400, "Bad query parameter for exclude_rooms", Codes.INVALID_PARAM
)
return 200, await self.handler.federation_space_summary(
room_id, suggested_only, max_rooms_per_space, exclude_rooms
)
# TODO When switching to the stable endpoint, remove the POST handler.
async def on_POST(
self,
origin: str,

View file

@ -17,6 +17,7 @@ import logging
import time
import unicodedata
import urllib.parse
from binascii import crc32
from typing import (
TYPE_CHECKING,
Any,
@ -34,6 +35,7 @@ from typing import (
import attr
import bcrypt
import pymacaroons
import unpaddedbase64
from twisted.web.server import Request
@ -66,6 +68,7 @@ from synapse.util import stringutils as stringutils
from synapse.util.async_helpers import maybe_awaitable
from synapse.util.macaroons import get_value_from_macaroon, satisfy_expiry
from synapse.util.msisdn import phone_number_to_msisdn
from synapse.util.stringutils import base62_encode
from synapse.util.threepids import canonicalise_email
if TYPE_CHECKING:
@ -808,10 +811,12 @@ class AuthHandler(BaseHandler):
logger.info(
"Logging in user %s as %s%s", user_id, puppets_user_id, fmt_expiry
)
target_user_id_obj = UserID.from_string(puppets_user_id)
else:
logger.info(
"Logging in user %s on device %s%s", user_id, device_id, fmt_expiry
)
target_user_id_obj = UserID.from_string(user_id)
if (
not is_appservice_ghost
@ -819,7 +824,7 @@ class AuthHandler(BaseHandler):
):
await self.auth.check_auth_blocking(user_id)
access_token = self.macaroon_gen.generate_access_token(user_id)
access_token = self.generate_access_token(target_user_id_obj)
await self.store.add_access_token_to_user(
user_id=user_id,
token=access_token,
@ -1192,6 +1197,19 @@ class AuthHandler(BaseHandler):
return None
return user_id
def generate_access_token(self, for_user: UserID) -> str:
"""Generates an opaque string, for use as an access token"""
# we use the following format for access tokens:
# syt_<base64 local part>_<random string>_<base62 crc check>
b64local = unpaddedbase64.encode_base64(for_user.localpart.encode("utf-8"))
random_string = stringutils.random_string(20)
base = f"syt_{b64local}_{random_string}"
crc = base62_encode(crc32(base.encode("ascii")), minwidth=6)
return f"{base}_{crc}"
async def validate_short_term_login_token(
self, login_token: str
) -> LoginTokenAttributes:
@ -1585,10 +1603,7 @@ class MacaroonGenerator:
hs = attr.ib()
def generate_access_token(
self, user_id: str, extra_caveats: Optional[List[str]] = None
) -> str:
extra_caveats = extra_caveats or []
def generate_guest_access_token(self, user_id: str) -> str:
macaroon = self._generate_base_macaroon(user_id)
macaroon.add_first_party_caveat("type = access")
# Include a nonce, to make sure that each login gets a different
@ -1596,8 +1611,7 @@ class MacaroonGenerator:
macaroon.add_first_party_caveat(
"nonce = %s" % (stringutils.random_string_with_symbols(16),)
)
for caveat in extra_caveats:
macaroon.add_first_party_caveat(caveat)
macaroon.add_first_party_caveat("guest = true")
return macaroon.serialize()
def generate_short_term_login_token(

View file

@ -15,7 +15,7 @@
import logging
from typing import TYPE_CHECKING, Any, Dict
from synapse.api.constants import EduTypes
from synapse.api.constants import ToDeviceEventTypes
from synapse.api.errors import SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.logging.context import run_in_background
@ -79,6 +79,8 @@ class DeviceMessageHandler:
ReplicationUserDevicesResyncRestServlet.make_client(hs)
)
# a rate limiter for room key requests. The keys are
# (sending_user_id, sending_device_id).
self._ratelimiter = Ratelimiter(
store=self.store,
clock=hs.get_clock(),
@ -100,12 +102,25 @@ class DeviceMessageHandler:
for user_id, by_device in content["messages"].items():
# we use UserID.from_string to catch invalid user ids
if not self.is_mine(UserID.from_string(user_id)):
logger.warning("Request for keys for non-local user %s", user_id)
logger.warning("To-device message to non-local user %s", user_id)
raise SynapseError(400, "Not a user here")
if not by_device:
continue
# Ratelimit key requests by the sending user.
if message_type == ToDeviceEventTypes.RoomKeyRequest:
allowed, _ = await self._ratelimiter.can_do_action(
None, (sender_user_id, None)
)
if not allowed:
logger.info(
"Dropping room_key_request from %s to %s due to rate limit",
sender_user_id,
user_id,
)
continue
messages_by_device = {
device_id: {
"content": message_content,
@ -192,12 +207,18 @@ class DeviceMessageHandler:
for user_id, by_device in messages.items():
# Ratelimit local cross-user key requests by the sending device.
if (
message_type == EduTypes.RoomKeyRequest
message_type == ToDeviceEventTypes.RoomKeyRequest
and user_id != sender_user_id
and await self._ratelimiter.can_do_action(
):
allowed, _ = await self._ratelimiter.can_do_action(
requester, (sender_user_id, requester.device_id)
)
):
if not allowed:
logger.info(
"Dropping room_key_request from %s to %s due to rate limit",
sender_user_id,
user_id,
)
continue
# we use UserID.from_string to catch invalid user ids

View file

@ -14,7 +14,7 @@
import logging
import string
from typing import Iterable, List, Optional
from typing import TYPE_CHECKING, Iterable, List, Optional
from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes
from synapse.api.errors import (
@ -27,15 +27,19 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.appservice import ApplicationService
from synapse.types import Requester, RoomAlias, UserID, get_domain_from_id
from synapse.storage.databases.main.directory import RoomAliasMapping
from synapse.types import JsonDict, Requester, RoomAlias, UserID, get_domain_from_id
from ._base import BaseHandler
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class DirectoryHandler(BaseHandler):
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.state = hs.get_state_handler()
@ -60,7 +64,7 @@ class DirectoryHandler(BaseHandler):
room_id: str,
servers: Optional[Iterable[str]] = None,
creator: Optional[str] = None,
):
) -> None:
# general association creation for both human users and app services
# meow: allow specific users to include anything in room aliases
@ -76,7 +80,7 @@ class DirectoryHandler(BaseHandler):
# TODO(erikj): Add transactions.
# TODO(erikj): Check if there is a current association.
if not servers:
users = await self.state.get_current_users_in_room(room_id)
users = await self.store.get_users_in_room(room_id)
servers = {get_domain_from_id(u) for u in users}
if not servers:
@ -106,8 +110,9 @@ class DirectoryHandler(BaseHandler):
"""
user_id = requester.user.to_string()
room_alias_str = room_alias.to_string()
if len(room_alias.to_string()) > MAX_ALIAS_LENGTH:
if len(room_alias_str) > MAX_ALIAS_LENGTH:
raise SynapseError(
400,
"Can't create aliases longer than %s characters" % MAX_ALIAS_LENGTH,
@ -116,7 +121,7 @@ class DirectoryHandler(BaseHandler):
service = requester.app_service
if service:
if not service.is_interested_in_alias(room_alias.to_string()):
if not service.is_interested_in_alias(room_alias_str):
raise SynapseError(
400,
"This application service has not reserved this kind of alias.",
@ -140,7 +145,7 @@ class DirectoryHandler(BaseHandler):
raise AuthError(403, "This user is not permitted to create this alias")
if not self.config.is_alias_creation_allowed(
user_id, room_id, room_alias.to_string()
user_id, room_id, room_alias_str
):
# Lets just return a generic message, as there may be all sorts of
# reasons why we said no. TODO: Allow configurable error messages
@ -213,7 +218,7 @@ class DirectoryHandler(BaseHandler):
async def delete_appservice_association(
self, service: ApplicationService, room_alias: RoomAlias
):
) -> None:
if not service.is_interested_in_alias(room_alias.to_string()):
raise SynapseError(
400,
@ -222,7 +227,7 @@ class DirectoryHandler(BaseHandler):
)
await self._delete_association(room_alias)
async def _delete_association(self, room_alias: RoomAlias):
async def _delete_association(self, room_alias: RoomAlias) -> str:
if not self.hs.is_mine(room_alias):
raise SynapseError(400, "Room alias must be local")
@ -230,17 +235,19 @@ class DirectoryHandler(BaseHandler):
return room_id
async def get_association(self, room_alias: RoomAlias):
async def get_association(self, room_alias: RoomAlias) -> JsonDict:
room_id = None
if self.hs.is_mine(room_alias):
result = await self.get_association_from_room_alias(room_alias)
result = await self.get_association_from_room_alias(
room_alias
) # type: Optional[RoomAliasMapping]
if result:
room_id = result.room_id
servers = result.servers
else:
try:
result = await self.federation.make_query(
fed_result = await self.federation.make_query(
destination=room_alias.domain,
query_type="directory",
args={"room_alias": room_alias.to_string()},
@ -250,13 +257,13 @@ class DirectoryHandler(BaseHandler):
except CodeMessageException as e:
logging.warning("Error retrieving alias")
if e.code == 404:
result = None
fed_result = None
else:
raise
if result and "room_id" in result and "servers" in result:
room_id = result["room_id"]
servers = result["servers"]
if fed_result and "room_id" in fed_result and "servers" in fed_result:
room_id = fed_result["room_id"]
servers = fed_result["servers"]
if not room_id:
raise SynapseError(
@ -265,7 +272,7 @@ class DirectoryHandler(BaseHandler):
Codes.NOT_FOUND,
)
users = await self.state.get_current_users_in_room(room_id)
users = await self.store.get_users_in_room(room_id)
extra_servers = {get_domain_from_id(u) for u in users}
servers = set(extra_servers) | set(servers)
@ -277,7 +284,7 @@ class DirectoryHandler(BaseHandler):
return {"room_id": room_id, "servers": servers}
async def on_directory_query(self, args):
async def on_directory_query(self, args: JsonDict) -> JsonDict:
room_alias = RoomAlias.from_string(args["room_alias"])
if not self.hs.is_mine(room_alias):
raise SynapseError(400, "Room Alias is not hosted on this homeserver")
@ -295,7 +302,7 @@ class DirectoryHandler(BaseHandler):
async def _update_canonical_alias(
self, requester: Requester, user_id: str, room_id: str, room_alias: RoomAlias
):
) -> None:
"""
Send an updated canonical alias event if the removed alias was set as
the canonical alias or listed in the alt_aliases field.
@ -346,7 +353,9 @@ class DirectoryHandler(BaseHandler):
ratelimit=False,
)
async def get_association_from_room_alias(self, room_alias: RoomAlias):
async def get_association_from_room_alias(
self, room_alias: RoomAlias
) -> Optional[RoomAliasMapping]:
result = await self.store.get_association_from_room_alias(room_alias)
if not result:
# Query AS to see if it exists
@ -374,7 +383,7 @@ class DirectoryHandler(BaseHandler):
# either no interested services, or no service with an exclusive lock
return True
async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str):
async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str) -> bool:
"""Determine whether a user can delete an alias.
One of the following must be true:
@ -396,14 +405,13 @@ class DirectoryHandler(BaseHandler):
if not room_id:
return False
res = await self.auth.check_can_change_room_list(
return await self.auth.check_can_change_room_list(
room_id, UserID.from_string(user_id)
)
return res
async def edit_published_room_list(
self, requester: Requester, room_id: str, visibility: str
):
) -> None:
"""Edit the entry of the room in the published room list.
requester
@ -471,7 +479,7 @@ class DirectoryHandler(BaseHandler):
async def edit_published_appservice_room_list(
self, appservice_id: str, network_id: str, room_id: str, visibility: str
):
) -> None:
"""Add or remove a room from the appservice/network specific public
room list.
@ -501,5 +509,4 @@ class DirectoryHandler(BaseHandler):
room_id, requester.user.to_string()
)
aliases = await self.store.get_aliases_for_room(room_id)
return aliases
return await self.store.get_aliases_for_room(room_id)

View file

@ -103,7 +103,7 @@ class EventStreamHandler(BaseHandler):
# Send down presence.
if event.state_key == auth_user_id:
# Send down presence for everyone in the room.
users = await self.state.get_current_users_in_room(
users = await self.store.get_users_in_room(
event.room_id
) # type: Iterable[str]
else:

View file

@ -552,8 +552,12 @@ class FederationHandler(BaseHandler):
destination: str,
room_id: str,
event_id: str,
) -> Tuple[List[EventBase], List[EventBase]]:
"""Requests all of the room state at a given event from a remote homeserver.
) -> List[EventBase]:
"""Requests all of the room state at a given event from a remote
homeserver.
Will also fetch any missing events reported in the `auth_chain_ids`
section of `/state_ids`.
Args:
destination: The remote homeserver to query for the state.
@ -561,8 +565,7 @@ class FederationHandler(BaseHandler):
event_id: The id of the event we want the state at.
Returns:
A list of events in the state, not including the event itself, and
a list of events in the auth chain for the given event.
A list of events in the state, not including the event itself.
"""
(
state_event_ids,
@ -571,68 +574,53 @@ class FederationHandler(BaseHandler):
destination, room_id, event_id=event_id
)
desired_events = set(state_event_ids + auth_event_ids)
event_map = await self._get_events_from_store_or_dest(
destination, room_id, desired_events
)
failed_to_fetch = desired_events - event_map.keys()
if failed_to_fetch:
logger.warning(
"Failed to fetch missing state/auth events for %s %s",
event_id,
failed_to_fetch,
)
remote_state = [
event_map[e_id] for e_id in state_event_ids if e_id in event_map
]
auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map]
auth_chain.sort(key=lambda e: e.depth)
return remote_state, auth_chain
async def _get_events_from_store_or_dest(
self, destination: str, room_id: str, event_ids: Iterable[str]
) -> Dict[str, EventBase]:
"""Fetch events from a remote destination, checking if we already have them.
Persists any events we don't already have as outliers.
If we fail to fetch any of the events, a warning will be logged, and the event
will be omitted from the result. Likewise, any events which turn out not to
be in the given room.
This function *does not* automatically get missing auth events of the
newly fetched events. Callers must include the full auth chain of
of the missing events in the `event_ids` argument, to ensure that any
missing auth events are correctly fetched.
Returns:
map from event_id to event
"""
fetched_events = await self.store.get_events(event_ids, allow_rejected=True)
missing_events = set(event_ids) - fetched_events.keys()
if missing_events:
logger.debug(
"Fetching unknown state/auth events %s for room %s",
missing_events,
room_id,
)
# Fetch the state events from the DB, and check we have the auth events.
event_map = await self.store.get_events(state_event_ids, allow_rejected=True)
auth_events_in_store = await self.store.have_seen_events(auth_event_ids)
# Check for missing events. We handle state and auth event seperately,
# as we want to pull the state from the DB, but we don't for the auth
# events. (Note: we likely won't use the majority of the auth chain, and
# it can be *huge* for large rooms, so it's worth ensuring that we don't
# unnecessarily pull it from the DB).
missing_state_events = set(state_event_ids) - set(event_map)
missing_auth_events = set(auth_event_ids) - set(auth_events_in_store)
if missing_state_events or missing_auth_events:
await self._get_events_and_persist(
destination=destination, room_id=room_id, events=missing_events
destination=destination,
room_id=room_id,
events=missing_state_events | missing_auth_events,
)
# we need to make sure we re-load from the database to get the rejected
# state correct.
fetched_events.update(
(await self.store.get_events(missing_events, allow_rejected=True))
if missing_state_events:
new_events = await self.store.get_events(
missing_state_events, allow_rejected=True
)
event_map.update(new_events)
missing_state_events.difference_update(new_events)
if missing_state_events:
logger.warning(
"Failed to fetch missing state events for %s %s",
event_id,
missing_state_events,
)
if missing_auth_events:
auth_events_in_store = await self.store.have_seen_events(
missing_auth_events
)
missing_auth_events.difference_update(auth_events_in_store)
if missing_auth_events:
logger.warning(
"Failed to fetch missing auth events for %s %s",
event_id,
missing_auth_events,
)
remote_state = list(event_map.values())
# check for events which were in the wrong room.
#
@ -640,8 +628,8 @@ class FederationHandler(BaseHandler):
# auth_events at an event in room A are actually events in room B
bad_events = [
(event_id, event.room_id)
for event_id, event in fetched_events.items()
(event.event_id, event.room_id)
for event in remote_state
if event.room_id != room_id
]
@ -658,9 +646,10 @@ class FederationHandler(BaseHandler):
room_id,
)
del fetched_events[bad_event_id]
if bad_events:
remote_state = [e for e in remote_state if e.room_id == room_id]
return fetched_events
return remote_state
async def _get_state_after_missing_prev_event(
self,
@ -963,27 +952,23 @@ class FederationHandler(BaseHandler):
# For each edge get the current state.
auth_events = {}
state_events = {}
events_to_state = {}
for e_id in edges:
state, auth = await self._get_state_for_room(
state = await self._get_state_for_room(
destination=dest,
room_id=room_id,
event_id=e_id,
)
auth_events.update({a.event_id: a for a in auth})
auth_events.update({s.event_id: s for s in state})
state_events.update({s.event_id: s for s in state})
events_to_state[e_id] = state
required_auth = {
a_id
for event in events
+ list(state_events.values())
+ list(auth_events.values())
for event in events + list(state_events.values())
for a_id in event.auth_event_ids()
}
auth_events = await self.store.get_events(required_auth, allow_rejected=True)
auth_events.update(
{e_id: event_map[e_id] for e_id in required_auth if e_id in event_map}
)
@ -2446,7 +2431,9 @@ class FederationHandler(BaseHandler):
# If we are going to send this event over federation we precaclculate
# the joined hosts.
if event.internal_metadata.get_send_on_behalf_of():
await self.event_creation_handler.cache_joined_hosts_for_event(event)
await self.event_creation_handler.cache_joined_hosts_for_event(
event, context
)
return context

View file

@ -17,7 +17,7 @@
"""Utilities for interacting with Identity Servers"""
import logging
import urllib.parse
from typing import Awaitable, Callable, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Tuple
from synapse.api.errors import (
CodeMessageException,
@ -41,13 +41,16 @@ from synapse.util.stringutils import (
from ._base import BaseHandler
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
id_server_scheme = "https://"
class IdentityHandler(BaseHandler):
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
# An HTTP client for contacting trusted URLs.
@ -80,7 +83,7 @@ class IdentityHandler(BaseHandler):
request: SynapseRequest,
medium: str,
address: str,
):
) -> None:
"""Used to ratelimit requests to `/requestToken` by IP and address.
Args:

View file

@ -15,10 +15,11 @@
# limitations under the License.
import logging
import random
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple
from canonicaljson import encode_canonical_json
from twisted.internet import defer
from twisted.internet.interfaces import IDelayedCall
from synapse import event_auth
@ -43,14 +44,15 @@ from synapse.events import EventBase
from synapse.events.builder import EventBuilder
from synapse.events.snapshot import EventContext
from synapse.events.validator import EventValidator
from synapse.logging.context import run_in_background
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
from synapse.types import Requester, RoomAlias, StreamToken, UserID, create_requester
from synapse.util import json_decoder, json_encoder
from synapse.util.async_helpers import Linearizer
from synapse.util import json_decoder, json_encoder, log_failure
from synapse.util.async_helpers import Linearizer, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import measure_func
from synapse.visibility import filter_events_for_client
@ -66,7 +68,7 @@ logger = logging.getLogger(__name__)
class MessageHandler:
"""Contains some read only APIs to get state about a room"""
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.state = hs.get_state_handler()
@ -91,7 +93,7 @@ class MessageHandler:
room_id: str,
event_type: str,
state_key: str,
) -> dict:
) -> Optional[EventBase]:
"""Get data from a room.
Args:
@ -115,6 +117,10 @@ class MessageHandler:
data = await self.state.get_current_state(room_id, event_type, state_key)
elif membership == Membership.LEAVE:
key = (event_type, state_key)
# If the membership is not JOIN, then the event ID should exist.
assert (
membership_event_id is not None
), "check_user_in_room_or_world_readable returned invalid data"
room_state = await self.state_store.get_state_for_events(
[membership_event_id], StateFilter.from_types([key])
)
@ -186,10 +192,12 @@ class MessageHandler:
event = last_events[0]
if visible_events:
room_state = await self.state_store.get_state_for_events(
room_state_events = await self.state_store.get_state_for_events(
[event.event_id], state_filter=state_filter
)
room_state = room_state[event.event_id]
room_state = room_state_events[
event.event_id
] # type: Mapping[Any, EventBase]
else:
raise AuthError(
403,
@ -210,10 +218,14 @@ class MessageHandler:
)
room_state = await self.store.get_events(state_ids.values())
elif membership == Membership.LEAVE:
room_state = await self.state_store.get_state_for_events(
# If the membership is not JOIN, then the event ID should exist.
assert (
membership_event_id is not None
), "check_user_in_room_or_world_readable returned invalid data"
room_state_events = await self.state_store.get_state_for_events(
[membership_event_id], state_filter=state_filter
)
room_state = room_state[membership_event_id]
room_state = room_state_events[membership_event_id]
now = self.clock.time_msec()
events = await self._event_serializer.serialize_events(
@ -248,7 +260,7 @@ class MessageHandler:
"Getting joined members after leaving is not implemented"
)
users_with_profile = await self.state.get_current_users_in_room(room_id)
users_with_profile = await self.store.get_users_in_room_with_profiles(room_id)
# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or because there
@ -449,6 +461,19 @@ class EventCreationHandler:
self._external_cache = hs.get_external_cache()
# Stores the state groups we've recently added to the joined hosts
# external cache. Note that the timeout must be significantly less than
# the TTL on the external cache.
self._external_cache_joined_hosts_updates = (
None
) # type: Optional[ExpiringCache]
if self._external_cache.is_enabled():
self._external_cache_joined_hosts_updates = ExpiringCache(
"_external_cache_joined_hosts_updates",
self.clock,
expiry_ms=30 * 60 * 1000,
)
async def create_event(
self,
requester: Requester,
@ -957,9 +982,43 @@ class EventCreationHandler:
logger.exception("Failed to encode content: %r", event.content)
raise
await self.action_generator.handle_push_actions_for_event(event, context)
# We now persist the event (and update the cache in parallel, since we
# don't want to block on it).
result = await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(
self._persist_event,
requester=requester,
event=event,
context=context,
ratelimit=ratelimit,
extra_users=extra_users,
),
run_in_background(
self.cache_joined_hosts_for_event, event, context
).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
],
consumeErrors=True,
)
).addErrback(unwrapFirstError)
await self.cache_joined_hosts_for_event(event)
return result[0]
async def _persist_event(
self,
requester: Requester,
event: EventBase,
context: EventContext,
ratelimit: bool = True,
extra_users: Optional[List[UserID]] = None,
) -> EventBase:
"""Actually persists the event. Should only be called by
`handle_new_client_event`, and see its docstring for documentation of
the arguments.
"""
await self.action_generator.handle_push_actions_for_event(event, context)
try:
# If we're a worker we need to hit out to the master.
@ -1000,7 +1059,9 @@ class EventCreationHandler:
await self.store.remove_push_actions_from_staging(event.event_id)
raise
async def cache_joined_hosts_for_event(self, event: EventBase) -> None:
async def cache_joined_hosts_for_event(
self, event: EventBase, context: EventContext
) -> None:
"""Precalculate the joined hosts at the event, when using Redis, so that
external federation senders don't have to recalculate it themselves.
"""
@ -1008,6 +1069,9 @@ class EventCreationHandler:
if not self._external_cache.is_enabled():
return
# If external cache is enabled we should always have this.
assert self._external_cache_joined_hosts_updates is not None
# We actually store two mappings, event ID -> prev state group,
# state group -> joined hosts, which is much more space efficient
# than event ID -> joined hosts.
@ -1015,22 +1079,28 @@ class EventCreationHandler:
# Note: We have to cache event ID -> prev state group, as we don't
# store that in the DB.
#
# Note: We always set the state group -> joined hosts cache, even if
# we already set it, so that the expiry time is reset.
# Note: We set the state group -> joined hosts cache if it hasn't been
# set for a while, so that the expiry time is reset.
state_entry = await self.state.resolve_state_groups_for_events(
event.room_id, event_ids=event.prev_event_ids()
)
if state_entry.state_group:
joined_hosts = await self.store.get_joined_hosts(event.room_id, state_entry)
await self._external_cache.set(
"event_to_prev_state_group",
event.event_id,
state_entry.state_group,
expiry_ms=60 * 60 * 1000,
)
if state_entry.state_group in self._external_cache_joined_hosts_updates:
return
joined_hosts = await self.store.get_joined_hosts(event.room_id, state_entry)
# Note that the expiry times must be larger than the expiry time in
# _external_cache_joined_hosts_updates.
await self._external_cache.set(
"get_joined_hosts",
str(state_entry.state_group),
@ -1038,6 +1108,8 @@ class EventCreationHandler:
expiry_ms=60 * 60 * 1000,
)
self._external_cache_joined_hosts_updates[state_entry.state_group] = None
async def _validate_canonical_alias(
self, directory_handler, room_alias_str: str, expected_room_id: str
) -> None:

View file

@ -28,6 +28,7 @@ from bisect import bisect
from contextlib import contextmanager
from typing import (
TYPE_CHECKING,
Callable,
Collection,
Dict,
FrozenSet,
@ -232,23 +233,23 @@ class BasePresenceHandler(abc.ABC):
"""
async def update_external_syncs_row(
self, process_id, user_id, is_syncing, sync_time_msec
):
self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int
) -> None:
"""Update the syncing users for an external process as a delta.
This is a no-op when presence is handled by a different worker.
Args:
process_id (str): An identifier for the process the users are
process_id: An identifier for the process the users are
syncing against. This allows synapse to process updates
as user start and stop syncing against a given process.
user_id (str): The user who has started or stopped syncing
is_syncing (bool): Whether or not the user is now syncing
sync_time_msec(int): Time in ms when the user was last syncing
user_id: The user who has started or stopped syncing
is_syncing: Whether or not the user is now syncing
sync_time_msec: Time in ms when the user was last syncing
"""
pass
async def update_external_syncs_clear(self, process_id):
async def update_external_syncs_clear(self, process_id: str) -> None:
"""Marks all users that had been marked as syncing by a given process
as offline.
@ -304,7 +305,7 @@ class _NullContextManager(ContextManager[None]):
class WorkerPresenceHandler(BasePresenceHandler):
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.hs = hs
@ -327,7 +328,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
# user_id -> last_sync_ms. Lists the users that have stopped syncing but
# we haven't notified the presence writer of that yet
self.users_going_offline = {}
self.users_going_offline = {} # type: Dict[str, int]
self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
self._set_state_client = ReplicationPresenceSetState.make_client(hs)
@ -346,24 +347,21 @@ class WorkerPresenceHandler(BasePresenceHandler):
self._on_shutdown,
)
def _on_shutdown(self):
def _on_shutdown(self) -> None:
if self._presence_enabled:
self.hs.get_tcp_replication().send_command(
ClearUserSyncsCommand(self.instance_id)
)
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None:
if self._presence_enabled:
self.hs.get_tcp_replication().send_user_sync(
self.instance_id, user_id, is_syncing, last_sync_ms
)
def mark_as_coming_online(self, user_id):
def mark_as_coming_online(self, user_id: str) -> None:
"""A user has started syncing. Send a UserSync to the presence writer,
unless they had recently stopped syncing.
Args:
user_id (str)
"""
going_offline = self.users_going_offline.pop(user_id, None)
if not going_offline:
@ -371,18 +369,15 @@ class WorkerPresenceHandler(BasePresenceHandler):
# were offline
self.send_user_sync(user_id, True, self.clock.time_msec())
def mark_as_going_offline(self, user_id):
def mark_as_going_offline(self, user_id: str) -> None:
"""A user has stopped syncing. We wait before notifying the presence
writer as its likely they'll come back soon. This allows us to avoid
sending a stopped syncing immediately followed by a started syncing
notification to the presence writer
Args:
user_id (str)
"""
self.users_going_offline[user_id] = self.clock.time_msec()
def send_stop_syncing(self):
def send_stop_syncing(self) -> None:
"""Check if there are any users who have stopped syncing a while ago and
haven't come back yet. If there are poke the presence writer about them.
"""
@ -430,7 +425,9 @@ class WorkerPresenceHandler(BasePresenceHandler):
return _user_syncing()
async def notify_from_replication(self, states, stream_id):
async def notify_from_replication(
self, states: List[UserPresenceState], stream_id: int
) -> None:
parties = await get_interested_parties(self.store, self.presence_router, states)
room_ids_to_states, users_to_states = parties
@ -478,7 +475,12 @@ class WorkerPresenceHandler(BasePresenceHandler):
if count > 0
]
async def set_state(self, target_user, state, ignore_status_msg=False):
async def set_state(
self,
target_user: UserID,
state: JsonDict,
ignore_status_msg: bool = False,
) -> None:
"""Set the presence state of the user."""
presence = state["presence"]
@ -508,7 +510,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
ignore_status_msg=ignore_status_msg,
)
async def bump_presence_active_time(self, user):
async def bump_presence_active_time(self, user: UserID) -> None:
"""We've seen the user do something that indicates they're interacting
with the app.
"""
@ -592,8 +594,8 @@ class PresenceHandler(BasePresenceHandler):
# we assume that all the sync requests on that process have stopped.
# Stored as a dict from process_id to set of user_id, and a dict of
# process_id to millisecond timestamp last updated.
self.external_process_to_current_syncs = {} # type: Dict[int, Set[str]]
self.external_process_last_updated_ms = {} # type: Dict[int, int]
self.external_process_to_current_syncs = {} # type: Dict[str, Set[str]]
self.external_process_last_updated_ms = {} # type: Dict[str, int]
self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
@ -633,7 +635,7 @@ class PresenceHandler(BasePresenceHandler):
self._event_pos = self.store.get_current_events_token()
self._event_processing = False
async def _on_shutdown(self):
async def _on_shutdown(self) -> None:
"""Gets called when shutting down. This lets us persist any updates that
we haven't yet persisted, e.g. updates that only changes some internal
timers. This allows changes to persist across startup without having to
@ -662,7 +664,7 @@ class PresenceHandler(BasePresenceHandler):
)
logger.info("Finished _on_shutdown")
async def _persist_unpersisted_changes(self):
async def _persist_unpersisted_changes(self) -> None:
"""We periodically persist the unpersisted changes, as otherwise they
may stack up and slow down shutdown times.
"""
@ -762,7 +764,7 @@ class PresenceHandler(BasePresenceHandler):
states, destinations
)
async def _handle_timeouts(self):
async def _handle_timeouts(self) -> None:
"""Checks the presence of users that have timed out and updates as
appropriate.
"""
@ -814,7 +816,7 @@ class PresenceHandler(BasePresenceHandler):
return await self._update_states(changes)
async def bump_presence_active_time(self, user):
async def bump_presence_active_time(self, user: UserID) -> None:
"""We've seen the user do something that indicates they're interacting
with the app.
"""
@ -911,17 +913,17 @@ class PresenceHandler(BasePresenceHandler):
return []
async def update_external_syncs_row(
self, process_id, user_id, is_syncing, sync_time_msec
):
self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int
) -> None:
"""Update the syncing users for an external process as a delta.
Args:
process_id (str): An identifier for the process the users are
process_id: An identifier for the process the users are
syncing against. This allows synapse to process updates
as user start and stop syncing against a given process.
user_id (str): The user who has started or stopped syncing
is_syncing (bool): Whether or not the user is now syncing
sync_time_msec(int): Time in ms when the user was last syncing
user_id: The user who has started or stopped syncing
is_syncing: Whether or not the user is now syncing
sync_time_msec: Time in ms when the user was last syncing
"""
with (await self.external_sync_linearizer.queue(process_id)):
prev_state = await self.current_state_for_user(user_id)
@ -958,7 +960,7 @@ class PresenceHandler(BasePresenceHandler):
self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
async def update_external_syncs_clear(self, process_id):
async def update_external_syncs_clear(self, process_id: str) -> None:
"""Marks all users that had been marked as syncing by a given process
as offline.
@ -979,12 +981,12 @@ class PresenceHandler(BasePresenceHandler):
)
self.external_process_last_updated_ms.pop(process_id, None)
async def current_state_for_user(self, user_id):
async def current_state_for_user(self, user_id: str) -> UserPresenceState:
"""Get the current presence state for a user."""
res = await self.current_state_for_users([user_id])
return res[user_id]
async def _persist_and_notify(self, states):
async def _persist_and_notify(self, states: List[UserPresenceState]) -> None:
"""Persist states in the database, poke the notifier and send to
interested remote servers
"""
@ -1005,7 +1007,7 @@ class PresenceHandler(BasePresenceHandler):
# stream (which is updated by `store.update_presence`).
await self.maybe_send_presence_to_interested_destinations(states)
async def incoming_presence(self, origin, content):
async def incoming_presence(self, origin: str, content: JsonDict) -> None:
"""Called when we receive a `m.presence` EDU from a remote server."""
if not self._presence_enabled:
return
@ -1055,7 +1057,9 @@ class PresenceHandler(BasePresenceHandler):
federation_presence_counter.inc(len(updates))
await self._update_states(updates)
async def set_state(self, target_user, state, ignore_status_msg=False):
async def set_state(
self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False
) -> None:
"""Set the presence state of the user."""
status_msg = state.get("status_msg", None)
presence = state["presence"]
@ -1089,7 +1093,7 @@ class PresenceHandler(BasePresenceHandler):
await self._update_states([prev_state.copy_and_replace(**new_fields)])
async def is_visible(self, observed_user, observer_user):
async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool:
"""Returns whether a user can see another user's presence."""
observer_room_ids = await self.store.get_rooms_for_user(
observer_user.to_string()
@ -1144,7 +1148,7 @@ class PresenceHandler(BasePresenceHandler):
)
return rows
def notify_new_event(self):
def notify_new_event(self) -> None:
"""Called when new events have happened. Handles users and servers
joining rooms and require being sent presence.
"""
@ -1163,7 +1167,7 @@ class PresenceHandler(BasePresenceHandler):
run_as_background_process("presence.notify_new_event", _process_presence)
async def _unsafe_process(self):
async def _unsafe_process(self) -> None:
# Loop round handling deltas until we're up to date
while True:
with Measure(self.clock, "presence_delta"):
@ -1179,7 +1183,16 @@ class PresenceHandler(BasePresenceHandler):
max_pos, deltas = await self.store.get_current_state_deltas(
self._event_pos, room_max_stream_ordering
)
await self._handle_state_delta(deltas)
# We may get multiple deltas for different rooms, but we want to
# handle them on a room by room basis, so we batch them up by
# room.
deltas_by_room: Dict[str, List[JsonDict]] = {}
for delta in deltas:
deltas_by_room.setdefault(delta["room_id"], []).append(delta)
for room_id, deltas_for_room in deltas_by_room.items():
await self._handle_state_delta(room_id, deltas_for_room)
self._event_pos = max_pos
@ -1188,17 +1201,21 @@ class PresenceHandler(BasePresenceHandler):
max_pos
)
async def _handle_state_delta(self, deltas):
"""Process current state deltas to find new joins that need to be
handled.
async def _handle_state_delta(self, room_id: str, deltas: List[JsonDict]) -> None:
"""Process current state deltas for the room to find new joins that need
to be handled.
"""
# A map of destination to a set of user state that they should receive
presence_destinations = {} # type: Dict[str, Set[UserPresenceState]]
# Sets of newly joined users. Note that if the local server is
# joining a remote room for the first time we'll see both the joining
# user and all remote users as newly joined.
newly_joined_users = set()
for delta in deltas:
assert room_id == delta["room_id"]
typ = delta["type"]
state_key = delta["state_key"]
room_id = delta["room_id"]
event_id = delta["event_id"]
prev_event_id = delta["prev_event_id"]
@ -1227,72 +1244,55 @@ class PresenceHandler(BasePresenceHandler):
# Ignore changes to join events.
continue
# Retrieve any user presence state updates that need to be sent as a result,
# and the destinations that need to receive it
destinations, user_presence_states = await self._on_user_joined_room(
room_id, state_key
)
newly_joined_users.add(state_key)
# Insert the destinations and respective updates into our destinations dict
for destination in destinations:
presence_destinations.setdefault(destination, set()).update(
user_presence_states
)
if not newly_joined_users:
# If nobody has joined then there's nothing to do.
return
# Send out user presence updates for each destination
for destination, user_state_set in presence_destinations.items():
self._federation_queue.send_presence_to_destinations(
destinations=[destination], states=user_state_set
)
# We want to send:
# 1. presence states of all local users in the room to newly joined
# remote servers
# 2. presence states of newly joined users to all remote servers in
# the room.
#
# TODO: Only send presence states to remote hosts that don't already
# have them (because they already share rooms).
async def _on_user_joined_room(
self, room_id: str, user_id: str
) -> Tuple[List[str], List[UserPresenceState]]:
"""Called when we detect a user joining the room via the current state
delta stream. Returns the destinations that need to be updated and the
presence updates to send to them.
# Get all the users who were already in the room, by fetching the
# current users in the room and removing the newly joined users.
users = await self.store.get_users_in_room(room_id)
prev_users = set(users) - newly_joined_users
Args:
room_id: The ID of the room that the user has joined.
user_id: The ID of the user that has joined the room.
Returns:
A tuple of destinations and presence updates to send to them.
"""
# Construct sets for all the local users and remote hosts that were
# already in the room
prev_local_users = []
prev_remote_hosts = set()
for user_id in prev_users:
if self.is_mine_id(user_id):
# If this is a local user then we need to send their presence
# out to hosts in the room (who don't already have it)
# TODO: We should be able to filter the hosts down to those that
# haven't previously seen the user
remote_hosts = await self.state.get_current_hosts_in_room(room_id)
# Filter out ourselves.
filtered_remote_hosts = [
host for host in remote_hosts if host != self.server_name
]
state = await self.current_state_for_user(user_id)
return filtered_remote_hosts, [state]
prev_local_users.append(user_id)
else:
# A remote user has joined the room, so we need to:
# 1. Check if this is a new server in the room
# 2. If so send any presence they don't already have for
# local users in the room.
prev_remote_hosts.add(get_domain_from_id(user_id))
# TODO: We should be able to filter the users down to those that
# the server hasn't previously seen
# Similarly, construct sets for all the local users and remote hosts
# that were *not* already in the room. Care needs to be taken with the
# calculating the remote hosts, as a host may have already been in the
# room even if there is a newly joined user from that host.
newly_joined_local_users = []
newly_joined_remote_hosts = set()
for user_id in newly_joined_users:
if self.is_mine_id(user_id):
newly_joined_local_users.append(user_id)
else:
host = get_domain_from_id(user_id)
if host not in prev_remote_hosts:
newly_joined_remote_hosts.add(host)
# TODO: Check that this is actually a new server joining the
# room.
remote_host = get_domain_from_id(user_id)
users = await self.state.get_current_users_in_room(room_id)
user_ids = list(filter(self.is_mine_id, users))
states_d = await self.current_state_for_users(user_ids)
# Send presence states of all local users in the room to newly joined
# remote servers. (We actually only send states for local users already
# in the room, as we'll send states for newly joined local users below.)
if prev_local_users and newly_joined_remote_hosts:
local_states = await self.current_state_for_users(prev_local_users)
# Filter out old presence, i.e. offline presence states where
# the user hasn't been active for a week. We can change this
@ -1302,16 +1302,30 @@ class PresenceHandler(BasePresenceHandler):
now = self.clock.time_msec()
states = [
state
for state in states_d.values()
for state in local_states.values()
if state.state != PresenceState.OFFLINE
or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
or state.status_msg is not None
]
return [remote_host], states
self._federation_queue.send_presence_to_destinations(
destinations=newly_joined_remote_hosts,
states=states,
)
# Send presence states of newly joined users to all remote servers in
# the room
if newly_joined_local_users and (
prev_remote_hosts or newly_joined_remote_hosts
):
local_states = await self.current_state_for_users(newly_joined_local_users)
self._federation_queue.send_presence_to_destinations(
destinations=prev_remote_hosts | newly_joined_remote_hosts,
states=list(local_states.values()),
)
def should_notify(old_state, new_state):
def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> bool:
"""Decides if a presence state change should be sent to interested parties."""
if old_state == new_state:
return False
@ -1347,7 +1361,9 @@ def should_notify(old_state, new_state):
return False
def format_user_presence_state(state, now, include_user_id=True):
def format_user_presence_state(
state: UserPresenceState, now: int, include_user_id: bool = True
) -> JsonDict:
"""Convert UserPresenceState to a format that can be sent down to clients
and to other servers.
@ -1385,11 +1401,11 @@ class PresenceEventSource:
@log_function
async def get_new_events(
self,
user,
from_key,
room_ids=None,
include_offline=True,
explicit_room_id=None,
user: UserID,
from_key: Optional[int],
room_ids: Optional[List[str]] = None,
include_offline: bool = True,
explicit_room_id: Optional[str] = None,
**kwargs,
) -> Tuple[List[UserPresenceState], int]:
# The process for getting presence events are:
@ -1594,7 +1610,7 @@ class PresenceEventSource:
if update.state != PresenceState.OFFLINE
]
def get_current_key(self):
def get_current_key(self) -> int:
return self.store.get_current_presence_token()
@cached(num_args=2, cache_context=True)
@ -1654,15 +1670,20 @@ class PresenceEventSource:
return users_interested_in
def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
def handle_timeouts(
user_states: List[UserPresenceState],
is_mine_fn: Callable[[str], bool],
syncing_user_ids: Set[str],
now: int,
) -> List[UserPresenceState]:
"""Checks the presence of users that have timed out and updates as
appropriate.
Args:
user_states(list): List of UserPresenceState's to check.
is_mine_fn (fn): Function that returns if a user_id is ours
syncing_user_ids (set): Set of user_ids with active syncs.
now (int): Current time in ms.
user_states: List of UserPresenceState's to check.
is_mine_fn: Function that returns if a user_id is ours
syncing_user_ids: Set of user_ids with active syncs.
now: Current time in ms.
Returns:
List of UserPresenceState updates
@ -1679,14 +1700,16 @@ def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
return list(changes.values())
def handle_timeout(state, is_mine, syncing_user_ids, now):
def handle_timeout(
state: UserPresenceState, is_mine: bool, syncing_user_ids: Set[str], now: int
) -> Optional[UserPresenceState]:
"""Checks the presence of the user to see if any of the timers have elapsed
Args:
state (UserPresenceState)
is_mine (bool): Whether the user is ours
syncing_user_ids (set): Set of user_ids with active syncs.
now (int): Current time in ms.
state
is_mine: Whether the user is ours
syncing_user_ids: Set of user_ids with active syncs.
now: Current time in ms.
Returns:
A UserPresenceState update or None if no update.
@ -1738,23 +1761,29 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
return state if changed else None
def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
def handle_update(
prev_state: UserPresenceState,
new_state: UserPresenceState,
is_mine: bool,
wheel_timer: WheelTimer,
now: int,
) -> Tuple[UserPresenceState, bool, bool]:
"""Given a presence update:
1. Add any appropriate timers.
2. Check if we should notify anyone.
Args:
prev_state (UserPresenceState)
new_state (UserPresenceState)
is_mine (bool): Whether the user is ours
wheel_timer (WheelTimer)
now (int): Time now in ms
prev_state
new_state
is_mine: Whether the user is ours
wheel_timer
now: Time now in ms
Returns:
3-tuple: `(new_state, persist_and_notify, federation_ping)` where:
- new_state: is the state to actually persist
- persist_and_notify (bool): whether to persist and notify people
- federation_ping (bool): whether we should send a ping over federation
- persist_and_notify: whether to persist and notify people
- federation_ping: whether we should send a ping over federation
"""
user_id = new_state.user_id

View file

@ -729,9 +729,7 @@ class RegistrationHandler(BaseHandler):
)
if is_guest:
assert valid_until_ms is None
access_token = self.macaroon_gen.generate_access_token(
user_id, ["guest = true"]
)
access_token = self.macaroon_gen.generate_guest_access_token(user_id)
else:
access_token = await self._auth_handler.get_access_token_for_user_id(
user_id,

View file

@ -32,7 +32,14 @@ from synapse.api.constants import (
RoomCreationPreset,
RoomEncryptionAlgorithms,
)
from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
from synapse.api.errors import (
AuthError,
Codes,
LimitExceededError,
NotFoundError,
StoreError,
SynapseError,
)
from synapse.api.filtering import Filter
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.events import EventBase
@ -126,10 +133,6 @@ class RoomCreationHandler(BaseHandler):
self.third_party_event_rules = hs.get_third_party_event_rules()
self._invite_burst_count = (
hs.config.ratelimiting.rc_invites_per_room.burst_count
)
async def upgrade_room(
self, requester: Requester, old_room_id: str, new_version: RoomVersion
) -> str:
@ -676,7 +679,17 @@ class RoomCreationHandler(BaseHandler):
invite_3pid_list = []
invite_list = []
if len(invite_list) + len(invite_3pid_list) > self._invite_burst_count:
if invite_list or invite_3pid_list:
try:
# If there are invites in the request, see if the ratelimiting settings
# allow that number of invites to be sent from the current user.
await self.room_member_handler.ratelimit_multiple_invites(
requester,
room_id=None,
n_invites=len(invite_list) + len(invite_3pid_list),
update=False,
)
except LimitExceededError:
raise SynapseError(400, "Cannot invite so many users at once")
await self.event_creation_handler.assert_accepted_privacy_policy(requester)
@ -1337,7 +1350,7 @@ class RoomShutdownHandler:
new_room_id = None
logger.info("Shutting down room %r", room_id)
users = await self.state.get_current_users_in_room(room_id)
users = await self.store.get_users_in_room(room_id)
kicked_users = []
failed_to_kick_users = []
for user_id in users:

View file

@ -163,6 +163,31 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
async def forget(self, user: UserID, room_id: str) -> None:
raise NotImplementedError()
async def ratelimit_multiple_invites(
self,
requester: Optional[Requester],
room_id: Optional[str],
n_invites: int,
update: bool = True,
):
"""Ratelimit more than one invite sent by the given requester in the given room.
Args:
requester: The requester sending the invites.
room_id: The room the invites are being sent in.
n_invites: The amount of invites to ratelimit for.
update: Whether to update the ratelimiter's cache.
Raises:
LimitExceededError: The requester can't send that many invites in the room.
"""
await self._invites_per_room_limiter.ratelimit(
requester,
room_id,
update=update,
n_actions=n_invites,
)
async def ratelimit_invite(
self,
requester: Optional[Requester],
@ -1044,7 +1069,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
class RoomMemberMasterHandler(RoomMemberHandler):
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.distributor = hs.get_distributor()

View file

@ -14,6 +14,7 @@
import itertools
import logging
import re
from collections import deque
from typing import TYPE_CHECKING, Iterable, List, Optional, Sequence, Set, Tuple, cast
@ -226,6 +227,23 @@ class SpaceSummaryHandler:
suggested_only: bool,
max_children: Optional[int],
) -> Tuple[Sequence[JsonDict], Sequence[JsonDict]]:
"""
Generate a room entry and a list of event entries for a given room.
Args:
requester: The requesting user, or None if this is over federation.
room_id: The room ID to summarize.
suggested_only: True if only suggested children should be returned.
Otherwise, all children are returned.
max_children: The maximum number of children to return for this node.
Returns:
A tuple of:
An iterable of a single value of the room.
An iterable of the sorted children events. This may be limited
to a maximum size or may include all children.
"""
if not await self._is_room_accessible(room_id, requester):
return (), ()
@ -288,6 +306,7 @@ class SpaceSummaryHandler:
ev.data
for ev in res.events
if ev.event_type == EventTypes.MSC1772_SPACE_CHILD
or ev.event_type == EventTypes.SpaceChild
)
async def _is_room_accessible(self, room_id: str, requester: Optional[str]) -> bool:
@ -331,6 +350,8 @@ class SpaceSummaryHandler:
)
# TODO: update once MSC1772 lands
room_type = create_event.content.get(EventContentFields.ROOM_TYPE)
if not room_type:
room_type = create_event.content.get(EventContentFields.MSC1772_ROOM_TYPE)
entry = {
@ -344,6 +365,7 @@ class SpaceSummaryHandler:
stats["history_visibility"] == HistoryVisibility.WORLD_READABLE
),
"guest_can_join": stats["guest_access"] == "can_join",
"creation_ts": create_event.origin_server_ts,
"room_type": room_type,
}
@ -353,6 +375,18 @@ class SpaceSummaryHandler:
return room_entry
async def _get_child_events(self, room_id: str) -> Iterable[EventBase]:
"""
Get the child events for a given room.
The returned results are sorted for stability.
Args:
room_id: The room id to get the children of.
Returns:
An iterable of sorted child events.
"""
# look for child rooms/spaces.
current_state_ids = await self._store.get_current_state_ids(room_id)
@ -360,13 +394,15 @@ class SpaceSummaryHandler:
[
event_id
for key, event_id in current_state_ids.items()
# TODO: update once MSC1772 lands
# TODO: update once MSC1772 has been FCP for a period of time.
if key[0] == EventTypes.MSC1772_SPACE_CHILD
or key[0] == EventTypes.SpaceChild
]
)
# filter out any events without a "via" (which implies it has been redacted)
return (e for e in events if _has_valid_via(e))
# filter out any events without a "via" (which implies it has been redacted),
# and order to ensure we return stable results.
return sorted(filter(_has_valid_via, events), key=_child_events_comparison_key)
@attr.s(frozen=True, slots=True)
@ -392,3 +428,39 @@ def _is_suggested_child_event(edge_event: EventBase) -> bool:
return True
logger.debug("Ignorning not-suggested child %s", edge_event.state_key)
return False
# Order may only contain characters in the range of \x20 (space) to \x7F (~).
_INVALID_ORDER_CHARS_RE = re.compile(r"[^\x20-\x7F]")
def _child_events_comparison_key(child: EventBase) -> Tuple[bool, Optional[str], str]:
"""
Generate a value for comparing two child events for ordering.
The rules for ordering are supposed to be:
1. The 'order' key, if it is valid.
2. The 'origin_server_ts' of the 'm.room.create' event.
3. The 'room_id'.
But we skip step 2 since we may not have any state from the room.
Args:
child: The event for generating a comparison key.
Returns:
The comparison key as a tuple of:
False if the ordering is valid.
The ordering field.
The room ID.
"""
order = child.content.get("order")
# If order is not a string or doesn't meet the requirements, ignore it.
if not isinstance(order, str):
order = None
elif len(order) > 50 or _INVALID_ORDER_CHARS_RE.search(order):
order = None
# Items without an order come last.
return (order is None, order, child.room_id)

View file

@ -1189,7 +1189,7 @@ class SyncHandler:
# Step 1b, check for newly joined rooms
for room_id in newly_joined_rooms:
joined_users = await self.state.get_current_users_in_room(room_id)
joined_users = await self.store.get_users_in_room(room_id)
newly_joined_or_invited_users.update(joined_users)
# TODO: Check that these users are actually new, i.e. either they
@ -1205,7 +1205,7 @@ class SyncHandler:
# Now find users that we no longer track
for room_id in newly_left_rooms:
left_users = await self.state.get_current_users_in_room(room_id)
left_users = await self.store.get_users_in_room(room_id)
newly_left_users.update(left_users)
# Remove any users that we still share a room with.
@ -1360,7 +1360,7 @@ class SyncHandler:
extra_users_ids = set(newly_joined_or_invited_users)
for room_id in newly_joined_rooms:
users = await self.state.get_current_users_in_room(room_id)
users = await self.store.get_users_in_room(room_id)
extra_users_ids.update(users)
extra_users_ids.discard(user.to_string())

View file

@ -13,7 +13,7 @@
# limitations under the License.
import logging
from typing import Any
from typing import TYPE_CHECKING, Any
from twisted.web.client import PartialDownloadError
@ -22,13 +22,16 @@ from synapse.api.errors import Codes, LoginError, SynapseError
from synapse.config.emailconfig import ThreepidBehaviour
from synapse.util import json_decoder
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class UserInteractiveAuthChecker:
"""Abstract base class for an interactive auth checker"""
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
pass
def is_enabled(self) -> bool:
@ -57,10 +60,10 @@ class UserInteractiveAuthChecker:
class DummyAuthChecker(UserInteractiveAuthChecker):
AUTH_TYPE = LoginType.DUMMY
def is_enabled(self):
def is_enabled(self) -> bool:
return True
async def check_auth(self, authdict, clientip):
async def check_auth(self, authdict: dict, clientip: str) -> Any:
return True
@ -70,24 +73,24 @@ class TermsAuthChecker(UserInteractiveAuthChecker):
def is_enabled(self):
return True
async def check_auth(self, authdict, clientip):
async def check_auth(self, authdict: dict, clientip: str) -> Any:
return True
class RecaptchaAuthChecker(UserInteractiveAuthChecker):
AUTH_TYPE = LoginType.RECAPTCHA
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self._enabled = bool(hs.config.recaptcha_private_key)
self._http_client = hs.get_proxied_http_client()
self._url = hs.config.recaptcha_siteverify_api
self._secret = hs.config.recaptcha_private_key
def is_enabled(self):
def is_enabled(self) -> bool:
return self._enabled
async def check_auth(self, authdict, clientip):
async def check_auth(self, authdict: dict, clientip: str) -> Any:
try:
user_response = authdict["response"]
except KeyError:
@ -132,11 +135,11 @@ class RecaptchaAuthChecker(UserInteractiveAuthChecker):
class _BaseThreepidAuthChecker:
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.store = hs.get_datastore()
async def _check_threepid(self, medium, authdict):
async def _check_threepid(self, medium: str, authdict: dict) -> dict:
if "threepid_creds" not in authdict:
raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
@ -206,31 +209,31 @@ class _BaseThreepidAuthChecker:
class EmailIdentityAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker):
AUTH_TYPE = LoginType.EMAIL_IDENTITY
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
UserInteractiveAuthChecker.__init__(self, hs)
_BaseThreepidAuthChecker.__init__(self, hs)
def is_enabled(self):
def is_enabled(self) -> bool:
return self.hs.config.threepid_behaviour_email in (
ThreepidBehaviour.REMOTE,
ThreepidBehaviour.LOCAL,
)
async def check_auth(self, authdict, clientip):
async def check_auth(self, authdict: dict, clientip: str) -> Any:
return await self._check_threepid("email", authdict)
class MsisdnAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker):
AUTH_TYPE = LoginType.MSISDN
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
UserInteractiveAuthChecker.__init__(self, hs)
_BaseThreepidAuthChecker.__init__(self, hs)
def is_enabled(self):
def is_enabled(self) -> bool:
return bool(self.hs.config.account_threepid_delegate_msisdn)
async def check_auth(self, authdict, clientip):
async def check_auth(self, authdict: dict, clientip: str) -> Any:
return await self._check_threepid("msisdn", authdict)

View file

@ -12,8 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# These are imported to allow for nicer logging configuration files.
import logging
from synapse.logging._remote import RemoteHandler
from synapse.logging._terse_json import JsonFormatter, TerseJsonFormatter
# These are imported to allow for nicer logging configuration files.
__all__ = ["RemoteHandler", "JsonFormatter", "TerseJsonFormatter"]
# Debug logger for https://github.com/matrix-org/synapse/issues/9533 etc
issue9533_logger = logging.getLogger("synapse.9533_debug")

View file

@ -535,6 +535,13 @@ class ReactorLastSeenMetric:
REGISTRY.register(ReactorLastSeenMetric())
# The minimum time in seconds between GCs for each generation, regardless of the current GC
# thresholds and counts.
MIN_TIME_BETWEEN_GCS = (1.0, 10.0, 30.0)
# The time (in seconds since the epoch) of the last time we did a GC for each generation.
_last_gc = [0.0, 0.0, 0.0]
def runUntilCurrentTimer(reactor, func):
@functools.wraps(func)
@ -575,11 +582,16 @@ def runUntilCurrentTimer(reactor, func):
return ret
# Check if we need to do a manual GC (since its been disabled), and do
# one if necessary.
# one if necessary. Note we go in reverse order as e.g. a gen 1 GC may
# promote an object into gen 2, and we don't want to handle the same
# object multiple times.
threshold = gc.get_threshold()
counts = gc.get_count()
for i in (2, 1, 0):
if threshold[i] < counts[i]:
# We check if we need to do one based on a straightforward
# comparison between the threshold and count. We also do an extra
# check to make sure that we don't a GC too often.
if threshold[i] < counts[i] and MIN_TIME_BETWEEN_GCS[i] < end - _last_gc[i]:
if i == 0:
logger.debug("Collecting gc %d", i)
else:
@ -589,6 +601,8 @@ def runUntilCurrentTimer(reactor, func):
unreachable = gc.collect(i)
end = time.time()
_last_gc[i] = end
gc_time.labels(i).observe(end - start)
gc_unreachable.labels(i).set(unreachable)
@ -615,6 +629,7 @@ try:
except AttributeError:
pass
__all__ = [
"MetricsResource",
"generate_latest",

196
synapse/metrics/jemalloc.py Normal file
View file

@ -0,0 +1,196 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ctypes
import logging
import os
import re
from typing import Optional
from synapse.metrics import REGISTRY, GaugeMetricFamily
logger = logging.getLogger(__name__)
def _setup_jemalloc_stats():
"""Checks to see if jemalloc is loaded, and hooks up a collector to record
statistics exposed by jemalloc.
"""
# Try to find the loaded jemalloc shared library, if any. We need to
# introspect into what is loaded, rather than loading whatever is on the
# path, as if we load a *different* jemalloc version things will seg fault.
# We look in `/proc/self/maps`, which only exists on linux.
if not os.path.exists("/proc/self/maps"):
logger.debug("Not looking for jemalloc as no /proc/self/maps exist")
return
# We're looking for a path at the end of the line that includes
# "libjemalloc".
regex = re.compile(r"/\S+/libjemalloc.*$")
jemalloc_path = None
with open("/proc/self/maps") as f:
for line in f:
match = regex.search(line.strip())
if match:
jemalloc_path = match.group()
if not jemalloc_path:
# No loaded jemalloc was found.
logger.debug("jemalloc not found")
return
logger.debug("Found jemalloc at %s", jemalloc_path)
jemalloc = ctypes.CDLL(jemalloc_path)
def _mallctl(
name: str, read: bool = True, write: Optional[int] = None
) -> Optional[int]:
"""Wrapper around `mallctl` for reading and writing integers to
jemalloc.
Args:
name: The name of the option to read from/write to.
read: Whether to try and read the value.
write: The value to write, if given.
Returns:
The value read if `read` is True, otherwise None.
Raises:
An exception if `mallctl` returns a non-zero error code.
"""
input_var = None
input_var_ref = None
input_len_ref = None
if read:
input_var = ctypes.c_size_t(0)
input_len = ctypes.c_size_t(ctypes.sizeof(input_var))
input_var_ref = ctypes.byref(input_var)
input_len_ref = ctypes.byref(input_len)
write_var_ref = None
write_len = ctypes.c_size_t(0)
if write is not None:
write_var = ctypes.c_size_t(write)
write_len = ctypes.c_size_t(ctypes.sizeof(write_var))
write_var_ref = ctypes.byref(write_var)
# The interface is:
#
# int mallctl(
# const char *name,
# void *oldp,
# size_t *oldlenp,
# void *newp,
# size_t newlen
# )
#
# Where oldp/oldlenp is a buffer where the old value will be written to
# (if not null), and newp/newlen is the buffer with the new value to set
# (if not null). Note that they're all references *except* newlen.
result = jemalloc.mallctl(
name.encode("ascii"),
input_var_ref,
input_len_ref,
write_var_ref,
write_len,
)
if result != 0:
raise Exception("Failed to call mallctl")
if input_var is None:
return None
return input_var.value
def _jemalloc_refresh_stats() -> None:
"""Request that jemalloc updates its internal statistics. This needs to
be called before querying for stats, otherwise it will return stale
values.
"""
try:
_mallctl("epoch", read=False, write=1)
except Exception as e:
logger.warning("Failed to reload jemalloc stats: %s", e)
class JemallocCollector:
"""Metrics for internal jemalloc stats."""
def collect(self):
_jemalloc_refresh_stats()
g = GaugeMetricFamily(
"jemalloc_stats_app_memory_bytes",
"The stats reported by jemalloc",
labels=["type"],
)
# Read the relevant global stats from jemalloc. Note that these may
# not be accurate if python is configured to use its internal small
# object allocator (which is on by default, disable by setting the
# env `PYTHONMALLOC=malloc`).
#
# See the jemalloc manpage for details about what each value means,
# roughly:
# - allocated ─ Total number of bytes allocated by the app
# - active ─ Total number of bytes in active pages allocated by
# the application, this is bigger than `allocated`.
# - resident ─ Maximum number of bytes in physically resident data
# pages mapped by the allocator, comprising all pages dedicated
# to allocator metadata, pages backing active allocations, and
# unused dirty pages. This is bigger than `active`.
# - mapped ─ Total number of bytes in active extents mapped by the
# allocator.
# - metadata ─ Total number of bytes dedicated to jemalloc
# metadata.
for t in (
"allocated",
"active",
"resident",
"mapped",
"metadata",
):
try:
value = _mallctl(f"stats.{t}")
except Exception as e:
# There was an error fetching the value, skip.
logger.warning("Failed to read jemalloc stats.%s: %s", t, e)
continue
g.add_metric([t], value=value)
yield g
REGISTRY.register(JemallocCollector())
logger.debug("Added jemalloc stats")
def setup_jemalloc_stats():
"""Try to setup jemalloc stats, if jemalloc is loaded."""
try:
_setup_jemalloc_stats()
except Exception as e:
# This should only happen if we find the loaded jemalloc library, but
# fail to load it somehow (e.g. we somehow picked the wrong version).
logger.info("Failed to setup collector to record jemalloc stats: %s", e)

View file

@ -38,6 +38,7 @@ from synapse.api.constants import EventTypes, HistoryVisibility, Membership
from synapse.api.errors import AuthError
from synapse.events import EventBase
from synapse.handlers.presence import format_user_presence_state
from synapse.logging import issue9533_logger
from synapse.logging.context import PreserveLoggingContext
from synapse.logging.opentracing import log_kv, start_active_span
from synapse.logging.utils import log_function
@ -426,6 +427,13 @@ class Notifier:
for room in rooms:
user_streams |= self.room_to_user_streams.get(room, set())
if stream_key == "to_device_key":
issue9533_logger.debug(
"to-device messages stream id %s, awaking streams for %s",
new_token,
users,
)
time_now_ms = self.clock.time_msec()
for user_stream in user_streams:
try:

View file

@ -19,6 +19,7 @@ from typing import Any, Dict, List, Optional, Pattern, Tuple, Union
from synapse.events import EventBase
from synapse.types import UserID
from synapse.util import glob_to_regex, re_word_boundary
from synapse.util.caches.lrucache import LruCache
logger = logging.getLogger(__name__)
@ -183,7 +184,7 @@ class PushRuleEvaluatorForEvent:
r = regex_cache.get((display_name, False, True), None)
if not r:
r1 = re.escape(display_name)
r1 = _re_word_boundary(r1)
r1 = re_word_boundary(r1)
r = re.compile(r1, flags=re.IGNORECASE)
regex_cache[(display_name, False, True)] = r
@ -212,7 +213,7 @@ def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool:
try:
r = regex_cache.get((glob, True, word_boundary), None)
if not r:
r = _glob_to_re(glob, word_boundary)
r = glob_to_regex(glob, word_boundary)
regex_cache[(glob, True, word_boundary)] = r
return bool(r.search(value))
except re.error:
@ -220,56 +221,6 @@ def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool:
return False
def _glob_to_re(glob: str, word_boundary: bool) -> Pattern:
"""Generates regex for a given glob.
Args:
glob
word_boundary: Whether to match against word boundaries or entire string.
"""
if IS_GLOB.search(glob):
r = re.escape(glob)
r = r.replace(r"\*", ".*?")
r = r.replace(r"\?", ".")
# handle [abc], [a-z] and [!a-z] style ranges.
r = GLOB_REGEX.sub(
lambda x: (
"[%s%s]" % (x.group(1) and "^" or "", x.group(2).replace(r"\\\-", "-"))
),
r,
)
if word_boundary:
r = _re_word_boundary(r)
return re.compile(r, flags=re.IGNORECASE)
else:
r = "^" + r + "$"
return re.compile(r, flags=re.IGNORECASE)
elif word_boundary:
r = re.escape(glob)
r = _re_word_boundary(r)
return re.compile(r, flags=re.IGNORECASE)
else:
r = "^" + re.escape(glob) + "$"
return re.compile(r, flags=re.IGNORECASE)
def _re_word_boundary(r: str) -> str:
"""
Adds word boundary characters to the start and end of an
expression to require that the match occur as a whole word,
but do so respecting the fact that strings starting or ending
with non-word characters will change word boundaries.
"""
# we can't use \b as it chokes on unicode. however \W seems to be okay
# as shorthand for [^0-9A-Za-z_].
return r"(^|\W)%s(\W|$)" % (r,)
def _flatten_dict(
d: Union[EventBase, dict],
prefix: Optional[List[str]] = None,

View file

@ -78,7 +78,8 @@ REQUIREMENTS = [
# we use attr.validators.deep_iterable, which arrived in 19.1.0 (Note:
# Fedora 31 only has 19.1, so if we want to upgrade we should wait until 33
# is out in November.)
"attrs>=19.1.0",
# Note: 21.1.0 broke `/sync`, see #9936
"attrs>=19.1.0,!=21.1.0",
"netaddr>=0.7.18",
"Jinja2>=2.9",
"bleach>=1.4.3",
@ -116,6 +117,8 @@ CONDITIONAL_REQUIREMENTS = {
# hiredis is not a *strict* dependency, but it makes things much faster.
# (if it is not installed, we fall back to slow code.)
"redis": ["txredisapi>=1.4.7", "hiredis"],
# Required to use experimental `caches.track_memory_usage` config option.
"cache_memory": ["pympler"],
}
ALL_OPTIONAL_REQUIREMENTS = set() # type: Set[str]

View file

@ -51,7 +51,6 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# How long we allow callers to wait for replication updates before timing out.
_WAIT_FOR_REPLICATION_TIMEOUT_SECONDS = 30

View file

@ -15,7 +15,7 @@
import logging
from typing import TYPE_CHECKING, Any, Optional
from prometheus_client import Counter
from prometheus_client import Counter, Histogram
from synapse.logging.context import make_deferred_yieldable
from synapse.util import json_decoder, json_encoder
@ -35,6 +35,20 @@ get_counter = Counter(
labelnames=["cache_name", "hit"],
)
response_timer = Histogram(
"synapse_external_cache_response_time_seconds",
"Time taken to get a response from Redis for a cache get/set request",
labelnames=["method"],
buckets=(
0.001,
0.002,
0.005,
0.01,
0.02,
0.05,
),
)
logger = logging.getLogger(__name__)
@ -72,6 +86,7 @@ class ExternalCache:
logger.debug("Caching %s %s: %r", cache_name, key, encoded_value)
with response_timer.labels("set").time():
return await make_deferred_yieldable(
self._redis_connection.set(
self._get_redis_key(cache_name, key),
@ -86,6 +101,7 @@ class ExternalCache:
if self._redis_connection is None:
return None
with response_timer.labels("get").time():
result = await make_deferred_yieldable(
self._redis_connection.get(self._get_redis_key(cache_name, key))
)

View file

@ -37,9 +37,11 @@ from synapse.types import JsonDict, RoomAlias, RoomID, UserID, create_requester
from synapse.util import json_decoder
if TYPE_CHECKING:
from synapse.api.auth import Auth
from synapse.handlers.pagination import PaginationHandler
from synapse.handlers.room import RoomShutdownHandler
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@ -146,50 +148,14 @@ class DeleteRoomRestServlet(RestServlet):
async def on_POST(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
await assert_user_is_admin(self.auth, requester.user)
content = parse_json_object_from_request(request)
block = content.get("block", False)
if not isinstance(block, bool):
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Param 'block' must be a boolean, if given",
Codes.BAD_JSON,
return await _delete_room(
request,
room_id,
self.auth,
self.room_shutdown_handler,
self.pagination_handler,
)
purge = content.get("purge", True)
if not isinstance(purge, bool):
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Param 'purge' must be a boolean, if given",
Codes.BAD_JSON,
)
force_purge = content.get("force_purge", False)
if not isinstance(force_purge, bool):
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Param 'force_purge' must be a boolean, if given",
Codes.BAD_JSON,
)
ret = await self.room_shutdown_handler.shutdown_room(
room_id=room_id,
new_room_user_id=content.get("new_room_user_id"),
new_room_name=content.get("room_name"),
message=content.get("message"),
requester_user_id=requester.user.to_string(),
block=block,
)
# Purge room
if purge:
await self.pagination_handler.purge_room(room_id, force=force_purge)
return (200, ret)
class ListRoomRestServlet(RestServlet):
"""
@ -282,7 +248,22 @@ class ListRoomRestServlet(RestServlet):
class RoomRestServlet(RestServlet):
"""Get room details.
"""Manage a room.
On GET : Get details of a room.
On DELETE : Delete a room from server.
It is a combination and improvement of shutdown and purge room.
Shuts down a room by removing all local users from the room.
Blocking all future invites and joins to the room is optional.
If desired any local aliases will be repointed to a new room
created by `new_room_user_id` and kicked users will be auto-
joined to the new room.
If 'purge' is true, it will remove all traces of a room from the database.
TODO: Add on_POST to allow room creation without joining the room
"""
@ -293,6 +274,8 @@ class RoomRestServlet(RestServlet):
self.hs = hs
self.auth = hs.get_auth()
self.store = hs.get_datastore()
self.room_shutdown_handler = hs.get_room_shutdown_handler()
self.pagination_handler = hs.get_pagination_handler()
async def on_GET(
self, request: SynapseRequest, room_id: str
@ -308,6 +291,17 @@ class RoomRestServlet(RestServlet):
return (200, ret)
async def on_DELETE(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
return await _delete_room(
request,
room_id,
self.auth,
self.room_shutdown_handler,
self.pagination_handler,
)
class RoomMembersRestServlet(RestServlet):
"""
@ -694,3 +688,55 @@ class RoomEventContextServlet(RestServlet):
)
return 200, results
async def _delete_room(
request: SynapseRequest,
room_id: str,
auth: "Auth",
room_shutdown_handler: "RoomShutdownHandler",
pagination_handler: "PaginationHandler",
) -> Tuple[int, JsonDict]:
requester = await auth.get_user_by_req(request)
await assert_user_is_admin(auth, requester.user)
content = parse_json_object_from_request(request)
block = content.get("block", False)
if not isinstance(block, bool):
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Param 'block' must be a boolean, if given",
Codes.BAD_JSON,
)
purge = content.get("purge", True)
if not isinstance(purge, bool):
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Param 'purge' must be a boolean, if given",
Codes.BAD_JSON,
)
force_purge = content.get("force_purge", False)
if not isinstance(force_purge, bool):
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Param 'force_purge' must be a boolean, if given",
Codes.BAD_JSON,
)
ret = await room_shutdown_handler.shutdown_room(
room_id=room_id,
new_room_user_id=content.get("new_room_user_id"),
new_room_name=content.get("room_name"),
message=content.get("message"),
requester_user_id=requester.user.to_string(),
block=block,
)
# Purge room
if purge:
await pagination_handler.purge_room(room_id, force=force_purge)
return (200, ret)

View file

@ -1023,6 +1023,7 @@ class RoomSpaceSummaryRestServlet(RestServlet):
max_rooms_per_space=parse_integer(request, "max_rooms_per_space"),
)
# TODO When switching to the stable endpoint, remove the POST handler.
async def on_POST(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:

View file

@ -213,19 +213,23 @@ class StateHandler:
return ret.state
async def get_current_users_in_room(
self, room_id: str, latest_event_ids: Optional[List[str]] = None
self, room_id: str, latest_event_ids: List[str]
) -> Dict[str, ProfileInfo]:
"""
Get the users who are currently in a room.
Note: This is much slower than using the equivalent method
`DataStore.get_users_in_room` or `DataStore.get_users_in_room_with_profiles`,
so this should only be used when wanting the users at a particular point
in the room.
Args:
room_id: The ID of the room.
latest_event_ids: Precomputed list of latest event IDs. Will be computed if None.
Returns:
Dictionary of user IDs to their profileinfo.
"""
if not latest_event_ids:
latest_event_ids = await self.store.get_latest_event_ids_in_room(room_id)
assert latest_event_ids is not None
logger.debug("calling resolve_state_groups from get_current_users_in_room")

View file

@ -69,6 +69,7 @@ class SQLBaseStore(metaclass=ABCMeta):
self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,))
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
self._attempt_to_invalidate_cache("get_current_state_ids", (room_id,))

View file

@ -715,7 +715,9 @@ class DatabasePool:
# pool).
assert not self.engine.in_transaction(conn)
with LoggingContext("runWithConnection", parent_context) as context:
with LoggingContext(
str(curr_context), parent_context=parent_context
) as context:
sched_duration_sec = monotonic_time() - start_time
sql_scheduling_timer.observe(sched_duration_sec)
context.add_database_scheduled(sched_duration_sec)

View file

@ -15,6 +15,7 @@
import logging
from typing import List, Optional, Tuple
from synapse.logging import issue9533_logger
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.replication.tcp.streams import ToDeviceStream
from synapse.storage._base import SQLBaseStore, db_to_json
@ -404,6 +405,13 @@ class DeviceInboxWorkerStore(SQLBaseStore):
],
)
if remote_messages_by_destination:
issue9533_logger.debug(
"Queued outgoing to-device messages with stream_id %i for %s",
stream_id,
list(remote_messages_by_destination.keys()),
)
async with self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self.clock.time_msec()
await self.db_pool.runInteraction(
@ -533,6 +541,16 @@ class DeviceInboxWorkerStore(SQLBaseStore):
],
)
issue9533_logger.debug(
"Stored to-device messages with stream_id %i for %s",
stream_id,
[
(user_id, device_id)
for (user_id, messages_by_device) in local_by_user_then_device.items()
for device_id in messages_by_device.keys()
],
)
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"

View file

@ -84,6 +84,8 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore):
if keys:
result["keys"] = keys
device_display_name = None
if self.hs.config.allow_device_name_lookup_over_federation:
device_display_name = device.display_name
if device_display_name:
result["device_display_name"] = device_display_name

View file

@ -205,8 +205,12 @@ class RoomMemberWorkerStore(EventsWorkerStore):
def _get_users_in_room_with_profiles(txn) -> Dict[str, ProfileInfo]:
sql = """
SELECT user_id, display_name, avatar_url FROM room_memberships
WHERE room_id = ? AND membership = ?
SELECT state_key, display_name, avatar_url FROM room_memberships as m
INNER JOIN current_state_events as c
ON m.event_id = c.event_id
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
"""
txn.execute(sql, (room_id, Membership.JOIN))

View file

@ -1,21 +0,0 @@
# Synapse Database Schemas
These schemas are used as a basis to create brand new Synapse databases, on both
SQLite3 and Postgres.
## Building full schema dumps
If you want to recreate these schemas, they need to be made from a database that
has had all background updates run.
To do so, use `scripts-dev/make_full_schema.sh`. This will produce new
`full.sql.postgres ` and `full.sql.sqlite` files.
Ensure postgres is installed and your user has the ability to run bash commands
such as `createdb`, then call
./scripts-dev/make_full_schema.sh -p postgres_username -o output_dir/
There are currently two folders with full-schema snapshots. `16` is a snapshot
from 2015, for historical reference. The other contains the most recent full
schema snapshot.

View file

@ -142,8 +142,6 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
batch_size (int): Maximum number of state events to process
per cycle.
"""
state = self.hs.get_state_handler()
# If we don't have progress filed, delete everything.
if not progress:
await self.delete_all_from_user_dir()
@ -197,7 +195,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
room_id
)
users_with_profile = await state.get_current_users_in_room(room_id)
users_with_profile = await self.get_users_in_room_with_profiles(room_id)
user_ids = set(users_with_profile)
# Update each user in the user directory.

View file

@ -26,16 +26,13 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.storage.database import LoggingDatabaseConnection
from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.engines.postgres import PostgresEngine
from synapse.storage.schema import SCHEMA_VERSION
from synapse.storage.types import Cursor
logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 59
dir_path = os.path.abspath(os.path.dirname(__file__))
schema_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), "schema")
class PrepareDatabaseException(Exception):
@ -168,6 +165,13 @@ def _setup_new_database(
Example directory structure:
schema/
common/
delta/
...
full_schemas/
11/
foo.sql
main/
delta/
...
full_schemas/
@ -175,15 +179,14 @@ def _setup_new_database(
test.sql
...
11/
foo.sql
bar.sql
...
In the example foo.sql and bar.sql would be run, and then any delta files
for versions strictly greater than 11.
Note: we apply the full schemas and deltas from the top level `schema/`
folder as well those in the data stores specified.
Note: we apply the full schemas and deltas from the `schema/common`
folder as well those in the databases specified.
Args:
cur: a database cursor
@ -195,12 +198,12 @@ def _setup_new_database(
# configured to our liking.
database_engine.check_new_database(cur)
current_dir = os.path.join(dir_path, "schema", "full_schemas")
full_schemas_dir = os.path.join(schema_path, "common", "full_schemas")
# First we find the highest full schema version we have
valid_versions = []
for filename in os.listdir(current_dir):
for filename in os.listdir(full_schemas_dir):
try:
ver = int(filename)
except ValueError:
@ -218,15 +221,13 @@ def _setup_new_database(
logger.debug("Initialising schema v%d", max_current_ver)
# Now lets find all the full schema files, both in the global schema and
# in data store schemas.
directories = [os.path.join(current_dir, str(max_current_ver))]
# Now let's find all the full schema files, both in the common schema and
# in database schemas.
directories = [os.path.join(full_schemas_dir, str(max_current_ver))]
directories.extend(
os.path.join(
dir_path,
"databases",
schema_path,
database,
"schema",
"full_schemas",
str(max_current_ver),
)
@ -357,6 +358,9 @@ def _upgrade_existing_database(
check_database_before_upgrade(cur, database_engine, config)
start_ver = current_version
# if we got to this schema version by running a full_schema rather than a series
# of deltas, we should not run the deltas for this version.
if not upgraded:
start_ver += 1
@ -385,12 +389,10 @@ def _upgrade_existing_database(
# directories for schema updates.
# First we find the directories to search in
delta_dir = os.path.join(dir_path, "schema", "delta", str(v))
delta_dir = os.path.join(schema_path, "common", "delta", str(v))
directories = [delta_dir]
for database in databases:
directories.append(
os.path.join(dir_path, "databases", database, "schema", "delta", str(v))
)
directories.append(os.path.join(schema_path, database, "delta", str(v)))
# Used to check if we have any duplicate file names
file_name_counter = Counter() # type: CounterType[str]
@ -621,8 +623,8 @@ def _get_or_create_schema_state(
txn: Cursor, database_engine: BaseDatabaseEngine
) -> Optional[Tuple[int, List[str], bool]]:
# Bluntly try creating the schema_version tables.
schema_path = os.path.join(dir_path, "schema", "schema_version.sql")
executescript(txn, schema_path)
sql_path = os.path.join(schema_path, "common", "schema_version.sql")
executescript(txn, sql_path)
txn.execute("SELECT version, upgraded FROM schema_version")
row = txn.fetchone()

View file

@ -0,0 +1,37 @@
# Synapse Database Schemas
This directory contains the schema files used to build Synapse databases.
Synapse supports splitting its datastore across multiple physical databases (which can
be useful for large installations), and the schema files are therefore split according
to the logical database they are apply to.
At the time of writing, the following "logical" databases are supported:
* `state` - used to store Matrix room state (more specifically, `state_groups`,
their relationships and contents.)
* `main` - stores everything else.
Addionally, the `common` directory contains schema files for tables which must be
present on *all* physical databases.
## Full schema dumps
In the `full_schemas` directories, only the most recently-numbered snapshot is useful
(`54` at the time of writing). Older snapshots (eg, `16`) are present for historical
reference only.
## Building full schema dumps
If you want to recreate these schemas, they need to be made from a database that
has had all background updates run.
To do so, use `scripts-dev/make_full_schema.sh`. This will produce new
`full.sql.postgres` and `full.sql.sqlite` files.
Ensure postgres is installed, then run:
./scripts-dev/make_full_schema.sh -p postgres_username -o output_dir/
NB at the time of writing, this script predates the split into separate `state`/`main`
databases so will require updates to handle that correctly.

View file

@ -0,0 +1,17 @@
# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
SCHEMA_VERSION = 59

Some files were not shown because too many files have changed in this diff Show more