Merge pull request #3845 from matrix-org/erikj/timeout_reads

Timeout reading body for outbound HTTP requests
This commit is contained in:
Amber Brown 2018-09-12 20:16:08 +10:00 committed by GitHub
commit 4073f73edc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 46 additions and 7 deletions

1
changelog.d/3845.bugfix Normal file
View File

@ -0,0 +1 @@
Fix outbound requests occasionally wedging, which can result in federation breaking between servers.

View File

@ -280,7 +280,10 @@ class MatrixFederationHttpClient(object):
# :'( # :'(
# Update transactions table? # Update transactions table?
with logcontext.PreserveLoggingContext(): with logcontext.PreserveLoggingContext():
body = yield treq.content(response) body = yield self._timeout_deferred(
treq.content(response),
timeout,
)
raise HttpResponseException( raise HttpResponseException(
response.code, response.phrase, body response.code, response.phrase, body
) )
@ -394,7 +397,10 @@ class MatrixFederationHttpClient(object):
check_content_type_is_json(response.headers) check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext(): with logcontext.PreserveLoggingContext():
body = yield treq.json_content(response) body = yield self._timeout_deferred(
treq.json_content(response),
timeout,
)
defer.returnValue(body) defer.returnValue(body)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -444,7 +450,10 @@ class MatrixFederationHttpClient(object):
check_content_type_is_json(response.headers) check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext(): with logcontext.PreserveLoggingContext():
body = yield treq.json_content(response) body = yield self._timeout_deferred(
treq.json_content(response),
timeout,
)
defer.returnValue(body) defer.returnValue(body)
@ -496,7 +505,10 @@ class MatrixFederationHttpClient(object):
check_content_type_is_json(response.headers) check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext(): with logcontext.PreserveLoggingContext():
body = yield treq.json_content(response) body = yield self._timeout_deferred(
treq.json_content(response),
timeout,
)
defer.returnValue(body) defer.returnValue(body)
@ -543,7 +555,10 @@ class MatrixFederationHttpClient(object):
check_content_type_is_json(response.headers) check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext(): with logcontext.PreserveLoggingContext():
body = yield treq.json_content(response) body = yield self._timeout_deferred(
treq.json_content(response),
timeout,
)
defer.returnValue(body) defer.returnValue(body)
@ -585,8 +600,10 @@ class MatrixFederationHttpClient(object):
try: try:
with logcontext.PreserveLoggingContext(): with logcontext.PreserveLoggingContext():
length = yield _readBodyToFile( length = yield self._timeout_deferred(
_readBodyToFile(
response, output_stream, max_size response, output_stream, max_size
),
) )
except Exception: except Exception:
logger.exception("Failed to download body") logger.exception("Failed to download body")
@ -594,6 +611,27 @@ class MatrixFederationHttpClient(object):
defer.returnValue((length, headers)) defer.returnValue((length, headers))
def _timeout_deferred(self, deferred, timeout_ms=None):
"""Times the deferred out after `timeout_ms` ms
Args:
deferred (Deferred)
timeout_ms (int|None): Timeout in milliseconds. If None defaults
to 60 seconds.
Returns:
Deferred
"""
add_timeout_to_deferred(
deferred,
timeout_ms / 1000. if timeout_ms else 60,
self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
return deferred
class _ReadBodyToFileProtocol(protocol.Protocol): class _ReadBodyToFileProtocol(protocol.Protocol):
def __init__(self, stream, deferred, max_size): def __init__(self, stream, deferred, max_size):