Synapse 1.33.0rc1 (2021-04-28)

==============================
 
 Features
 --------
 
 - Update experimental support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083): restricting room access via group membership. ([\#9800](https://github.com/matrix-org/synapse/issues/9800), [\#9814](https://github.com/matrix-org/synapse/issues/9814))
 - Add experimental support for handling presence on a worker. ([\#9819](https://github.com/matrix-org/synapse/issues/9819), [\#9820](https://github.com/matrix-org/synapse/issues/9820), [\#9828](https://github.com/matrix-org/synapse/issues/9828), [\#9850](https://github.com/matrix-org/synapse/issues/9850))
 - Return a new template when an user attempts to renew their account multiple times with the same token, stating that their account is set to expire. This replaces the invalid token template that would previously be shown in this case. This change concerns the optional account validity feature. ([\#9832](https://github.com/matrix-org/synapse/issues/9832))
 
 Bugfixes
 --------
 
 - Fixes the OIDC SSO flow when using a `public_baseurl` value including a non-root URL path. ([\#9726](https://github.com/matrix-org/synapse/issues/9726))
 - Fix thumbnail generation for some sites with non-standard content types. Contributed by @rkfg. ([\#9788](https://github.com/matrix-org/synapse/issues/9788))
 - Add some sanity checks to identity server passed to 3PID bind/unbind endpoints. ([\#9802](https://github.com/matrix-org/synapse/issues/9802))
 - Limit the size of HTTP responses read over federation. ([\#9833](https://github.com/matrix-org/synapse/issues/9833))
 - Fix a bug which could cause Synapse to get stuck in a loop of resyncing device lists. ([\#9867](https://github.com/matrix-org/synapse/issues/9867))
 - Fix a long-standing bug where errors from federation did not propagate to the client. ([\#9868](https://github.com/matrix-org/synapse/issues/9868))
 
 Improved Documentation
 ----------------------
 
 - Add a note to the docker docs mentioning that we mirror upstream's supported Docker platforms. ([\#9801](https://github.com/matrix-org/synapse/issues/9801))
 
 Internal Changes
 ----------------
 
 - Add a dockerfile for running Synapse in worker-mode under Complement. ([\#9162](https://github.com/matrix-org/synapse/issues/9162))
 - Apply `pyupgrade` across the codebase. ([\#9786](https://github.com/matrix-org/synapse/issues/9786))
 - Move some replication processing out of `generic_worker`. ([\#9796](https://github.com/matrix-org/synapse/issues/9796))
 - Replace `HomeServer.get_config()` with inline references. ([\#9815](https://github.com/matrix-org/synapse/issues/9815))
 - Rename some handlers and config modules to not duplicate the top-level module. ([\#9816](https://github.com/matrix-org/synapse/issues/9816))
 - Fix a long-standing bug which caused `max_upload_size` to not be correctly enforced. ([\#9817](https://github.com/matrix-org/synapse/issues/9817))
 - Reduce CPU usage of the user directory by reusing existing calculated room membership. ([\#9821](https://github.com/matrix-org/synapse/issues/9821))
 - Small speed up for joining large remote rooms. ([\#9825](https://github.com/matrix-org/synapse/issues/9825))
 - Introduce flake8-bugbear to the test suite and fix some of its lint violations. ([\#9838](https://github.com/matrix-org/synapse/issues/9838))
 - Only store the raw data in the in-memory caches, rather than objects that include references to e.g. the data stores. ([\#9845](https://github.com/matrix-org/synapse/issues/9845))
 - Limit length of accepted email addresses. ([\#9855](https://github.com/matrix-org/synapse/issues/9855))
 - Remove redundant `synapse.types.Collection` type definition. ([\#9856](https://github.com/matrix-org/synapse/issues/9856))
 - Handle recently added rate limits correctly when using `--no-rate-limit` with the demo scripts. ([\#9858](https://github.com/matrix-org/synapse/issues/9858))
 - Disable invite rate-limiting by default when running the unit tests. ([\#9871](https://github.com/matrix-org/synapse/issues/9871))
 - Pass a reactor into `SynapseSite` to make testing easier. ([\#9874](https://github.com/matrix-org/synapse/issues/9874))
 - Make `DomainSpecificString` an `attrs` class. ([\#9875](https://github.com/matrix-org/synapse/issues/9875))
 - Add type hints to `synapse.api.auth` and `synapse.api.auth_blocking` modules. ([\#9876](https://github.com/matrix-org/synapse/issues/9876))
 - Remove redundant `_PushHTTPChannel` test class. ([\#9878](https://github.com/matrix-org/synapse/issues/9878))
 - Remove backwards-compatibility code for Python versions < 3.6. ([\#9879](https://github.com/matrix-org/synapse/issues/9879))
 - Small performance improvement around handling new local presence updates. ([\#9887](https://github.com/matrix-org/synapse/issues/9887))
 -----BEGIN PGP SIGNATURE-----
 
 iQJHBAABCgAxFiEEgQG31Z317NrSMt0QiISIDS7+X/QFAmCJQmgTHGFuZHJld0Bh
 bW9yZ2FuLnh5egAKCRCIhIgNLv5f9EKWD/9MZG4ngjECrk3QMBlYqaipGl/l7wcl
 8vBOTxiEqIgVJxHLCRFBbtFOItdxB4YkrorIo2fqshkk+lv4CRtTD9jEKHRvS4T6
 7p1icRdRTv2K94OkJ8R9jMFlmywZFU87oHPfI2xUYg6hvOKrR+RwPvIjA7c24UZt
 6MJqDhgGDlZD7/hQdQof9O4oOJzzIgLJPk6o8E42y6c0bLlPPKgH3sh0vlenDLfE
 15thRCOeiP237YYvXSdbr7G3PI66Efhq4BwQowSrgFg+B0BR68l4747iSIeWmLQJ
 Ow6QLLFPCOhgAPC4amp9PdaaV/9NYiBcvNxlOvyQAVl/+ioEiATNHnzdNtDouCQo
 nDdSHw0Mt9D3i+rxpu5Pf0gZN9dXRGiczqnq5QKL8+EvT/4FLYeluEeduuy3rE+G
 o5OTCd3EajxynzjftuopeysNAw6zeDpbulZoTCeCumxBL2+wAod2PsyY5Ei9Gapn
 iJvExNOJX4OlkFc67jO2CK8o3sUTNEDeIDWCQ0fVKAwIt7T45ebTA/UzDNWg7YzN
 EyUp+3NZcUZBskgMB5hpcijPJoXNYzWZPews73vMPV7AfQRxDzU5xrM9AdbadBDd
 Idv6wT9ssDUA+M0aKnafvoSJZ+qE85mi2x0rRsueZNd9uO9/QHIXKIb+4PvVpj3C
 BJnZf34m568AGQ==
 =smAk
 -----END PGP SIGNATURE-----

Merge tag 'v1.33.0rc1' into develop

Synapse 1.33.0rc1 (2021-04-28)
==============================

Features
--------

- Update experimental support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083): restricting room access via group membership. ([\#9800](https://github.com/matrix-org/synapse/issues/9800), [\#9814](https://github.com/matrix-org/synapse/issues/9814))
- Add experimental support for handling presence on a worker. ([\#9819](https://github.com/matrix-org/synapse/issues/9819), [\#9820](https://github.com/matrix-org/synapse/issues/9820), [\#9828](https://github.com/matrix-org/synapse/issues/9828), [\#9850](https://github.com/matrix-org/synapse/issues/9850))
- Return a new template when an user attempts to renew their account multiple times with the same token, stating that their account is set to expire. This replaces the invalid token template that would previously be shown in this case. This change concerns the optional account validity feature. ([\#9832](https://github.com/matrix-org/synapse/issues/9832))

Bugfixes
--------

- Fixes the OIDC SSO flow when using a `public_baseurl` value including a non-root URL path. ([\#9726](https://github.com/matrix-org/synapse/issues/9726))
- Fix thumbnail generation for some sites with non-standard content types. Contributed by @rkfg. ([\#9788](https://github.com/matrix-org/synapse/issues/9788))
- Add some sanity checks to identity server passed to 3PID bind/unbind endpoints. ([\#9802](https://github.com/matrix-org/synapse/issues/9802))
- Limit the size of HTTP responses read over federation. ([\#9833](https://github.com/matrix-org/synapse/issues/9833))
- Fix a bug which could cause Synapse to get stuck in a loop of resyncing device lists. ([\#9867](https://github.com/matrix-org/synapse/issues/9867))
- Fix a long-standing bug where errors from federation did not propagate to the client. ([\#9868](https://github.com/matrix-org/synapse/issues/9868))

Improved Documentation
----------------------

- Add a note to the docker docs mentioning that we mirror upstream's supported Docker platforms. ([\#9801](https://github.com/matrix-org/synapse/issues/9801))

Internal Changes
----------------

- Add a dockerfile for running Synapse in worker-mode under Complement. ([\#9162](https://github.com/matrix-org/synapse/issues/9162))
- Apply `pyupgrade` across the codebase. ([\#9786](https://github.com/matrix-org/synapse/issues/9786))
- Move some replication processing out of `generic_worker`. ([\#9796](https://github.com/matrix-org/synapse/issues/9796))
- Replace `HomeServer.get_config()` with inline references. ([\#9815](https://github.com/matrix-org/synapse/issues/9815))
- Rename some handlers and config modules to not duplicate the top-level module. ([\#9816](https://github.com/matrix-org/synapse/issues/9816))
- Fix a long-standing bug which caused `max_upload_size` to not be correctly enforced. ([\#9817](https://github.com/matrix-org/synapse/issues/9817))
- Reduce CPU usage of the user directory by reusing existing calculated room membership. ([\#9821](https://github.com/matrix-org/synapse/issues/9821))
- Small speed up for joining large remote rooms. ([\#9825](https://github.com/matrix-org/synapse/issues/9825))
- Introduce flake8-bugbear to the test suite and fix some of its lint violations. ([\#9838](https://github.com/matrix-org/synapse/issues/9838))
- Only store the raw data in the in-memory caches, rather than objects that include references to e.g. the data stores. ([\#9845](https://github.com/matrix-org/synapse/issues/9845))
- Limit length of accepted email addresses. ([\#9855](https://github.com/matrix-org/synapse/issues/9855))
- Remove redundant `synapse.types.Collection` type definition. ([\#9856](https://github.com/matrix-org/synapse/issues/9856))
- Handle recently added rate limits correctly when using `--no-rate-limit` with the demo scripts. ([\#9858](https://github.com/matrix-org/synapse/issues/9858))
- Disable invite rate-limiting by default when running the unit tests. ([\#9871](https://github.com/matrix-org/synapse/issues/9871))
- Pass a reactor into `SynapseSite` to make testing easier. ([\#9874](https://github.com/matrix-org/synapse/issues/9874))
- Make `DomainSpecificString` an `attrs` class. ([\#9875](https://github.com/matrix-org/synapse/issues/9875))
- Add type hints to `synapse.api.auth` and `synapse.api.auth_blocking` modules. ([\#9876](https://github.com/matrix-org/synapse/issues/9876))
- Remove redundant `_PushHTTPChannel` test class. ([\#9878](https://github.com/matrix-org/synapse/issues/9878))
- Remove backwards-compatibility code for Python versions < 3.6. ([\#9879](https://github.com/matrix-org/synapse/issues/9879))
- Small performance improvement around handling new local presence updates. ([\#9887](https://github.com/matrix-org/synapse/issues/9887))
This commit is contained in:
Andrew Morgan 2021-04-28 12:12:29 +01:00
commit fa6679e794
41 changed files with 145 additions and 171 deletions

View File

@ -1,3 +1,56 @@
Synapse 1.33.0rc1 (2021-04-28)
==============================
Features
--------
- Update experimental support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083): restricting room access via group membership. ([\#9800](https://github.com/matrix-org/synapse/issues/9800), [\#9814](https://github.com/matrix-org/synapse/issues/9814))
- Add experimental support for handling presence on a worker. ([\#9819](https://github.com/matrix-org/synapse/issues/9819), [\#9820](https://github.com/matrix-org/synapse/issues/9820), [\#9828](https://github.com/matrix-org/synapse/issues/9828), [\#9850](https://github.com/matrix-org/synapse/issues/9850))
- Return a new template when an user attempts to renew their account multiple times with the same token, stating that their account is set to expire. This replaces the invalid token template that would previously be shown in this case. This change concerns the optional account validity feature. ([\#9832](https://github.com/matrix-org/synapse/issues/9832))
Bugfixes
--------
- Fixes the OIDC SSO flow when using a `public_baseurl` value including a non-root URL path. ([\#9726](https://github.com/matrix-org/synapse/issues/9726))
- Fix thumbnail generation for some sites with non-standard content types. Contributed by @rkfg. ([\#9788](https://github.com/matrix-org/synapse/issues/9788))
- Add some sanity checks to identity server passed to 3PID bind/unbind endpoints. ([\#9802](https://github.com/matrix-org/synapse/issues/9802))
- Limit the size of HTTP responses read over federation. ([\#9833](https://github.com/matrix-org/synapse/issues/9833))
- Fix a bug which could cause Synapse to get stuck in a loop of resyncing device lists. ([\#9867](https://github.com/matrix-org/synapse/issues/9867))
- Fix a long-standing bug where errors from federation did not propagate to the client. ([\#9868](https://github.com/matrix-org/synapse/issues/9868))
Improved Documentation
----------------------
- Add a note to the docker docs mentioning that we mirror upstream's supported Docker platforms. ([\#9801](https://github.com/matrix-org/synapse/issues/9801))
Internal Changes
----------------
- Add a dockerfile for running Synapse in worker-mode under Complement. ([\#9162](https://github.com/matrix-org/synapse/issues/9162))
- Apply `pyupgrade` across the codebase. ([\#9786](https://github.com/matrix-org/synapse/issues/9786))
- Move some replication processing out of `generic_worker`. ([\#9796](https://github.com/matrix-org/synapse/issues/9796))
- Replace `HomeServer.get_config()` with inline references. ([\#9815](https://github.com/matrix-org/synapse/issues/9815))
- Rename some handlers and config modules to not duplicate the top-level module. ([\#9816](https://github.com/matrix-org/synapse/issues/9816))
- Fix a long-standing bug which caused `max_upload_size` to not be correctly enforced. ([\#9817](https://github.com/matrix-org/synapse/issues/9817))
- Reduce CPU usage of the user directory by reusing existing calculated room membership. ([\#9821](https://github.com/matrix-org/synapse/issues/9821))
- Small speed up for joining large remote rooms. ([\#9825](https://github.com/matrix-org/synapse/issues/9825))
- Introduce flake8-bugbear to the test suite and fix some of its lint violations. ([\#9838](https://github.com/matrix-org/synapse/issues/9838))
- Only store the raw data in the in-memory caches, rather than objects that include references to e.g. the data stores. ([\#9845](https://github.com/matrix-org/synapse/issues/9845))
- Limit length of accepted email addresses. ([\#9855](https://github.com/matrix-org/synapse/issues/9855))
- Remove redundant `synapse.types.Collection` type definition. ([\#9856](https://github.com/matrix-org/synapse/issues/9856))
- Handle recently added rate limits correctly when using `--no-rate-limit` with the demo scripts. ([\#9858](https://github.com/matrix-org/synapse/issues/9858))
- Disable invite rate-limiting by default when running the unit tests. ([\#9871](https://github.com/matrix-org/synapse/issues/9871))
- Pass a reactor into `SynapseSite` to make testing easier. ([\#9874](https://github.com/matrix-org/synapse/issues/9874))
- Make `DomainSpecificString` an `attrs` class. ([\#9875](https://github.com/matrix-org/synapse/issues/9875))
- Add type hints to `synapse.api.auth` and `synapse.api.auth_blocking` modules. ([\#9876](https://github.com/matrix-org/synapse/issues/9876))
- Remove redundant `_PushHTTPChannel` test class. ([\#9878](https://github.com/matrix-org/synapse/issues/9878))
- Remove backwards-compatibility code for Python versions < 3.6. ([\#9879](https://github.com/matrix-org/synapse/issues/9879))
- Small performance improvement around handling new local presence updates. ([\#9887](https://github.com/matrix-org/synapse/issues/9887))
Synapse 1.32.2 (2021-04-22) Synapse 1.32.2 (2021-04-22)
=========================== ===========================

View File

@ -1 +0,0 @@
Add a dockerfile for running Synapse in worker-mode under Complement.

View File

@ -1 +0,0 @@
Speed up federation transmission by using fewer database calls. Contributed by @ShadowJonathan.

View File

@ -1 +0,0 @@
Fixes the OIDC SSO flow when using a `public_baseurl` value including a non-root URL path.

View File

@ -1 +0,0 @@
Apply `pyupgrade` across the codebase.

View File

@ -1 +0,0 @@
Fix thumbnail generation for some sites with non-standard content types. Contributed by @rkfg.

View File

@ -1 +0,0 @@
Move some replication processing out of `generic_worker`.

View File

@ -1 +0,0 @@
Update experimental support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083): restricting room access via group membership.

View File

@ -1 +0,0 @@
Add a note to the docker docs mentioning that we mirror upstream's supported Docker platforms.

View File

@ -1 +0,0 @@
Add some sanity checks to identity server passed to 3PID bind/unbind endpoints.

View File

@ -1 +0,0 @@
Update experimental support for [MSC3083](https://github.com/matrix-org/matrix-doc/pull/3083): restricting room access via group membership.

View File

@ -1 +0,0 @@
Replace `HomeServer.get_config()` with inline references.

View File

@ -1 +0,0 @@
Rename some handlers and config modules to not duplicate the top-level module.

View File

@ -1 +0,0 @@
Fix a long-standing bug which caused `max_upload_size` to not be correctly enforced.

View File

@ -1 +0,0 @@
Add experimental support for handling presence on a worker.

View File

@ -1 +0,0 @@
Add experimental support for handling presence on a worker.

View File

@ -1 +0,0 @@
Reduce CPU usage of the user directory by reusing existing calculated room membership.

View File

@ -1 +0,0 @@
Small speed up for joining large remote rooms.

View File

@ -1 +0,0 @@
Add experimental support for handling presence on a worker.

View File

@ -1 +0,0 @@
Don't return an error when a user attempts to renew their account multiple times with the same token. Instead, state when their account is set to expire. This change concerns the optional account validity feature.

View File

@ -1 +0,0 @@
Limit the size of HTTP responses read over federation.

View File

@ -1 +0,0 @@
Introduce flake8-bugbear to the test suite and fix some of its lint violations.

View File

@ -1 +0,0 @@
Only store the raw data in the in-memory caches, rather than objects that include references to e.g. the data stores.

View File

@ -1 +0,0 @@
Add experimental support for handling presence on a worker.

View File

@ -1 +0,0 @@
Limit length of accepted email addresses.

View File

@ -1 +0,0 @@
Remove redundant `synapse.types.Collection` type definition.

View File

@ -1 +0,0 @@
Handle recently added rate limits correctly when using `--no-rate-limit` with the demo scripts.

View File

@ -1 +0,0 @@
Fix a bug which could cause Synapse to get stuck in a loop of resyncing device lists.

View File

@ -1 +0,0 @@
Fix a long-standing bug where errors from federation did not propagate to the client.

View File

@ -1 +0,0 @@
Disable invite rate-limiting by default when running the unit tests.

View File

@ -1 +0,0 @@
Pass a reactor into `SynapseSite` to make testing easier.

View File

@ -1 +0,0 @@
Make `DomainSpecificString` an `attrs` class.

View File

@ -1 +0,0 @@
Add type hints to `synapse.api.auth` and `synapse.api.auth_blocking` modules.

View File

@ -1 +0,0 @@
Remove redundant `_PushHTTPChannel` test class.

View File

@ -1 +0,0 @@
Remove backwards-compatibility code for Python versions < 3.6.

View File

@ -1 +0,0 @@
Small performance improvement around handling new local presence updates.

View File

@ -224,8 +224,7 @@ class HomeServer(ReplicationHandler):
destinations = yield self.get_servers_for_context(room_name) destinations = yield self.get_servers_for_context(room_name)
try: try:
yield self.replication_layer.send_pdus( yield self.replication_layer.send_pdu(
[
Pdu.create_new( Pdu.create_new(
context=room_name, context=room_name,
pdu_type="sy.room.message", pdu_type="sy.room.message",
@ -233,7 +232,6 @@ class HomeServer(ReplicationHandler):
origin=self.server_name, origin=self.server_name,
destinations=destinations, destinations=destinations,
) )
]
) )
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)
@ -255,7 +253,7 @@ class HomeServer(ReplicationHandler):
origin=self.server_name, origin=self.server_name,
destinations=destinations, destinations=destinations,
) )
yield self.replication_layer.send_pdus([pdu]) yield self.replication_layer.send_pdu(pdu)
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)
@ -267,8 +265,7 @@ class HomeServer(ReplicationHandler):
destinations = yield self.get_servers_for_context(room_name) destinations = yield self.get_servers_for_context(room_name)
try: try:
yield self.replication_layer.send_pdus( yield self.replication_layer.send_pdu(
[
Pdu.create_new( Pdu.create_new(
context=room_name, context=room_name,
is_state=True, is_state=True,
@ -278,7 +275,6 @@ class HomeServer(ReplicationHandler):
origin=self.server_name, origin=self.server_name,
destinations=destinations, destinations=destinations,
) )
]
) )
except Exception as e: except Exception as e:
logger.exception(e) logger.exception(e)

View File

@ -47,7 +47,7 @@ try:
except ImportError: except ImportError:
pass pass
__version__ = "1.32.2" __version__ = "1.33.0rc1"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when # We import here so that we don't have to install a bunch of deps when

View File

@ -14,26 +14,19 @@
import abc import abc
import logging import logging
from typing import ( from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple
TYPE_CHECKING,
Collection,
Dict,
Hashable,
Iterable,
List,
Optional,
Set,
Tuple,
)
from prometheus_client import Counter from prometheus_client import Counter
from twisted.internet import defer
import synapse.metrics import synapse.metrics
from synapse.api.presence import UserPresenceState from synapse.api.presence import UserPresenceState
from synapse.events import EventBase from synapse.events import EventBase
from synapse.federation.sender.per_destination_queue import PerDestinationQueue from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager from synapse.federation.sender.transaction_manager import TransactionManager
from synapse.federation.units import Edu from synapse.federation.units import Edu
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import ( from synapse.metrics import (
LaterGauge, LaterGauge,
event_processing_loop_counter, event_processing_loop_counter,
@ -262,27 +255,15 @@ class FederationSender(AbstractFederationSender):
if not events and next_token >= self._last_poked_id: if not events and next_token >= self._last_poked_id:
break break
async def get_destinations_for_event( async def handle_event(event: EventBase) -> None:
event: EventBase,
) -> Collection[str]:
"""Computes the destinations to which this event must be sent.
This returns an empty tuple when there are no destinations to send to,
or if this event is not from this homeserver and it is not sending
it on behalf of another server.
Will also filter out destinations which this sender is not responsible for,
if multiple federation senders exist.
"""
# Only send events for this server. # Only send events for this server.
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
is_mine = self.is_mine_id(event.sender) is_mine = self.is_mine_id(event.sender)
if not is_mine and send_on_behalf_of is None: if not is_mine and send_on_behalf_of is None:
return () return
if not event.internal_metadata.should_proactively_send(): if not event.internal_metadata.should_proactively_send():
return () return
destinations = None # type: Optional[Set[str]] destinations = None # type: Optional[Set[str]]
if not event.prev_event_ids(): if not event.prev_event_ids():
@ -317,7 +298,7 @@ class FederationSender(AbstractFederationSender):
"Failed to calculate hosts in room for event: %s", "Failed to calculate hosts in room for event: %s",
event.event_id, event.event_id,
) )
return () return
destinations = { destinations = {
d d
@ -327,15 +308,17 @@ class FederationSender(AbstractFederationSender):
) )
} }
destinations.discard(self.server_name)
if send_on_behalf_of is not None: if send_on_behalf_of is not None:
# If we are sending the event on behalf of another server # If we are sending the event on behalf of another server
# then it already has the event and there is no reason to # then it already has the event and there is no reason to
# send the event to it. # send the event to it.
destinations.discard(send_on_behalf_of) destinations.discard(send_on_behalf_of)
logger.debug("Sending %s to %r", event, destinations)
if destinations: if destinations:
await self._send_pdu(event, destinations)
now = self.clock.time_msec() now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id) ts = await self.store.get_received_ts(event.event_id)
@ -343,29 +326,24 @@ class FederationSender(AbstractFederationSender):
"federation_sender" "federation_sender"
).observe((now - ts) / 1000) ).observe((now - ts) / 1000)
return destinations async def handle_room_events(events: Iterable[EventBase]) -> None:
return () with Measure(self.clock, "handle_room_events"):
for event in events:
await handle_event(event)
async def get_federatable_events_and_destinations( events_by_room = {} # type: Dict[str, List[EventBase]]
events: Iterable[EventBase], for event in events:
) -> List[Tuple[EventBase, Collection[str]]]: events_by_room.setdefault(event.room_id, []).append(event)
with Measure(self.clock, "get_destinations_for_events"):
# Fetch federation destinations per event,
# skip if get_destinations_for_event returns an empty collection,
# return list of event->destinations pairs.
return [
(event, dests)
for (event, dests) in [
(event, await get_destinations_for_event(event))
for event in events
]
if dests
]
events_and_dests = await get_federatable_events_and_destinations(events) await make_deferred_yieldable(
defer.gatherResults(
# Send corresponding events to each destination queue [
await self._distribute_events(events_and_dests) run_in_background(handle_room_events, evs)
for evs in events_by_room.values()
],
consumeErrors=True,
)
)
await self.store.update_federation_out_pos("events", next_token) await self.store.update_federation_out_pos("events", next_token)
@ -383,7 +361,7 @@ class FederationSender(AbstractFederationSender):
events_processed_counter.inc(len(events)) events_processed_counter.inc(len(events))
event_processing_loop_room_count.labels("federation_sender").inc( event_processing_loop_room_count.labels("federation_sender").inc(
len({event.room_id for event in events}) len(events_by_room)
) )
event_processing_loop_counter.labels("federation_sender").inc() event_processing_loop_counter.labels("federation_sender").inc()
@ -395,53 +373,34 @@ class FederationSender(AbstractFederationSender):
finally: finally:
self._is_processing = False self._is_processing = False
async def _distribute_events( async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
self, # We loop through all destinations to see whether we already have
events_and_dests: Iterable[Tuple[EventBase, Collection[str]]], # a transaction in progress. If we do, stick it in the pending_pdus
) -> None: # table and we'll get back to it later.
"""Distribute events to the respective per_destination queues.
Also persists last-seen per-room stream_ordering to 'destination_rooms'. destinations = set(destinations)
destinations.discard(self.server_name)
logger.debug("Sending to: %s", str(destinations))
Args: if not destinations:
events_and_dests: A list of tuples, which are (event: EventBase, destinations: Collection[str]). return
Every event is paired with its intended destinations (in federation).
"""
# Tuples of room_id + destination to their max-seen stream_ordering
room_with_dest_stream_ordering = {} # type: Dict[Tuple[str, str], int]
# List of events to send to each destination
events_by_dest = {} # type: Dict[str, List[EventBase]]
# For each event-destinations pair...
for event, destinations in events_and_dests:
# (we got this from the database, it's filled)
assert event.internal_metadata.stream_ordering
sent_pdus_destination_dist_total.inc(len(destinations)) sent_pdus_destination_dist_total.inc(len(destinations))
sent_pdus_destination_dist_count.inc() sent_pdus_destination_dist_count.inc()
# ...iterate over those destinations.. assert pdu.internal_metadata.stream_ordering
# track the fact that we have a PDU for these destinations,
# to allow us to perform catch-up later on if the remote is unreachable
# for a while.
await self.store.store_destination_rooms_entries(
destinations,
pdu.room_id,
pdu.internal_metadata.stream_ordering,
)
for destination in destinations: for destination in destinations:
# ...update their stream-ordering... self._get_per_destination_queue(destination).send_pdu(pdu)
room_with_dest_stream_ordering[(event.room_id, destination)] = max(
event.internal_metadata.stream_ordering,
room_with_dest_stream_ordering.get((event.room_id, destination), 0),
)
# ...and add the event to each destination queue.
events_by_dest.setdefault(destination, []).append(event)
# Bulk-store destination_rooms stream_ids
await self.store.bulk_store_destination_rooms_entries(
room_with_dest_stream_ordering
)
for destination, pdus in events_by_dest.items():
logger.debug("Sending %d pdus to %s", len(pdus), destination)
self._get_per_destination_queue(destination).send_pdus(pdus)
async def send_read_receipt(self, receipt: ReadReceipt) -> None: async def send_read_receipt(self, receipt: ReadReceipt) -> None:
"""Send a RR to any other servers in the room """Send a RR to any other servers in the room

View File

@ -154,22 +154,19 @@ class PerDestinationQueue:
+ len(self._pending_edus_keyed) + len(self._pending_edus_keyed)
) )
def send_pdus(self, pdus: Iterable[EventBase]) -> None: def send_pdu(self, pdu: EventBase) -> None:
"""Add PDUs to the queue, and start the transmission loop if necessary """Add a PDU to the queue, and start the transmission loop if necessary
Args: Args:
pdus: pdus to send pdu: pdu to send
""" """
if not self._catching_up or self._last_successful_stream_ordering is None: if not self._catching_up or self._last_successful_stream_ordering is None:
# only enqueue the PDU if we are not catching up (False) or do not # only enqueue the PDU if we are not catching up (False) or do not
# yet know if we have anything to catch up (None) # yet know if we have anything to catch up (None)
self._pending_pdus.extend(pdus) self._pending_pdus.append(pdu)
else: else:
self._catchup_last_skipped = max( assert pdu.internal_metadata.stream_ordering
pdu.internal_metadata.stream_ordering self._catchup_last_skipped = pdu.internal_metadata.stream_ordering
for pdu in pdus
if pdu.internal_metadata.stream_ordering is not None
)
self.attempt_new_transaction() self.attempt_new_transaction()

View File

@ -14,7 +14,7 @@
import logging import logging
from collections import namedtuple from collections import namedtuple
from typing import Dict, List, Optional, Tuple from typing import Iterable, List, Optional, Tuple
from canonicaljson import encode_canonical_json from canonicaljson import encode_canonical_json
@ -295,33 +295,37 @@ class TransactionStore(TransactionWorkerStore):
}, },
) )
async def bulk_store_destination_rooms_entries( async def store_destination_rooms_entries(
self, room_and_destination_to_ordering: Dict[Tuple[str, str], int] self,
): destinations: Iterable[str],
room_id: str,
stream_ordering: int,
) -> None:
""" """
Updates or creates `destination_rooms` entries for a number of events. Updates or creates `destination_rooms` entries in batch for a single event.
Args: Args:
room_and_destination_to_ordering: A mapping of (room, destination) -> stream_id destinations: list of destinations
room_id: the room_id of the event
stream_ordering: the stream_ordering of the event
""" """
await self.db_pool.simple_upsert_many( await self.db_pool.simple_upsert_many(
table="destinations", table="destinations",
key_names=("destination",), key_names=("destination",),
key_values={(d,) for _, d in room_and_destination_to_ordering.keys()}, key_values=[(d,) for d in destinations],
value_names=[], value_names=[],
value_values=[], value_values=[],
desc="store_destination_rooms_entries_dests", desc="store_destination_rooms_entries_dests",
) )
rows = [(destination, room_id) for destination in destinations]
await self.db_pool.simple_upsert_many( await self.db_pool.simple_upsert_many(
table="destination_rooms", table="destination_rooms",
key_names=("room_id", "destination"), key_names=("destination", "room_id"),
key_values=list(room_and_destination_to_ordering.keys()), key_values=rows,
value_names=["stream_ordering"], value_names=["stream_ordering"],
value_values=[ value_values=[(stream_ordering,)] * len(rows),
(stream_id,) for stream_id in room_and_destination_to_ordering.values()
],
desc="store_destination_rooms_entries_rooms", desc="store_destination_rooms_entries_rooms",
) )