diff --git a/CHANGES.md b/CHANGES.md index 1fac16580..7e6f478d4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,19 @@ +Synapse 1.35.0rc3 (2021-05-28) +============================== + +Bugfixes +-------- + +- Fixed a bug causing replication requests to fail when receiving a lot of events via federation. Introduced in v1.33.0. ([\#10082](https://github.com/matrix-org/synapse/issues/10082)) +- Fix HTTP response size limit to allow joining very large rooms over federation. Introduced in v1.33.0. ([\#10093](https://github.com/matrix-org/synapse/issues/10093)) + + +Internal Changes +---------------- + +- Log method and path when dropping request due to size limit. ([\#10091](https://github.com/matrix-org/synapse/issues/10091)) + + Synapse 1.35.0rc2 (2021-05-27) ============================== diff --git a/synapse/__init__.py b/synapse/__init__.py index 6faf31dbb..4591246bd 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -47,7 +47,7 @@ try: except ImportError: pass -__version__ = "1.35.0rc2" +__version__ = "1.35.0rc3" if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index e93ab83f7..5b4f5d17f 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -35,6 +35,11 @@ from synapse.types import JsonDict logger = logging.getLogger(__name__) +# Send join responses can be huge, so we set a separate limit here. The response +# is parsed in a streaming manner, which helps alleviate the issue of memory +# usage a bit. +MAX_RESPONSE_SIZE_SEND_JOIN = 500 * 1024 * 1024 + class TransportLayerClient: """Sends federation HTTP requests to other servers""" @@ -261,6 +266,7 @@ class TransportLayerClient: path=path, data=content, parser=SendJoinParser(room_version, v1_api=True), + max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN, ) return response @@ -276,6 +282,7 @@ class TransportLayerClient: path=path, data=content, parser=SendJoinParser(room_version, v1_api=False), + max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN, ) return response diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 471010bd8..6527a82a3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -91,6 +91,7 @@ from synapse.types import ( get_domain_from_id, ) from synapse.util.async_helpers import Linearizer, concurrently_execute +from synapse.util.iterutils import batch_iter from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import shortstr from synapse.visibility import filter_events_for_server @@ -3053,13 +3054,15 @@ class FederationHandler(BaseHandler): """ instance = self.config.worker.events_shard_config.get_instance(room_id) if instance != self._instance_name: - result = await self._send_events( - instance_name=instance, - store=self.store, - room_id=room_id, - event_and_contexts=event_and_contexts, - backfilled=backfilled, - ) + # Limit the number of events sent over federation. + for batch in batch_iter(event_and_contexts, 1000): + result = await self._send_events( + instance_name=instance, + store=self.store, + room_id=room_id, + event_and_contexts=batch, + backfilled=backfilled, + ) return result["max_stream_id"] else: assert self.storage.persistence diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index f5503b394..1998990a1 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -205,6 +205,7 @@ async def _handle_response( response: IResponse, start_ms: int, parser: ByteParser[T], + max_response_size: Optional[int] = None, ) -> T: """ Reads the body of a response with a timeout and sends it to a parser @@ -216,15 +217,20 @@ async def _handle_response( response: response to the request start_ms: Timestamp when request was made parser: The parser for the response + max_response_size: The maximum size to read from the response, if None + uses the default. Returns: The parsed response """ + if max_response_size is None: + max_response_size = MAX_RESPONSE_SIZE + try: check_content_type_is(response.headers, parser.CONTENT_TYPE) - d = read_body_with_max_size(response, parser, MAX_RESPONSE_SIZE) + d = read_body_with_max_size(response, parser, max_response_size) d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor) length = await make_deferred_yieldable(d) @@ -735,6 +741,7 @@ class MatrixFederationHttpClient: backoff_on_404: bool = False, try_trailing_slash_on_400: bool = False, parser: Literal[None] = None, + max_response_size: Optional[int] = None, ) -> Union[JsonDict, list]: ... @@ -752,6 +759,7 @@ class MatrixFederationHttpClient: backoff_on_404: bool = False, try_trailing_slash_on_400: bool = False, parser: Optional[ByteParser[T]] = None, + max_response_size: Optional[int] = None, ) -> T: ... @@ -768,6 +776,7 @@ class MatrixFederationHttpClient: backoff_on_404: bool = False, try_trailing_slash_on_400: bool = False, parser: Optional[ByteParser] = None, + max_response_size: Optional[int] = None, ): """Sends the specified json data using PUT @@ -803,6 +812,8 @@ class MatrixFederationHttpClient: enabled. parser: The parser to use to decode the response. Defaults to parsing as JSON. + max_response_size: The maximum size to read from the response, if None + uses the default. Returns: Succeeds when we get a 2xx HTTP response. The @@ -853,6 +864,7 @@ class MatrixFederationHttpClient: response, start_ms, parser=parser, + max_response_size=max_response_size, ) return body diff --git a/synapse/http/site.py b/synapse/http/site.py index 671fd3fbc..40754b7be 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -105,8 +105,10 @@ class SynapseRequest(Request): assert self.content, "handleContentChunk() called before gotLength()" if self.content.tell() + len(data) > self._max_request_body_size: logger.warning( - "Aborting connection from %s because the request exceeds maximum size", + "Aborting connection from %s because the request exceeds maximum size: %s %s", self.client, + self.get_method(), + self.get_redacted_uri(), ) self.transport.abortConnection() return