From 16ec8c3272ba014620c95285cc7c6539ee4e9ed2 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 1 Mar 2021 12:45:00 -0500 Subject: [PATCH] (Hopefully) stop leaking file descriptors in media repo. (#9497) By consuming the response if the headers imply that the content is too large. --- changelog.d/9497.bugfix | 1 + synapse/http/client.py | 31 ++++++++++++- tests/http/test_client.py | 91 +++++++++++++++++++++++---------------- 3 files changed, 85 insertions(+), 38 deletions(-) create mode 100644 changelog.d/9497.bugfix diff --git a/changelog.d/9497.bugfix b/changelog.d/9497.bugfix new file mode 100644 index 000000000..598bcaab6 --- /dev/null +++ b/changelog.d/9497.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where the media repository could leak file descriptors while previewing media. diff --git a/synapse/http/client.py b/synapse/http/client.py index a910548f1..72901e3f9 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -748,7 +748,32 @@ class BodyExceededMaxSize(Exception): """The maximum allowed size of the HTTP body was exceeded.""" +class _DiscardBodyWithMaxSizeProtocol(protocol.Protocol): + """A protocol which immediately errors upon receiving data.""" + + def __init__(self, deferred: defer.Deferred): + self.deferred = deferred + + def _maybe_fail(self): + """ + Report a max size exceed error and disconnect the first time this is called. + """ + if not self.deferred.called: + self.deferred.errback(BodyExceededMaxSize()) + # Close the connection (forcefully) since all the data will get + # discarded anyway. + self.transport.abortConnection() + + def dataReceived(self, data: bytes) -> None: + self._maybe_fail() + + def connectionLost(self, reason: Failure) -> None: + self._maybe_fail() + + class _ReadBodyWithMaxSizeProtocol(protocol.Protocol): + """A protocol which reads body to a stream, erroring if the body exceeds a maximum size.""" + def __init__( self, stream: BinaryIO, deferred: defer.Deferred, max_size: Optional[int] ): @@ -805,13 +830,15 @@ def read_body_with_max_size( Returns: A Deferred which resolves to the length of the read body. """ + d = defer.Deferred() + # If the Content-Length header gives a size larger than the maximum allowed # size, do not bother downloading the body. if max_size is not None and response.length != UNKNOWN_LENGTH: if response.length > max_size: - return defer.fail(BodyExceededMaxSize()) + response.deliverBody(_DiscardBodyWithMaxSizeProtocol(d)) + return d - d = defer.Deferred() response.deliverBody(_ReadBodyWithMaxSizeProtocol(stream, d, max_size)) return d diff --git a/tests/http/test_client.py b/tests/http/test_client.py index 2d9b733be..21ecb81c9 100644 --- a/tests/http/test_client.py +++ b/tests/http/test_client.py @@ -26,77 +26,96 @@ from tests.unittest import TestCase class ReadBodyWithMaxSizeTests(TestCase): - def setUp(self): + def _build_response(self, length=UNKNOWN_LENGTH): """Start reading the body, returns the response, result and proto""" - response = Mock(length=UNKNOWN_LENGTH) - self.result = BytesIO() - self.deferred = read_body_with_max_size(response, self.result, 6) + response = Mock(length=length) + result = BytesIO() + deferred = read_body_with_max_size(response, result, 6) # Fish the protocol out of the response. - self.protocol = response.deliverBody.call_args[0][0] - self.protocol.transport = Mock() + protocol = response.deliverBody.call_args[0][0] + protocol.transport = Mock() - def _cleanup_error(self): + return result, deferred, protocol + + def _assert_error(self, deferred, protocol): + """Ensure that the expected error is received.""" + self.assertIsInstance(deferred.result, Failure) + self.assertIsInstance(deferred.result.value, BodyExceededMaxSize) + protocol.transport.abortConnection.assert_called_once() + + def _cleanup_error(self, deferred): """Ensure that the error in the Deferred is handled gracefully.""" called = [False] def errback(f): called[0] = True - self.deferred.addErrback(errback) + deferred.addErrback(errback) self.assertTrue(called[0]) def test_no_error(self): """A response that is NOT too large.""" + result, deferred, protocol = self._build_response() # Start sending data. - self.protocol.dataReceived(b"12345") + protocol.dataReceived(b"12345") # Close the connection. - self.protocol.connectionLost(Failure(ResponseDone())) + protocol.connectionLost(Failure(ResponseDone())) - self.assertEqual(self.result.getvalue(), b"12345") - self.assertEqual(self.deferred.result, 5) + self.assertEqual(result.getvalue(), b"12345") + self.assertEqual(deferred.result, 5) def test_too_large(self): """A response which is too large raises an exception.""" + result, deferred, protocol = self._build_response() # Start sending data. - self.protocol.dataReceived(b"1234567890") - # Close the connection. - self.protocol.connectionLost(Failure(ResponseDone())) + protocol.dataReceived(b"1234567890") - self.assertEqual(self.result.getvalue(), b"1234567890") - self.assertIsInstance(self.deferred.result, Failure) - self.assertIsInstance(self.deferred.result.value, BodyExceededMaxSize) - self._cleanup_error() + self.assertEqual(result.getvalue(), b"1234567890") + self._assert_error(deferred, protocol) + self._cleanup_error(deferred) def test_multiple_packets(self): - """Data should be accummulated through mutliple packets.""" + """Data should be accumulated through mutliple packets.""" + result, deferred, protocol = self._build_response() # Start sending data. - self.protocol.dataReceived(b"12") - self.protocol.dataReceived(b"34") + protocol.dataReceived(b"12") + protocol.dataReceived(b"34") # Close the connection. - self.protocol.connectionLost(Failure(ResponseDone())) + protocol.connectionLost(Failure(ResponseDone())) - self.assertEqual(self.result.getvalue(), b"1234") - self.assertEqual(self.deferred.result, 4) + self.assertEqual(result.getvalue(), b"1234") + self.assertEqual(deferred.result, 4) def test_additional_data(self): """A connection can receive data after being closed.""" + result, deferred, protocol = self._build_response() # Start sending data. - self.protocol.dataReceived(b"1234567890") - self.assertIsInstance(self.deferred.result, Failure) - self.assertIsInstance(self.deferred.result.value, BodyExceededMaxSize) - self.protocol.transport.abortConnection.assert_called_once() + protocol.dataReceived(b"1234567890") + self._assert_error(deferred, protocol) # More data might have come in. - self.protocol.dataReceived(b"1234567890") - # Close the connection. - self.protocol.connectionLost(Failure(ResponseDone())) + protocol.dataReceived(b"1234567890") - self.assertEqual(self.result.getvalue(), b"1234567890") - self.assertIsInstance(self.deferred.result, Failure) - self.assertIsInstance(self.deferred.result.value, BodyExceededMaxSize) - self._cleanup_error() + self.assertEqual(result.getvalue(), b"1234567890") + self._assert_error(deferred, protocol) + self._cleanup_error(deferred) + + def test_content_length(self): + """The body shouldn't be read (at all) if the Content-Length header is too large.""" + result, deferred, protocol = self._build_response(length=10) + + # Deferred shouldn't be called yet. + self.assertFalse(deferred.called) + + # Start sending data. + protocol.dataReceived(b"12345") + self._assert_error(deferred, protocol) + self._cleanup_error(deferred) + + # The data is never consumed. + self.assertEqual(result.getvalue(), b"")