Merge pull request from matrix-org/erikj/update_timeout

Update to use new timeout function everywhere.
This commit is contained in:
Erik Johnston 2018-09-19 11:45:11 +01:00 committed by GitHub
commit cb016baa37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 52 additions and 87 deletions

1
changelog.d/3910.bugfix Normal file
View File

@ -0,0 +1 @@
Fix bug where things occaisonally were not being timed out correctly.

View File

@ -43,7 +43,7 @@ from twisted.web.http_headers import Headers
from synapse.api.errors import Codes, HttpResponseException, SynapseError from synapse.api.errors import Codes, HttpResponseException, SynapseError
from synapse.http import cancelled_to_request_timed_out_error, redact_uri from synapse.http import cancelled_to_request_timed_out_error, redact_uri
from synapse.http.endpoint import SpiderEndpoint from synapse.http.endpoint import SpiderEndpoint
from synapse.util.async_helpers import add_timeout_to_deferred from synapse.util.async_helpers import timeout_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable from synapse.util.logcontext import make_deferred_yieldable
@ -99,7 +99,7 @@ class SimpleHttpClient(object):
request_deferred = treq.request( request_deferred = treq.request(
method, uri, agent=self.agent, data=data, headers=headers method, uri, agent=self.agent, data=data, headers=headers
) )
add_timeout_to_deferred( request_deferred = timeout_deferred(
request_deferred, 60, self.hs.get_reactor(), request_deferred, 60, self.hs.get_reactor(),
cancelled_to_request_timed_out_error, cancelled_to_request_timed_out_error,
) )

View File

@ -44,7 +44,7 @@ from synapse.api.errors import (
SynapseError, SynapseError,
) )
from synapse.http.endpoint import matrix_federation_endpoint from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util.async_helpers import timeout_no_seriously from synapse.util.async_helpers import timeout_deferred
from synapse.util.logcontext import make_deferred_yieldable from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
@ -145,8 +145,14 @@ def _handle_json_response(reactor, timeout_sec, request, response):
""" """
try: try:
check_content_type_is_json(response.headers) check_content_type_is_json(response.headers)
d = treq.json_content(response) d = treq.json_content(response)
d.addTimeout(timeout_sec, reactor) d = timeout_deferred(
d,
timeout=timeout_sec,
reactor=reactor,
)
body = yield make_deferred_yieldable(d) body = yield make_deferred_yieldable(d)
except Exception as e: except Exception as e:
logger.warn( logger.warn(
@ -321,15 +327,10 @@ class MatrixFederationHttpClient(object):
reactor=self.hs.get_reactor(), reactor=self.hs.get_reactor(),
unbuffered=True unbuffered=True
) )
request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())
# Sometimes the timeout above doesn't work, so lets hack yet request_deferred = timeout_deferred(
# another layer of timeouts in in the vain hope that at some
# point the world made sense and this really really really
# should work.
request_deferred = timeout_no_seriously(
request_deferred, request_deferred,
timeout=_sec_timeout * 2, timeout=_sec_timeout,
reactor=self.hs.get_reactor(), reactor=self.hs.get_reactor(),
) )
@ -388,7 +389,11 @@ class MatrixFederationHttpClient(object):
# :'( # :'(
# Update transactions table? # Update transactions table?
d = treq.content(response) d = treq.content(response)
d.addTimeout(_sec_timeout, self.hs.get_reactor()) d = timeout_deferred(
d,
timeout=_sec_timeout,
reactor=self.hs.get_reactor(),
)
body = yield make_deferred_yieldable(d) body = yield make_deferred_yieldable(d)
raise HttpResponseException( raise HttpResponseException(
response.code, response.phrase, body response.code, response.phrase, body

View File

@ -25,11 +25,7 @@ from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import LaterGauge from synapse.metrics import LaterGauge
from synapse.types import StreamToken from synapse.types import StreamToken
from synapse.util.async_helpers import ( from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
DeferredTimeoutError,
ObservableDeferred,
add_timeout_to_deferred,
)
from synapse.util.logcontext import PreserveLoggingContext, run_in_background from synapse.util.logcontext import PreserveLoggingContext, run_in_background
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
@ -337,7 +333,7 @@ class Notifier(object):
# Now we wait for the _NotifierUserStream to be told there # Now we wait for the _NotifierUserStream to be told there
# is a new token. # is a new token.
listener = user_stream.new_listener(prev_token) listener = user_stream.new_listener(prev_token)
add_timeout_to_deferred( listener.deferred = timeout_deferred(
listener.deferred, listener.deferred,
(end_time - now) / 1000., (end_time - now) / 1000.,
self.hs.get_reactor(), self.hs.get_reactor(),
@ -354,7 +350,7 @@ class Notifier(object):
# Update the prev_token to the current_token since nothing # Update the prev_token to the current_token since nothing
# has happened between the old prev_token and the current_token # has happened between the old prev_token and the current_token
prev_token = current_token prev_token = current_token
except DeferredTimeoutError: except defer.TimeoutError:
break break
except defer.CancelledError: except defer.CancelledError:
break break
@ -559,15 +555,16 @@ class Notifier(object):
if end_time <= now: if end_time <= now:
break break
add_timeout_to_deferred( listener.deferred = timeout_deferred(
listener.deferred.addTimeout, listener.deferred,
(end_time - now) / 1000., timeout=(end_time - now) / 1000.,
self.hs.get_reactor(), reactor=self.hs.get_reactor(),
) )
try: try:
with PreserveLoggingContext(): with PreserveLoggingContext():
yield listener.deferred yield listener.deferred
except DeferredTimeoutError: except defer.TimeoutError:
break break
except defer.CancelledError: except defer.CancelledError:
break break

View File

@ -374,29 +374,25 @@ class ReadWriteLock(object):
defer.returnValue(_ctx_manager()) defer.returnValue(_ctx_manager())
class DeferredTimeoutError(Exception): def _cancelled_to_timed_out_error(value, timeout):
""" if isinstance(value, failure.Failure):
This error is raised by default when a L{Deferred} times out. value.trap(CancelledError)
""" raise defer.TimeoutError(timeout, "Deferred")
return value
def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None): def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
""" """The in built twisted `Deferred.addTimeout` fails to time out deferreds
Add a timeout to a deferred by scheduling it to be cancelled after that have a canceller that throws exceptions. This method creates a new
timeout seconds. deferred that wraps and times out the given deferred, correctly handling
the case where the given deferred's canceller throws.
This is essentially a backport of deferred.addTimeout, which was introduced NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred
in twisted 16.5.
If the deferred gets timed out, it errbacks with a DeferredTimeoutError,
unless a cancelable function was passed to its initialization or unless
a different on_timeout_cancel callable is provided.
Args: Args:
deferred (defer.Deferred): deferred to be timed out deferred (Deferred)
timeout (Number): seconds to time out after timeout (float): Timeout in seconds
reactor (twisted.internet.reactor): the Twisted reactor to use reactor (twisted.internet.reactor): The twisted reactor to use
on_timeout_cancel (callable): A callable which is called immediately on_timeout_cancel (callable): A callable which is called immediately
after the deferred times out, and not if this deferred is after the deferred times out, and not if this deferred is
otherwise cancelled before the timeout. otherwise cancelled before the timeout.
@ -406,48 +402,10 @@ def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
the timeout. the timeout.
The default callable (if none is provided) will translate a The default callable (if none is provided) will translate a
CancelledError Failure into a DeferredTimeoutError. CancelledError Failure into a defer.TimeoutError.
"""
timed_out = [False]
def time_it_out(): Returns:
timed_out[0] = True Deferred
deferred.cancel()
delayed_call = reactor.callLater(timeout, time_it_out)
def convert_cancelled(value):
if timed_out[0]:
to_call = on_timeout_cancel or _cancelled_to_timed_out_error
return to_call(value, timeout)
return value
deferred.addBoth(convert_cancelled)
def cancel_timeout(result):
# stop the pending call to cancel the deferred if it's been fired
if delayed_call.active():
delayed_call.cancel()
return result
deferred.addBoth(cancel_timeout)
def _cancelled_to_timed_out_error(value, timeout):
if isinstance(value, failure.Failure):
value.trap(CancelledError)
raise DeferredTimeoutError(timeout, "Deferred")
return value
def timeout_no_seriously(deferred, timeout, reactor):
"""The in build twisted deferred addTimeout (and the method above)
completely fail to time things out under some unknown circumstances.
Lets try a different way of timing things out and maybe that will make
things work?!
TODO: Kill this with fire.
""" """
new_d = defer.Deferred() new_d = defer.Deferred()
@ -457,16 +415,20 @@ def timeout_no_seriously(deferred, timeout, reactor):
def time_it_out(): def time_it_out():
timed_out[0] = True timed_out[0] = True
if not new_d.called: try:
new_d.errback(DeferredTimeoutError(timeout, "Deferred")) deferred.cancel()
except: # noqa: E722, if we throw any exception it'll break time outs
logger.exception("Canceller failed during timeout")
deferred.cancel() if not new_d.called:
new_d.errback(defer.TimeoutError(timeout, "Deferred"))
delayed_call = reactor.callLater(timeout, time_it_out) delayed_call = reactor.callLater(timeout, time_it_out)
def convert_cancelled(value): def convert_cancelled(value):
if timed_out[0]: if timed_out[0]:
return _cancelled_to_timed_out_error(value, timeout) to_call = on_timeout_cancel or _cancelled_to_timed_out_error
return to_call(value, timeout)
return value return value
deferred.addBoth(convert_cancelled) deferred.addBoth(convert_cancelled)