Update to use new timeout function everywhere.

The existing deferred timeout helper function (and the one into twisted)
suffer from a bug when a deferred's canceller throws an exception, #3842.

The new helper function doesn't suffer from this problem.
This commit is contained in:
Erik Johnston 2018-09-19 10:39:40 +01:00
parent 05b9937cd7
commit a334e1cace
4 changed files with 43 additions and 72 deletions

View File

@ -43,7 +43,7 @@ from twisted.web.http_headers import Headers
from synapse.api.errors import Codes, HttpResponseException, SynapseError from synapse.api.errors import Codes, HttpResponseException, SynapseError
from synapse.http import cancelled_to_request_timed_out_error, redact_uri from synapse.http import cancelled_to_request_timed_out_error, redact_uri
from synapse.http.endpoint import SpiderEndpoint from synapse.http.endpoint import SpiderEndpoint
from synapse.util.async_helpers import add_timeout_to_deferred from synapse.util.async_helpers import timeout_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable from synapse.util.logcontext import make_deferred_yieldable
@ -99,7 +99,7 @@ class SimpleHttpClient(object):
request_deferred = treq.request( request_deferred = treq.request(
method, uri, agent=self.agent, data=data, headers=headers method, uri, agent=self.agent, data=data, headers=headers
) )
add_timeout_to_deferred( request_deferred = timeout_deferred(
request_deferred, 60, self.hs.get_reactor(), request_deferred, 60, self.hs.get_reactor(),
cancelled_to_request_timed_out_error, cancelled_to_request_timed_out_error,
) )

View File

@ -44,7 +44,7 @@ from synapse.api.errors import (
SynapseError, SynapseError,
) )
from synapse.http.endpoint import matrix_federation_endpoint from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util.async_helpers import timeout_no_seriously from synapse.util.async_helpers import timeout_deferred
from synapse.util.logcontext import make_deferred_yieldable from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
@ -145,8 +145,14 @@ def _handle_json_response(reactor, timeout_sec, request, response):
""" """
try: try:
check_content_type_is_json(response.headers) check_content_type_is_json(response.headers)
d = treq.json_content(response) d = treq.json_content(response)
d.addTimeout(timeout_sec, reactor) d = timeout_deferred(
d,
timeout=timeout_sec,
reactor=reactor,
)
body = yield make_deferred_yieldable(d) body = yield make_deferred_yieldable(d)
except Exception as e: except Exception as e:
logger.warn( logger.warn(
@ -321,15 +327,10 @@ class MatrixFederationHttpClient(object):
reactor=self.hs.get_reactor(), reactor=self.hs.get_reactor(),
unbuffered=True unbuffered=True
) )
request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())
# Sometimes the timeout above doesn't work, so lets hack yet request_deferred = timeout_deferred(
# another layer of timeouts in in the vain hope that at some
# point the world made sense and this really really really
# should work.
request_deferred = timeout_no_seriously(
request_deferred, request_deferred,
timeout=_sec_timeout * 2, timeout=_sec_timeout,
reactor=self.hs.get_reactor(), reactor=self.hs.get_reactor(),
) )
@ -388,7 +389,11 @@ class MatrixFederationHttpClient(object):
# :'( # :'(
# Update transactions table? # Update transactions table?
d = treq.content(response) d = treq.content(response)
d.addTimeout(_sec_timeout, self.hs.get_reactor()) d = timeout_deferred(
d,
timeout=_sec_timeout,
reactor=self.hs.get_reactor(),
)
body = yield make_deferred_yieldable(d) body = yield make_deferred_yieldable(d)
raise HttpResponseException( raise HttpResponseException(
response.code, response.phrase, body response.code, response.phrase, body

View File

@ -28,7 +28,7 @@ from synapse.types import StreamToken
from synapse.util.async_helpers import ( from synapse.util.async_helpers import (
DeferredTimeoutError, DeferredTimeoutError,
ObservableDeferred, ObservableDeferred,
add_timeout_to_deferred, timeout_deferred,
) )
from synapse.util.logcontext import PreserveLoggingContext, run_in_background from synapse.util.logcontext import PreserveLoggingContext, run_in_background
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
@ -337,7 +337,7 @@ class Notifier(object):
# Now we wait for the _NotifierUserStream to be told there # Now we wait for the _NotifierUserStream to be told there
# is a new token. # is a new token.
listener = user_stream.new_listener(prev_token) listener = user_stream.new_listener(prev_token)
add_timeout_to_deferred( listener.deferred = timeout_deferred(
listener.deferred, listener.deferred,
(end_time - now) / 1000., (end_time - now) / 1000.,
self.hs.get_reactor(), self.hs.get_reactor(),
@ -559,11 +559,12 @@ class Notifier(object):
if end_time <= now: if end_time <= now:
break break
add_timeout_to_deferred( listener.deferred = timeout_deferred(
listener.deferred.addTimeout, listener.deferred,
(end_time - now) / 1000., timeout=(end_time - now) / 1000.,
self.hs.get_reactor(), reactor=self.hs.get_reactor(),
) )
try: try:
with PreserveLoggingContext(): with PreserveLoggingContext():
yield listener.deferred yield listener.deferred

View File

@ -380,23 +380,25 @@ class DeferredTimeoutError(Exception):
""" """
def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None): def _cancelled_to_timed_out_error(value, timeout):
""" if isinstance(value, failure.Failure):
Add a timeout to a deferred by scheduling it to be cancelled after value.trap(CancelledError)
timeout seconds. raise DeferredTimeoutError(timeout, "Deferred")
return value
This is essentially a backport of deferred.addTimeout, which was introduced
in twisted 16.5.
If the deferred gets timed out, it errbacks with a DeferredTimeoutError, def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
unless a cancelable function was passed to its initialization or unless """The in built twisted `Deferred.addTimeout` fails to time out deferreds
a different on_timeout_cancel callable is provided. that have a canceller that throws exceptions. This method creates a new
deferred that wraps and times out the given deferred, correctly handling
the case where the given deferred's canceller throws.
NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred
Args: Args:
deferred (defer.Deferred): deferred to be timed out deferred (Deferred)
timeout (Number): seconds to time out after timeout (float): Timeout in seconds
reactor (twisted.internet.reactor): the Twisted reactor to use reactor (twisted.internet.reactor): The twisted reactor to use
on_timeout_cancel (callable): A callable which is called immediately on_timeout_cancel (callable): A callable which is called immediately
after the deferred times out, and not if this deferred is after the deferred times out, and not if this deferred is
otherwise cancelled before the timeout. otherwise cancelled before the timeout.
@ -407,47 +409,9 @@ def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
The default callable (if none is provided) will translate a The default callable (if none is provided) will translate a
CancelledError Failure into a DeferredTimeoutError. CancelledError Failure into a DeferredTimeoutError.
"""
timed_out = [False]
def time_it_out(): Returns:
timed_out[0] = True Deferred
deferred.cancel()
delayed_call = reactor.callLater(timeout, time_it_out)
def convert_cancelled(value):
if timed_out[0]:
to_call = on_timeout_cancel or _cancelled_to_timed_out_error
return to_call(value, timeout)
return value
deferred.addBoth(convert_cancelled)
def cancel_timeout(result):
# stop the pending call to cancel the deferred if it's been fired
if delayed_call.active():
delayed_call.cancel()
return result
deferred.addBoth(cancel_timeout)
def _cancelled_to_timed_out_error(value, timeout):
if isinstance(value, failure.Failure):
value.trap(CancelledError)
raise DeferredTimeoutError(timeout, "Deferred")
return value
def timeout_no_seriously(deferred, timeout, reactor):
"""The in build twisted deferred addTimeout (and the method above)
completely fail to time things out under some unknown circumstances.
Lets try a different way of timing things out and maybe that will make
things work?!
TODO: Kill this with fire.
""" """
new_d = defer.Deferred() new_d = defer.Deferred()
@ -466,7 +430,8 @@ def timeout_no_seriously(deferred, timeout, reactor):
def convert_cancelled(value): def convert_cancelled(value):
if timed_out[0]: if timed_out[0]:
return _cancelled_to_timed_out_error(value, timeout) to_call = on_timeout_cancel or _cancelled_to_timed_out_error
return to_call(value, timeout)
return value return value
deferred.addBoth(convert_cancelled) deferred.addBoth(convert_cancelled)