From 9371019133bf16cec163d58fe69aa701c8ca5305 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 18:13:34 +0000 Subject: [PATCH] Try to only back off if we think we failed to connect to the remote --- synapse/crypto/keyring.py | 100 ++++++++++++------------ synapse/federation/transaction_queue.py | 76 +++++++++--------- synapse/util/retryutils.py | 10 ++- 3 files changed, 96 insertions(+), 90 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index ea00c830c..828aced44 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -101,63 +101,63 @@ class Keyring(object): server_name, self.hs.tls_context_factory ) - # Check the response. + # Check the response. - x509_certificate_bytes = crypto.dump_certificate( - crypto.FILETYPE_ASN1, tls_certificate - ) + x509_certificate_bytes = crypto.dump_certificate( + crypto.FILETYPE_ASN1, tls_certificate + ) - if ("signatures" not in response - or server_name not in response["signatures"]): - raise ValueError("Key response not signed by remote server") + if ("signatures" not in response + or server_name not in response["signatures"]): + raise ValueError("Key response not signed by remote server") - if "tls_certificate" not in response: - raise ValueError("Key response missing TLS certificate") + if "tls_certificate" not in response: + raise ValueError("Key response missing TLS certificate") - tls_certificate_b64 = response["tls_certificate"] + tls_certificate_b64 = response["tls_certificate"] - if encode_base64(x509_certificate_bytes) != tls_certificate_b64: - raise ValueError("TLS certificate doesn't match") + if encode_base64(x509_certificate_bytes) != tls_certificate_b64: + raise ValueError("TLS certificate doesn't match") - verify_keys = {} - for key_id, key_base64 in response["verify_keys"].items(): - if is_signing_algorithm_supported(key_id): - key_bytes = decode_base64(key_base64) - verify_key = decode_verify_key_bytes(key_id, key_bytes) - verify_keys[key_id] = verify_key + verify_keys = {} + for key_id, key_base64 in response["verify_keys"].items(): + if is_signing_algorithm_supported(key_id): + key_bytes = decode_base64(key_base64) + verify_key = decode_verify_key_bytes(key_id, key_bytes) + verify_keys[key_id] = verify_key - for key_id in response["signatures"][server_name]: - if key_id not in response["verify_keys"]: - raise ValueError( - "Key response must include verification keys for all" - " signatures" - ) - if key_id in verify_keys: - verify_signed_json( - response, - server_name, - verify_keys[key_id] - ) - - # Cache the result in the datastore. - - time_now_ms = self.clock.time_msec() - - yield self.store.store_server_certificate( - server_name, - server_name, - time_now_ms, - tls_certificate, - ) - - for key_id, key in verify_keys.items(): - yield self.store.store_server_verify_key( - server_name, server_name, time_now_ms, key + for key_id in response["signatures"][server_name]: + if key_id not in response["verify_keys"]: + raise ValueError( + "Key response must include verification keys for all" + " signatures" + ) + if key_id in verify_keys: + verify_signed_json( + response, + server_name, + verify_keys[key_id] ) - for key_id in key_ids: - if key_id in verify_keys: - defer.returnValue(verify_keys[key_id]) - return + # Cache the result in the datastore. - raise ValueError("No verification key found for given key ids") + time_now_ms = self.clock.time_msec() + + yield self.store.store_server_certificate( + server_name, + server_name, + time_now_ms, + tls_certificate, + ) + + for key_id, key in verify_keys.items(): + yield self.store.store_server_verify_key( + server_name, server_name, time_now_ms, key + ) + + for key_id in key_ids: + if key_id in verify_keys: + defer.returnValue(verify_keys[key_id]) + return + + raise ValueError("No verification key found for given key ids") diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 7d02afe16..dc0f30cb6 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -167,15 +167,6 @@ class TransactionQueue(object): logger.info("TX [%s] Nothing to send", destination) return - logger.debug( - "TX [%s] Attempting new transaction" - " (pdus: %d, edus: %d, failures: %d)", - destination, - len(pending_pdus), - len(pending_edus), - len(pending_failures) - ) - # Sort based on the order field pending_pdus.sort(key=lambda t: t[2]) @@ -194,32 +185,41 @@ class TransactionQueue(object): self.store, ) + logger.debug( + "TX [%s] Attempting new transaction" + " (pdus: %d, edus: %d, failures: %d)", + destination, + len(pending_pdus), + len(pending_edus), + len(pending_failures) + ) + + self.pending_transactions[destination] = 1 + + logger.debug("TX [%s] Persisting transaction...", destination) + + transaction = Transaction.create_new( + origin_server_ts=int(self._clock.time_msec()), + transaction_id=str(self._next_txn_id), + origin=self.server_name, + destination=destination, + pdus=pdus, + edus=edus, + pdu_failures=failures, + ) + + self._next_txn_id += 1 + + yield self.transaction_actions.prepare_to_send(transaction) + + logger.debug("TX [%s] Persisted transaction", destination) + logger.info( + "TX [%s] Sending transaction [%s]", + destination, + transaction.transaction_id, + ) + with limiter: - self.pending_transactions[destination] = 1 - - logger.debug("TX [%s] Persisting transaction...", destination) - - transaction = Transaction.create_new( - origin_server_ts=int(self._clock.time_msec()), - transaction_id=str(self._next_txn_id), - origin=self.server_name, - destination=destination, - pdus=pdus, - edus=edus, - pdu_failures=failures, - ) - - self._next_txn_id += 1 - - yield self.transaction_actions.prepare_to_send(transaction) - - logger.debug("TX [%s] Persisted transaction", destination) - logger.info( - "TX [%s] Sending transaction [%s]", - destination, - transaction.transaction_id, - ) - # Actually send the transaction # FIXME (erikj): This is a bit of a hack to make the Pdu age @@ -249,11 +249,11 @@ class TransactionQueue(object): logger.debug("TX [%s] Sent transaction", destination) logger.debug("TX [%s] Marking as delivered...", destination) - yield self.transaction_actions.delivered( - transaction, code, response - ) + yield self.transaction_actions.delivered( + transaction, code, response + ) - logger.debug("TX [%s] Marked as delivered", destination) + logger.debug("TX [%s] Marked as delivered", destination) logger.debug("TX [%s] Yielding to callbacks...", destination) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 888b7ef2e..d190100c8 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -15,6 +15,8 @@ from twisted.internet import defer +from synapse.api.errors import CodeMessageException + import logging @@ -67,7 +69,7 @@ def get_retry_limiter(destination, clock, store, **kwargs): class RetryDestinationLimiter(object): def __init__(self, destination, clock, store, retry_interval, min_retry_interval=20000, max_retry_interval=60 * 60 * 1000, - multiplier_retry_interval=2): + multiplier_retry_interval=2,): self.clock = clock self.store = store self.destination = destination @@ -87,7 +89,11 @@ class RetryDestinationLimiter(object): failure.value ) - if exc_type is None and exc_val is None and exc_tb is None: + valid_err_code = False + if exc_type is CodeMessageException: + valid_err_code = 0 <= exc_val.code < 500 + + if exc_type is None or valid_err_code: # We connected successfully. if not self.retry_interval: return