Merge remote-tracking branch 'upstream/release-v1.115'

This commit is contained in:
Tulir Asokan 2024-09-10 17:57:07 +03:00
commit a38b12f81a
178 changed files with 10584 additions and 1193 deletions

View File

@ -29,10 +29,14 @@ jobs:
with:
install-project: "false"
- name: Run ruff
- name: Run ruff check
continue-on-error: true
run: poetry run ruff check --fix .
- name: Run ruff format
continue-on-error: true
run: poetry run ruff format --quiet .
- run: cargo clippy --all-features --fix -- -D warnings
continue-on-error: true

View File

@ -131,9 +131,12 @@ jobs:
with:
install-project: "false"
- name: Check style
- name: Run ruff check
run: poetry run ruff check --output-format=github .
- name: Run ruff format
run: poetry run ruff format --check .
lint-mypy:
runs-on: ubuntu-latest
name: Typechecking

View File

@ -1,3 +1,53 @@
# Synapse 1.115.0rc1 (2024-09-10)
### Features
- Improve cross-signing upload when using [MSC3861](https://github.com/matrix-org/matrix-spec-proposals/pull/3861) to use a custom UIA flow stage, with web fallback support. ([\#17509](https://github.com/element-hq/synapse/issues/17509))
### Bugfixes
- Return `400 M_BAD_JSON` upon attempting to complete various room actions with a non-local user ID and unknown room ID, rather than an internal server error. ([\#17607](https://github.com/element-hq/synapse/issues/17607))
- Fix authenticated media responses using a wrong limit when following redirects over federation. ([\#17626](https://github.com/element-hq/synapse/issues/17626))
- Fix bug where we returned the wrong `bump_stamp` for invites in sliding sync response, causing incorrect ordering of invites in the room list. ([\#17674](https://github.com/element-hq/synapse/issues/17674))
### Improved Documentation
- Clarify that the admin api resource is only loaded on the main process and not workers. ([\#17590](https://github.com/element-hq/synapse/issues/17590))
- Fixed typo in `saml2_config` config [example](https://element-hq.github.io/synapse/latest/usage/configuration/config_documentation.html#saml2_config). ([\#17594](https://github.com/element-hq/synapse/issues/17594))
### Deprecations and Removals
- Stabilise [MSC4156](https://github.com/matrix-org/matrix-spec-proposals/pull/4156) by removing the `msc4156_enabled` config setting and defaulting it to `true`. ([\#17650](https://github.com/element-hq/synapse/issues/17650))
### Internal Changes
- Update [MSC3861](https://github.com/matrix-org/matrix-spec-proposals/pull/3861) implementation: load the issuer and account management URLs from OIDC discovery. ([\#17407](https://github.com/element-hq/synapse/issues/17407))
- Pre-populate room data used in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint for quick filtering/sorting. ([\#17512](https://github.com/element-hq/synapse/issues/17512), [\#17632](https://github.com/element-hq/synapse/issues/17632), [\#17633](https://github.com/element-hq/synapse/issues/17633), [\#17634](https://github.com/element-hq/synapse/issues/17634), [\#17635](https://github.com/element-hq/synapse/issues/17635), [\#17636](https://github.com/element-hq/synapse/issues/17636), [\#17641](https://github.com/element-hq/synapse/issues/17641), [\#17654](https://github.com/element-hq/synapse/issues/17654), [\#17673](https://github.com/element-hq/synapse/issues/17673))
- Store sliding sync per-connection state in the database. ([\#17599](https://github.com/element-hq/synapse/issues/17599), [\#17631](https://github.com/element-hq/synapse/issues/17631))
- Make the sliding sync `PerConnectionState` class immutable. ([\#17600](https://github.com/element-hq/synapse/issues/17600))
- Replace `isort` and `black` with `ruff`. ([\#17620](https://github.com/element-hq/synapse/issues/17620), [\#17643](https://github.com/element-hq/synapse/issues/17643))
- Sliding Sync: Split up `get_room_membership_for_user_at_to_token`. ([\#17629](https://github.com/element-hq/synapse/issues/17629))
- Use new database tables for sliding sync. ([\#17630](https://github.com/element-hq/synapse/issues/17630), [\#17649](https://github.com/element-hq/synapse/issues/17649))
- Prevent duplicate tags being added to Sliding Sync traces. ([\#17655](https://github.com/element-hq/synapse/issues/17655))
- Get `bump_stamp` from [new sliding sync tables](https://github.com/element-hq/synapse/pull/17512) which should be faster. ([\#17658](https://github.com/element-hq/synapse/issues/17658))
- Speed up incremental Sliding Sync requests by avoiding extra work. ([\#17665](https://github.com/element-hq/synapse/issues/17665))
- Small performance improvement in speeding up sliding sync. ([\#17666](https://github.com/element-hq/synapse/issues/17666), [\#17670](https://github.com/element-hq/synapse/issues/17670), [\#17672](https://github.com/element-hq/synapse/issues/17672))
- Speed up sliding sync by reducing number of database calls. ([\#17684](https://github.com/element-hq/synapse/issues/17684))
- Speed up sync by pulling out fewer events from the database. ([\#17688](https://github.com/element-hq/synapse/issues/17688))
### Updates to locked dependencies
* Bump authlib from 1.3.1 to 1.3.2. ([\#17679](https://github.com/element-hq/synapse/issues/17679))
* Bump idna from 3.7 to 3.8. ([\#17682](https://github.com/element-hq/synapse/issues/17682))
* Bump ruff from 0.6.2 to 0.6.4. ([\#17680](https://github.com/element-hq/synapse/issues/17680))
* Bump towncrier from 24.7.1 to 24.8.0. ([\#17645](https://github.com/element-hq/synapse/issues/17645))
* Bump twisted from 24.7.0rc1 to 24.7.0. ([\#17647](https://github.com/element-hq/synapse/issues/17647))
* Bump types-pillow from 10.2.0.20240520 to 10.2.0.20240822. ([\#17644](https://github.com/element-hq/synapse/issues/17644))
* Bump types-psycopg2 from 2.9.21.20240417 to 2.9.21.20240819. ([\#17646](https://github.com/element-hq/synapse/issues/17646))
* Bump types-setuptools from 71.1.0.20240818 to 74.1.0.20240907. ([\#17681](https://github.com/element-hq/synapse/issues/17681))
# Synapse 1.114.0 (2024-09-02)
This release enables support for

View File

@ -22,6 +22,7 @@
#
"""Starts a synapse client console."""
import argparse
import binascii
import cmd

6
debian/changelog vendored
View File

@ -1,3 +1,9 @@
matrix-synapse-py3 (1.115.0~rc1) stable; urgency=medium
* New Synapse release 1.115.0rc1.
-- Synapse Packaging team <packages@matrix.org> Tue, 10 Sep 2024 08:39:09 -0600
matrix-synapse-py3 (1.114.0) stable; urgency=medium
* New Synapse release 1.114.0.

View File

@ -509,7 +509,8 @@ Unix socket support (_Added in Synapse 1.89.0_):
Valid resource names are:
* `client`: the client-server API (/_matrix/client), and the synapse admin API (/_synapse/admin). Also implies `media` and `static`.
* `client`: the client-server API (/_matrix/client). Also implies `media` and `static`.
If configuring the main process, the Synapse Admin API (/_synapse/admin) is also implied.
* `consent`: user consent forms (/_matrix/consent). See [here](../../consent_tracking.md) for more.
@ -3302,8 +3303,8 @@ saml2_config:
contact_person:
- given_name: Bob
sur_name: "the Sysadmin"
email_address": ["admin@example.com"]
contact_type": technical
email_address: ["admin@example.com"]
contact_type: technical
saml_session_lifetime: 5m

84
poetry.lock generated
View File

@ -35,13 +35,13 @@ tests-mypy = ["mypy (>=1.11.1)", "pytest-mypy-plugins"]
[[package]]
name = "authlib"
version = "1.3.1"
version = "1.3.2"
description = "The ultimate Python library in building OAuth and OpenID Connect servers and clients."
optional = true
python-versions = ">=3.8"
files = [
{file = "Authlib-1.3.1-py2.py3-none-any.whl", hash = "sha256:d35800b973099bbadc49b42b256ecb80041ad56b7fe1216a362c7943c088f377"},
{file = "authlib-1.3.1.tar.gz", hash = "sha256:7ae843f03c06c5c0debd63c9db91f9fda64fa62a42a77419fa15fbb7e7a58917"},
{file = "Authlib-1.3.2-py2.py3-none-any.whl", hash = "sha256:ede026a95e9f5cdc2d4364a52103f5405e75aa156357e831ef2bfd0bc5094dfc"},
{file = "authlib-1.3.2.tar.gz", hash = "sha256:4b16130117f9eb82aa6eec97f6dd4673c3f960ac0283ccdae2897ee4bc030ba2"},
]
[package.dependencies]
@ -608,13 +608,13 @@ idna = ">=2.5"
[[package]]
name = "idna"
version = "3.7"
version = "3.8"
description = "Internationalized Domain Names in Applications (IDNA)"
optional = false
python-versions = ">=3.5"
python-versions = ">=3.6"
files = [
{file = "idna-3.7-py3-none-any.whl", hash = "sha256:82fee1fc78add43492d3a1898bfa6d8a904cc97d8427f683ed8e798d07761aa0"},
{file = "idna-3.7.tar.gz", hash = "sha256:028ff3aadf0609c1fd278d8ea3089299412a7a8b9bd005dd08b9f8285bcb5cfc"},
{file = "idna-3.8-py3-none-any.whl", hash = "sha256:050b4e5baadcd44d760cedbd2b8e639f2ff89bbc7a5730fcc662954303377aac"},
{file = "idna-3.8.tar.gz", hash = "sha256:d838c2c0ed6fced7693d5e8ab8e734d5f8fda53a039c0164afb0b82e771e3603"},
]
[[package]]
@ -2268,29 +2268,29 @@ files = [
[[package]]
name = "ruff"
version = "0.6.2"
version = "0.6.4"
description = "An extremely fast Python linter and code formatter, written in Rust."
optional = false
python-versions = ">=3.7"
files = [
{file = "ruff-0.6.2-py3-none-linux_armv6l.whl", hash = "sha256:5c8cbc6252deb3ea840ad6a20b0f8583caab0c5ef4f9cca21adc5a92b8f79f3c"},
{file = "ruff-0.6.2-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:17002fe241e76544448a8e1e6118abecbe8cd10cf68fde635dad480dba594570"},
{file = "ruff-0.6.2-py3-none-macosx_11_0_arm64.whl", hash = "sha256:3dbeac76ed13456f8158b8f4fe087bf87882e645c8e8b606dd17b0b66c2c1158"},
{file = "ruff-0.6.2-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:094600ee88cda325988d3f54e3588c46de5c18dae09d683ace278b11f9d4d534"},
{file = "ruff-0.6.2-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:316d418fe258c036ba05fbf7dfc1f7d3d4096db63431546163b472285668132b"},
{file = "ruff-0.6.2-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d72b8b3abf8a2d51b7b9944a41307d2f442558ccb3859bbd87e6ae9be1694a5d"},
{file = "ruff-0.6.2-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:2aed7e243be68487aa8982e91c6e260982d00da3f38955873aecd5a9204b1d66"},
{file = "ruff-0.6.2-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:d371f7fc9cec83497fe7cf5eaf5b76e22a8efce463de5f775a1826197feb9df8"},
{file = "ruff-0.6.2-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:a8f310d63af08f583363dfb844ba8f9417b558199c58a5999215082036d795a1"},
{file = "ruff-0.6.2-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7db6880c53c56addb8638fe444818183385ec85eeada1d48fc5abe045301b2f1"},
{file = "ruff-0.6.2-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:1175d39faadd9a50718f478d23bfc1d4da5743f1ab56af81a2b6caf0a2394f23"},
{file = "ruff-0.6.2-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:5b939f9c86d51635fe486585389f54582f0d65b8238e08c327c1534844b3bb9a"},
{file = "ruff-0.6.2-py3-none-musllinux_1_2_i686.whl", hash = "sha256:d0d62ca91219f906caf9b187dea50d17353f15ec9bb15aae4a606cd697b49b4c"},
{file = "ruff-0.6.2-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:7438a7288f9d67ed3c8ce4d059e67f7ed65e9fe3aa2ab6f5b4b3610e57e3cb56"},
{file = "ruff-0.6.2-py3-none-win32.whl", hash = "sha256:279d5f7d86696df5f9549b56b9b6a7f6c72961b619022b5b7999b15db392a4da"},
{file = "ruff-0.6.2-py3-none-win_amd64.whl", hash = "sha256:d9f3469c7dd43cd22eb1c3fc16926fb8258d50cb1b216658a07be95dd117b0f2"},
{file = "ruff-0.6.2-py3-none-win_arm64.whl", hash = "sha256:f28fcd2cd0e02bdf739297516d5643a945cc7caf09bd9bcb4d932540a5ea4fa9"},
{file = "ruff-0.6.2.tar.gz", hash = "sha256:239ee6beb9e91feb8e0ec384204a763f36cb53fb895a1a364618c6abb076b3be"},
{file = "ruff-0.6.4-py3-none-linux_armv6l.whl", hash = "sha256:c4b153fc152af51855458e79e835fb6b933032921756cec9af7d0ba2aa01a258"},
{file = "ruff-0.6.4-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:bedff9e4f004dad5f7f76a9d39c4ca98af526c9b1695068198b3bda8c085ef60"},
{file = "ruff-0.6.4-py3-none-macosx_11_0_arm64.whl", hash = "sha256:d02a4127a86de23002e694d7ff19f905c51e338c72d8e09b56bfb60e1681724f"},
{file = "ruff-0.6.4-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7862f42fc1a4aca1ea3ffe8a11f67819d183a5693b228f0bb3a531f5e40336fc"},
{file = "ruff-0.6.4-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:eebe4ff1967c838a1a9618a5a59a3b0a00406f8d7eefee97c70411fefc353617"},
{file = "ruff-0.6.4-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:932063a03bac394866683e15710c25b8690ccdca1cf192b9a98260332ca93408"},
{file = "ruff-0.6.4-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:50e30b437cebef547bd5c3edf9ce81343e5dd7c737cb36ccb4fe83573f3d392e"},
{file = "ruff-0.6.4-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:c44536df7b93a587de690e124b89bd47306fddd59398a0fb12afd6133c7b3818"},
{file = "ruff-0.6.4-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:0ea086601b22dc5e7693a78f3fcfc460cceabfdf3bdc36dc898792aba48fbad6"},
{file = "ruff-0.6.4-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0b52387d3289ccd227b62102c24714ed75fbba0b16ecc69a923a37e3b5e0aaaa"},
{file = "ruff-0.6.4-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:0308610470fcc82969082fc83c76c0d362f562e2f0cdab0586516f03a4e06ec6"},
{file = "ruff-0.6.4-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:803b96dea21795a6c9d5bfa9e96127cc9c31a1987802ca68f35e5c95aed3fc0d"},
{file = "ruff-0.6.4-py3-none-musllinux_1_2_i686.whl", hash = "sha256:66dbfea86b663baab8fcae56c59f190caba9398df1488164e2df53e216248baa"},
{file = "ruff-0.6.4-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:34d5efad480193c046c86608dbba2bccdc1c5fd11950fb271f8086e0c763a5d1"},
{file = "ruff-0.6.4-py3-none-win32.whl", hash = "sha256:f0f8968feea5ce3777c0d8365653d5e91c40c31a81d95824ba61d871a11b8523"},
{file = "ruff-0.6.4-py3-none-win_amd64.whl", hash = "sha256:549daccee5227282289390b0222d0fbee0275d1db6d514550d65420053021a58"},
{file = "ruff-0.6.4-py3-none-win_arm64.whl", hash = "sha256:ac4b75e898ed189b3708c9ab3fc70b79a433219e1e87193b4f2b77251d058d14"},
{file = "ruff-0.6.4.tar.gz", hash = "sha256:ac3b5bfbee99973f80aa1b7cbd1c9cbce200883bdd067300c22a6cc1c7fba212"},
]
[[package]]
@ -2557,13 +2557,13 @@ files = [
[[package]]
name = "towncrier"
version = "24.7.1"
version = "24.8.0"
description = "Building newsfiles for your project."
optional = false
python-versions = ">=3.8"
files = [
{file = "towncrier-24.7.1-py3-none-any.whl", hash = "sha256:685e2a94335b5dc47537b4d3b449a25b18571ea85b07dcf6e8df31ba40f692dd"},
{file = "towncrier-24.7.1.tar.gz", hash = "sha256:57a057faedabcadf1a62f6f9bad726ae566c1f31a411338ddb8316993f583b3d"},
{file = "towncrier-24.8.0-py3-none-any.whl", hash = "sha256:9343209592b839209cdf28c339ba45792fbfe9775b5f9c177462fd693e127d8d"},
{file = "towncrier-24.8.0.tar.gz", hash = "sha256:013423ee7eed102b2f393c287d22d95f66f1a3ea10a4baa82d298001a7f18af3"},
]
[package.dependencies]
@ -2622,13 +2622,13 @@ urllib3 = ">=1.26.0"
[[package]]
name = "twisted"
version = "24.7.0rc1"
version = "24.7.0"
description = "An asynchronous networking framework written in Python"
optional = false
python-versions = ">=3.8.0"
files = [
{file = "twisted-24.7.0rc1-py3-none-any.whl", hash = "sha256:f37d6656fe4e2871fab29d8952ae90bd6ca8b48a9e4dfa1b348f4cd62e6ba0bb"},
{file = "twisted-24.7.0rc1.tar.gz", hash = "sha256:bbc4a2193ca34cfa32f626300746698a6d70fcd77d9c0b79a664c347e39634fc"},
{file = "twisted-24.7.0-py3-none-any.whl", hash = "sha256:734832ef98108136e222b5230075b1079dad8a3fc5637319615619a7725b0c81"},
{file = "twisted-24.7.0.tar.gz", hash = "sha256:5a60147f044187a127ec7da96d170d49bcce50c6fd36f594e60f4587eff4d394"},
]
[package.dependencies]
@ -2761,24 +2761,24 @@ files = [
[[package]]
name = "types-pillow"
version = "10.2.0.20240520"
version = "10.2.0.20240822"
description = "Typing stubs for Pillow"
optional = false
python-versions = ">=3.8"
files = [
{file = "types-Pillow-10.2.0.20240520.tar.gz", hash = "sha256:130b979195465fa1e1676d8e81c9c7c30319e8e95b12fae945e8f0d525213107"},
{file = "types_Pillow-10.2.0.20240520-py3-none-any.whl", hash = "sha256:33c36494b380e2a269bb742181bea5d9b00820367822dbd3760f07210a1da23d"},
{file = "types-Pillow-10.2.0.20240822.tar.gz", hash = "sha256:559fb52a2ef991c326e4a0d20accb3bb63a7ba8d40eb493e0ecb0310ba52f0d3"},
{file = "types_Pillow-10.2.0.20240822-py3-none-any.whl", hash = "sha256:d9dab025aba07aeb12fd50a6799d4eac52a9603488eca09d7662543983f16c5d"},
]
[[package]]
name = "types-psycopg2"
version = "2.9.21.20240417"
version = "2.9.21.20240819"
description = "Typing stubs for psycopg2"
optional = false
python-versions = ">=3.8"
files = [
{file = "types-psycopg2-2.9.21.20240417.tar.gz", hash = "sha256:05db256f4a459fb21a426b8e7fca0656c3539105ff0208eaf6bdaf406a387087"},
{file = "types_psycopg2-2.9.21.20240417-py3-none-any.whl", hash = "sha256:644d6644d64ebbe37203229b00771012fb3b3bddd507a129a2e136485990e4f8"},
{file = "types-psycopg2-2.9.21.20240819.tar.gz", hash = "sha256:4ed6b47464d6374fa64e5e3b234cea0f710e72123a4596d67ab50b7415a84666"},
{file = "types_psycopg2-2.9.21.20240819-py3-none-any.whl", hash = "sha256:c9192311c27d7ad561eef705f1b2df1074f2cdcf445a98a6a2fcaaaad43278cf"},
]
[[package]]
@ -2823,13 +2823,13 @@ urllib3 = ">=2"
[[package]]
name = "types-setuptools"
version = "71.1.0.20240818"
version = "74.1.0.20240907"
description = "Typing stubs for setuptools"
optional = false
python-versions = ">=3.8"
files = [
{file = "types-setuptools-71.1.0.20240818.tar.gz", hash = "sha256:f62eaffaa39774462c65fbb49368c4dc1d91a90a28371cb14e1af090ff0e41e3"},
{file = "types_setuptools-71.1.0.20240818-py3-none-any.whl", hash = "sha256:c4f95302f88369ac0ac46c67ddbfc70c6c4dbbb184d9fed356244217a2934025"},
{file = "types-setuptools-74.1.0.20240907.tar.gz", hash = "sha256:0abdb082552ca966c1e5fc244e4853adc62971f6cd724fb1d8a3713b580e5a65"},
{file = "types_setuptools-74.1.0.20240907-py3-none-any.whl", hash = "sha256:15b38c8e63ca34f42f6063ff4b1dd662ea20086166d5ad6a102e670a52574120"},
]
[[package]]
@ -3104,4 +3104,4 @@ user-search = ["pyicu"]
[metadata]
lock-version = "2.0"
python-versions = "^3.8.0"
content-hash = "2bf09e2b68f3abd1a0f9ff2227eb3026ac3d034845acfc120d0b1cb8167ea43b"
content-hash = "26ff23a6cafd8593141cb3d54d7b1e94328a02b863d347578d2b6e666ee2bc93"

View File

@ -97,7 +97,7 @@ module-name = "synapse.synapse_rust"
[tool.poetry]
name = "matrix-synapse"
version = "1.114.0"
version = "1.115.0rc1"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "AGPL-3.0-or-later"
@ -320,7 +320,7 @@ all = [
# failing on new releases. Keeping lower bounds loose here means that dependabot
# can bump versions without having to update the content-hash in the lockfile.
# This helps prevents merge conflicts when running a batch of dependabot updates.
ruff = "0.6.2"
ruff = "0.6.4"
# Type checking only works with the pydantic.v1 compat module from pydantic v2
pydantic = "^2"

View File

@ -4,9 +4,9 @@ annotated-types==0.5.0 ; python_version >= "3.8" and python_full_version < "4.0.
attrs==24.2.0 ; python_version >= "3.8" and python_full_version < "4.0.0" \
--hash=sha256:5cfb1b9148b5b086569baec03f20d7b6bf3bcacc9a42bebf87ffaaca362f6346 \
--hash=sha256:81921eb96de3191c8258c199618104dd27ac608d9366f5e35d011eae1867ede2
authlib==1.3.1 ; python_version >= "3.8" and python_full_version < "4.0.0" \
--hash=sha256:7ae843f03c06c5c0debd63c9db91f9fda64fa62a42a77419fa15fbb7e7a58917 \
--hash=sha256:d35800b973099bbadc49b42b256ecb80041ad56b7fe1216a362c7943c088f377
authlib==1.3.2 ; python_version >= "3.8" and python_full_version < "4.0.0" \
--hash=sha256:4b16130117f9eb82aa6eec97f6dd4673c3f960ac0283ccdae2897ee4bc030ba2 \
--hash=sha256:ede026a95e9f5cdc2d4364a52103f5405e75aa156357e831ef2bfd0bc5094dfc
automat==22.10.0 ; python_full_version >= "3.8.0" and python_full_version < "4.0.0" \
--hash=sha256:c3164f8742b9dc440f3682482d32aaff7bb53f71740dd018533f9de286b64180 \
--hash=sha256:e56beb84edad19dcc11d30e8d9b895f75deeb5ef5e96b84a467066b3b84bb04e
@ -317,9 +317,9 @@ hiredis==3.0.0 ; python_version >= "3.8" and python_full_version < "4.0.0" \
hyperlink==21.0.0 ; python_full_version >= "3.8.0" and python_full_version < "4.0.0" \
--hash=sha256:427af957daa58bc909471c6c40f74c5450fa123dd093fc53efd2e91d2705a56b \
--hash=sha256:e6b14c37ecb73e89c77d78cdb4c2cc8f3fb59a885c5b3f819ff4ed80f25af1b4
idna==3.7 ; python_version >= "3.8" and python_full_version < "4.0.0" \
--hash=sha256:028ff3aadf0609c1fd278d8ea3089299412a7a8b9bd005dd08b9f8285bcb5cfc \
--hash=sha256:82fee1fc78add43492d3a1898bfa6d8a904cc97d8427f683ed8e798d07761aa0
idna==3.8 ; python_version >= "3.8" and python_full_version < "4.0.0" \
--hash=sha256:050b4e5baadcd44d760cedbd2b8e639f2ff89bbc7a5730fcc662954303377aac \
--hash=sha256:d838c2c0ed6fced7693d5e8ab8e734d5f8fda53a039c0164afb0b82e771e3603
ijson==3.3.0 ; python_full_version >= "3.8.0" and python_full_version < "4.0.0" \
--hash=sha256:0015354011303175eae7e2ef5136414e91de2298e5a2e9580ed100b728c07e51 \
--hash=sha256:034642558afa57351a0ffe6de89e63907c4cf6849070cc10a3b2542dccda1afe \
@ -1107,12 +1107,12 @@ tomli==2.0.1 ; python_version >= "3.8" and python_version < "3.11" \
treq==23.11.0 ; python_full_version >= "3.8.0" and python_full_version < "4.0.0" \
--hash=sha256:0914ff929fd1632ce16797235260f8bc19d20ff7c459c1deabd65b8c68cbeac5 \
--hash=sha256:f494c2218d61cab2cabbee37cd6606d3eea9d16cf14190323095c95d22c467e9
twisted==24.7.0rc1 ; python_full_version >= "3.8.0" and python_full_version < "4.0.0" \
--hash=sha256:bbc4a2193ca34cfa32f626300746698a6d70fcd77d9c0b79a664c347e39634fc \
--hash=sha256:f37d6656fe4e2871fab29d8952ae90bd6ca8b48a9e4dfa1b348f4cd62e6ba0bb
twisted[tls]==24.7.0rc1 ; python_full_version >= "3.8.0" and python_full_version < "4.0.0" \
--hash=sha256:bbc4a2193ca34cfa32f626300746698a6d70fcd77d9c0b79a664c347e39634fc \
--hash=sha256:f37d6656fe4e2871fab29d8952ae90bd6ca8b48a9e4dfa1b348f4cd62e6ba0bb
twisted==24.7.0 ; python_full_version >= "3.8.0" and python_full_version < "4.0.0" \
--hash=sha256:5a60147f044187a127ec7da96d170d49bcce50c6fd36f594e60f4587eff4d394 \
--hash=sha256:734832ef98108136e222b5230075b1079dad8a3fc5637319615619a7725b0c81
twisted[tls]==24.7.0 ; python_full_version >= "3.8.0" and python_full_version < "4.0.0" \
--hash=sha256:5a60147f044187a127ec7da96d170d49bcce50c6fd36f594e60f4587eff4d394 \
--hash=sha256:734832ef98108136e222b5230075b1079dad8a3fc5637319615619a7725b0c81
txredisapi==1.4.10 ; python_full_version >= "3.8.0" and python_full_version < "4.0.0" \
--hash=sha256:0a6ea77f27f8cf092f907654f08302a97b48fa35f24e0ad99dfb74115f018161 \
--hash=sha256:7609a6af6ff4619a3189c0adfb86aeda789afba69eb59fc1e19ac0199e725395

View File

@ -31,6 +31,7 @@ Pydantic does not yet offer a strict mode, but it is planned for pydantic v2. Se
until then, this script is a best effort to stop us from introducing type coersion bugs
(like the infamous stringy power levels fixed in room version 10).
"""
import argparse
import contextlib
import functools

View File

@ -109,6 +109,9 @@ set -x
# --quiet suppresses the update check.
ruff check --quiet --fix "${files[@]}"
# Reformat Python code.
ruff format --quiet "${files[@]}"
# Catch any common programming mistakes in Rust code.
#
# --bins, --examples, --lib, --tests combined explicitly disable checking

View File

@ -20,8 +20,7 @@
#
#
"""An interactive script for doing a release. See `cli()` below.
"""
"""An interactive script for doing a release. See `cli()` below."""
import glob
import json

View File

@ -13,8 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Contains *incomplete* type hints for txredisapi.
"""
"""Contains *incomplete* type hints for txredisapi."""
from typing import Any, List, Optional, Type, Union
from twisted.internet import protocol

View File

@ -20,8 +20,7 @@
#
#
""" This is an implementation of a Matrix homeserver.
"""
"""This is an implementation of a Matrix homeserver."""
import os
import sys

View File

@ -171,7 +171,7 @@ def elide_http_methods_if_unconflicting(
"""
def paths_to_methods_dict(
methods_and_paths: Iterable[Tuple[str, str]]
methods_and_paths: Iterable[Tuple[str, str]],
) -> Dict[str, Set[str]]:
"""
Given (method, path) pairs, produces a dict from path to set of methods
@ -201,7 +201,7 @@ def elide_http_methods_if_unconflicting(
def simplify_path_regexes(
registrations: Dict[Tuple[str, str], EndpointDescription]
registrations: Dict[Tuple[str, str], EndpointDescription],
) -> Dict[Tuple[str, str], EndpointDescription]:
"""
Simplify all the path regexes for the dict of endpoint descriptions,

View File

@ -40,6 +40,7 @@ from synapse.storage.engines import create_engine
class ReviewConfig(RootConfig):
"A config class that just pulls out the database config"
config_classes = [DatabaseConfig]
@ -160,7 +161,11 @@ def main() -> None:
with make_conn(database_config, engine, "review_recent_signups") as db_conn:
# This generates a type of Cursor, not LoggingTransaction.
user_infos = get_recent_users(db_conn.cursor(), since_ms, exclude_users_with_appservice) # type: ignore[arg-type]
user_infos = get_recent_users(
db_conn.cursor(),
since_ms, # type: ignore[arg-type]
exclude_users_with_appservice,
)
for user_info in user_infos:
if exclude_users_with_email and user_info.emails:

View File

@ -129,6 +129,11 @@ BOOLEAN_COLUMNS = {
"remote_media_cache": ["authenticated"],
"room_stats_state": ["is_federatable"],
"rooms": ["is_public", "has_auth_chain_index"],
"sliding_sync_joined_rooms": ["is_encrypted"],
"sliding_sync_membership_snapshots": [
"has_known_state",
"is_encrypted",
],
"users": ["shadow_banned", "approved", "locked", "suspended"],
"un_partial_stated_event_stream": ["rejection_status_changed"],
"users_who_share_rooms": ["share_private"],
@ -712,9 +717,7 @@ class Porter:
return
# Check if all background updates are done, abort if not.
updates_complete = (
await self.sqlite_store.db_pool.updates.has_completed_background_updates()
)
updates_complete = await self.sqlite_store.db_pool.updates.has_completed_background_updates()
if not updates_complete:
end_error = (
"Pending background updates exist in the SQLite3 database."
@ -1090,11 +1093,11 @@ class Porter:
return done, remaining + done
async def _setup_state_group_id_seq(self) -> None:
curr_id: Optional[int] = (
await self.sqlite_store.db_pool.simple_select_one_onecol(
curr_id: Optional[
int
] = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="state_groups", keyvalues={}, retcol="MAX(id)", allow_none=True
)
)
if not curr_id:
return
@ -1181,14 +1184,14 @@ class Porter:
)
async def _setup_auth_chain_sequence(self) -> None:
curr_chain_id: Optional[int] = (
await self.sqlite_store.db_pool.simple_select_one_onecol(
curr_chain_id: Optional[
int
] = await self.sqlite_store.db_pool.simple_select_one_onecol(
table="event_auth_chains",
keyvalues={},
retcol="MAX(chain_id)",
allow_none=True,
)
)
def r(txn: LoggingTransaction) -> None:
# Presumably there is at least one row in event_auth_chains.

View File

@ -230,6 +230,8 @@ class EventContentFields:
ROOM_NAME: Final = "name"
MEMBERSHIP: Final = "membership"
# Used in m.room.guest_access events.
GUEST_ACCESS: Final = "guest_access"
@ -245,6 +247,8 @@ class EventContentFields:
# `m.room.encryption`` algorithm field
ENCRYPTION_ALGORITHM: Final = "algorithm"
TOMBSTONE_SUCCESSOR_ROOM: Final = "replacement_room"
class EventUnsignedContentFields:
"""Fields found inside the 'unsigned' data on events"""

View File

@ -20,6 +20,7 @@
#
"""Contains the URL paths to prefix various aspects of the server with."""
import hmac
from hashlib import sha256
from urllib.parse import urlencode

View File

@ -54,6 +54,7 @@ UP & quit +---------- YES SUCCESS
This is all tied together by the AppServiceScheduler which DIs the required
components.
"""
import logging
from typing import (
TYPE_CHECKING,

View File

@ -447,6 +447,3 @@ class ExperimentalConfig(Config):
# MSC4151: Report room API (Client-Server API)
self.msc4151_enabled: bool = experimental.get("msc4151_enabled", False)
# MSC4156: Migrate server_name to via
self.msc4156_enabled: bool = experimental.get("msc4156_enabled", False)

View File

@ -200,16 +200,13 @@ class KeyConfig(Config):
)
form_secret = 'form_secret: "%s"' % random_string_with_symbols(50)
return (
"""\
return """\
%(macaroon_secret_key)s
%(form_secret)s
signing_key_path: "%(base_key_name)s.signing.key"
trusted_key_servers:
- server_name: "matrix.org"
"""
% locals()
)
""" % locals()
def read_signing_keys(self, signing_key_path: str, name: str) -> List[SigningKey]:
"""Read the signing keys in the given path.
@ -249,7 +246,9 @@ class KeyConfig(Config):
if is_signing_algorithm_supported(key_id):
key_base64 = key_data["key"]
key_bytes = decode_base64(key_base64)
verify_key: "VerifyKeyWithExpiry" = decode_verify_key_bytes(key_id, key_bytes) # type: ignore[assignment]
verify_key: "VerifyKeyWithExpiry" = decode_verify_key_bytes(
key_id, key_bytes
) # type: ignore[assignment]
verify_key.expired = key_data["expired_ts"]
keys[key_id] = verify_key
else:

View File

@ -157,12 +157,9 @@ class LoggingConfig(Config):
self, config_dir_path: str, server_name: str, **kwargs: Any
) -> str:
log_config = os.path.join(config_dir_path, server_name + ".log.config")
return (
"""\
return """\
log_config: "%(log_config)s"
"""
% locals()
)
""" % locals()
def read_arguments(self, args: argparse.Namespace) -> None:
if args.no_redirect_stdio is not None:

View File

@ -828,13 +828,10 @@ class ServerConfig(Config):
).lstrip()
if not unsecure_listeners:
unsecure_http_bindings = (
"""- port: %(unsecure_port)s
unsecure_http_bindings = """- port: %(unsecure_port)s
tls: false
type: http
x_forwarded: true"""
% locals()
)
x_forwarded: true""" % locals()
if not open_private_ports:
unsecure_http_bindings += (
@ -853,16 +850,13 @@ class ServerConfig(Config):
if not secure_listeners:
secure_http_bindings = ""
return (
"""\
return """\
server_name: "%(server_name)s"
pid_file: %(pid_file)s
listeners:
%(secure_http_bindings)s
%(unsecure_http_bindings)s
"""
% locals()
)
""" % locals()
def read_arguments(self, args: argparse.Namespace) -> None:
if args.manhole is not None:

View File

@ -328,10 +328,11 @@ class WorkerConfig(Config):
)
# type-ignore: the expression `Union[A, B]` is not a Type[Union[A, B]] currently
self.instance_map: Dict[
str, InstanceLocationConfig
] = parse_and_validate_mapping(
instance_map, InstanceLocationConfig # type: ignore[arg-type]
self.instance_map: Dict[str, InstanceLocationConfig] = (
parse_and_validate_mapping(
instance_map,
InstanceLocationConfig, # type: ignore[arg-type]
)
)
# Map from type of streams to source, c.f. WriterLocations.

View File

@ -887,7 +887,8 @@ def _check_power_levels(
raise SynapseError(400, f"{v!r} must be an integer.")
if k in {"events", "notifications", "users"}:
if not isinstance(v, collections.abc.Mapping) or not all(
type(v) is int for v in v.values() # noqa: E721
type(v) is int
for v in v.values() # noqa: E721
):
raise SynapseError(
400,

View File

@ -80,7 +80,7 @@ def load_legacy_presence_router(hs: "HomeServer") -> None:
# All methods that the module provides should be async, but this wasn't enforced
# in the old module system, so we wrap them if needed
def async_wrapper(
f: Optional[Callable[P, R]]
f: Optional[Callable[P, R]],
) -> Optional[Callable[P, Awaitable[R]]]:
# f might be None if the callback isn't implemented by the module. In this
# case we don't want to register a callback at all so we return None.

View File

@ -504,7 +504,7 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
def _encode_state_group_delta(
state_group_delta: Dict[Tuple[int, int], StateMap[str]]
state_group_delta: Dict[Tuple[int, int], StateMap[str]],
) -> List[Tuple[int, int, Optional[List[Tuple[str, str, str]]]]]:
if not state_group_delta:
return []
@ -517,7 +517,7 @@ def _encode_state_group_delta(
def _decode_state_group_delta(
input: List[Tuple[int, int, List[Tuple[str, str, str]]]]
input: List[Tuple[int, int, List[Tuple[str, str, str]]]],
) -> Dict[Tuple[int, int], StateMap[str]]:
if not input:
return {}
@ -544,7 +544,7 @@ def _encode_state_dict(
def _decode_state_dict(
input: Optional[List[Tuple[str, str, str]]]
input: Optional[List[Tuple[str, str, str]]],
) -> Optional[StateMap[str]]:
"""Decodes a state dict encoded using `_encode_state_dict` above"""
if input is None:

View File

@ -19,5 +19,4 @@
#
#
""" This package includes all the federation specific logic.
"""
"""This package includes all the federation specific logic."""

View File

@ -859,7 +859,6 @@ class FederationMediaThumbnailServlet(BaseFederationServerServlet):
request: SynapseRequest,
media_id: str,
) -> None:
width = parse_integer(request, "width", required=True)
height = parse_integer(request, "height", required=True)
method = parse_string(request, "method", "scale")

View File

@ -118,11 +118,11 @@ class AccountHandler:
}
if self._use_account_validity_in_account_status:
status["org.matrix.expired"] = (
await self._account_validity_handler.is_user_expired(
status[
"org.matrix.expired"
] = await self._account_validity_handler.is_user_expired(
user_id.to_string()
)
)
return status

View File

@ -197,15 +197,17 @@ class AdminHandler:
# events that we have and then filtering, this isn't the most
# efficient method perhaps but it does guarantee we get everything.
while True:
events, _ = (
await self._store.paginate_room_events_by_topological_ordering(
(
events,
_,
_,
) = await self._store.paginate_room_events_by_topological_ordering(
room_id=room_id,
from_key=from_key,
to_key=to_key,
limit=100,
direction=Direction.FORWARDS,
)
)
if not events:
break

View File

@ -166,8 +166,7 @@ def login_id_phone_to_thirdparty(identifier: JsonDict) -> Dict[str, str]:
if "country" not in identifier or (
# The specification requires a "phone" field, while Synapse used to require a "number"
# field. Accept both for backwards compatibility.
"phone" not in identifier
and "number" not in identifier
"phone" not in identifier and "number" not in identifier
):
raise SynapseError(
400, "Invalid phone-type identifier", errcode=Codes.INVALID_PARAM

View File

@ -270,9 +270,9 @@ class DirectoryHandler:
async def get_association(self, room_alias: RoomAlias) -> JsonDict:
room_id = None
if self.hs.is_mine(room_alias):
result: Optional[RoomAliasMapping] = (
await self.get_association_from_room_alias(room_alias)
)
result: Optional[
RoomAliasMapping
] = await self.get_association_from_room_alias(room_alias)
if result:
room_id = result.room_id
@ -517,13 +517,11 @@ class DirectoryHandler:
raise SynapseError(403, "Not allowed to publish room")
# Check if publishing is blocked by a third party module
allowed_by_third_party_rules = (
await (
allowed_by_third_party_rules = await (
self._third_party_event_rules.check_visibility_can_be_modified(
room_id, visibility
)
)
)
if not allowed_by_third_party_rules:
raise SynapseError(403, "Not allowed to publish room")

View File

@ -1001,12 +1001,12 @@ class FederationHandler:
)
if include_auth_user_id:
event_content[EventContentFields.AUTHORISING_USER] = (
await self._event_auth_handler.get_user_which_could_invite(
event_content[
EventContentFields.AUTHORISING_USER
] = await self._event_auth_handler.get_user_which_could_invite(
room_id,
state_ids,
)
)
builder = self.event_builder_factory.for_room_version(
room_version,

View File

@ -21,6 +21,7 @@
#
"""Utilities for interacting with Identity Servers"""
import logging
import urllib.parse
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Tuple

View File

@ -1225,10 +1225,9 @@ class EventCreationHandler:
)
if prev_event_ids is not None:
assert (
len(prev_event_ids) <= 10
), "Attempting to create an event with %i prev_events" % (
len(prev_event_ids),
assert len(prev_event_ids) <= 10, (
"Attempting to create an event with %i prev_events"
% (len(prev_event_ids),)
)
else:
prev_event_ids = await self.store.get_prev_events_for_room(builder.room_id)

View File

@ -507,8 +507,11 @@ class PaginationHandler:
# Initially fetch the events from the database. With any luck, we can return
# these without blocking on backfill (handled below).
events, next_key = (
await self.store.paginate_room_events_by_topological_ordering(
(
events,
next_key,
_,
) = await self.store.paginate_room_events_by_topological_ordering(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
@ -516,7 +519,6 @@ class PaginationHandler:
limit=pagin_config.limit,
event_filter=event_filter,
)
)
if pagin_config.direction == Direction.BACKWARDS:
# We use a `Set` because there can be multiple events at a given depth
@ -584,8 +586,11 @@ class PaginationHandler:
# If we did backfill something, refetch the events from the database to
# catch anything new that might have been added since we last fetched.
if did_backfill:
events, next_key = (
await self.store.paginate_room_events_by_topological_ordering(
(
events,
next_key,
_,
) = await self.store.paginate_room_events_by_topological_ordering(
room_id=room_id,
from_key=from_token.room_key,
to_key=to_room_key,
@ -593,7 +598,6 @@ class PaginationHandler:
limit=pagin_config.limit,
event_filter=event_filter,
)
)
else:
# Otherwise, we can backfill in the background for eventual
# consistency's sake but we don't need to block the client waiting

View File

@ -71,6 +71,7 @@ user state; this device follows the normal timeout logic (see above) and will
automatically be replaced with any information from currently available devices.
"""
import abc
import contextlib
import itertools
@ -493,9 +494,9 @@ class WorkerPresenceHandler(BasePresenceHandler):
# The number of ongoing syncs on this process, by (user ID, device ID).
# Empty if _presence_enabled is false.
self._user_device_to_num_current_syncs: Dict[Tuple[str, Optional[str]], int] = (
{}
)
self._user_device_to_num_current_syncs: Dict[
Tuple[str, Optional[str]], int
] = {}
self.notifier = hs.get_notifier()
self.instance_id = hs.get_instance_id()
@ -818,9 +819,9 @@ class PresenceHandler(BasePresenceHandler):
# Keeps track of the number of *ongoing* syncs on this process. While
# this is non zero a user will never go offline.
self._user_device_to_num_current_syncs: Dict[Tuple[str, Optional[str]], int] = (
{}
)
self._user_device_to_num_current_syncs: Dict[
Tuple[str, Optional[str]], int
] = {}
# Keeps track of the number of *ongoing* syncs on other processes.
#

View File

@ -351,9 +351,9 @@ class ProfileHandler:
server_name = host
if self._is_mine_server_name(server_name):
media_info: Optional[Union[LocalMedia, RemoteMedia]] = (
await self.store.get_local_media(media_id)
)
media_info: Optional[
Union[LocalMedia, RemoteMedia]
] = await self.store.get_local_media(media_id)
else:
media_info = await self.store.get_cached_remote_media(server_name, media_id)

View File

@ -188,14 +188,14 @@ class RelationsHandler:
if include_original_event:
# Do not bundle aggregations when retrieving the original event because
# we want the content before relations are applied to it.
return_value["original_event"] = (
await self._event_serializer.serialize_event(
return_value[
"original_event"
] = await self._event_serializer.serialize_event(
event,
now,
bundle_aggregations=None,
config=serialize_options,
)
)
if next_token:
return_value["next_batch"] = await next_token.to_string(self._main_store)

View File

@ -20,6 +20,7 @@
#
"""Contains functions for performing actions on rooms."""
import itertools
import logging
import math
@ -912,13 +913,11 @@ class RoomCreationHandler:
)
# Check whether this visibility value is blocked by a third party module
allowed_by_third_party_rules = (
await (
allowed_by_third_party_rules = await (
self._third_party_event_rules.check_visibility_can_be_modified(
room_id, visibility
)
)
)
if not allowed_by_third_party_rules:
raise SynapseError(403, "Room visibility value not allowed.")
@ -1766,7 +1765,7 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
)
events = list(room_events)
events.extend(e for evs, _ in room_to_events.values() for e in evs)
events.extend(e for evs, _, _ in room_to_events.values() for e in evs)
# We know stream_ordering must be not None here, as its been
# persisted, but mypy doesn't know that

View File

@ -1282,12 +1282,12 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# If this is going to be a local join, additional information must
# be included in the event content in order to efficiently validate
# the event.
content[EventContentFields.AUTHORISING_USER] = (
await self.event_auth_handler.get_user_which_could_invite(
content[
EventContentFields.AUTHORISING_USER
] = await self.event_auth_handler.get_user_which_could_invite(
room_id,
state_before_join,
)
)
return False, []
@ -1395,9 +1395,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
if requester is not None:
sender = UserID.from_string(event.sender)
assert (
sender == requester.user
), "Sender (%s) must be same as requester (%s)" % (sender, requester.user)
assert sender == requester.user, (
"Sender (%s) must be same as requester (%s)" % (sender, requester.user)
)
assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,)
else:
requester = types.create_requester(target_user)

View File

@ -423,9 +423,9 @@ class SearchHandler:
}
if search_result.room_groups and "room_id" in group_keys:
rooms_cat_res.setdefault("groups", {})[
"room_id"
] = search_result.room_groups
rooms_cat_res.setdefault("groups", {})["room_id"] = (
search_result.room_groups
)
if sender_group and "sender" in group_keys:
rooms_cat_res.setdefault("groups", {})["sender"] = sender_group

View File

@ -25,8 +25,8 @@ from synapse.events.utils import strip_event
from synapse.handlers.relations import BundledAggregations
from synapse.handlers.sliding_sync.extensions import SlidingSyncExtensionHandler
from synapse.handlers.sliding_sync.room_lists import (
RoomsForUserType,
SlidingSyncRoomLists,
_RoomMembershipForUser,
)
from synapse.handlers.sliding_sync.store import SlidingSyncConnectionStore
from synapse.logging.opentracing import (
@ -39,17 +39,20 @@ from synapse.logging.opentracing import (
)
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.databases.main.stream import PaginateFunction
from synapse.storage.roommember import MemberSummary
from synapse.storage.roommember import (
MemberSummary,
)
from synapse.types import (
JsonDict,
MutableStateMap,
PersistedEventPosition,
Requester,
RoomStreamToken,
SlidingSyncStreamToken,
StateMap,
StreamKeyType,
StreamToken,
)
from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
from synapse.types.handlers.sliding_sync import (
HaveSentRoomFlag,
MutablePerConnectionState,
@ -76,18 +79,6 @@ sync_processing_time = Histogram(
)
# The event types that clients should consider as new activity.
DEFAULT_BUMP_EVENT_TYPES = {
EventTypes.Create,
EventTypes.Message,
EventTypes.Encrypted,
EventTypes.Sticker,
EventTypes.CallInvite,
EventTypes.PollStart,
EventTypes.LiveLocationShareStart,
}
class SlidingSyncHandler:
def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
@ -266,6 +257,8 @@ class SlidingSyncHandler:
],
from_token=from_token,
to_token=to_token,
newly_joined=room_id in interested_rooms.newly_joined_rooms,
is_dm=room_id in interested_rooms.dm_room_ids,
)
# Filter out empty room results during incremental sync
@ -363,7 +356,7 @@ class SlidingSyncHandler:
async def get_current_state_ids_at(
self,
room_id: str,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
room_membership_for_user_at_to_token: RoomsForUserType,
state_filter: StateFilter,
to_token: StreamToken,
) -> StateMap[str]:
@ -428,7 +421,7 @@ class SlidingSyncHandler:
async def get_current_state_at(
self,
room_id: str,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
room_membership_for_user_at_to_token: RoomsForUserType,
state_filter: StateFilter,
to_token: StreamToken,
) -> StateMap[EventBase]:
@ -460,6 +453,7 @@ class SlidingSyncHandler:
return state_map
@trace
async def get_room_sync_data(
self,
sync_config: SlidingSyncConfig,
@ -467,9 +461,11 @@ class SlidingSyncHandler:
new_connection_state: "MutablePerConnectionState",
room_id: str,
room_sync_config: RoomSyncConfig,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
room_membership_for_user_at_to_token: RoomsForUserType,
from_token: Optional[SlidingSyncStreamToken],
to_token: StreamToken,
newly_joined: bool,
is_dm: bool,
) -> SlidingSyncResult.RoomResult:
"""
Fetch room data for the sync response.
@ -485,6 +481,8 @@ class SlidingSyncHandler:
in the room at the time of `to_token`.
from_token: The point in the stream to sync from.
to_token: The point in the stream to sync up to.
newly_joined: If the user has newly joined the room
is_dm: Whether the room is a DM room
"""
user = sync_config.user
@ -529,7 +527,7 @@ class SlidingSyncHandler:
from_bound = None
initial = True
ignore_timeline_bound = False
if from_token and not room_membership_for_user_at_to_token.newly_joined:
if from_token and not newly_joined:
room_status = previous_connection_state.rooms.have_sent_room(room_id)
if room_status.status == HaveSentRoomFlag.LIVE:
from_bound = from_token.stream_token.room_key
@ -598,9 +596,7 @@ class SlidingSyncHandler:
Membership.LEAVE,
Membership.BAN,
):
to_bound = (
room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
)
to_bound = room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
timeline_from_bound = from_bound
if ignore_timeline_bound:
@ -635,7 +631,7 @@ class SlidingSyncHandler:
# Use `stream_ordering` for updates
else paginate_room_events_by_stream_ordering
)
timeline_events, new_room_key = await pagination_method(
timeline_events, new_room_key, limited = await pagination_method(
room_id=room_id,
# The bounds are reversed so we can paginate backwards
# (from newer to older events) starting at to_bound.
@ -643,28 +639,13 @@ class SlidingSyncHandler:
from_key=to_bound,
to_key=timeline_from_bound,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
limit=room_sync_config.timeline_limit + 1,
limit=room_sync_config.timeline_limit,
)
# We want to return the events in ascending order (the last event is the
# most recent).
timeline_events.reverse()
# Determine our `limited` status based on the timeline. We do this before
# filtering the events so we can accurately determine if there is more to
# paginate even if we filter out some/all events.
if len(timeline_events) > room_sync_config.timeline_limit:
limited = True
# Get rid of that extra "+ 1" event because we only used it to determine
# if we hit the limit or not
timeline_events = timeline_events[-room_sync_config.timeline_limit :]
assert timeline_events[0].internal_metadata.stream_ordering
new_room_key = RoomStreamToken(
stream=timeline_events[0].internal_metadata.stream_ordering - 1
)
# Make sure we don't expose any events that the client shouldn't see
timeline_events = await filter_events_for_client(
self.storage_controllers,
@ -757,6 +738,17 @@ class SlidingSyncHandler:
# indicate to the client that a state reset happened. Perhaps we should indicate
# this by setting `initial: True` and empty `required_state`.
# Get the changes to current state in the token range from the
# `current_state_delta_stream` table.
#
# For incremental syncs, we can do this first to determine if something relevant
# has changed and strategically avoid fetching other costly things.
room_state_delta_id_map: MutableStateMap[str] = {}
name_event_id: Optional[str] = None
membership_changed = False
name_changed = False
avatar_changed = False
if initial:
# Check whether the room has a name set
name_state_ids = await self.get_current_state_ids_at(
room_id=room_id,
@ -765,9 +757,50 @@ class SlidingSyncHandler:
to_token=to_token,
)
name_event_id = name_state_ids.get((EventTypes.Name, ""))
else:
assert from_bound is not None
room_membership_summary: Mapping[str, MemberSummary]
# TODO: Limit the number of state events we're about to send down
# the room, if its too many we should change this to an
# `initial=True`?
deltas = await self.store.get_current_state_deltas_for_room(
room_id=room_id,
from_token=from_bound,
to_token=to_token.room_key,
)
for delta in deltas:
# TODO: Handle state resets where event_id is None
if delta.event_id is not None:
room_state_delta_id_map[(delta.event_type, delta.state_key)] = (
delta.event_id
)
if delta.event_type == EventTypes.Member:
membership_changed = True
elif delta.event_type == EventTypes.Name and delta.state_key == "":
name_changed = True
elif (
delta.event_type == EventTypes.RoomAvatar and delta.state_key == ""
):
avatar_changed = True
room_membership_summary: Optional[Mapping[str, MemberSummary]] = None
empty_membership_summary = MemberSummary([], 0)
# We need the room summary for:
# - Always for initial syncs (or the first time we send down the room)
# - When the room has no name, we need `heroes`
# - When the membership has changed so we need to give updated `heroes` and
# `joined_count`/`invited_count`.
#
# Ideally, instead of just looking at `name_changed`, we'd check if the room
# name is not set but this is a good enough approximation that saves us from
# having to pull out the full event. This just means, we're generating the
# summary whenever the room name changes instead of only when it changes to
# `None`.
if initial or name_changed or membership_changed:
# We can't trace the function directly because it's cached and the `@cached`
# decorator doesn't mix with `@trace` yet.
with start_active_span("get_room_summary"):
if room_membership_for_user_at_to_token.membership in (
Membership.LEAVE,
Membership.BAN,
@ -790,7 +823,12 @@ class SlidingSyncHandler:
# TODO: Should we also check for `EventTypes.CanonicalAlias`
# (`m.room.canonical_alias`) as a fallback for the room name? see
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1671260153
if name_event_id is None:
#
# We need to fetch the `heroes` if the room name is not set. But we only need to
# get them on initial syncs (or the first time we send down the room) or if the
# membership has changed which may change the heroes.
if name_event_id is None and (initial or (not initial and membership_changed)):
assert room_membership_summary is not None
hero_user_ids = extract_heroes_from_room_summary(
room_membership_summary, me=user.to_string()
)
@ -852,13 +890,13 @@ class SlidingSyncHandler:
required_state_filter = StateFilter.all()
else:
required_state_types: List[Tuple[str, Optional[str]]] = []
num_wild_state_keys = 0
lazy_load_room_members = False
num_others = 0
for (
state_type,
state_key_set,
) in room_sync_config.required_state_map.items():
num_wild_state_keys = 0
lazy_load_room_members = False
num_others = 0
for state_key in state_key_set:
if state_key == StateValues.WILDCARD:
num_wild_state_keys += 1
@ -908,9 +946,15 @@ class SlidingSyncHandler:
# We need this base set of info for the response so let's just fetch it along
# with the `required_state` for the room
meta_room_state = [(EventTypes.Name, ""), (EventTypes.RoomAvatar, "")] + [
hero_room_state = [
(EventTypes.Member, hero_user_id) for hero_user_id in hero_user_ids
]
meta_room_state = list(hero_room_state)
if initial or name_changed:
meta_room_state.append((EventTypes.Name, ""))
if initial or avatar_changed:
meta_room_state.append((EventTypes.RoomAvatar, ""))
state_filter = StateFilter.all()
if required_state_filter != StateFilter.all():
state_filter = StateFilter(
@ -933,21 +977,22 @@ class SlidingSyncHandler:
else:
assert from_bound is not None
# TODO: Limit the number of state events we're about to send down
# the room, if its too many we should change this to an
# `initial=True`?
deltas = await self.store.get_current_state_deltas_for_room(
room_id=room_id,
from_token=from_bound,
to_token=to_token.room_key,
)
# TODO: Filter room state before fetching events
# TODO: Handle state resets where event_id is None
events = await self.store.get_events(
[d.event_id for d in deltas if d.event_id]
state_filter.filter_state(room_state_delta_id_map).values()
)
room_state = {(s.type, s.state_key): s for s in events.values()}
# If the membership changed and we have to get heroes, get the remaining
# heroes from the state
if hero_user_ids:
hero_membership_state = await self.get_current_state_at(
room_id=room_id,
room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
state_filter=StateFilter.from_types(hero_room_state),
to_token=to_token,
)
room_state.update(hero_membership_state)
required_room_state: StateMap[EventBase] = {}
if required_state_filter != StateFilter.none():
required_room_state = required_state_filter.filter_state(room_state)
@ -980,24 +1025,19 @@ class SlidingSyncHandler:
)
# Figure out the last bump event in the room
last_bump_event_result = (
await self.store.get_last_event_pos_in_room_before_stream_ordering(
room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
)
)
# By default, just choose the membership event position
#
# By default, just choose the membership event position for any non-join membership
bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
# But if we found a bump event, use that instead
if last_bump_event_result is not None:
_, new_bump_event_pos = last_bump_event_result
# If we've just joined a remote room, then the last bump event may
# have been backfilled (and so have a negative stream ordering).
# These negative stream orderings can't sensibly be compared, so
# instead we use the membership event position.
if new_bump_event_pos.stream > 0:
bump_stamp = new_bump_event_pos.stream
# If we're joined to the room, we need to find the last bump event before the
# `to_token`
if room_membership_for_user_at_to_token.membership == Membership.JOIN:
# Try and get a bump stamp, if not we just fall back to the
# membership token.
new_bump_stamp = await self._get_bump_stamp(
room_id, to_token, timeline_events
)
if new_bump_stamp is not None:
bump_stamp = new_bump_stamp
unstable_expanded_timeline = False
prev_room_sync_config = previous_connection_state.room_configs.get(room_id)
@ -1050,11 +1090,25 @@ class SlidingSyncHandler:
set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
joined_count: Optional[int] = None
if initial or membership_changed:
assert room_membership_summary is not None
joined_count = room_membership_summary.get(
Membership.JOIN, empty_membership_summary
).count
invited_count: Optional[int] = None
if initial or membership_changed:
assert room_membership_summary is not None
invited_count = room_membership_summary.get(
Membership.INVITE, empty_membership_summary
).count
return SlidingSyncResult.RoomResult(
name=room_name,
avatar=room_avatar,
heroes=heroes,
is_dm=room_membership_for_user_at_to_token.is_dm,
is_dm=is_dm,
initial=initial,
required_state=list(required_room_state.values()),
timeline_events=timeline_events,
@ -1065,15 +1119,100 @@ class SlidingSyncHandler:
unstable_expanded_timeline=unstable_expanded_timeline,
num_live=num_live,
bump_stamp=bump_stamp,
joined_count=room_membership_summary.get(
Membership.JOIN, empty_membership_summary
).count,
invited_count=room_membership_summary.get(
Membership.INVITE, empty_membership_summary
).count,
joined_count=joined_count,
invited_count=invited_count,
# TODO: These are just dummy values. We could potentially just remove these
# since notifications can only really be done correctly on the client anyway
# (encrypted rooms).
notification_count=0,
highlight_count=0,
)
@trace
async def _get_bump_stamp(
self, room_id: str, to_token: StreamToken, timeline: List[EventBase]
) -> Optional[int]:
"""Get a bump stamp for the room, if we have a bump event
Args:
room_id
to_token: The upper bound of token to return
timeline: The list of events we have fetched.
"""
# First check the timeline events we're returning to see if one of
# those matches. We iterate backwards and take the stream ordering
# of the first event that matches the bump event types.
for timeline_event in reversed(timeline):
if timeline_event.type in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES:
new_bump_stamp = timeline_event.internal_metadata.stream_ordering
# All persisted events have a stream ordering
assert new_bump_stamp is not None
# If we've just joined a remote room, then the last bump event may
# have been backfilled (and so have a negative stream ordering).
# These negative stream orderings can't sensibly be compared, so
# instead we use the membership event position.
if new_bump_stamp > 0:
return new_bump_stamp
# We can quickly query for the latest bump event in the room using the
# sliding sync tables.
latest_room_bump_stamp = await self.store.get_latest_bump_stamp_for_room(
room_id
)
min_to_token_position = to_token.room_key.stream
# If we can rely on the new sliding sync tables and the `bump_stamp` is
# `None`, just fallback to the membership event position. This can happen
# when we've just joined a remote room and all the events are backfilled.
if (
# FIXME: The background job check can be removed once we bump
# `SCHEMA_COMPAT_VERSION` and run the foreground update for
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots`
# (tracked by https://github.com/element-hq/synapse/issues/17623)
await self.store.have_finished_sliding_sync_background_jobs()
and latest_room_bump_stamp is None
):
return None
# The `bump_stamp` stored in the database might be ahead of our token. Since
# `bump_stamp` is only a `stream_ordering` position, we can't be 100% sure
# that's before the `to_token` in all scenarios. The only scenario we can be
# sure of is if the `bump_stamp` is totally before the minimum position from
# the token.
#
# We don't need to check if the background update has finished, as if the
# returned bump stamp is not None then it must be up to date.
elif (
latest_room_bump_stamp is not None
and latest_room_bump_stamp < min_to_token_position
):
if latest_room_bump_stamp > 0:
return latest_room_bump_stamp
else:
return None
# Otherwise, if it's within or after the `to_token`, we need to find the
# last bump event before the `to_token`.
else:
last_bump_event_result = (
await self.store.get_last_event_pos_in_room_before_stream_ordering(
room_id,
to_token.room_key,
event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES,
)
)
if last_bump_event_result is not None:
_, new_bump_event_pos = last_bump_event_result
# If we've just joined a remote room, then the last bump event may
# have been backfilled (and so have a negative stream ordering).
# These negative stream orderings can't sensibly be compared, so
# instead we use the membership event position.
if new_bump_event_pos.stream > 0:
return new_bump_event_pos.stream
return None

View File

@ -386,9 +386,9 @@ class SlidingSyncExtensionHandler:
if have_push_rules_changed:
global_account_data_map = dict(global_account_data_map)
# TODO: This should take into account the `from_token` and `to_token`
global_account_data_map[AccountDataTypes.PUSH_RULES] = (
await self.push_rules_handler.push_rules_for_user(sync_config.user)
)
global_account_data_map[
AccountDataTypes.PUSH_RULES
] = await self.push_rules_handler.push_rules_for_user(sync_config.user)
else:
# TODO: This should take into account the `to_token`
all_global_account_data = await self.store.get_global_account_data_for_user(
@ -397,9 +397,9 @@ class SlidingSyncExtensionHandler:
global_account_data_map = dict(all_global_account_data)
# TODO: This should take into account the `to_token`
global_account_data_map[AccountDataTypes.PUSH_RULES] = (
await self.push_rules_handler.push_rules_for_user(sync_config.user)
)
global_account_data_map[
AccountDataTypes.PUSH_RULES
] = await self.push_rules_handler.push_rules_for_user(sync_config.user)
# Fetch room account data
account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] = {}

File diff suppressed because it is too large Load Diff

View File

@ -183,10 +183,7 @@ class JoinedSyncResult:
to tell if room needs to be part of the sync result.
"""
return bool(
self.timeline
or self.state
or self.ephemeral
or self.account_data
self.timeline or self.state or self.ephemeral or self.account_data
# nb the notification count does not, er, count: if there's nothing
# else in the result, we don't need to send it.
)
@ -575,11 +572,11 @@ class SyncHandler:
if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result: Union[SyncResult, E2eeSyncResult] = (
await self.current_sync_for_user(
result: Union[
SyncResult, E2eeSyncResult
] = await self.current_sync_for_user(
sync_config, sync_version, since_token, full_state=full_state
)
)
else:
# Otherwise, we wait for something to happen and report it to the user.
async def current_sync_callback(
@ -673,11 +670,11 @@ class SyncHandler:
# Go through the `/sync` v2 path
if sync_version == SyncVersion.SYNC_V2:
sync_result: Union[SyncResult, E2eeSyncResult] = (
await self.generate_sync_result(
sync_result: Union[
SyncResult, E2eeSyncResult
] = await self.generate_sync_result(
sync_config, since_token, full_state
)
)
# Go through the MSC3575 Sliding Sync `/sync/e2ee` path
elif sync_version == SyncVersion.E2EE_SYNC:
sync_result = await self.generate_e2ee_sync_result(
@ -909,7 +906,7 @@ class SyncHandler:
# Use `stream_ordering` for updates
else paginate_room_events_by_stream_ordering
)
events, end_key = await pagination_method(
events, end_key, limited = await pagination_method(
room_id=room_id,
# The bounds are reversed so we can paginate backwards
# (from newer to older events) starting at to_bound.
@ -917,9 +914,7 @@ class SyncHandler:
from_key=end_key,
to_key=since_key,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
limit=load_limit + 1,
limit=load_limit,
)
# We want to return the events in ascending order (the last event is the
# most recent).
@ -974,9 +969,6 @@ class SyncHandler:
loaded_recents.extend(recents)
recents = loaded_recents
if len(events) <= load_limit:
limited = False
break
max_repeat -= 1
if len(recents) > timeline_limit:
@ -1487,14 +1479,17 @@ class SyncHandler:
# timeline here. The caller will then dedupe any redundant
# ones.
state_ids = await self._state_storage_controller.get_state_ids_for_event(
state_ids = (
await self._state_storage_controller.get_state_ids_for_event(
batch.events[0].event_id,
# we only want members!
state_filter=StateFilter.from_types(
(EventTypes.Member, member) for member in members_to_fetch
(EventTypes.Member, member)
for member in members_to_fetch
),
await_full_state=False,
)
)
return state_ids
if batch:
@ -2165,18 +2160,18 @@ class SyncHandler:
if push_rules_changed:
global_account_data = dict(global_account_data)
global_account_data[AccountDataTypes.PUSH_RULES] = (
await self._push_rules_handler.push_rules_for_user(sync_config.user)
)
global_account_data[
AccountDataTypes.PUSH_RULES
] = await self._push_rules_handler.push_rules_for_user(sync_config.user)
else:
all_global_account_data = await self.store.get_global_account_data_for_user(
user_id
)
global_account_data = dict(all_global_account_data)
global_account_data[AccountDataTypes.PUSH_RULES] = (
await self._push_rules_handler.push_rules_for_user(sync_config.user)
)
global_account_data[
AccountDataTypes.PUSH_RULES
] = await self._push_rules_handler.push_rules_for_user(sync_config.user)
account_data_for_user = (
await sync_config.filter_collection.filter_global_account_data(
@ -2607,7 +2602,7 @@ class SyncHandler:
newly_joined = room_id in newly_joined_rooms
if room_entry:
events, start_key = room_entry
events, start_key, _ = room_entry
# We want to return the events in ascending order (the last event is the
# most recent).
events.reverse()

View File

@ -183,7 +183,7 @@ class WorkerLocksHandler:
return
def _wake_all_locks(
locks: Collection[Union[WaitingLock, WaitingMultiLock]]
locks: Collection[Union[WaitingLock, WaitingMultiLock]],
) -> None:
for lock in locks:
deferred = lock.deferred

View File

@ -1313,6 +1313,5 @@ def is_unknown_endpoint(
)
) or (
# Older Synapses returned a 400 error.
e.code == 400
and synapse_error.errcode == Codes.UNRECOGNIZED
e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED
)

View File

@ -233,7 +233,7 @@ def return_html_error(
def wrap_async_request_handler(
h: Callable[["_AsyncResource", "SynapseRequest"], Awaitable[None]]
h: Callable[["_AsyncResource", "SynapseRequest"], Awaitable[None]],
) -> Callable[["_AsyncResource", "SynapseRequest"], "defer.Deferred[None]"]:
"""Wraps an async request handler so that it calls request.processing.

View File

@ -22,6 +22,7 @@
"""
Log formatters that output terse JSON.
"""
import json
import logging

View File

@ -29,6 +29,7 @@ them.
See doc/log_contexts.rst for details on how this works.
"""
import logging
import threading
import typing
@ -751,7 +752,7 @@ def preserve_fn(
f: Union[
Callable[P, R],
Callable[P, Awaitable[R]],
]
],
) -> Callable[P, "defer.Deferred[R]"]:
"""Function decorator which wraps the function with run_in_background"""

View File

@ -169,6 +169,7 @@ Gotchas
than one caller? Will all of those calling functions have be in a context
with an active span?
"""
import contextlib
import enum
import inspect
@ -414,7 +415,7 @@ def ensure_active_span(
"""
def ensure_active_span_inner_1(
func: Callable[P, R]
func: Callable[P, R],
) -> Callable[P, Union[Optional[T], R]]:
@wraps(func)
def ensure_active_span_inner_2(
@ -700,7 +701,7 @@ def set_operation_name(operation_name: str) -> None:
@only_if_tracing
def force_tracing(
span: Union["opentracing.Span", _Sentinel] = _Sentinel.sentinel
span: Union["opentracing.Span", _Sentinel] = _Sentinel.sentinel,
) -> None:
"""Force sampling for the active/given span and its children.
@ -1093,9 +1094,10 @@ def trace_servlet(
# Mypy seems to think that start_context.tag below can be Optional[str], but
# that doesn't appear to be correct and works in practice.
request_tags[
SynapseTags.REQUEST_TAG
] = request.request_metrics.start_context.tag # type: ignore[assignment]
request_tags[SynapseTags.REQUEST_TAG] = (
request.request_metrics.start_context.tag # type: ignore[assignment]
)
# set the tags *after* the servlet completes, in case it decided to
# prioritise the span (tags will get dropped on unprioritised spans)

View File

@ -293,7 +293,7 @@ def wrap_as_background_process(
"""
def wrap_as_background_process_inner(
func: Callable[P, Awaitable[Optional[R]]]
func: Callable[P, Awaitable[Optional[R]]],
) -> Callable[P, "defer.Deferred[Optional[R]]"]:
@wraps(func)
def wrap_as_background_process_inner_2(

View File

@ -304,9 +304,9 @@ class BulkPushRuleEvaluator:
if relation_type == "m.thread" and event.content.get(
"m.relates_to", {}
).get("is_falling_back", False):
related_events["m.in_reply_to"][
"im.vector.is_falling_back"
] = ""
related_events["m.in_reply_to"]["im.vector.is_falling_back"] = (
""
)
return related_events
@ -372,7 +372,8 @@ class BulkPushRuleEvaluator:
gather_results(
(
run_in_background( # type: ignore[call-arg]
self.store.get_number_joined_users_in_room, event.room_id # type: ignore[arg-type]
self.store.get_number_joined_users_in_room,
event.room_id, # type: ignore[arg-type]
),
run_in_background(
self._get_power_levels_and_sender_level,

View File

@ -119,7 +119,9 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
return payload
async def _handle_request(self, request: Request, content: JsonDict) -> Tuple[int, JsonDict]: # type: ignore[override]
async def _handle_request( # type: ignore[override]
self, request: Request, content: JsonDict
) -> Tuple[int, JsonDict]:
with Measure(self.clock, "repl_fed_send_events_parse"):
room_id = content["room_id"]
backfilled = content["backfilled"]

View File

@ -98,7 +98,9 @@ class ReplicationCopyPusherRestServlet(ReplicationEndpoint):
self._store = hs.get_datastores().main
@staticmethod
async def _serialize_payload(user_id: str, old_room_id: str, new_room_id: str) -> JsonDict: # type: ignore[override]
async def _serialize_payload( # type: ignore[override]
user_id: str, old_room_id: str, new_room_id: str
) -> JsonDict:
return {}
async def _handle_request( # type: ignore[override]
@ -109,7 +111,6 @@ class ReplicationCopyPusherRestServlet(ReplicationEndpoint):
old_room_id: str,
new_room_id: str,
) -> Tuple[int, JsonDict]:
await self._store.copy_push_rules_from_room_to_room_for_user(
old_room_id, new_room_id, user_id
)

View File

@ -18,8 +18,8 @@
# [This file includes modifications made by New Vector Limited]
#
#
"""A replication client for use by synapse workers.
"""
"""A replication client for use by synapse workers."""
import logging
from typing import TYPE_CHECKING, Dict, Iterable, Optional, Set, Tuple

View File

@ -23,6 +23,7 @@
The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are
allowed to be sent by which side.
"""
import abc
import logging
from typing import List, Optional, Tuple, Type, TypeVar

View File

@ -857,7 +857,7 @@ UpdateRow = TypeVar("UpdateRow")
def _batch_updates(
updates: Iterable[Tuple[UpdateToken, UpdateRow]]
updates: Iterable[Tuple[UpdateToken, UpdateRow]],
) -> Iterator[Tuple[UpdateToken, List[UpdateRow]]]:
"""Collect stream updates with the same token together

View File

@ -23,6 +23,7 @@ protocols.
An explanation of this protocol is available in docs/tcp_replication.md
"""
import fcntl
import logging
import struct

View File

@ -18,8 +18,7 @@
# [This file includes modifications made by New Vector Limited]
#
#
"""The server side of the replication stream.
"""
"""The server side of the replication stream."""
import logging
import random
@ -307,7 +306,7 @@ class ReplicationStreamer:
def _batch_updates(
updates: List[Tuple[Token, StreamRow]]
updates: List[Tuple[Token, StreamRow]],
) -> List[Tuple[Optional[Token], StreamRow]]:
"""Takes a list of updates of form [(token, row)] and sets the token to
None for all rows where the next row has the same token. This is used to

View File

@ -247,7 +247,7 @@ class _StreamFromIdGen(Stream):
def current_token_without_instance(
current_token: Callable[[], int]
current_token: Callable[[], int],
) -> Callable[[str], int]:
"""Takes a current token callback function for a single writer stream
that doesn't take an instance name parameter and wraps it in a function that

View File

@ -181,8 +181,7 @@ class NewRegistrationTokenRestServlet(RestServlet):
uses_allowed = body.get("uses_allowed", None)
if not (
uses_allowed is None
or (type(uses_allowed) is int and uses_allowed >= 0) # noqa: E721
uses_allowed is None or (type(uses_allowed) is int and uses_allowed >= 0) # noqa: E721
):
raise SynapseError(
HTTPStatus.BAD_REQUEST,

View File

@ -19,8 +19,8 @@
#
#
"""This module contains base REST classes for constructing client v1 servlets.
"""
"""This module contains base REST classes for constructing client v1 servlets."""
import logging
import re
from typing import Any, Awaitable, Callable, Iterable, Pattern, Tuple, TypeVar, cast

View File

@ -108,9 +108,9 @@ class AccountDataServlet(RestServlet):
# Push rules are stored in a separate table and must be queried separately.
if account_data_type == AccountDataTypes.PUSH_RULES:
account_data: Optional[JsonMapping] = (
await self._push_rules_handler.push_rules_for_user(requester.user)
)
account_data: Optional[
JsonMapping
] = await self._push_rules_handler.push_rules_for_user(requester.user)
else:
account_data = await self.store.get_global_account_data_by_type_for_user(
user_id, account_data_type

View File

@ -48,9 +48,7 @@ class AccountValidityRenewServlet(RestServlet):
self.account_renewed_template = (
hs.config.account_validity.account_validity_account_renewed_template
)
self.account_previously_renewed_template = (
hs.config.account_validity.account_validity_account_previously_renewed_template
)
self.account_previously_renewed_template = hs.config.account_validity.account_validity_account_previously_renewed_template
self.invalid_token_template = (
hs.config.account_validity.account_validity_invalid_token_template
)

View File

@ -20,6 +20,7 @@
#
"""This module contains REST servlets to do with event streaming, /events."""
import logging
from typing import TYPE_CHECKING, Dict, List, Tuple, Union

View File

@ -53,7 +53,6 @@ class KnockRoomAliasServlet(RestServlet):
super().__init__()
self.room_member_handler = hs.get_room_member_handler()
self.auth = hs.get_auth()
self._support_via = hs.config.experimental.msc4156_enabled
async def on_POST(
self,
@ -72,16 +71,12 @@ class KnockRoomAliasServlet(RestServlet):
# twisted.web.server.Request.args is incorrectly defined as Optional[Any]
args: Dict[bytes, List[bytes]] = request.args # type: ignore
# Prefer via over server_name (deprecated with MSC4156)
remote_room_hosts = parse_strings_from_args(args, "via", required=False)
if remote_room_hosts is None:
remote_room_hosts = parse_strings_from_args(
args, "server_name", required=False
)
if self._support_via:
remote_room_hosts = parse_strings_from_args(
args,
"org.matrix.msc4156.via",
default=remote_room_hosts,
required=False,
)
elif RoomAlias.is_valid(room_identifier):
handler = self.room_member_handler
room_alias = RoomAlias.from_string(room_identifier)

View File

@ -19,8 +19,8 @@
#
#
""" This module contains REST servlets to do with presence: /presence/<paths>
"""
"""This module contains REST servlets to do with presence: /presence/<paths>"""
import logging
from typing import TYPE_CHECKING, Tuple

View File

@ -640,14 +640,12 @@ class RegisterRestServlet(RestServlet):
if not password_hash:
raise SynapseError(400, "Missing params: password", Codes.MISSING_PARAM)
desired_username = (
await (
desired_username = await (
self.password_auth_provider.get_username_for_registration(
auth_result,
params,
)
)
)
if desired_username is None:
desired_username = params.get("username", None)
@ -696,13 +694,11 @@ class RegisterRestServlet(RestServlet):
session_id
)
display_name = (
await (
display_name = await (
self.password_auth_provider.get_displayname_for_registration(
auth_result, params
)
)
)
registered_user_id = await self.registration_handler.register_user(
localpart=desired_username,

View File

@ -20,6 +20,7 @@
#
"""This module contains REST servlets to do with rooms: /rooms/<paths>"""
import logging
import re
from enum import Enum
@ -422,7 +423,6 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, TransactionRestServlet):
super().__init__(hs)
super(ResolveRoomIdMixin, self).__init__(hs) # ensure the Mixin is set up
self.auth = hs.get_auth()
self._support_via = hs.config.experimental.msc4156_enabled
def register(self, http_server: HttpServer) -> None:
# /join/$room_identifier[/$txn_id]
@ -440,13 +440,11 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, TransactionRestServlet):
# twisted.web.server.Request.args is incorrectly defined as Optional[Any]
args: Dict[bytes, List[bytes]] = request.args # type: ignore
remote_room_hosts = parse_strings_from_args(args, "server_name", required=False)
if self._support_via:
# Prefer via over server_name (deprecated with MSC4156)
remote_room_hosts = parse_strings_from_args(args, "via", required=False)
if remote_room_hosts is None:
remote_room_hosts = parse_strings_from_args(
args,
"org.matrix.msc4156.via",
default=remote_room_hosts,
required=False,
args, "server_name", required=False
)
room_id, remote_room_hosts = await self.resolve_room_id(
room_identifier,

View File

@ -1011,12 +1011,16 @@ class SlidingSyncRestServlet(RestServlet):
for room_id, room_result in rooms.items():
serialized_rooms[room_id] = {
"bump_stamp": room_result.bump_stamp,
"joined_count": room_result.joined_count,
"invited_count": room_result.invited_count,
"notification_count": room_result.notification_count,
"highlight_count": room_result.highlight_count,
}
if room_result.joined_count is not None:
serialized_rooms[room_id]["joined_count"] = room_result.joined_count
if room_result.invited_count is not None:
serialized_rooms[room_id]["invited_count"] = room_result.invited_count
if room_result.name:
serialized_rooms[room_id]["name"] = room_result.name
@ -1045,9 +1049,9 @@ class SlidingSyncRestServlet(RestServlet):
serialized_rooms[room_id]["initial"] = room_result.initial
if room_result.unstable_expanded_timeline:
serialized_rooms[room_id][
"unstable_expanded_timeline"
] = room_result.unstable_expanded_timeline
serialized_rooms[room_id]["unstable_expanded_timeline"] = (
room_result.unstable_expanded_timeline
)
# This will be omitted for invite/knock rooms with `stripped_state`
if (
@ -1082,9 +1086,9 @@ class SlidingSyncRestServlet(RestServlet):
# This will be omitted for invite/knock rooms with `stripped_state`
if room_result.prev_batch is not None:
serialized_rooms[room_id]["prev_batch"] = (
await room_result.prev_batch.to_string(self.store)
)
serialized_rooms[room_id][
"prev_batch"
] = await room_result.prev_batch.to_string(self.store)
# This will be omitted for invite/knock rooms with `stripped_state`
if room_result.num_live is not None:

View File

@ -21,6 +21,7 @@
"""This module contains logic for storing HTTP PUT transactions. This is used
to ensure idempotency when performing PUTs using the REST API."""
import logging
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, Hashable, Tuple

View File

@ -191,11 +191,11 @@ class RemoteKey(RestServlet):
server_keys: Dict[Tuple[str, str], Optional[FetchKeyResultForRemote]] = {}
for server_name, key_ids in query.items():
if key_ids:
results: Mapping[str, Optional[FetchKeyResultForRemote]] = (
await self.store.get_server_keys_json_for_remote(
results: Mapping[
str, Optional[FetchKeyResultForRemote]
] = await self.store.get_server_keys_json_for_remote(
server_name, key_ids
)
)
else:
results = await self.store.get_all_server_keys_json_for_remote(
server_name

View File

@ -65,9 +65,9 @@ class WellKnownBuilder:
}
account_management_url = await auth.account_management_url()
if account_management_url is not None:
result["org.matrix.msc2965.authentication"][
"account"
] = account_management_url
result["org.matrix.msc2965.authentication"]["account"] = (
account_management_url
)
if self._config.server.extra_well_known_client_content:
for (

View File

@ -119,7 +119,9 @@ class ResourceLimitsServerNotices:
elif not currently_blocked and limit_msg:
# Room is not notifying of a block, when it ought to be.
await self._apply_limit_block_notification(
user_id, limit_msg, limit_type # type: ignore
user_id,
limit_msg,
limit_type, # type: ignore
)
except SynapseError as e:
logger.error("Error sending resource limits server notice: %s", e)

View File

@ -126,6 +126,9 @@ class SQLBaseStore(metaclass=ABCMeta):
self._attempt_to_invalidate_cache(
"_get_rooms_for_local_user_where_membership_is_inner", (user_id,)
)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user", (user_id,)
)
# Purge other caches based on room state.
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
@ -160,6 +163,7 @@ class SQLBaseStore(metaclass=ABCMeta):
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
self._attempt_to_invalidate_cache("get_room_type", (room_id,))
self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
def _attempt_to_invalidate_cache(
self, cache_name: str, key: Optional[Collection[Any]]

View File

@ -44,7 +44,7 @@ from synapse._pydantic_compat import HAS_PYDANTIC_V2
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Connection, Cursor
from synapse.types import JsonDict
from synapse.types import JsonDict, StrCollection
from synapse.util import Clock, json_encoder
from . import engines
@ -487,6 +487,25 @@ class BackgroundUpdater:
return not update_exists
async def have_completed_background_updates(
self, update_names: StrCollection
) -> bool:
"""Return the name of background updates that have not yet been
completed"""
if self._all_done:
return True
rows = await self.db_pool.simple_select_many_batch(
table="background_updates",
column="update_name",
iterable=update_names,
retcols=("update_name",),
desc="get_uncompleted_background_updates",
)
# If we find any rows then we've not completed the update.
return not bool(rows)
async def do_next_background_update(self, sleep: bool = True) -> bool:
"""Does some amount of work on the next queued background update

View File

@ -416,7 +416,7 @@ class EventsPersistenceStorageController:
set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
async def enqueue(
item: Tuple[str, List[Tuple[EventBase, EventContext]]]
item: Tuple[str, List[Tuple[EventBase, EventContext]]],
) -> Dict[str, str]:
room_id, evs_ctxs = item
return await self._event_persist_queue.add_to_queue(
@ -502,8 +502,15 @@ class EventsPersistenceStorageController:
"""
state = await self._calculate_current_state(room_id)
delta = await self._calculate_state_delta(room_id, state)
sliding_sync_table_changes = (
await self.persist_events_store._calculate_sliding_sync_table_changes(
room_id, [], delta
)
)
await self.persist_events_store.update_current_state(room_id, delta)
await self.persist_events_store.update_current_state(
room_id, delta, sliding_sync_table_changes
)
async def _calculate_current_state(self, room_id: str) -> StateMap[str]:
"""Calculate the current state of a room, based on the forward extremities
@ -785,9 +792,9 @@ class EventsPersistenceStorageController:
)
# Remove any events which are prev_events of any existing events.
existing_prevs: Collection[str] = (
await self.persist_events_store._get_events_which_are_prevs(result)
)
existing_prevs: Collection[
str
] = await self.persist_events_store._get_events_which_are_prevs(result)
result.difference_update(existing_prevs)
# Finally handle the case where the new events have soft-failed prev

View File

@ -35,6 +35,7 @@ from typing import (
Iterable,
Iterator,
List,
Mapping,
Optional,
Sequence,
Tuple,
@ -1297,9 +1298,9 @@ class DatabasePool:
self,
txn: LoggingTransaction,
table: str,
keyvalues: Dict[str, Any],
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
keyvalues: Mapping[str, Any],
values: Mapping[str, Any],
insertion_values: Optional[Mapping[str, Any]] = None,
where_clause: Optional[str] = None,
) -> bool:
"""
@ -1342,9 +1343,9 @@ class DatabasePool:
self,
txn: LoggingTransaction,
table: str,
keyvalues: Dict[str, Any],
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
keyvalues: Mapping[str, Any],
values: Mapping[str, Any],
insertion_values: Optional[Mapping[str, Any]] = None,
where_clause: Optional[str] = None,
lock: bool = True,
) -> bool:
@ -1365,7 +1366,7 @@ class DatabasePool:
if lock:
# We need to lock the table :(
self.engine.lock_table(txn, table)
txn.database_engine.lock_table(txn, table)
def _getwhere(key: str) -> str:
# If the value we're passing in is None (aka NULL), we need to use
@ -1419,13 +1420,13 @@ class DatabasePool:
# successfully inserted
return True
@staticmethod
def simple_upsert_txn_native_upsert(
self,
txn: LoggingTransaction,
table: str,
keyvalues: Dict[str, Any],
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
keyvalues: Mapping[str, Any],
values: Mapping[str, Any],
insertion_values: Optional[Mapping[str, Any]] = None,
where_clause: Optional[str] = None,
) -> bool:
"""
@ -1578,8 +1579,8 @@ class DatabasePool:
self.simple_upsert_txn_emulated(txn, table, _keys, _vals, lock=False)
@staticmethod
def simple_upsert_many_txn_native_upsert(
self,
txn: LoggingTransaction,
table: str,
key_names: Collection[str],
@ -2009,8 +2010,8 @@ class DatabasePool:
def simple_update_txn(
txn: LoggingTransaction,
table: str,
keyvalues: Dict[str, Any],
updatevalues: Dict[str, Any],
keyvalues: Mapping[str, Any],
updatevalues: Mapping[str, Any],
) -> int:
"""
Update rows in the given database table.

View File

@ -346,6 +346,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache(
"_get_rooms_for_local_user_where_membership_is_inner", (state_key,)
)
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user", (state_key,)
)
self._attempt_to_invalidate_cache(
"did_forget",
@ -417,6 +420,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache(
"_get_rooms_for_local_user_where_membership_is_inner", None
)
self._attempt_to_invalidate_cache("get_sliding_sync_rooms_for_user", None)
self._attempt_to_invalidate_cache("did_forget", None)
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_references_for_event", None)

View File

@ -238,9 +238,7 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore):
INNER JOIN user_ips USING (user_id, access_token, ip)
GROUP BY user_id, access_token, ip
HAVING count(*) > 1
""".format(
clause
),
""".format(clause),
args,
)
res = cast(
@ -373,9 +371,7 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore):
LIMIT ?
) c
INNER JOIN user_ips AS u USING (user_id, device_id, last_seen)
""" % {
"where_clause": where_clause
}
""" % {"where_clause": where_clause}
txn.execute(sql, where_args + [batch_size])
rows = cast(List[Tuple[int, str, str, str, str]], txn.fetchall())

View File

@ -1116,7 +1116,7 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
txn.execute(sql, (start, stop))
destinations = {d for d, in txn}
destinations = {d for (d,) in txn}
to_remove = set()
for d in destinations:
try:

View File

@ -670,9 +670,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
result["keys"] = keys
device_display_name = None
if (
self.hs.config.federation.allow_device_name_lookup_over_federation
):
if self.hs.config.federation.allow_device_name_lookup_over_federation:
device_display_name = device.display_name
if device_display_name:
result["device_display_name"] = device_display_name
@ -917,7 +915,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
from_key,
to_key,
)
return {u for u, in rows}
return {u for (u,) in rows}
@cancellable
async def get_users_whose_devices_changed(
@ -968,7 +966,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
txn.database_engine, "user_id", chunk
)
txn.execute(sql % (clause,), [from_key, to_key] + args)
changes.update(user_id for user_id, in txn)
changes.update(user_id for (user_id,) in txn)
return changes
@ -1520,7 +1518,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
args: List[Any],
) -> Set[str]:
txn.execute(sql.format(clause=clause), args)
return {user_id for user_id, in txn}
return {user_id for (user_id,) in txn}
changes = set()
for chunk in batch_iter(changed_room_ids, 1000):
@ -1560,7 +1558,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
txn: LoggingTransaction,
) -> Set[str]:
txn.execute(sql, (from_id, to_id))
return {room_id for room_id, in txn}
return {room_id for (room_id,) in txn}
return await self.db_pool.runInteraction(
"get_all_device_list_changes",

View File

@ -387,9 +387,7 @@ class EndToEndRoomKeyStore(EndToEndRoomKeyBackgroundStore):
is_verified, session_data
FROM e2e_room_keys
WHERE user_id = ? AND version = ? AND (%s)
""" % (
" OR ".join(where_clauses)
)
""" % (" OR ".join(where_clauses))
txn.execute(sql, params)

View File

@ -472,9 +472,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
signature_sql = """
SELECT user_id, key_id, target_device_id, signature
FROM e2e_cross_signing_signatures WHERE %s
""" % (
" OR ".join("(" + q + ")" for q in signature_query_clauses)
)
""" % (" OR ".join("(" + q + ")" for q in signature_query_clauses))
txn.execute(signature_sql, signature_query_params)
return cast(
@ -917,9 +915,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
FROM e2e_cross_signing_keys
WHERE %(clause)s
ORDER BY user_id, keytype, stream_id DESC
""" % {
"clause": clause
}
""" % {"clause": clause}
else:
# SQLite has special handling for bare columns when using
# MIN/MAX with a `GROUP BY` clause where it picks the value from
@ -929,9 +925,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
FROM e2e_cross_signing_keys
WHERE %(clause)s
GROUP BY user_id, keytype
""" % {
"clause": clause
}
""" % {"clause": clause}
txn.execute(sql, params)

View File

@ -326,7 +326,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
"""
rows = txn.execute_values(sql, chains.items())
results.update(r for r, in rows)
results.update(r for (r,) in rows)
else:
# For SQLite we just fall back to doing a noddy for loop.
sql = """
@ -335,7 +335,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
"""
for chain_id, max_no in chains.items():
txn.execute(sql, (chain_id, max_no))
results.update(r for r, in txn)
results.update(r for (r,) in txn)
return results
@ -645,7 +645,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
]
rows = txn.execute_values(sql, args)
result.update(r for r, in rows)
result.update(r for (r,) in rows)
else:
# For SQLite we just fall back to doing a noddy for loop.
sql = """
@ -654,7 +654,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
"""
for chain_id, (min_no, max_no) in chain_to_gap.items():
txn.execute(sql, (chain_id, min_no, max_no))
result.update(r for r, in txn)
result.update(r for (r,) in txn)
return result
@ -1220,13 +1220,11 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
HAVING count(*) > ?
ORDER BY count(*) DESC
LIMIT ?
""" % (
where_clause,
)
""" % (where_clause,)
query_args = list(itertools.chain(room_id_filter, [min_count, limit]))
txn.execute(sql, query_args)
return [room_id for room_id, in txn]
return [room_id for (room_id,) in txn]
return await self.db_pool.runInteraction(
"get_rooms_with_many_extremities", _get_rooms_with_many_extremities_txn
@ -1358,7 +1356,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
def get_forward_extremeties_for_room_txn(txn: LoggingTransaction) -> List[str]:
txn.execute(sql, (stream_ordering, room_id))
return [event_id for event_id, in txn]
return [event_id for (event_id,) in txn]
event_ids = await self.db_pool.runInteraction(
"get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn

View File

@ -1860,9 +1860,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
AND epa.notif = 1
ORDER BY epa.stream_ordering DESC
LIMIT ?
""" % (
before_clause,
)
""" % (before_clause,)
txn.execute(sql, args)
return cast(
List[Tuple[str, str, int, int, str, bool, str, int]], txn.fetchall()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -98,6 +98,26 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
class DatabaseCorruptionError(RuntimeError):
"""We found an event in the DB that has a persisted event ID that doesn't
match its computed event ID."""
def __init__(
self, room_id: str, persisted_event_id: str, computed_event_id: str
) -> None:
self.room_id = room_id
self.persisted_event_id = persisted_event_id
self.computed_event_id = computed_event_id
message = (
f"Database corruption: Event {persisted_event_id} in room {room_id} "
f"from the database appears to have been modified (calculated "
f"event id {computed_event_id})"
)
super().__init__(message)
# These values are used in the `enqueue_event` and `_fetch_loop` methods to
# control how we batch/bulk fetch events from the database.
# The values are plucked out of thing air to make initial sync run faster
@ -457,6 +477,8 @@ class EventsWorkerStore(SQLBaseStore):
) -> Optional[EventBase]:
"""Get an event from the database by event_id.
Events for unknown room versions will also be filtered out.
Args:
event_id: The event_id of the event to fetch
@ -511,6 +533,10 @@ class EventsWorkerStore(SQLBaseStore):
) -> Dict[str, EventBase]:
"""Get events from the database
Unknown events will be omitted from the response.
Events for unknown room versions will also be filtered out.
Args:
event_ids: The event_ids of the events to fetch
@ -553,6 +579,8 @@ class EventsWorkerStore(SQLBaseStore):
Unknown events will be omitted from the response.
Events for unknown room versions will also be filtered out.
Args:
event_ids: The event_ids of the events to fetch
@ -1356,10 +1384,8 @@ class EventsWorkerStore(SQLBaseStore):
if original_ev.event_id != event_id:
# it's difficult to see what to do here. Pretty much all bets are off
# if Synapse cannot rely on the consistency of its database.
raise RuntimeError(
f"Database corruption: Event {event_id} in room {d['room_id']} "
f"from the database appears to have been modified (calculated "
f"event id {original_ev.event_id})"
raise DatabaseCorruptionError(
d["room_id"], event_id, original_ev.event_id
)
event_map[event_id] = original_ev
@ -1639,7 +1665,7 @@ class EventsWorkerStore(SQLBaseStore):
txn.database_engine, "e.event_id", event_ids
)
txn.execute(sql + clause, args)
found_events = {eid for eid, in txn}
found_events = {eid for (eid,) in txn}
# ... and then we can update the results for each key
return {eid: (eid in found_events) for eid in event_ids}
@ -1838,9 +1864,9 @@ class EventsWorkerStore(SQLBaseStore):
" LIMIT ?"
)
txn.execute(sql, (-last_id, -current_id, instance_name, limit))
new_event_updates: List[Tuple[int, Tuple[str, str, str, str, str, str]]] = (
[]
)
new_event_updates: List[
Tuple[int, Tuple[str, str, str, str, str, str]]
] = []
row: Tuple[int, str, str, str, str, str, str]
# Type safety: iterating over `txn` yields `Tuple`, i.e.
# `Tuple[Any, ...]` of arbitrary length. Mypy detects assigning a

View File

@ -201,7 +201,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
txn.execute_batch(
"INSERT INTO event_backward_extremities (room_id, event_id)"
" VALUES (?, ?)",
[(room_id, event_id) for event_id, in new_backwards_extrems],
[(room_id, event_id) for (event_id,) in new_backwards_extrems],
)
logger.info("[purge] finding state groups referenced by deleted events")
@ -215,7 +215,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
"""
)
referenced_state_groups = {sg for sg, in txn}
referenced_state_groups = {sg for (sg,) in txn}
logger.info(
"[purge] found %i referenced state groups", len(referenced_state_groups)
)
@ -454,6 +454,10 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# so must be deleted first.
"local_current_membership",
"room_memberships",
# Note: the sliding_sync_ tables have foreign keys to the `events` table
# so must be deleted first.
"sliding_sync_joined_rooms",
"sliding_sync_membership_snapshots",
"events",
"federation_inbound_events_staging",
"receipts_graph",

View File

@ -762,7 +762,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
txn.execute(sql, args)
return [room_id for room_id, in txn]
return [room_id for (room_id,) in txn]
results: List[str] = []
for batch in batch_iter(room_ids, 1000):
@ -1030,9 +1030,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
SELECT event_id WHERE room_id = ? AND stream_ordering IN (
SELECT max(stream_ordering) WHERE %s
)
""" % (
clause,
)
""" % (clause,)
txn.execute(sql, [room_id] + list(args))
rows = txn.fetchall()

View File

@ -1250,9 +1250,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
SELECT address, session_id, medium, client_secret,
last_send_attempt, validated_at
FROM threepid_validation_session WHERE %s
""" % (
" AND ".join("%s = ?" % k for k in keyvalues.keys()),
)
""" % (" AND ".join("%s = ?" % k for k in keyvalues.keys()),)
if validated is not None:
sql += " AND validated_at IS " + ("NOT NULL" if validated else "NULL")

View File

@ -1382,6 +1382,30 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
partial_state_rooms = {row[0] for row in rows}
return {room_id: room_id in partial_state_rooms for room_id in room_ids}
@cached(max_entries=10000, iterable=True)
async def get_partial_rooms(self) -> AbstractSet[str]:
"""Get any "partial-state" rooms which the user is in.
This is fast as the set of partially stated rooms at any point across
the whole server is small, and so such a query is fast. This is also
faster than looking up whether a set of room ID's are partially stated
via `is_partial_state_room_batched(...)` because of the sheer amount of
CPU time looking all the rooms up in the cache.
"""
def _get_partial_rooms_for_user_txn(
txn: LoggingTransaction,
) -> AbstractSet[str]:
sql = """
SELECT room_id FROM partial_state_rooms
"""
txn.execute(sql)
return {room_id for (room_id,) in txn}
return await self.db_pool.runInteraction(
"get_partial_rooms_for_user", _get_partial_rooms_for_user_txn
)
async def get_join_event_id_and_device_lists_stream_id_for_partial_state(
self, room_id: str
) -> Tuple[str, int]:
@ -1608,9 +1632,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
FROM event_reports AS er
JOIN room_stats_state ON room_stats_state.room_id = er.room_id
{}
""".format(
where_clause
)
""".format(where_clause)
txn.execute(sql, args)
count = cast(Tuple[int], txn.fetchone())[0]
@ -2343,6 +2365,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
self._invalidate_cache_and_stream(
txn, self._get_partial_state_servers_at_join, (room_id,)
)
self._invalidate_all_cache_and_stream(txn, self.get_partial_rooms)
async def write_partial_state_rooms_join_event_id(
self,
@ -2564,6 +2587,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
self._invalidate_cache_and_stream(
txn, self._get_partial_state_servers_at_join, (room_id,)
)
self._invalidate_all_cache_and_stream(txn, self.get_partial_rooms)
DatabasePool.simple_insert_txn(
txn,

View File

@ -19,6 +19,7 @@
#
#
import logging
from http import HTTPStatus
from typing import (
TYPE_CHECKING,
AbstractSet,
@ -39,6 +40,7 @@ from typing import (
import attr
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import Codes, SynapseError
from synapse.logging.opentracing import trace
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import wrap_as_background_process
@ -49,9 +51,15 @@ from synapse.storage.database import (
LoggingTransaction,
)
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.events_bg_updates import _BackgroundUpdates
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import Sqlite3Engine
from synapse.storage.roommember import MemberSummary, ProfileInfo, RoomsForUser
from synapse.storage.roommember import (
MemberSummary,
ProfileInfo,
RoomsForUser,
RoomsForUserSlidingSync,
)
from synapse.types import (
JsonDict,
PersistedEventPosition,
@ -225,9 +233,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
AND m.room_id = c.room_id
AND m.user_id = c.state_key
WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ? AND %s
""" % (
clause,
)
""" % (clause,)
txn.execute(sql, (room_id, Membership.JOIN, *ids))
return {r[0]: ProfileInfo(display_name=r[1], avatar_url=r[2]) for r in txn}
@ -524,9 +530,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
WHERE
user_id = ?
AND %s
""" % (
clause,
)
""" % (clause,)
txn.execute(sql, (user_id, *args))
results = [
@ -631,10 +635,8 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
"""
# Paranoia check.
if not self.hs.is_mine_id(user_id):
raise Exception(
"Cannot call 'get_local_current_membership_for_user_in_room' on "
"non-local user %s" % (user_id,),
)
message = f"Provided user_id {user_id} is a non-local user"
raise SynapseError(HTTPStatus.BAD_REQUEST, message, errcode=Codes.BAD_JSON)
results = cast(
Optional[Tuple[str, str]],
@ -808,7 +810,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
"""
txn.execute(sql, (user_id, *args))
return {u: True for u, in txn}
return {u: True for (u,) in txn}
to_return = {}
for batch_user_ids in batch_iter(other_user_ids, 1000):
@ -1026,7 +1028,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
AND room_id = ?
"""
txn.execute(sql, (room_id,))
return {d for d, in txn}
return {d for (d,) in txn}
return await self.db_pool.runInteraction(
"get_current_hosts_in_room", get_current_hosts_in_room_txn
@ -1094,7 +1096,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
"""
txn.execute(sql, (room_id,))
# `server_domain` will be `NULL` for malformed MXIDs with no colons.
return tuple(d for d, in txn if d is not None)
return tuple(d for (d,) in txn if d is not None)
return await self.db_pool.runInteraction(
"get_current_hosts_in_room_ordered", get_current_hosts_in_room_ordered_txn
@ -1311,9 +1313,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
room_id = ? AND membership = ?
AND NOT (%s)
LIMIT 1
""" % (
clause,
)
""" % (clause,)
def _is_local_host_in_room_ignoring_users_txn(
txn: LoggingTransaction,
@ -1337,6 +1337,12 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
keyvalues={"user_id": user_id, "room_id": room_id},
updatevalues={"forgotten": 1},
)
self.db_pool.simple_update_txn(
txn,
table="sliding_sync_membership_snapshots",
keyvalues={"user_id": user_id, "room_id": room_id},
updatevalues={"forgotten": 1},
)
self._invalidate_cache_and_stream(txn, self.did_forget, (user_id, room_id))
self._invalidate_cache_and_stream(
@ -1371,6 +1377,65 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
desc="room_forgetter_stream_pos",
)
@cached(iterable=True, max_entries=10000)
async def get_sliding_sync_rooms_for_user(
self,
user_id: str,
) -> Mapping[str, RoomsForUserSlidingSync]:
"""Get all the rooms for a user to handle a sliding sync request.
Ignores forgotten rooms and rooms that the user has been kicked from.
Returns:
Map from room ID to membership info
"""
def get_sliding_sync_rooms_for_user_txn(
txn: LoggingTransaction,
) -> Dict[str, RoomsForUserSlidingSync]:
sql = """
SELECT m.room_id, m.sender, m.membership, m.membership_event_id,
r.room_version,
m.event_instance_name, m.event_stream_ordering,
COALESCE(j.room_type, m.room_type),
COALESCE(j.is_encrypted, m.is_encrypted)
FROM sliding_sync_membership_snapshots AS m
INNER JOIN rooms AS r USING (room_id)
LEFT JOIN sliding_sync_joined_rooms AS j ON (j.room_id = m.room_id AND m.membership = 'join')
WHERE user_id = ?
AND m.forgotten = 0
"""
txn.execute(sql, (user_id,))
return {
row[0]: RoomsForUserSlidingSync(
room_id=row[0],
sender=row[1],
membership=row[2],
event_id=row[3],
room_version_id=row[4],
event_pos=PersistedEventPosition(row[5], row[6]),
room_type=row[7],
is_encrypted=row[8],
)
for row in txn
}
return await self.db_pool.runInteraction(
"get_sliding_sync_rooms_for_user",
get_sliding_sync_rooms_for_user_txn,
)
async def have_finished_sliding_sync_background_jobs(self) -> bool:
"""Return if it's safe to use the sliding sync membership tables."""
return await self.db_pool.updates.have_completed_background_updates(
(
_BackgroundUpdates.SLIDING_SYNC_PREFILL_JOINED_ROOMS_TO_RECALCULATE_TABLE_BG_UPDATE,
_BackgroundUpdates.SLIDING_SYNC_JOINED_ROOMS_BG_UPDATE,
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
)
)
class RoomMemberBackgroundUpdateStore(SQLBaseStore):
def __init__(
@ -1405,10 +1470,12 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore):
self, progress: JsonDict, batch_size: int
) -> int:
target_min_stream_id = progress.get(
"target_min_stream_id_inclusive", self._min_stream_order_on_start # type: ignore[attr-defined]
"target_min_stream_id_inclusive",
self._min_stream_order_on_start, # type: ignore[attr-defined]
)
max_stream_id = progress.get(
"max_stream_id_exclusive", self._stream_order_on_start + 1 # type: ignore[attr-defined]
"max_stream_id_exclusive",
self._stream_order_on_start + 1, # type: ignore[attr-defined]
)
def add_membership_profile_txn(txn: LoggingTransaction) -> int:

View File

@ -177,9 +177,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
AND (%s)
ORDER BY stream_ordering DESC
LIMIT ?
""" % (
" OR ".join("type = '%s'" % (t,) for t in TYPES),
)
""" % (" OR ".join("type = '%s'" % (t,) for t in TYPES),)
txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))

View File

@ -41,6 +41,46 @@ logger = logging.getLogger(__name__)
class SlidingSyncStore(SQLBaseStore):
async def get_latest_bump_stamp_for_room(
self,
room_id: str,
) -> Optional[int]:
"""
Get the `bump_stamp` for the room.
The `bump_stamp` is the `stream_ordering` of the last event according to the
`bump_event_types`. This helps clients sort more readily without them needing to
pull in a bunch of the timeline to determine the last activity.
`bump_event_types` is a thing because for example, we don't want display name
changes to mark the room as unread and bump it to the top. For encrypted rooms,
we just have to consider any activity as a bump because we can't see the content
and the client has to figure it out for themselves.
This should only be called where the server is participating
in the room (someone local is joined).
Returns:
The `bump_stamp` for the room (which can be `None`).
"""
return cast(
Optional[int],
await self.db_pool.simple_select_one_onecol(
table="sliding_sync_joined_rooms",
keyvalues={"room_id": room_id},
retcol="bump_stamp",
# FIXME: This should be `False` once we bump `SCHEMA_COMPAT_VERSION` and run the
# foreground update for
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked
# by https://github.com/element-hq/synapse/issues/17623)
#
# The should be `allow_none=False` in the future because event though
# `bump_stamp` itself can be `None`, we should have a row in the
# `sliding_sync_joined_rooms` table for any joined room.
allow_none=True,
),
)
async def persist_per_connection_state(
self,
user_id: str,

Some files were not shown because too many files have changed in this diff Show More