mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2024-10-01 11:49:51 -04:00
Add IDs to outbound transactions
This commit is contained in:
parent
fb7def3344
commit
9d112f4440
@ -35,11 +35,13 @@ from syutil.crypto.jsonsign import sign_json
|
|||||||
|
|
||||||
import simplejson as json
|
import simplejson as json
|
||||||
import logging
|
import logging
|
||||||
|
import sys
|
||||||
import urllib
|
import urllib
|
||||||
import urlparse
|
import urlparse
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
outbound_logger = logging.getLogger("synapse.http.outbound")
|
||||||
|
|
||||||
metrics = synapse.metrics.get_metrics_for(__name__)
|
metrics = synapse.metrics.get_metrics_for(__name__)
|
||||||
|
|
||||||
@ -109,6 +111,8 @@ class MatrixFederationHttpClient(object):
|
|||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.version_string = hs.version_string
|
self.version_string = hs.version_string
|
||||||
|
|
||||||
|
self._next_id = 1
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _create_request(self, destination, method, path_bytes,
|
def _create_request(self, destination, method, path_bytes,
|
||||||
body_callback, headers_dict={}, param_bytes=b"",
|
body_callback, headers_dict={}, param_bytes=b"",
|
||||||
@ -123,8 +127,13 @@ class MatrixFederationHttpClient(object):
|
|||||||
("", "", path_bytes, param_bytes, query_bytes, "",)
|
("", "", path_bytes, param_bytes, query_bytes, "",)
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info("Sending request to %s: %s %s",
|
txn_id = "%s-%s" % (method, self._next_id)
|
||||||
destination, method, url_bytes)
|
self._next_id = (self._next_id + 1) % (sys.maxint - 1)
|
||||||
|
|
||||||
|
outbound_logger.info(
|
||||||
|
"{%s} [%s] Sending request: %s %s",
|
||||||
|
txn_id, destination, method, url_bytes
|
||||||
|
)
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Types: %s",
|
"Types: %s",
|
||||||
@ -141,63 +150,72 @@ class MatrixFederationHttpClient(object):
|
|||||||
|
|
||||||
endpoint = self._getEndpoint(reactor, destination)
|
endpoint = self._getEndpoint(reactor, destination)
|
||||||
|
|
||||||
while True:
|
log_result = None
|
||||||
producer = None
|
try:
|
||||||
if body_callback:
|
while True:
|
||||||
producer = body_callback(method, url_bytes, headers_dict)
|
producer = None
|
||||||
|
if body_callback:
|
||||||
|
producer = body_callback(method, url_bytes, headers_dict)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
request_deferred = preserve_context_over_fn(
|
request_deferred = preserve_context_over_fn(
|
||||||
self.agent.request,
|
self.agent.request,
|
||||||
destination,
|
|
||||||
endpoint,
|
|
||||||
method,
|
|
||||||
path_bytes,
|
|
||||||
param_bytes,
|
|
||||||
query_bytes,
|
|
||||||
Headers(headers_dict),
|
|
||||||
producer
|
|
||||||
)
|
|
||||||
|
|
||||||
response = yield self.clock.time_bound_deferred(
|
|
||||||
request_deferred,
|
|
||||||
time_out=timeout/1000. if timeout else 60,
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.debug("Got response to %s", method)
|
|
||||||
break
|
|
||||||
except Exception as e:
|
|
||||||
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
|
|
||||||
logger.warn(
|
|
||||||
"DNS Lookup failed to %s with %s",
|
|
||||||
destination,
|
destination,
|
||||||
e
|
endpoint,
|
||||||
|
method,
|
||||||
|
path_bytes,
|
||||||
|
param_bytes,
|
||||||
|
query_bytes,
|
||||||
|
Headers(headers_dict),
|
||||||
|
producer
|
||||||
)
|
)
|
||||||
raise
|
|
||||||
|
|
||||||
logger.warn(
|
response = yield self.clock.time_bound_deferred(
|
||||||
"Sending request failed to %s: %s %s: %s - %s",
|
request_deferred,
|
||||||
destination,
|
time_out=timeout/1000. if timeout else 60,
|
||||||
method,
|
)
|
||||||
url_bytes,
|
|
||||||
type(e).__name__,
|
|
||||||
_flatten_response_never_received(e),
|
|
||||||
)
|
|
||||||
|
|
||||||
if retries_left and not timeout:
|
logger.debug("{%s} Got response to %s", txn_id, method)
|
||||||
yield sleep(2 ** (5 - retries_left))
|
log_result = "%d %s" % (response.code, response.phrase,)
|
||||||
retries_left -= 1
|
break
|
||||||
else:
|
except Exception as e:
|
||||||
raise
|
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
|
||||||
|
logger.warn(
|
||||||
|
"DNS Lookup failed to %s with %s",
|
||||||
|
destination,
|
||||||
|
e
|
||||||
|
)
|
||||||
|
log_result = "DNS Lookup failed to %s with %s" % (
|
||||||
|
destination, e
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
logger.info(
|
logger.warn(
|
||||||
"Received response %d %s for %s: %s %s",
|
"{%s} Sending request failed to %s: %s %s: %s - %s",
|
||||||
response.code,
|
txn_id,
|
||||||
response.phrase,
|
destination,
|
||||||
destination,
|
method,
|
||||||
method,
|
url_bytes,
|
||||||
url_bytes
|
type(e).__name__,
|
||||||
)
|
_flatten_response_never_received(e),
|
||||||
|
)
|
||||||
|
|
||||||
|
log_result = "%s - %s" % (
|
||||||
|
type(e).__name__, _flatten_response_never_received(e),
|
||||||
|
)
|
||||||
|
|
||||||
|
if retries_left and not timeout:
|
||||||
|
yield sleep(2 ** (5 - retries_left))
|
||||||
|
retries_left -= 1
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
outbound_logger.info(
|
||||||
|
"{%s} [%s] Result: %s",
|
||||||
|
txn_id,
|
||||||
|
destination,
|
||||||
|
log_result,
|
||||||
|
)
|
||||||
|
|
||||||
if 200 <= response.code < 300:
|
if 200 <= response.code < 300:
|
||||||
pass
|
pass
|
||||||
|
Loading…
Reference in New Issue
Block a user