Merge pull request #2050 from matrix-org/rav/federation_backoff

push federation retry limiter down to matrixfederationclient
This commit is contained in:
Richard van der Hoff 2017-03-23 22:27:01 +00:00 committed by GitHub
commit 9397edb28b
12 changed files with 359 additions and 319 deletions

View File

@ -15,7 +15,6 @@
from synapse.crypto.keyclient import fetch_server_key from synapse.crypto.keyclient import fetch_server_key
from synapse.api.errors import SynapseError, Codes from synapse.api.errors import SynapseError, Codes
from synapse.util.retryutils import get_retry_limiter
from synapse.util import unwrapFirstError from synapse.util import unwrapFirstError
from synapse.util.async import ObservableDeferred from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import ( from synapse.util.logcontext import (
@ -382,30 +381,24 @@ class Keyring(object):
def get_keys_from_server(self, server_name_and_key_ids): def get_keys_from_server(self, server_name_and_key_ids):
@defer.inlineCallbacks @defer.inlineCallbacks
def get_key(server_name, key_ids): def get_key(server_name, key_ids):
limiter = yield get_retry_limiter( keys = None
server_name, try:
self.clock, keys = yield self.get_server_verify_key_v2_direct(
self.store, server_name, key_ids
) )
with limiter: except Exception as e:
keys = None logger.info(
try: "Unable to get key %r for %r directly: %s %s",
keys = yield self.get_server_verify_key_v2_direct( key_ids, server_name,
server_name, key_ids type(e).__name__, str(e.message),
) )
except Exception as e:
logger.info(
"Unable to get key %r for %r directly: %s %s",
key_ids, server_name,
type(e).__name__, str(e.message),
)
if not keys: if not keys:
keys = yield self.get_server_verify_key_v1_direct( keys = yield self.get_server_verify_key_v1_direct(
server_name, key_ids server_name, key_ids
) )
keys = {server_name: keys} keys = {server_name: keys}
defer.returnValue(keys) defer.returnValue(keys)

View File

@ -29,7 +29,7 @@ from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
from synapse.events import FrozenEvent, builder from synapse.events import FrozenEvent, builder
import synapse.metrics import synapse.metrics
from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination from synapse.util.retryutils import NotRetryingDestination
import copy import copy
import itertools import itertools
@ -88,7 +88,7 @@ class FederationClient(FederationBase):
@log_function @log_function
def make_query(self, destination, query_type, args, def make_query(self, destination, query_type, args,
retry_on_dns_fail=False): retry_on_dns_fail=False, ignore_backoff=False):
"""Sends a federation Query to a remote homeserver of the given type """Sends a federation Query to a remote homeserver of the given type
and arguments. and arguments.
@ -98,6 +98,8 @@ class FederationClient(FederationBase):
handler name used in register_query_handler(). handler name used in register_query_handler().
args (dict): Mapping of strings to strings containing the details args (dict): Mapping of strings to strings containing the details
of the query request. of the query request.
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
Returns: Returns:
a Deferred which will eventually yield a JSON object from the a Deferred which will eventually yield a JSON object from the
@ -106,7 +108,8 @@ class FederationClient(FederationBase):
sent_queries_counter.inc(query_type) sent_queries_counter.inc(query_type)
return self.transport_layer.make_query( return self.transport_layer.make_query(
destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail,
ignore_backoff=ignore_backoff,
) )
@log_function @log_function
@ -234,31 +237,24 @@ class FederationClient(FederationBase):
continue continue
try: try:
limiter = yield get_retry_limiter( transaction_data = yield self.transport_layer.get_event(
destination, destination, event_id, timeout=timeout,
self._clock,
self.store,
) )
with limiter: logger.debug("transaction_data %r", transaction_data)
transaction_data = yield self.transport_layer.get_event(
destination, event_id, timeout=timeout,
)
logger.debug("transaction_data %r", transaction_data) pdu_list = [
self.event_from_pdu_json(p, outlier=outlier)
for p in transaction_data["pdus"]
]
pdu_list = [ if pdu_list and pdu_list[0]:
self.event_from_pdu_json(p, outlier=outlier) pdu = pdu_list[0]
for p in transaction_data["pdus"]
]
if pdu_list and pdu_list[0]: # Check signatures are correct.
pdu = pdu_list[0] signed_pdu = yield self._check_sigs_and_hashes([pdu])[0]
# Check signatures are correct. break
signed_pdu = yield self._check_sigs_and_hashes([pdu])[0]
break
pdu_attempts[destination] = now pdu_attempts[destination] = now

View File

@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import datetime
from twisted.internet import defer from twisted.internet import defer
@ -22,9 +22,7 @@ from .units import Transaction, Edu
from synapse.api.errors import HttpResponseException from synapse.api.errors import HttpResponseException
from synapse.util.async import run_on_reactor from synapse.util.async import run_on_reactor
from synapse.util.logcontext import preserve_context_over_fn from synapse.util.logcontext import preserve_context_over_fn
from synapse.util.retryutils import ( from synapse.util.retryutils import NotRetryingDestination
get_retry_limiter, NotRetryingDestination,
)
from synapse.util.metrics import measure_func from synapse.util.metrics import measure_func
from synapse.types import get_domain_from_id from synapse.types import get_domain_from_id
from synapse.handlers.presence import format_user_presence_state from synapse.handlers.presence import format_user_presence_state
@ -312,13 +310,6 @@ class TransactionQueue(object):
yield run_on_reactor() yield run_on_reactor()
while True: while True:
limiter = yield get_retry_limiter(
destination,
self.clock,
self.store,
backoff_on_404=True, # If we get a 404 the other side has gone
)
device_message_edus, device_stream_id, dev_list_id = ( device_message_edus, device_stream_id, dev_list_id = (
yield self._get_new_device_messages(destination) yield self._get_new_device_messages(destination)
) )
@ -374,7 +365,6 @@ class TransactionQueue(object):
success = yield self._send_new_transaction( success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus, pending_failures, destination, pending_pdus, pending_edus, pending_failures,
limiter=limiter,
) )
if success: if success:
# Remove the acknowledged device messages from the database # Remove the acknowledged device messages from the database
@ -392,12 +382,24 @@ class TransactionQueue(object):
self.last_device_list_stream_id_by_dest[destination] = dev_list_id self.last_device_list_stream_id_by_dest[destination] = dev_list_id
else: else:
break break
except NotRetryingDestination: except NotRetryingDestination as e:
logger.debug( logger.debug(
"TX [%s] not ready for retry yet - " "TX [%s] not ready for retry yet (next retry at %s) - "
"dropping transaction for now", "dropping transaction for now",
destination, destination,
datetime.datetime.fromtimestamp(
(e.retry_last_ts + e.retry_interval) / 1000.0
),
) )
except Exception as e:
logger.warn(
"TX [%s] Failed to send transaction: %s",
destination,
e,
)
for p in pending_pdus:
logger.info("Failed to send event %s to %s", p.event_id,
destination)
finally: finally:
# We want to be *very* sure we delete this after we stop processing # We want to be *very* sure we delete this after we stop processing
self.pending_transactions.pop(destination, None) self.pending_transactions.pop(destination, None)
@ -437,7 +439,7 @@ class TransactionQueue(object):
@measure_func("_send_new_transaction") @measure_func("_send_new_transaction")
@defer.inlineCallbacks @defer.inlineCallbacks
def _send_new_transaction(self, destination, pending_pdus, pending_edus, def _send_new_transaction(self, destination, pending_pdus, pending_edus,
pending_failures, limiter): pending_failures):
# Sort based on the order field # Sort based on the order field
pending_pdus.sort(key=lambda t: t[1]) pending_pdus.sort(key=lambda t: t[1])
@ -447,132 +449,104 @@ class TransactionQueue(object):
success = True success = True
logger.debug("TX [%s] _attempt_new_transaction", destination)
txn_id = str(self._next_txn_id)
logger.debug(
"TX [%s] {%s} Attempting new transaction"
" (pdus: %d, edus: %d, failures: %d)",
destination, txn_id,
len(pdus),
len(edus),
len(failures)
)
logger.debug("TX [%s] Persisting transaction...", destination)
transaction = Transaction.create_new(
origin_server_ts=int(self.clock.time_msec()),
transaction_id=txn_id,
origin=self.server_name,
destination=destination,
pdus=pdus,
edus=edus,
pdu_failures=failures,
)
self._next_txn_id += 1
yield self.transaction_actions.prepare_to_send(transaction)
logger.debug("TX [%s] Persisted transaction", destination)
logger.info(
"TX [%s] {%s} Sending transaction [%s],"
" (PDUs: %d, EDUs: %d, failures: %d)",
destination, txn_id,
transaction.transaction_id,
len(pdus),
len(edus),
len(failures),
)
# Actually send the transaction
# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data
try: try:
logger.debug("TX [%s] _attempt_new_transaction", destination) response = yield self.transport_layer.send_transaction(
transaction, json_data_cb
txn_id = str(self._next_txn_id)
logger.debug(
"TX [%s] {%s} Attempting new transaction"
" (pdus: %d, edus: %d, failures: %d)",
destination, txn_id,
len(pdus),
len(edus),
len(failures)
) )
code = 200
logger.debug("TX [%s] Persisting transaction...", destination) if response:
for e_id, r in response.get("pdus", {}).items():
transaction = Transaction.create_new( if "error" in r:
origin_server_ts=int(self.clock.time_msec()), logger.warn(
transaction_id=txn_id, "Transaction returned error for %s: %s",
origin=self.server_name, e_id, r,
destination=destination,
pdus=pdus,
edus=edus,
pdu_failures=failures,
)
self._next_txn_id += 1
yield self.transaction_actions.prepare_to_send(transaction)
logger.debug("TX [%s] Persisted transaction", destination)
logger.info(
"TX [%s] {%s} Sending transaction [%s],"
" (PDUs: %d, EDUs: %d, failures: %d)",
destination, txn_id,
transaction.transaction_id,
len(pdus),
len(edus),
len(failures),
)
with limiter:
# Actually send the transaction
# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
if "pdus" in data:
for p in data["pdus"]:
if "age_ts" in p:
unsigned = p.setdefault("unsigned", {})
unsigned["age"] = now - int(p["age_ts"])
del p["age_ts"]
return data
try:
response = yield self.transport_layer.send_transaction(
transaction, json_data_cb
)
code = 200
if response:
for e_id, r in response.get("pdus", {}).items():
if "error" in r:
logger.warn(
"Transaction returned error for %s: %s",
e_id, r,
)
except HttpResponseException as e:
code = e.code
response = e.response
if e.code in (401, 404, 429) or 500 <= e.code:
logger.info(
"TX [%s] {%s} got %d response",
destination, txn_id, code
) )
raise e except HttpResponseException as e:
code = e.code
response = e.response
if e.code in (401, 404, 429) or 500 <= e.code:
logger.info( logger.info(
"TX [%s] {%s} got %d response", "TX [%s] {%s} got %d response",
destination, txn_id, code destination, txn_id, code
) )
raise e
logger.debug("TX [%s] Sent transaction", destination) logger.info(
logger.debug("TX [%s] Marking as delivered...", destination) "TX [%s] {%s} got %d response",
destination, txn_id, code
)
yield self.transaction_actions.delivered( logger.debug("TX [%s] Sent transaction", destination)
transaction, code, response logger.debug("TX [%s] Marking as delivered...", destination)
)
logger.debug("TX [%s] Marked as delivered", destination) yield self.transaction_actions.delivered(
transaction, code, response
)
if code != 200: logger.debug("TX [%s] Marked as delivered", destination)
for p in pdus:
logger.info(
"Failed to send event %s to %s", p.event_id, destination
)
success = False
except RuntimeError as e:
# We capture this here as there as nothing actually listens
# for this finishing functions deferred.
logger.warn(
"TX [%s] Problem in _attempt_transaction: %s",
destination,
e,
)
success = False
if code != 200:
for p in pdus: for p in pdus:
logger.info("Failed to send event %s to %s", p.event_id, destination) logger.info(
except Exception as e: "Failed to send event %s to %s", p.event_id, destination
# We capture this here as there as nothing actually listens )
# for this finishing functions deferred.
logger.warn(
"TX [%s] Problem in _attempt_transaction: %s",
destination,
e,
)
success = False success = False
for p in pdus:
logger.info("Failed to send event %s to %s", p.event_id, destination)
defer.returnValue(success) defer.returnValue(success)

View File

@ -163,6 +163,7 @@ class TransportLayerClient(object):
data=json_data, data=json_data,
json_data_callback=json_data_callback, json_data_callback=json_data_callback,
long_retries=True, long_retries=True,
backoff_on_404=True, # If we get a 404 the other side has gone
) )
logger.debug( logger.debug(
@ -174,7 +175,8 @@ class TransportLayerClient(object):
@defer.inlineCallbacks @defer.inlineCallbacks
@log_function @log_function
def make_query(self, destination, query_type, args, retry_on_dns_fail): def make_query(self, destination, query_type, args, retry_on_dns_fail,
ignore_backoff=False):
path = PREFIX + "/query/%s" % query_type path = PREFIX + "/query/%s" % query_type
content = yield self.client.get_json( content = yield self.client.get_json(
@ -183,6 +185,7 @@ class TransportLayerClient(object):
args=args, args=args,
retry_on_dns_fail=retry_on_dns_fail, retry_on_dns_fail=retry_on_dns_fail,
timeout=10000, timeout=10000,
ignore_backoff=ignore_backoff,
) )
defer.returnValue(content) defer.returnValue(content)
@ -242,6 +245,7 @@ class TransportLayerClient(object):
destination=destination, destination=destination,
path=path, path=path,
data=content, data=content,
ignore_backoff=True,
) )
defer.returnValue(response) defer.returnValue(response)
@ -269,6 +273,7 @@ class TransportLayerClient(object):
destination=remote_server, destination=remote_server,
path=path, path=path,
args=args, args=args,
ignore_backoff=True,
) )
defer.returnValue(response) defer.returnValue(response)

View File

@ -175,6 +175,7 @@ class DirectoryHandler(BaseHandler):
"room_alias": room_alias.to_string(), "room_alias": room_alias.to_string(),
}, },
retry_on_dns_fail=False, retry_on_dns_fail=False,
ignore_backoff=True,
) )
except CodeMessageException as e: except CodeMessageException as e:
logging.warn("Error retrieving alias") logging.warn("Error retrieving alias")

View File

@ -22,7 +22,7 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, CodeMessageException from synapse.api.errors import SynapseError, CodeMessageException
from synapse.types import get_domain_from_id from synapse.types import get_domain_from_id
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination from synapse.util.retryutils import NotRetryingDestination
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -121,15 +121,11 @@ class E2eKeysHandler(object):
def do_remote_query(destination): def do_remote_query(destination):
destination_query = remote_queries_not_in_cache[destination] destination_query = remote_queries_not_in_cache[destination]
try: try:
limiter = yield get_retry_limiter( remote_result = yield self.federation.query_client_keys(
destination, self.clock, self.store destination,
{"device_keys": destination_query},
timeout=timeout
) )
with limiter:
remote_result = yield self.federation.query_client_keys(
destination,
{"device_keys": destination_query},
timeout=timeout
)
for user_id, keys in remote_result["device_keys"].items(): for user_id, keys in remote_result["device_keys"].items():
if user_id in destination_query: if user_id in destination_query:
@ -239,18 +235,14 @@ class E2eKeysHandler(object):
def claim_client_keys(destination): def claim_client_keys(destination):
device_keys = remote_queries[destination] device_keys = remote_queries[destination]
try: try:
limiter = yield get_retry_limiter( remote_result = yield self.federation.claim_client_keys(
destination, self.clock, self.store destination,
{"one_time_keys": device_keys},
timeout=timeout
) )
with limiter: for user_id, keys in remote_result["one_time_keys"].items():
remote_result = yield self.federation.claim_client_keys( if user_id in device_keys:
destination, json_result[user_id] = keys
{"one_time_keys": device_keys},
timeout=timeout
)
for user_id, keys in remote_result["one_time_keys"].items():
if user_id in device_keys:
json_result[user_id] = keys
except CodeMessageException as e: except CodeMessageException as e:
failures[destination] = { failures[destination] = {
"status": e.code, "message": e.message "status": e.code, "message": e.message

View File

@ -52,7 +52,8 @@ class ProfileHandler(BaseHandler):
args={ args={
"user_id": target_user.to_string(), "user_id": target_user.to_string(),
"field": "displayname", "field": "displayname",
} },
ignore_backoff=True,
) )
except CodeMessageException as e: except CodeMessageException as e:
if e.code != 404: if e.code != 404:
@ -99,7 +100,8 @@ class ProfileHandler(BaseHandler):
args={ args={
"user_id": target_user.to_string(), "user_id": target_user.to_string(),
"field": "avatar_url", "field": "avatar_url",
} },
ignore_backoff=True,
) )
except CodeMessageException as e: except CodeMessageException as e:
if e.code != 404: if e.code != 404:

View File

@ -12,8 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import synapse.util.retryutils
from twisted.internet import defer, reactor, protocol from twisted.internet import defer, reactor, protocol
from twisted.internet.error import DNSLookupError from twisted.internet.error import DNSLookupError
from twisted.web.client import readBody, HTTPConnectionPool, Agent from twisted.web.client import readBody, HTTPConnectionPool, Agent
@ -94,6 +93,7 @@ class MatrixFederationHttpClient(object):
reactor, MatrixFederationEndpointFactory(hs), pool=pool reactor, MatrixFederationEndpointFactory(hs), pool=pool
) )
self.clock = hs.get_clock() self.clock = hs.get_clock()
self._store = hs.get_datastore()
self.version_string = hs.version_string self.version_string = hs.version_string
self._next_id = 1 self._next_id = 1
@ -103,129 +103,151 @@ class MatrixFederationHttpClient(object):
) )
@defer.inlineCallbacks @defer.inlineCallbacks
def _create_request(self, destination, method, path_bytes, def _request(self, destination, method, path,
body_callback, headers_dict={}, param_bytes=b"", body_callback, headers_dict={}, param_bytes=b"",
query_bytes=b"", retry_on_dns_fail=True, query_bytes=b"", retry_on_dns_fail=True,
timeout=None, long_retries=False): timeout=None, long_retries=False,
""" Creates and sends a request to the given url ignore_backoff=False,
backoff_on_404=False):
""" Creates and sends a request to the given server
Args:
destination (str): The remote server to send the HTTP request to.
method (str): HTTP method
path (str): The HTTP path
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
backoff_on_404 (bool): Back off if we get a 404
Returns: Returns:
Deferred: resolves with the http response object on success. Deferred: resolves with the http response object on success.
Fails with ``HTTPRequestException``: if we get an HTTP response Fails with ``HTTPRequestException``: if we get an HTTP response
code >= 300. code >= 300.
Fails with ``NotRetryingDestination`` if we are not yet ready
to retry this server.
""" """
headers_dict[b"User-Agent"] = [self.version_string] limiter = yield synapse.util.retryutils.get_retry_limiter(
headers_dict[b"Host"] = [destination] destination,
self.clock,
url_bytes = self._create_url( self._store,
destination, path_bytes, param_bytes, query_bytes backoff_on_404=backoff_on_404,
ignore_backoff=ignore_backoff,
) )
txn_id = "%s-O-%s" % (method, self._next_id) destination = destination.encode("ascii")
self._next_id = (self._next_id + 1) % (sys.maxint - 1) path_bytes = path.encode("ascii")
with limiter:
headers_dict[b"User-Agent"] = [self.version_string]
headers_dict[b"Host"] = [destination]
outbound_logger.info( url_bytes = self._create_url(
"{%s} [%s] Sending request: %s %s", destination, path_bytes, param_bytes, query_bytes
txn_id, destination, method, url_bytes )
)
# XXX: Would be much nicer to retry only at the transaction-layer txn_id = "%s-O-%s" % (method, self._next_id)
# (once we have reliable transactions in place) self._next_id = (self._next_id + 1) % (sys.maxint - 1)
if long_retries:
retries_left = MAX_LONG_RETRIES
else:
retries_left = MAX_SHORT_RETRIES
http_url_bytes = urlparse.urlunparse( outbound_logger.info(
("", "", path_bytes, param_bytes, query_bytes, "") "{%s} [%s] Sending request: %s %s",
) txn_id, destination, method, url_bytes
)
log_result = None # XXX: Would be much nicer to retry only at the transaction-layer
try: # (once we have reliable transactions in place)
while True: if long_retries:
producer = None retries_left = MAX_LONG_RETRIES
if body_callback: else:
producer = body_callback(method, http_url_bytes, headers_dict) retries_left = MAX_SHORT_RETRIES
try: http_url_bytes = urlparse.urlunparse(
def send_request(): ("", "", path_bytes, param_bytes, query_bytes, "")
request_deferred = preserve_context_over_fn( )
self.agent.request,
log_result = None
try:
while True:
producer = None
if body_callback:
producer = body_callback(method, http_url_bytes, headers_dict)
try:
def send_request():
request_deferred = preserve_context_over_fn(
self.agent.request,
method,
url_bytes,
Headers(headers_dict),
producer
)
return self.clock.time_bound_deferred(
request_deferred,
time_out=timeout / 1000. if timeout else 60,
)
response = yield preserve_context_over_fn(send_request)
log_result = "%d %s" % (response.code, response.phrase,)
break
except Exception as e:
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
logger.warn(
"DNS Lookup failed to %s with %s",
destination,
e
)
log_result = "DNS Lookup failed to %s with %s" % (
destination, e
)
raise
logger.warn(
"{%s} Sending request failed to %s: %s %s: %s - %s",
txn_id,
destination,
method, method,
url_bytes, url_bytes,
Headers(headers_dict), type(e).__name__,
producer _flatten_response_never_received(e),
) )
return self.clock.time_bound_deferred( log_result = "%s - %s" % (
request_deferred, type(e).__name__, _flatten_response_never_received(e),
time_out=timeout / 1000. if timeout else 60,
) )
response = yield preserve_context_over_fn(send_request) if retries_left and not timeout:
if long_retries:
delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
delay = min(delay, 60)
delay *= random.uniform(0.8, 1.4)
else:
delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
delay = min(delay, 2)
delay *= random.uniform(0.8, 1.4)
log_result = "%d %s" % (response.code, response.phrase,) yield sleep(delay)
break retries_left -= 1
except Exception as e:
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
logger.warn(
"DNS Lookup failed to %s with %s",
destination,
e
)
log_result = "DNS Lookup failed to %s with %s" % (
destination, e
)
raise
logger.warn(
"{%s} Sending request failed to %s: %s %s: %s - %s",
txn_id,
destination,
method,
url_bytes,
type(e).__name__,
_flatten_response_never_received(e),
)
log_result = "%s - %s" % (
type(e).__name__, _flatten_response_never_received(e),
)
if retries_left and not timeout:
if long_retries:
delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
delay = min(delay, 60)
delay *= random.uniform(0.8, 1.4)
else: else:
delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left) raise
delay = min(delay, 2) finally:
delay *= random.uniform(0.8, 1.4) outbound_logger.info(
"{%s} [%s] Result: %s",
txn_id,
destination,
log_result,
)
yield sleep(delay) if 200 <= response.code < 300:
retries_left -= 1 pass
else: else:
raise # :'(
finally: # Update transactions table?
outbound_logger.info( body = yield preserve_context_over_fn(readBody, response)
"{%s} [%s] Result: %s", raise HttpResponseException(
txn_id, response.code, response.phrase, body
destination, )
log_result,
)
if 200 <= response.code < 300: defer.returnValue(response)
pass
else:
# :'(
# Update transactions table?
body = yield preserve_context_over_fn(readBody, response)
raise HttpResponseException(
response.code, response.phrase, body
)
defer.returnValue(response)
def sign_request(self, destination, method, url_bytes, headers_dict, def sign_request(self, destination, method, url_bytes, headers_dict,
content=None): content=None):
@ -254,7 +276,9 @@ class MatrixFederationHttpClient(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def put_json(self, destination, path, data={}, json_data_callback=None, def put_json(self, destination, path, data={}, json_data_callback=None,
long_retries=False, timeout=None): long_retries=False, timeout=None,
ignore_backoff=False,
backoff_on_404=False):
""" Sends the specifed json data using PUT """ Sends the specifed json data using PUT
Args: Args:
@ -269,11 +293,19 @@ class MatrixFederationHttpClient(object):
retry for a short or long time. retry for a short or long time.
timeout(int): How long to try (in ms) the destination for before timeout(int): How long to try (in ms) the destination for before
giving up. None indicates no timeout. giving up. None indicates no timeout.
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
backoff_on_404 (bool): True if we should count a 404 response as
a failure of the server (and should therefore back off future
requests)
Returns: Returns:
Deferred: Succeeds when we get a 2xx HTTP response. The result Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body. On a 4xx or 5xx error response a will be the decoded JSON body. On a 4xx or 5xx error response a
CodeMessageException is raised. CodeMessageException is raised.
Fails with ``NotRetryingDestination`` if we are not yet ready
to retry this server.
""" """
if not json_data_callback: if not json_data_callback:
@ -288,14 +320,16 @@ class MatrixFederationHttpClient(object):
producer = _JsonProducer(json_data) producer = _JsonProducer(json_data)
return producer return producer
response = yield self._create_request( response = yield self._request(
destination.encode("ascii"), destination,
"PUT", "PUT",
path.encode("ascii"), path,
body_callback=body_callback, body_callback=body_callback,
headers_dict={"Content-Type": ["application/json"]}, headers_dict={"Content-Type": ["application/json"]},
long_retries=long_retries, long_retries=long_retries,
timeout=timeout, timeout=timeout,
ignore_backoff=ignore_backoff,
backoff_on_404=backoff_on_404,
) )
if 200 <= response.code < 300: if 200 <= response.code < 300:
@ -307,7 +341,7 @@ class MatrixFederationHttpClient(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def post_json(self, destination, path, data={}, long_retries=False, def post_json(self, destination, path, data={}, long_retries=False,
timeout=None): timeout=None, ignore_backoff=False):
""" Sends the specifed json data using POST """ Sends the specifed json data using POST
Args: Args:
@ -320,11 +354,15 @@ class MatrixFederationHttpClient(object):
retry for a short or long time. retry for a short or long time.
timeout(int): How long to try (in ms) the destination for before timeout(int): How long to try (in ms) the destination for before
giving up. None indicates no timeout. giving up. None indicates no timeout.
ignore_backoff (bool): true to ignore the historical backoff data and
try the request anyway.
Returns: Returns:
Deferred: Succeeds when we get a 2xx HTTP response. The result Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body. On a 4xx or 5xx error response a will be the decoded JSON body. On a 4xx or 5xx error response a
CodeMessageException is raised. CodeMessageException is raised.
Fails with ``NotRetryingDestination`` if we are not yet ready
to retry this server.
""" """
def body_callback(method, url_bytes, headers_dict): def body_callback(method, url_bytes, headers_dict):
@ -333,14 +371,15 @@ class MatrixFederationHttpClient(object):
) )
return _JsonProducer(data) return _JsonProducer(data)
response = yield self._create_request( response = yield self._request(
destination.encode("ascii"), destination,
"POST", "POST",
path.encode("ascii"), path,
body_callback=body_callback, body_callback=body_callback,
headers_dict={"Content-Type": ["application/json"]}, headers_dict={"Content-Type": ["application/json"]},
long_retries=long_retries, long_retries=long_retries,
timeout=timeout, timeout=timeout,
ignore_backoff=ignore_backoff,
) )
if 200 <= response.code < 300: if 200 <= response.code < 300:
@ -353,7 +392,7 @@ class MatrixFederationHttpClient(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def get_json(self, destination, path, args={}, retry_on_dns_fail=True, def get_json(self, destination, path, args={}, retry_on_dns_fail=True,
timeout=None): timeout=None, ignore_backoff=False):
""" GETs some json from the given host homeserver and path """ GETs some json from the given host homeserver and path
Args: Args:
@ -365,11 +404,16 @@ class MatrixFederationHttpClient(object):
timeout (int): How long to try (in ms) the destination for before timeout (int): How long to try (in ms) the destination for before
giving up. None indicates no timeout and that the request will giving up. None indicates no timeout and that the request will
be retried. be retried.
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
Returns: Returns:
Deferred: Succeeds when we get *any* HTTP response. Deferred: Succeeds when we get *any* HTTP response.
The result of the deferred is a tuple of `(code, response)`, The result of the deferred is a tuple of `(code, response)`,
where `response` is a dict representing the decoded JSON body. where `response` is a dict representing the decoded JSON body.
Fails with ``NotRetryingDestination`` if we are not yet ready
to retry this server.
""" """
logger.debug("get_json args: %s", args) logger.debug("get_json args: %s", args)
@ -386,14 +430,15 @@ class MatrixFederationHttpClient(object):
self.sign_request(destination, method, url_bytes, headers_dict) self.sign_request(destination, method, url_bytes, headers_dict)
return None return None
response = yield self._create_request( response = yield self._request(
destination.encode("ascii"), destination,
"GET", "GET",
path.encode("ascii"), path,
query_bytes=query_bytes, query_bytes=query_bytes,
body_callback=body_callback, body_callback=body_callback,
retry_on_dns_fail=retry_on_dns_fail, retry_on_dns_fail=retry_on_dns_fail,
timeout=timeout, timeout=timeout,
ignore_backoff=ignore_backoff,
) )
if 200 <= response.code < 300: if 200 <= response.code < 300:
@ -406,19 +451,25 @@ class MatrixFederationHttpClient(object):
@defer.inlineCallbacks @defer.inlineCallbacks
def get_file(self, destination, path, output_stream, args={}, def get_file(self, destination, path, output_stream, args={},
retry_on_dns_fail=True, max_size=None): retry_on_dns_fail=True, max_size=None,
ignore_backoff=False):
"""GETs a file from a given homeserver """GETs a file from a given homeserver
Args: Args:
destination (str): The remote server to send the HTTP request to. destination (str): The remote server to send the HTTP request to.
path (str): The HTTP path to GET. path (str): The HTTP path to GET.
output_stream (file): File to write the response body to. output_stream (file): File to write the response body to.
args (dict): Optional dictionary used to create the query string. args (dict): Optional dictionary used to create the query string.
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
Returns: Returns:
Deferred: resolves with an (int,dict) tuple of the file length and Deferred: resolves with an (int,dict) tuple of the file length and
a dict of the response headers. a dict of the response headers.
Fails with ``HTTPRequestException`` if we get an HTTP response code Fails with ``HTTPRequestException`` if we get an HTTP response code
>= 300 >= 300
Fails with ``NotRetryingDestination`` if we are not yet ready
to retry this server.
""" """
encoded_args = {} encoded_args = {}
@ -434,13 +485,14 @@ class MatrixFederationHttpClient(object):
self.sign_request(destination, method, url_bytes, headers_dict) self.sign_request(destination, method, url_bytes, headers_dict)
return None return None
response = yield self._create_request( response = yield self._request(
destination.encode("ascii"), destination,
"GET", "GET",
path.encode("ascii"), path,
query_bytes=query_bytes, query_bytes=query_bytes,
body_callback=body_callback, body_callback=body_callback,
retry_on_dns_fail=retry_on_dns_fail retry_on_dns_fail=retry_on_dns_fail,
ignore_backoff=ignore_backoff,
) )
headers = dict(response.headers.getAllRawHeaders()) headers = dict(response.headers.getAllRawHeaders())

View File

@ -35,7 +35,8 @@ class NotRetryingDestination(Exception):
@defer.inlineCallbacks @defer.inlineCallbacks
def get_retry_limiter(destination, clock, store, **kwargs): def get_retry_limiter(destination, clock, store, ignore_backoff=False,
**kwargs):
"""For a given destination check if we have previously failed to """For a given destination check if we have previously failed to
send a request there and are waiting before retrying the destination. send a request there and are waiting before retrying the destination.
If we are not ready to retry the destination, this will raise a If we are not ready to retry the destination, this will raise a
@ -43,6 +44,14 @@ def get_retry_limiter(destination, clock, store, **kwargs):
that will mark the destination as down if an exception is thrown (excluding that will mark the destination as down if an exception is thrown (excluding
CodeMessageException with code < 500) CodeMessageException with code < 500)
Args:
destination (str): name of homeserver
clock (synapse.util.clock): timing source
store (synapse.storage.transactions.TransactionStore): datastore
ignore_backoff (bool): true to ignore the historical backoff data and
try the request anyway. We will still update the next
retry_interval on success/failure.
Example usage: Example usage:
try: try:
@ -66,7 +75,7 @@ def get_retry_limiter(destination, clock, store, **kwargs):
now = int(clock.time_msec()) now = int(clock.time_msec())
if retry_last_ts + retry_interval > now: if not ignore_backoff and retry_last_ts + retry_interval > now:
raise NotRetryingDestination( raise NotRetryingDestination(
retry_last_ts=retry_last_ts, retry_last_ts=retry_last_ts,
retry_interval=retry_interval, retry_interval=retry_interval,
@ -124,7 +133,13 @@ class RetryDestinationLimiter(object):
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
valid_err_code = False valid_err_code = False
if exc_type is not None and issubclass(exc_type, CodeMessageException): if exc_type is None:
valid_err_code = True
elif not issubclass(exc_type, Exception):
# avoid treating exceptions which don't derive from Exception as
# failures; this is mostly so as not to catch defer._DefGen.
valid_err_code = True
elif issubclass(exc_type, CodeMessageException):
# Some error codes are perfectly fine for some APIs, whereas other # Some error codes are perfectly fine for some APIs, whereas other
# APIs may expect to never received e.g. a 404. It's important to # APIs may expect to never received e.g. a 404. It's important to
# handle 404 as some remote servers will return a 404 when the HS # handle 404 as some remote servers will return a 404 when the HS
@ -142,11 +157,13 @@ class RetryDestinationLimiter(object):
else: else:
valid_err_code = False valid_err_code = False
if exc_type is None or valid_err_code: if valid_err_code:
# We connected successfully. # We connected successfully.
if not self.retry_interval: if not self.retry_interval:
return return
logger.debug("Connection to %s was successful; clearing backoff",
self.destination)
retry_last_ts = 0 retry_last_ts = 0
self.retry_interval = 0 self.retry_interval = 0
else: else:
@ -160,6 +177,10 @@ class RetryDestinationLimiter(object):
else: else:
self.retry_interval = self.min_retry_interval self.retry_interval = self.min_retry_interval
logger.debug(
"Connection to %s was unsuccessful (%s(%s)); backoff now %i",
self.destination, exc_type, exc_val, self.retry_interval
)
retry_last_ts = int(self.clock.time_msec()) retry_last_ts = int(self.clock.time_msec())
@defer.inlineCallbacks @defer.inlineCallbacks

View File

@ -93,6 +93,7 @@ class DirectoryTestCase(unittest.TestCase):
"room_alias": "#another:remote", "room_alias": "#another:remote",
}, },
retry_on_dns_fail=False, retry_on_dns_fail=False,
ignore_backoff=True,
) )
@defer.inlineCallbacks @defer.inlineCallbacks

View File

@ -119,7 +119,8 @@ class ProfileTestCase(unittest.TestCase):
self.mock_federation.make_query.assert_called_with( self.mock_federation.make_query.assert_called_with(
destination="remote", destination="remote",
query_type="profile", query_type="profile",
args={"user_id": "@alice:remote", "field": "displayname"} args={"user_id": "@alice:remote", "field": "displayname"},
ignore_backoff=True,
) )
@defer.inlineCallbacks @defer.inlineCallbacks

View File

@ -192,6 +192,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
), ),
json_data_callback=ANY, json_data_callback=ANY,
long_retries=True, long_retries=True,
backoff_on_404=True,
), ),
defer.succeed((200, "OK")) defer.succeed((200, "OK"))
) )
@ -263,6 +264,7 @@ class TypingNotificationsTestCase(unittest.TestCase):
), ),
json_data_callback=ANY, json_data_callback=ANY,
long_retries=True, long_retries=True,
backoff_on_404=True,
), ),
defer.succeed((200, "OK")) defer.succeed((200, "OK"))
) )