mirror of
https://git.anonymousland.org/anonymousland/synapse.git
synced 2025-08-02 19:16:12 -04:00
Merge branch 'develop' into markjh/twisted-15
Conflicts: synapse/http/matrixfederationclient.py
This commit is contained in:
commit
998a72d4d9
96 changed files with 4528 additions and 1677 deletions
|
@ -61,21 +61,31 @@ class SimpleHttpClient(object):
|
|||
self.agent = Agent(reactor, pool=pool)
|
||||
self.version_string = hs.version_string
|
||||
|
||||
def request(self, method, *args, **kwargs):
|
||||
def request(self, method, uri, *args, **kwargs):
|
||||
# A small wrapper around self.agent.request() so we can easily attach
|
||||
# counters to it
|
||||
outgoing_requests_counter.inc(method)
|
||||
d = preserve_context_over_fn(
|
||||
self.agent.request,
|
||||
method, *args, **kwargs
|
||||
method, uri, *args, **kwargs
|
||||
)
|
||||
|
||||
logger.info("Sending request %s %s", method, uri)
|
||||
|
||||
def _cb(response):
|
||||
incoming_responses_counter.inc(method, response.code)
|
||||
logger.info(
|
||||
"Received response to %s %s: %s",
|
||||
method, uri, response.code
|
||||
)
|
||||
return response
|
||||
|
||||
def _eb(failure):
|
||||
incoming_responses_counter.inc(method, "ERR")
|
||||
logger.info(
|
||||
"Error sending request to %s %s: %s %s",
|
||||
method, uri, failure.type, failure.getErrorMessage()
|
||||
)
|
||||
return failure
|
||||
|
||||
d.addCallbacks(_cb, _eb)
|
||||
|
@ -84,7 +94,9 @@ class SimpleHttpClient(object):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def post_urlencoded_get_json(self, uri, args={}):
|
||||
# TODO: Do we ever want to log message contents?
|
||||
logger.debug("post_urlencoded_get_json args: %s", args)
|
||||
|
||||
query_bytes = urllib.urlencode(args, True)
|
||||
|
||||
response = yield self.request(
|
||||
|
@ -97,7 +109,7 @@ class SimpleHttpClient(object):
|
|||
bodyProducer=FileBodyProducer(StringIO(query_bytes))
|
||||
)
|
||||
|
||||
body = yield readBody(response)
|
||||
body = yield preserve_context_over_fn(readBody, response)
|
||||
|
||||
defer.returnValue(json.loads(body))
|
||||
|
||||
|
@ -105,7 +117,7 @@ class SimpleHttpClient(object):
|
|||
def post_json_get_json(self, uri, post_json):
|
||||
json_str = encode_canonical_json(post_json)
|
||||
|
||||
logger.info("HTTP POST %s -> %s", json_str, uri)
|
||||
logger.debug("HTTP POST %s -> %s", json_str, uri)
|
||||
|
||||
response = yield self.request(
|
||||
"POST",
|
||||
|
@ -116,7 +128,7 @@ class SimpleHttpClient(object):
|
|||
bodyProducer=FileBodyProducer(StringIO(json_str))
|
||||
)
|
||||
|
||||
body = yield readBody(response)
|
||||
body = yield preserve_context_over_fn(readBody, response)
|
||||
|
||||
defer.returnValue(json.loads(body))
|
||||
|
||||
|
@ -149,7 +161,7 @@ class SimpleHttpClient(object):
|
|||
})
|
||||
)
|
||||
|
||||
body = yield readBody(response)
|
||||
body = yield preserve_context_over_fn(readBody, response)
|
||||
|
||||
if 200 <= response.code < 300:
|
||||
defer.returnValue(json.loads(body))
|
||||
|
@ -192,7 +204,7 @@ class SimpleHttpClient(object):
|
|||
bodyProducer=FileBodyProducer(StringIO(json_str))
|
||||
)
|
||||
|
||||
body = yield readBody(response)
|
||||
body = yield preserve_context_over_fn(readBody, response)
|
||||
|
||||
if 200 <= response.code < 300:
|
||||
defer.returnValue(json.loads(body))
|
||||
|
@ -226,7 +238,7 @@ class CaptchaServerHttpClient(SimpleHttpClient):
|
|||
)
|
||||
|
||||
try:
|
||||
body = yield readBody(response)
|
||||
body = yield preserve_context_over_fn(readBody, response)
|
||||
defer.returnValue(body)
|
||||
except PartialDownloadError as e:
|
||||
# twisted dislikes google's response, no content length.
|
||||
|
|
|
@ -35,11 +35,13 @@ from syutil.crypto.jsonsign import sign_json
|
|||
|
||||
import simplejson as json
|
||||
import logging
|
||||
import sys
|
||||
import urllib
|
||||
import urlparse
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
outbound_logger = logging.getLogger("synapse.http.outbound")
|
||||
|
||||
metrics = synapse.metrics.get_metrics_for(__name__)
|
||||
|
||||
|
@ -86,6 +88,7 @@ class MatrixFederationHttpClient(object):
|
|||
)
|
||||
self.clock = hs.get_clock()
|
||||
self.version_string = hs.version_string
|
||||
self._next_id = 1
|
||||
|
||||
def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
|
||||
return urlparse.urlunparse(
|
||||
|
@ -106,16 +109,12 @@ class MatrixFederationHttpClient(object):
|
|||
destination, path_bytes, param_bytes, query_bytes
|
||||
)
|
||||
|
||||
logger.info("Sending request to %s: %s %s",
|
||||
destination, method, url_bytes)
|
||||
txn_id = "%s-O-%s" % (method, self._next_id)
|
||||
self._next_id = (self._next_id + 1) % (sys.maxint - 1)
|
||||
|
||||
logger.debug(
|
||||
"Types: %s",
|
||||
[
|
||||
type(destination), type(method), type(path_bytes),
|
||||
type(param_bytes),
|
||||
type(query_bytes)
|
||||
]
|
||||
outbound_logger.info(
|
||||
"{%s} [%s] Sending request: %s %s",
|
||||
txn_id, destination, method, url_bytes
|
||||
)
|
||||
|
||||
# XXX: Would be much nicer to retry only at the transaction-layer
|
||||
|
@ -126,66 +125,80 @@ class MatrixFederationHttpClient(object):
|
|||
("", "", path_bytes, param_bytes, query_bytes, "")
|
||||
)
|
||||
|
||||
while True:
|
||||
producer = None
|
||||
if body_callback:
|
||||
producer = body_callback(method, http_url_bytes, headers_dict)
|
||||
log_result = None
|
||||
try:
|
||||
while True:
|
||||
producer = None
|
||||
if body_callback:
|
||||
producer = body_callback(method, http_url_bytes, headers_dict)
|
||||
|
||||
try:
|
||||
request_deferred = preserve_context_over_fn(
|
||||
self.agent.request,
|
||||
method,
|
||||
url_bytes,
|
||||
Headers(headers_dict),
|
||||
producer
|
||||
)
|
||||
try:
|
||||
def send_request():
|
||||
request_deferred = preserve_context_over_fn(
|
||||
self.agent.request,
|
||||
method,
|
||||
url_bytes,
|
||||
Headers(headers_dict),
|
||||
producer
|
||||
)
|
||||
|
||||
response = yield self.clock.time_bound_deferred(
|
||||
request_deferred,
|
||||
time_out=timeout/1000. if timeout else 60,
|
||||
)
|
||||
|
||||
logger.debug("Got response to %s", method)
|
||||
break
|
||||
except Exception as e:
|
||||
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
|
||||
logger.warn(
|
||||
"DNS Lookup failed to %s with %s",
|
||||
destination,
|
||||
e
|
||||
return self.clock.time_bound_deferred(
|
||||
request_deferred,
|
||||
time_out=timeout/1000. if timeout else 60,
|
||||
)
|
||||
|
||||
response = yield preserve_context_over_fn(
|
||||
send_request,
|
||||
)
|
||||
raise
|
||||
|
||||
logger.warn(
|
||||
"Sending request failed to %s: %s %s: %s - %s",
|
||||
destination,
|
||||
method,
|
||||
url_bytes,
|
||||
type(e).__name__,
|
||||
_flatten_response_never_received(e),
|
||||
)
|
||||
log_result = "%d %s" % (response.code, response.phrase,)
|
||||
break
|
||||
except Exception as e:
|
||||
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
|
||||
logger.warn(
|
||||
"DNS Lookup failed to %s with %s",
|
||||
destination,
|
||||
e
|
||||
)
|
||||
log_result = "DNS Lookup failed to %s with %s" % (
|
||||
destination, e
|
||||
)
|
||||
raise
|
||||
|
||||
if retries_left and not timeout:
|
||||
yield sleep(2 ** (5 - retries_left))
|
||||
retries_left -= 1
|
||||
else:
|
||||
raise
|
||||
logger.warn(
|
||||
"{%s} Sending request failed to %s: %s %s: %s - %s",
|
||||
txn_id,
|
||||
destination,
|
||||
method,
|
||||
url_bytes,
|
||||
type(e).__name__,
|
||||
_flatten_response_never_received(e),
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Received response %d %s for %s: %s %s",
|
||||
response.code,
|
||||
response.phrase,
|
||||
destination,
|
||||
method,
|
||||
url_bytes
|
||||
)
|
||||
log_result = "%s - %s" % (
|
||||
type(e).__name__, _flatten_response_never_received(e),
|
||||
)
|
||||
|
||||
if retries_left and not timeout:
|
||||
yield sleep(2 ** (5 - retries_left))
|
||||
retries_left -= 1
|
||||
else:
|
||||
raise
|
||||
finally:
|
||||
outbound_logger.info(
|
||||
"{%s} [%s] Result: %s",
|
||||
txn_id,
|
||||
destination,
|
||||
log_result,
|
||||
)
|
||||
|
||||
if 200 <= response.code < 300:
|
||||
pass
|
||||
else:
|
||||
# :'(
|
||||
# Update transactions table?
|
||||
body = yield readBody(response)
|
||||
body = yield preserve_context_over_fn(readBody, response)
|
||||
raise HttpResponseException(
|
||||
response.code, response.phrase, body
|
||||
)
|
||||
|
@ -265,10 +278,7 @@ class MatrixFederationHttpClient(object):
|
|||
"Content-Type not application/json"
|
||||
)
|
||||
|
||||
logger.debug("Getting resp body")
|
||||
body = yield readBody(response)
|
||||
logger.debug("Got resp body")
|
||||
|
||||
body = yield preserve_context_over_fn(readBody, response)
|
||||
defer.returnValue(json.loads(body))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -311,9 +321,7 @@ class MatrixFederationHttpClient(object):
|
|||
"Content-Type not application/json"
|
||||
)
|
||||
|
||||
logger.debug("Getting resp body")
|
||||
body = yield readBody(response)
|
||||
logger.debug("Got resp body")
|
||||
body = yield preserve_context_over_fn(readBody, response)
|
||||
|
||||
defer.returnValue(json.loads(body))
|
||||
|
||||
|
@ -371,9 +379,7 @@ class MatrixFederationHttpClient(object):
|
|||
"Content-Type not application/json"
|
||||
)
|
||||
|
||||
logger.debug("Getting resp body")
|
||||
body = yield readBody(response)
|
||||
logger.debug("Got resp body")
|
||||
body = yield preserve_context_over_fn(readBody, response)
|
||||
|
||||
defer.returnValue(json.loads(body))
|
||||
|
||||
|
@ -416,7 +422,10 @@ class MatrixFederationHttpClient(object):
|
|||
headers = dict(response.headers.getAllRawHeaders())
|
||||
|
||||
try:
|
||||
length = yield _readBodyToFile(response, output_stream, max_size)
|
||||
length = yield preserve_context_over_fn(
|
||||
_readBodyToFile,
|
||||
response, output_stream, max_size
|
||||
)
|
||||
except:
|
||||
logger.exception("Failed to download body")
|
||||
raise
|
||||
|
|
|
@ -79,53 +79,39 @@ def request_handler(request_handler):
|
|||
_next_request_id += 1
|
||||
with LoggingContext(request_id) as request_context:
|
||||
request_context.request = request_id
|
||||
code = None
|
||||
start = self.clock.time_msec()
|
||||
try:
|
||||
logger.info(
|
||||
"Received request: %s %s",
|
||||
request.method, request.path
|
||||
)
|
||||
d = request_handler(self, request)
|
||||
with PreserveLoggingContext():
|
||||
yield d
|
||||
code = request.code
|
||||
except CodeMessageException as e:
|
||||
code = e.code
|
||||
if isinstance(e, SynapseError):
|
||||
logger.info(
|
||||
"%s SynapseError: %s - %s", request, code, e.msg
|
||||
with request.processing():
|
||||
try:
|
||||
d = request_handler(self, request)
|
||||
with PreserveLoggingContext():
|
||||
yield d
|
||||
except CodeMessageException as e:
|
||||
code = e.code
|
||||
if isinstance(e, SynapseError):
|
||||
logger.info(
|
||||
"%s SynapseError: %s - %s", request, code, e.msg
|
||||
)
|
||||
else:
|
||||
logger.exception(e)
|
||||
outgoing_responses_counter.inc(request.method, str(code))
|
||||
respond_with_json(
|
||||
request, code, cs_exception(e), send_cors=True,
|
||||
pretty_print=_request_user_agent_is_curl(request),
|
||||
version_string=self.version_string,
|
||||
)
|
||||
except:
|
||||
logger.exception(
|
||||
"Failed handle request %s.%s on %r: %r",
|
||||
request_handler.__module__,
|
||||
request_handler.__name__,
|
||||
self,
|
||||
request
|
||||
)
|
||||
respond_with_json(
|
||||
request,
|
||||
500,
|
||||
{"error": "Internal server error"},
|
||||
send_cors=True
|
||||
)
|
||||
else:
|
||||
logger.exception(e)
|
||||
outgoing_responses_counter.inc(request.method, str(code))
|
||||
respond_with_json(
|
||||
request, code, cs_exception(e), send_cors=True,
|
||||
pretty_print=_request_user_agent_is_curl(request),
|
||||
version_string=self.version_string,
|
||||
)
|
||||
except:
|
||||
code = 500
|
||||
logger.exception(
|
||||
"Failed handle request %s.%s on %r: %r",
|
||||
request_handler.__module__,
|
||||
request_handler.__name__,
|
||||
self,
|
||||
request
|
||||
)
|
||||
respond_with_json(
|
||||
request,
|
||||
500,
|
||||
{"error": "Internal server error"},
|
||||
send_cors=True
|
||||
)
|
||||
finally:
|
||||
code = str(code) if code else "-"
|
||||
end = self.clock.time_msec()
|
||||
logger.info(
|
||||
"Processed request: %dms %s %s %s",
|
||||
end-start, code, request.method, request.path
|
||||
)
|
||||
return wrapped_request_handler
|
||||
|
||||
|
||||
|
@ -221,7 +207,7 @@ class JsonResource(HttpServer, resource.Resource):
|
|||
incoming_requests_counter.inc(request.method, servlet_classname)
|
||||
|
||||
args = [
|
||||
urllib.unquote(u).decode("UTF-8") for u in m.groups()
|
||||
urllib.unquote(u).decode("UTF-8") if u else u for u in m.groups()
|
||||
]
|
||||
|
||||
callback_return = yield callback(request, *args)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue