mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2024-12-27 14:19:21 -05:00
Merge pull request #3127 from matrix-org/rav/deferred_timeout
Use deferred.addTimeout instead of time_bound_deferred
This commit is contained in:
commit
9558236728
@ -1,5 +1,6 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
# Copyright 2014-2016 OpenMarket Ltd
|
# Copyright 2014-2016 OpenMarket Ltd
|
||||||
|
# Copyright 2018 New Vector Ltd
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
# you may not use this file except in compliance with the License.
|
# you may not use this file except in compliance with the License.
|
||||||
@ -12,3 +13,24 @@
|
|||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
from twisted.internet.defer import CancelledError
|
||||||
|
from twisted.python import failure
|
||||||
|
|
||||||
|
from synapse.api.errors import SynapseError
|
||||||
|
|
||||||
|
|
||||||
|
class RequestTimedOutError(SynapseError):
|
||||||
|
"""Exception representing timeout of an outbound request"""
|
||||||
|
def __init__(self):
|
||||||
|
super(RequestTimedOutError, self).__init__(504, "Timed out")
|
||||||
|
|
||||||
|
|
||||||
|
def cancelled_to_request_timed_out_error(value):
|
||||||
|
"""Turns CancelledErrors into RequestTimedOutErrors.
|
||||||
|
|
||||||
|
For use with async.add_timeout_to_deferred
|
||||||
|
"""
|
||||||
|
if isinstance(value, failure.Failure):
|
||||||
|
value.trap(CancelledError)
|
||||||
|
raise RequestTimedOutError()
|
||||||
|
return value
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
# Copyright 2014-2016 OpenMarket Ltd
|
# Copyright 2014-2016 OpenMarket Ltd
|
||||||
|
# Copyright 2018 New Vector Ltd
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
# you may not use this file except in compliance with the License.
|
# you may not use this file except in compliance with the License.
|
||||||
@ -18,9 +19,10 @@ from OpenSSL.SSL import VERIFY_NONE
|
|||||||
from synapse.api.errors import (
|
from synapse.api.errors import (
|
||||||
CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
|
CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
|
||||||
)
|
)
|
||||||
|
from synapse.http import cancelled_to_request_timed_out_error
|
||||||
|
from synapse.util.async import add_timeout_to_deferred
|
||||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
from synapse.util.caches import CACHE_SIZE_FACTOR
|
||||||
from synapse.util.logcontext import make_deferred_yieldable
|
from synapse.util.logcontext import make_deferred_yieldable
|
||||||
from synapse.util import logcontext
|
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
from synapse.http.endpoint import SpiderEndpoint
|
from synapse.http.endpoint import SpiderEndpoint
|
||||||
|
|
||||||
@ -95,21 +97,17 @@ class SimpleHttpClient(object):
|
|||||||
# counters to it
|
# counters to it
|
||||||
outgoing_requests_counter.inc(method)
|
outgoing_requests_counter.inc(method)
|
||||||
|
|
||||||
def send_request():
|
|
||||||
request_deferred = self.agent.request(
|
|
||||||
method, uri, *args, **kwargs
|
|
||||||
)
|
|
||||||
|
|
||||||
return self.clock.time_bound_deferred(
|
|
||||||
request_deferred,
|
|
||||||
time_out=60,
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info("Sending request %s %s", method, uri)
|
logger.info("Sending request %s %s", method, uri)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
with logcontext.PreserveLoggingContext():
|
request_deferred = self.agent.request(
|
||||||
response = yield send_request()
|
method, uri, *args, **kwargs
|
||||||
|
)
|
||||||
|
add_timeout_to_deferred(
|
||||||
|
request_deferred,
|
||||||
|
60, cancelled_to_request_timed_out_error,
|
||||||
|
)
|
||||||
|
response = yield make_deferred_yieldable(request_deferred)
|
||||||
|
|
||||||
incoming_responses_counter.inc(method, response.code)
|
incoming_responses_counter.inc(method, response.code)
|
||||||
logger.info(
|
logger.info(
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
# Copyright 2014-2016 OpenMarket Ltd
|
# Copyright 2014-2016 OpenMarket Ltd
|
||||||
|
# Copyright 2018 New Vector Ltd
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
# you may not use this file except in compliance with the License.
|
# you may not use this file except in compliance with the License.
|
||||||
@ -12,17 +13,19 @@
|
|||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import synapse.util.retryutils
|
|
||||||
from twisted.internet import defer, reactor, protocol
|
from twisted.internet import defer, reactor, protocol
|
||||||
from twisted.internet.error import DNSLookupError
|
from twisted.internet.error import DNSLookupError
|
||||||
from twisted.web.client import readBody, HTTPConnectionPool, Agent
|
from twisted.web.client import readBody, HTTPConnectionPool, Agent
|
||||||
from twisted.web.http_headers import Headers
|
from twisted.web.http_headers import Headers
|
||||||
from twisted.web._newclient import ResponseDone
|
from twisted.web._newclient import ResponseDone
|
||||||
|
|
||||||
|
from synapse.http import cancelled_to_request_timed_out_error
|
||||||
from synapse.http.endpoint import matrix_federation_endpoint
|
from synapse.http.endpoint import matrix_federation_endpoint
|
||||||
from synapse.util.async import sleep
|
|
||||||
from synapse.util import logcontext
|
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
|
from synapse.util.async import sleep, add_timeout_to_deferred
|
||||||
|
from synapse.util import logcontext
|
||||||
|
from synapse.util.logcontext import make_deferred_yieldable
|
||||||
|
import synapse.util.retryutils
|
||||||
|
|
||||||
from canonicaljson import encode_canonical_json
|
from canonicaljson import encode_canonical_json
|
||||||
|
|
||||||
@ -184,21 +187,20 @@ class MatrixFederationHttpClient(object):
|
|||||||
producer = body_callback(method, http_url_bytes, headers_dict)
|
producer = body_callback(method, http_url_bytes, headers_dict)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
def send_request():
|
|
||||||
request_deferred = self.agent.request(
|
request_deferred = self.agent.request(
|
||||||
method,
|
method,
|
||||||
url_bytes,
|
url_bytes,
|
||||||
Headers(headers_dict),
|
Headers(headers_dict),
|
||||||
producer
|
producer
|
||||||
)
|
)
|
||||||
|
add_timeout_to_deferred(
|
||||||
return self.clock.time_bound_deferred(
|
request_deferred,
|
||||||
|
timeout / 1000. if timeout else 60,
|
||||||
|
cancelled_to_request_timed_out_error,
|
||||||
|
)
|
||||||
|
response = yield make_deferred_yieldable(
|
||||||
request_deferred,
|
request_deferred,
|
||||||
time_out=timeout / 1000. if timeout else 60,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
with logcontext.PreserveLoggingContext():
|
|
||||||
response = yield send_request()
|
|
||||||
|
|
||||||
log_result = "%d %s" % (response.code, response.phrase,)
|
log_result = "%d %s" % (response.code, response.phrase,)
|
||||||
break
|
break
|
||||||
|
@ -14,13 +14,16 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes, Membership
|
from synapse.api.constants import EventTypes, Membership
|
||||||
from synapse.api.errors import AuthError
|
from synapse.api.errors import AuthError
|
||||||
from synapse.handlers.presence import format_user_presence_state
|
from synapse.handlers.presence import format_user_presence_state
|
||||||
|
|
||||||
from synapse.util import DeferredTimedOutError
|
|
||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
from synapse.util.async import ObservableDeferred
|
from synapse.util.async import (
|
||||||
|
ObservableDeferred, add_timeout_to_deferred,
|
||||||
|
DeferredTimeoutError,
|
||||||
|
)
|
||||||
from synapse.util.logcontext import PreserveLoggingContext, run_in_background
|
from synapse.util.logcontext import PreserveLoggingContext, run_in_background
|
||||||
from synapse.util.metrics import Measure
|
from synapse.util.metrics import Measure
|
||||||
from synapse.types import StreamToken
|
from synapse.types import StreamToken
|
||||||
@ -336,11 +339,12 @@ class Notifier(object):
|
|||||||
# Now we wait for the _NotifierUserStream to be told there
|
# Now we wait for the _NotifierUserStream to be told there
|
||||||
# is a new token.
|
# is a new token.
|
||||||
listener = user_stream.new_listener(prev_token)
|
listener = user_stream.new_listener(prev_token)
|
||||||
with PreserveLoggingContext():
|
add_timeout_to_deferred(
|
||||||
yield self.clock.time_bound_deferred(
|
|
||||||
listener.deferred,
|
listener.deferred,
|
||||||
time_out=(end_time - now) / 1000.
|
(end_time - now) / 1000.,
|
||||||
)
|
)
|
||||||
|
with PreserveLoggingContext():
|
||||||
|
yield listener.deferred
|
||||||
|
|
||||||
current_token = user_stream.current_token
|
current_token = user_stream.current_token
|
||||||
|
|
||||||
@ -351,7 +355,7 @@ class Notifier(object):
|
|||||||
# Update the prev_token to the current_token since nothing
|
# Update the prev_token to the current_token since nothing
|
||||||
# has happened between the old prev_token and the current_token
|
# has happened between the old prev_token and the current_token
|
||||||
prev_token = current_token
|
prev_token = current_token
|
||||||
except DeferredTimedOutError:
|
except DeferredTimeoutError:
|
||||||
break
|
break
|
||||||
except defer.CancelledError:
|
except defer.CancelledError:
|
||||||
break
|
break
|
||||||
@ -556,13 +560,14 @@ class Notifier(object):
|
|||||||
if end_time <= now:
|
if end_time <= now:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
add_timeout_to_deferred(
|
||||||
|
listener.deferred.addTimeout,
|
||||||
|
(end_time - now) / 1000.,
|
||||||
|
)
|
||||||
try:
|
try:
|
||||||
with PreserveLoggingContext():
|
with PreserveLoggingContext():
|
||||||
yield self.clock.time_bound_deferred(
|
yield listener.deferred
|
||||||
listener.deferred,
|
except DeferredTimeoutError:
|
||||||
time_out=(end_time - now) / 1000.
|
|
||||||
)
|
|
||||||
except DeferredTimedOutError:
|
|
||||||
break
|
break
|
||||||
except defer.CancelledError:
|
except defer.CancelledError:
|
||||||
break
|
break
|
||||||
|
@ -13,7 +13,6 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from synapse.api.errors import SynapseError
|
|
||||||
from synapse.util.logcontext import PreserveLoggingContext
|
from synapse.util.logcontext import PreserveLoggingContext
|
||||||
|
|
||||||
from twisted.internet import defer, reactor, task
|
from twisted.internet import defer, reactor, task
|
||||||
@ -24,11 +23,6 @@ import logging
|
|||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class DeferredTimedOutError(SynapseError):
|
|
||||||
def __init__(self):
|
|
||||||
super(DeferredTimedOutError, self).__init__(504, "Timed out")
|
|
||||||
|
|
||||||
|
|
||||||
def unwrapFirstError(failure):
|
def unwrapFirstError(failure):
|
||||||
# defer.gatherResults and DeferredLists wrap failures.
|
# defer.gatherResults and DeferredLists wrap failures.
|
||||||
failure.trap(defer.FirstError)
|
failure.trap(defer.FirstError)
|
||||||
@ -85,53 +79,3 @@ class Clock(object):
|
|||||||
except Exception:
|
except Exception:
|
||||||
if not ignore_errs:
|
if not ignore_errs:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def time_bound_deferred(self, given_deferred, time_out):
|
|
||||||
if given_deferred.called:
|
|
||||||
return given_deferred
|
|
||||||
|
|
||||||
ret_deferred = defer.Deferred()
|
|
||||||
|
|
||||||
def timed_out_fn():
|
|
||||||
e = DeferredTimedOutError()
|
|
||||||
|
|
||||||
try:
|
|
||||||
ret_deferred.errback(e)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
try:
|
|
||||||
given_deferred.cancel()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
timer = None
|
|
||||||
|
|
||||||
def cancel(res):
|
|
||||||
try:
|
|
||||||
self.cancel_call_later(timer)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
return res
|
|
||||||
|
|
||||||
ret_deferred.addBoth(cancel)
|
|
||||||
|
|
||||||
def success(res):
|
|
||||||
try:
|
|
||||||
ret_deferred.callback(res)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return res
|
|
||||||
|
|
||||||
def err(res):
|
|
||||||
try:
|
|
||||||
ret_deferred.errback(res)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
given_deferred.addCallbacks(callback=success, errback=err)
|
|
||||||
|
|
||||||
timer = self.call_later(time_out, timed_out_fn)
|
|
||||||
|
|
||||||
return ret_deferred
|
|
||||||
|
@ -15,6 +15,8 @@
|
|||||||
|
|
||||||
|
|
||||||
from twisted.internet import defer, reactor
|
from twisted.internet import defer, reactor
|
||||||
|
from twisted.internet.defer import CancelledError
|
||||||
|
from twisted.python import failure
|
||||||
|
|
||||||
from .logcontext import (
|
from .logcontext import (
|
||||||
PreserveLoggingContext, make_deferred_yieldable, preserve_fn
|
PreserveLoggingContext, make_deferred_yieldable, preserve_fn
|
||||||
@ -392,3 +394,68 @@ class ReadWriteLock(object):
|
|||||||
self.key_to_current_writer.pop(key)
|
self.key_to_current_writer.pop(key)
|
||||||
|
|
||||||
defer.returnValue(_ctx_manager())
|
defer.returnValue(_ctx_manager())
|
||||||
|
|
||||||
|
|
||||||
|
class DeferredTimeoutError(Exception):
|
||||||
|
"""
|
||||||
|
This error is raised by default when a L{Deferred} times out.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
|
||||||
|
"""
|
||||||
|
Add a timeout to a deferred by scheduling it to be cancelled after
|
||||||
|
timeout seconds.
|
||||||
|
|
||||||
|
This is essentially a backport of deferred.addTimeout, which was introduced
|
||||||
|
in twisted 16.5.
|
||||||
|
|
||||||
|
If the deferred gets timed out, it errbacks with a DeferredTimeoutError,
|
||||||
|
unless a cancelable function was passed to its initialization or unless
|
||||||
|
a different on_timeout_cancel callable is provided.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
deferred (defer.Deferred): deferred to be timed out
|
||||||
|
timeout (Number): seconds to time out after
|
||||||
|
|
||||||
|
on_timeout_cancel (callable): A callable which is called immediately
|
||||||
|
after the deferred times out, and not if this deferred is
|
||||||
|
otherwise cancelled before the timeout.
|
||||||
|
|
||||||
|
It takes an arbitrary value, which is the value of the deferred at
|
||||||
|
that exact point in time (probably a CancelledError Failure), and
|
||||||
|
the timeout.
|
||||||
|
|
||||||
|
The default callable (if none is provided) will translate a
|
||||||
|
CancelledError Failure into a DeferredTimeoutError.
|
||||||
|
"""
|
||||||
|
timed_out = [False]
|
||||||
|
|
||||||
|
def time_it_out():
|
||||||
|
timed_out[0] = True
|
||||||
|
deferred.cancel()
|
||||||
|
|
||||||
|
delayed_call = reactor.callLater(timeout, time_it_out)
|
||||||
|
|
||||||
|
def convert_cancelled(value):
|
||||||
|
if timed_out[0]:
|
||||||
|
to_call = on_timeout_cancel or _cancelled_to_timed_out_error
|
||||||
|
return to_call(value, timeout)
|
||||||
|
return value
|
||||||
|
|
||||||
|
deferred.addBoth(convert_cancelled)
|
||||||
|
|
||||||
|
def cancel_timeout(result):
|
||||||
|
# stop the pending call to cancel the deferred if it's been fired
|
||||||
|
if delayed_call.active():
|
||||||
|
delayed_call.cancel()
|
||||||
|
return result
|
||||||
|
|
||||||
|
deferred.addBoth(cancel_timeout)
|
||||||
|
|
||||||
|
|
||||||
|
def _cancelled_to_timed_out_error(value, timeout):
|
||||||
|
if isinstance(value, failure.Failure):
|
||||||
|
value.trap(CancelledError)
|
||||||
|
raise DeferredTimeoutError(timeout, "Deferred")
|
||||||
|
return value
|
||||||
|
@ -1,33 +0,0 @@
|
|||||||
# -*- coding: utf-8 -*-
|
|
||||||
# Copyright 2017 Vector Creations Ltd
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
from synapse import util
|
|
||||||
from twisted.internet import defer
|
|
||||||
from tests import unittest
|
|
||||||
|
|
||||||
|
|
||||||
class ClockTestCase(unittest.TestCase):
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def test_time_bound_deferred(self):
|
|
||||||
# just a deferred which never resolves
|
|
||||||
slow_deferred = defer.Deferred()
|
|
||||||
|
|
||||||
clock = util.Clock()
|
|
||||||
time_bound = clock.time_bound_deferred(slow_deferred, 0.001)
|
|
||||||
|
|
||||||
try:
|
|
||||||
yield time_bound
|
|
||||||
self.fail("Expected timedout error, but got nothing")
|
|
||||||
except util.DeferredTimedOutError:
|
|
||||||
pass
|
|
Loading…
Reference in New Issue
Block a user