mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2025-01-25 12:46:41 -05:00
949c2c5435
Add a timeout parameter for controlling how long synapse will wait for responses from remote servers. For servers that fail include how they failed to make it easier to debug. Fetch keys from different servers in parallel rather than in series. Set the default timeout to 10s.
528 lines
18 KiB
Python
528 lines
18 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright 2014-2016 OpenMarket Ltd
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
|
|
from twisted.internet import defer, reactor, protocol
|
|
from twisted.internet.error import DNSLookupError
|
|
from twisted.web.client import readBody, HTTPConnectionPool, Agent
|
|
from twisted.web.http_headers import Headers
|
|
from twisted.web._newclient import ResponseDone
|
|
|
|
from synapse.http.endpoint import matrix_federation_endpoint
|
|
from synapse.util.async import sleep
|
|
from synapse.util.logcontext import preserve_context_over_fn
|
|
import synapse.metrics
|
|
|
|
from canonicaljson import encode_canonical_json
|
|
|
|
from synapse.api.errors import (
|
|
SynapseError, Codes, HttpResponseException,
|
|
)
|
|
|
|
from signedjson.sign import sign_json
|
|
|
|
import simplejson as json
|
|
import logging
|
|
import random
|
|
import sys
|
|
import urllib
|
|
import urlparse
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
outbound_logger = logging.getLogger("synapse.http.outbound")
|
|
|
|
metrics = synapse.metrics.get_metrics_for(__name__)
|
|
|
|
outgoing_requests_counter = metrics.register_counter(
|
|
"requests",
|
|
labels=["method"],
|
|
)
|
|
incoming_responses_counter = metrics.register_counter(
|
|
"responses",
|
|
labels=["method", "code"],
|
|
)
|
|
|
|
|
|
MAX_LONG_RETRIES = 10
|
|
MAX_SHORT_RETRIES = 3
|
|
|
|
|
|
class MatrixFederationEndpointFactory(object):
|
|
def __init__(self, hs):
|
|
self.tls_server_context_factory = hs.tls_server_context_factory
|
|
|
|
def endpointForURI(self, uri):
|
|
destination = uri.netloc
|
|
|
|
return matrix_federation_endpoint(
|
|
reactor, destination, timeout=10,
|
|
ssl_context_factory=self.tls_server_context_factory
|
|
)
|
|
|
|
|
|
class MatrixFederationHttpClient(object):
|
|
"""HTTP client used to talk to other homeservers over the federation
|
|
protocol. Send client certificates and signs requests.
|
|
|
|
Attributes:
|
|
agent (twisted.web.client.Agent): The twisted Agent used to send the
|
|
requests.
|
|
"""
|
|
|
|
def __init__(self, hs):
|
|
self.hs = hs
|
|
self.signing_key = hs.config.signing_key[0]
|
|
self.server_name = hs.hostname
|
|
pool = HTTPConnectionPool(reactor)
|
|
pool.maxPersistentPerHost = 10
|
|
self.agent = Agent.usingEndpointFactory(
|
|
reactor, MatrixFederationEndpointFactory(hs), pool=pool
|
|
)
|
|
self.clock = hs.get_clock()
|
|
self.version_string = hs.version_string
|
|
self._next_id = 1
|
|
|
|
def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
|
|
return urlparse.urlunparse(
|
|
("matrix", destination, path_bytes, param_bytes, query_bytes, "")
|
|
)
|
|
|
|
@defer.inlineCallbacks
|
|
def _create_request(self, destination, method, path_bytes,
|
|
body_callback, headers_dict={}, param_bytes=b"",
|
|
query_bytes=b"", retry_on_dns_fail=True,
|
|
timeout=None, long_retries=False):
|
|
""" Creates and sends a request to the given url
|
|
"""
|
|
headers_dict[b"User-Agent"] = [self.version_string]
|
|
headers_dict[b"Host"] = [destination]
|
|
|
|
url_bytes = self._create_url(
|
|
destination, path_bytes, param_bytes, query_bytes
|
|
)
|
|
|
|
txn_id = "%s-O-%s" % (method, self._next_id)
|
|
self._next_id = (self._next_id + 1) % (sys.maxint - 1)
|
|
|
|
outbound_logger.info(
|
|
"{%s} [%s] Sending request: %s %s",
|
|
txn_id, destination, method, url_bytes
|
|
)
|
|
|
|
# XXX: Would be much nicer to retry only at the transaction-layer
|
|
# (once we have reliable transactions in place)
|
|
if long_retries:
|
|
retries_left = MAX_LONG_RETRIES
|
|
else:
|
|
retries_left = MAX_SHORT_RETRIES
|
|
|
|
http_url_bytes = urlparse.urlunparse(
|
|
("", "", path_bytes, param_bytes, query_bytes, "")
|
|
)
|
|
|
|
log_result = None
|
|
try:
|
|
while True:
|
|
producer = None
|
|
if body_callback:
|
|
producer = body_callback(method, http_url_bytes, headers_dict)
|
|
|
|
try:
|
|
def send_request():
|
|
request_deferred = preserve_context_over_fn(
|
|
self.agent.request,
|
|
method,
|
|
url_bytes,
|
|
Headers(headers_dict),
|
|
producer
|
|
)
|
|
|
|
return self.clock.time_bound_deferred(
|
|
request_deferred,
|
|
time_out=timeout / 1000. if timeout else 60,
|
|
)
|
|
|
|
response = yield preserve_context_over_fn(send_request)
|
|
|
|
log_result = "%d %s" % (response.code, response.phrase,)
|
|
break
|
|
except Exception as e:
|
|
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
|
|
logger.warn(
|
|
"DNS Lookup failed to %s with %s",
|
|
destination,
|
|
e
|
|
)
|
|
log_result = "DNS Lookup failed to %s with %s" % (
|
|
destination, e
|
|
)
|
|
raise
|
|
|
|
logger.warn(
|
|
"{%s} Sending request failed to %s: %s %s: %s - %s",
|
|
txn_id,
|
|
destination,
|
|
method,
|
|
url_bytes,
|
|
type(e).__name__,
|
|
_flatten_response_never_received(e),
|
|
)
|
|
|
|
log_result = "%s - %s" % (
|
|
type(e).__name__, _flatten_response_never_received(e),
|
|
)
|
|
|
|
if retries_left and not timeout:
|
|
if long_retries:
|
|
delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
|
|
delay = min(delay, 60)
|
|
delay *= random.uniform(0.8, 1.4)
|
|
else:
|
|
delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
|
|
delay = min(delay, 2)
|
|
delay *= random.uniform(0.8, 1.4)
|
|
|
|
yield sleep(delay)
|
|
retries_left -= 1
|
|
else:
|
|
raise
|
|
finally:
|
|
outbound_logger.info(
|
|
"{%s} [%s] Result: %s",
|
|
txn_id,
|
|
destination,
|
|
log_result,
|
|
)
|
|
|
|
if 200 <= response.code < 300:
|
|
pass
|
|
else:
|
|
# :'(
|
|
# Update transactions table?
|
|
body = yield preserve_context_over_fn(readBody, response)
|
|
raise HttpResponseException(
|
|
response.code, response.phrase, body
|
|
)
|
|
|
|
defer.returnValue(response)
|
|
|
|
def sign_request(self, destination, method, url_bytes, headers_dict,
|
|
content=None):
|
|
request = {
|
|
"method": method,
|
|
"uri": url_bytes,
|
|
"origin": self.server_name,
|
|
"destination": destination,
|
|
}
|
|
|
|
if content is not None:
|
|
request["content"] = content
|
|
|
|
request = sign_json(request, self.server_name, self.signing_key)
|
|
|
|
auth_headers = []
|
|
|
|
for key, sig in request["signatures"][self.server_name].items():
|
|
auth_headers.append(bytes(
|
|
"X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
|
|
self.server_name, key, sig,
|
|
)
|
|
))
|
|
|
|
headers_dict[b"Authorization"] = auth_headers
|
|
|
|
@defer.inlineCallbacks
|
|
def put_json(self, destination, path, data={}, json_data_callback=None,
|
|
long_retries=False, timeout=None):
|
|
""" Sends the specifed json data using PUT
|
|
|
|
Args:
|
|
destination (str): The remote server to send the HTTP request
|
|
to.
|
|
path (str): The HTTP path.
|
|
data (dict): A dict containing the data that will be used as
|
|
the request body. This will be encoded as JSON.
|
|
json_data_callback (callable): A callable returning the dict to
|
|
use as the request body.
|
|
long_retries (bool): A boolean that indicates whether we should
|
|
retry for a short or long time.
|
|
timeout(int): How long to try (in ms) the destination for before
|
|
giving up. None indicates no timeout.
|
|
|
|
Returns:
|
|
Deferred: Succeeds when we get a 2xx HTTP response. The result
|
|
will be the decoded JSON body. On a 4xx or 5xx error response a
|
|
CodeMessageException is raised.
|
|
"""
|
|
|
|
if not json_data_callback:
|
|
def json_data_callback():
|
|
return data
|
|
|
|
def body_callback(method, url_bytes, headers_dict):
|
|
json_data = json_data_callback()
|
|
self.sign_request(
|
|
destination, method, url_bytes, headers_dict, json_data
|
|
)
|
|
producer = _JsonProducer(json_data)
|
|
return producer
|
|
|
|
response = yield self._create_request(
|
|
destination.encode("ascii"),
|
|
"PUT",
|
|
path.encode("ascii"),
|
|
body_callback=body_callback,
|
|
headers_dict={"Content-Type": ["application/json"]},
|
|
long_retries=long_retries,
|
|
timeout=timeout,
|
|
)
|
|
|
|
if 200 <= response.code < 300:
|
|
# We need to update the transactions table to say it was sent?
|
|
c_type = response.headers.getRawHeaders("Content-Type")
|
|
|
|
if "application/json" not in c_type:
|
|
raise RuntimeError(
|
|
"Content-Type not application/json"
|
|
)
|
|
|
|
body = yield preserve_context_over_fn(readBody, response)
|
|
defer.returnValue(json.loads(body))
|
|
|
|
@defer.inlineCallbacks
|
|
def post_json(self, destination, path, data={}, long_retries=True,
|
|
timeout=None):
|
|
""" Sends the specifed json data using POST
|
|
|
|
Args:
|
|
destination (str): The remote server to send the HTTP request
|
|
to.
|
|
path (str): The HTTP path.
|
|
data (dict): A dict containing the data that will be used as
|
|
the request body. This will be encoded as JSON.
|
|
long_retries (bool): A boolean that indicates whether we should
|
|
retry for a short or long time.
|
|
timeout(int): How long to try (in ms) the destination for before
|
|
giving up. None indicates no timeout.
|
|
|
|
Returns:
|
|
Deferred: Succeeds when we get a 2xx HTTP response. The result
|
|
will be the decoded JSON body. On a 4xx or 5xx error response a
|
|
CodeMessageException is raised.
|
|
"""
|
|
|
|
def body_callback(method, url_bytes, headers_dict):
|
|
self.sign_request(
|
|
destination, method, url_bytes, headers_dict, data
|
|
)
|
|
return _JsonProducer(data)
|
|
|
|
response = yield self._create_request(
|
|
destination.encode("ascii"),
|
|
"POST",
|
|
path.encode("ascii"),
|
|
body_callback=body_callback,
|
|
headers_dict={"Content-Type": ["application/json"]},
|
|
long_retries=True,
|
|
timeout=timeout,
|
|
)
|
|
|
|
if 200 <= response.code < 300:
|
|
# We need to update the transactions table to say it was sent?
|
|
c_type = response.headers.getRawHeaders("Content-Type")
|
|
|
|
if "application/json" not in c_type:
|
|
raise RuntimeError(
|
|
"Content-Type not application/json"
|
|
)
|
|
|
|
body = yield preserve_context_over_fn(readBody, response)
|
|
|
|
defer.returnValue(json.loads(body))
|
|
|
|
@defer.inlineCallbacks
|
|
def get_json(self, destination, path, args={}, retry_on_dns_fail=True,
|
|
timeout=None):
|
|
""" GETs some json from the given host homeserver and path
|
|
|
|
Args:
|
|
destination (str): The remote server to send the HTTP request
|
|
to.
|
|
path (str): The HTTP path.
|
|
args (dict): A dictionary used to create query strings, defaults to
|
|
None.
|
|
timeout (int): How long to try (in ms) the destination for before
|
|
giving up. None indicates no timeout and that the request will
|
|
be retried.
|
|
Returns:
|
|
Deferred: Succeeds when we get *any* HTTP response.
|
|
|
|
The result of the deferred is a tuple of `(code, response)`,
|
|
where `response` is a dict representing the decoded JSON body.
|
|
"""
|
|
logger.debug("get_json args: %s", args)
|
|
|
|
encoded_args = {}
|
|
for k, vs in args.items():
|
|
if isinstance(vs, basestring):
|
|
vs = [vs]
|
|
encoded_args[k] = [v.encode("UTF-8") for v in vs]
|
|
|
|
query_bytes = urllib.urlencode(encoded_args, True)
|
|
logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
|
|
|
|
def body_callback(method, url_bytes, headers_dict):
|
|
self.sign_request(destination, method, url_bytes, headers_dict)
|
|
return None
|
|
|
|
response = yield self._create_request(
|
|
destination.encode("ascii"),
|
|
"GET",
|
|
path.encode("ascii"),
|
|
query_bytes=query_bytes,
|
|
body_callback=body_callback,
|
|
retry_on_dns_fail=retry_on_dns_fail,
|
|
timeout=timeout,
|
|
)
|
|
|
|
if 200 <= response.code < 300:
|
|
# We need to update the transactions table to say it was sent?
|
|
c_type = response.headers.getRawHeaders("Content-Type")
|
|
|
|
if "application/json" not in c_type:
|
|
raise RuntimeError(
|
|
"Content-Type not application/json"
|
|
)
|
|
|
|
body = yield preserve_context_over_fn(readBody, response)
|
|
|
|
defer.returnValue(json.loads(body))
|
|
|
|
@defer.inlineCallbacks
|
|
def get_file(self, destination, path, output_stream, args={},
|
|
retry_on_dns_fail=True, max_size=None):
|
|
"""GETs a file from a given homeserver
|
|
Args:
|
|
destination (str): The remote server to send the HTTP request to.
|
|
path (str): The HTTP path to GET.
|
|
output_stream (file): File to write the response body to.
|
|
args (dict): Optional dictionary used to create the query string.
|
|
Returns:
|
|
A (int,dict) tuple of the file length and a dict of the response
|
|
headers.
|
|
"""
|
|
|
|
encoded_args = {}
|
|
for k, vs in args.items():
|
|
if isinstance(vs, basestring):
|
|
vs = [vs]
|
|
encoded_args[k] = [v.encode("UTF-8") for v in vs]
|
|
|
|
query_bytes = urllib.urlencode(encoded_args, True)
|
|
logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
|
|
|
|
def body_callback(method, url_bytes, headers_dict):
|
|
self.sign_request(destination, method, url_bytes, headers_dict)
|
|
return None
|
|
|
|
response = yield self._create_request(
|
|
destination.encode("ascii"),
|
|
"GET",
|
|
path.encode("ascii"),
|
|
query_bytes=query_bytes,
|
|
body_callback=body_callback,
|
|
retry_on_dns_fail=retry_on_dns_fail
|
|
)
|
|
|
|
headers = dict(response.headers.getAllRawHeaders())
|
|
|
|
try:
|
|
length = yield preserve_context_over_fn(
|
|
_readBodyToFile,
|
|
response, output_stream, max_size
|
|
)
|
|
except:
|
|
logger.exception("Failed to download body")
|
|
raise
|
|
|
|
defer.returnValue((length, headers))
|
|
|
|
|
|
class _ReadBodyToFileProtocol(protocol.Protocol):
|
|
def __init__(self, stream, deferred, max_size):
|
|
self.stream = stream
|
|
self.deferred = deferred
|
|
self.length = 0
|
|
self.max_size = max_size
|
|
|
|
def dataReceived(self, data):
|
|
self.stream.write(data)
|
|
self.length += len(data)
|
|
if self.max_size is not None and self.length >= self.max_size:
|
|
self.deferred.errback(SynapseError(
|
|
502,
|
|
"Requested file is too large > %r bytes" % (self.max_size,),
|
|
Codes.TOO_LARGE,
|
|
))
|
|
self.deferred = defer.Deferred()
|
|
self.transport.loseConnection()
|
|
|
|
def connectionLost(self, reason):
|
|
if reason.check(ResponseDone):
|
|
self.deferred.callback(self.length)
|
|
else:
|
|
self.deferred.errback(reason)
|
|
|
|
|
|
def _readBodyToFile(response, stream, max_size):
|
|
d = defer.Deferred()
|
|
response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
|
|
return d
|
|
|
|
|
|
class _JsonProducer(object):
|
|
""" Used by the twisted http client to create the HTTP body from json
|
|
"""
|
|
def __init__(self, jsn):
|
|
self.reset(jsn)
|
|
|
|
def reset(self, jsn):
|
|
self.body = encode_canonical_json(jsn)
|
|
self.length = len(self.body)
|
|
|
|
def startProducing(self, consumer):
|
|
consumer.write(self.body)
|
|
return defer.succeed(None)
|
|
|
|
def pauseProducing(self):
|
|
pass
|
|
|
|
def stopProducing(self):
|
|
pass
|
|
|
|
def resumeProducing(self):
|
|
pass
|
|
|
|
|
|
def _flatten_response_never_received(e):
|
|
if hasattr(e, "reasons"):
|
|
return ", ".join(
|
|
_flatten_response_never_received(f.value)
|
|
for f in e.reasons
|
|
)
|
|
else:
|
|
return "%s: %s" % (type(e).__name__, e.message,)
|