Merge remote-tracking branch 'upstream/release-v1.35'

This commit is contained in:
Tulir Asokan 2021-05-28 19:21:46 +03:00
commit dd212cf4c0
6 changed files with 50 additions and 10 deletions

View File

@ -1,3 +1,19 @@
Synapse 1.35.0rc3 (2021-05-28)
==============================
Bugfixes
--------
- Fixed a bug causing replication requests to fail when receiving a lot of events via federation. Introduced in v1.33.0. ([\#10082](https://github.com/matrix-org/synapse/issues/10082))
- Fix HTTP response size limit to allow joining very large rooms over federation. Introduced in v1.33.0. ([\#10093](https://github.com/matrix-org/synapse/issues/10093))
Internal Changes
----------------
- Log method and path when dropping request due to size limit. ([\#10091](https://github.com/matrix-org/synapse/issues/10091))
Synapse 1.35.0rc2 (2021-05-27) Synapse 1.35.0rc2 (2021-05-27)
============================== ==============================

View File

@ -47,7 +47,7 @@ try:
except ImportError: except ImportError:
pass pass
__version__ = "1.35.0rc2" __version__ = "1.35.0rc3"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when # We import here so that we don't have to install a bunch of deps when

View File

@ -35,6 +35,11 @@ from synapse.types import JsonDict
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Send join responses can be huge, so we set a separate limit here. The response
# is parsed in a streaming manner, which helps alleviate the issue of memory
# usage a bit.
MAX_RESPONSE_SIZE_SEND_JOIN = 500 * 1024 * 1024
class TransportLayerClient: class TransportLayerClient:
"""Sends federation HTTP requests to other servers""" """Sends federation HTTP requests to other servers"""
@ -261,6 +266,7 @@ class TransportLayerClient:
path=path, path=path,
data=content, data=content,
parser=SendJoinParser(room_version, v1_api=True), parser=SendJoinParser(room_version, v1_api=True),
max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN,
) )
return response return response
@ -276,6 +282,7 @@ class TransportLayerClient:
path=path, path=path,
data=content, data=content,
parser=SendJoinParser(room_version, v1_api=False), parser=SendJoinParser(room_version, v1_api=False),
max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN,
) )
return response return response

View File

@ -91,6 +91,7 @@ from synapse.types import (
get_domain_from_id, get_domain_from_id,
) )
from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.iterutils import batch_iter
from synapse.util.retryutils import NotRetryingDestination from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_server from synapse.visibility import filter_events_for_server
@ -3053,13 +3054,15 @@ class FederationHandler(BaseHandler):
""" """
instance = self.config.worker.events_shard_config.get_instance(room_id) instance = self.config.worker.events_shard_config.get_instance(room_id)
if instance != self._instance_name: if instance != self._instance_name:
result = await self._send_events( # Limit the number of events sent over federation.
instance_name=instance, for batch in batch_iter(event_and_contexts, 1000):
store=self.store, result = await self._send_events(
room_id=room_id, instance_name=instance,
event_and_contexts=event_and_contexts, store=self.store,
backfilled=backfilled, room_id=room_id,
) event_and_contexts=batch,
backfilled=backfilled,
)
return result["max_stream_id"] return result["max_stream_id"]
else: else:
assert self.storage.persistence assert self.storage.persistence

View File

@ -205,6 +205,7 @@ async def _handle_response(
response: IResponse, response: IResponse,
start_ms: int, start_ms: int,
parser: ByteParser[T], parser: ByteParser[T],
max_response_size: Optional[int] = None,
) -> T: ) -> T:
""" """
Reads the body of a response with a timeout and sends it to a parser Reads the body of a response with a timeout and sends it to a parser
@ -216,15 +217,20 @@ async def _handle_response(
response: response to the request response: response to the request
start_ms: Timestamp when request was made start_ms: Timestamp when request was made
parser: The parser for the response parser: The parser for the response
max_response_size: The maximum size to read from the response, if None
uses the default.
Returns: Returns:
The parsed response The parsed response
""" """
if max_response_size is None:
max_response_size = MAX_RESPONSE_SIZE
try: try:
check_content_type_is(response.headers, parser.CONTENT_TYPE) check_content_type_is(response.headers, parser.CONTENT_TYPE)
d = read_body_with_max_size(response, parser, MAX_RESPONSE_SIZE) d = read_body_with_max_size(response, parser, max_response_size)
d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor) d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
length = await make_deferred_yieldable(d) length = await make_deferred_yieldable(d)
@ -735,6 +741,7 @@ class MatrixFederationHttpClient:
backoff_on_404: bool = False, backoff_on_404: bool = False,
try_trailing_slash_on_400: bool = False, try_trailing_slash_on_400: bool = False,
parser: Literal[None] = None, parser: Literal[None] = None,
max_response_size: Optional[int] = None,
) -> Union[JsonDict, list]: ) -> Union[JsonDict, list]:
... ...
@ -752,6 +759,7 @@ class MatrixFederationHttpClient:
backoff_on_404: bool = False, backoff_on_404: bool = False,
try_trailing_slash_on_400: bool = False, try_trailing_slash_on_400: bool = False,
parser: Optional[ByteParser[T]] = None, parser: Optional[ByteParser[T]] = None,
max_response_size: Optional[int] = None,
) -> T: ) -> T:
... ...
@ -768,6 +776,7 @@ class MatrixFederationHttpClient:
backoff_on_404: bool = False, backoff_on_404: bool = False,
try_trailing_slash_on_400: bool = False, try_trailing_slash_on_400: bool = False,
parser: Optional[ByteParser] = None, parser: Optional[ByteParser] = None,
max_response_size: Optional[int] = None,
): ):
"""Sends the specified json data using PUT """Sends the specified json data using PUT
@ -803,6 +812,8 @@ class MatrixFederationHttpClient:
enabled. enabled.
parser: The parser to use to decode the response. Defaults to parser: The parser to use to decode the response. Defaults to
parsing as JSON. parsing as JSON.
max_response_size: The maximum size to read from the response, if None
uses the default.
Returns: Returns:
Succeeds when we get a 2xx HTTP response. The Succeeds when we get a 2xx HTTP response. The
@ -853,6 +864,7 @@ class MatrixFederationHttpClient:
response, response,
start_ms, start_ms,
parser=parser, parser=parser,
max_response_size=max_response_size,
) )
return body return body

View File

@ -105,8 +105,10 @@ class SynapseRequest(Request):
assert self.content, "handleContentChunk() called before gotLength()" assert self.content, "handleContentChunk() called before gotLength()"
if self.content.tell() + len(data) > self._max_request_body_size: if self.content.tell() + len(data) > self._max_request_body_size:
logger.warning( logger.warning(
"Aborting connection from %s because the request exceeds maximum size", "Aborting connection from %s because the request exceeds maximum size: %s %s",
self.client, self.client,
self.get_method(),
self.get_redacted_uri(),
) )
self.transport.abortConnection() self.transport.abortConnection()
return return