mirror of
https://git.anonymousland.org/anonymousland/synapse-product.git
synced 2025-05-02 12:56:08 -04:00
Port http/ to Python 3 (#3771)
This commit is contained in:
parent
a6cf7d9d9a
commit
2d2828dcbc
8 changed files with 134 additions and 186 deletions
|
@ -17,19 +17,19 @@ import cgi
|
|||
import logging
|
||||
import random
|
||||
import sys
|
||||
import urllib
|
||||
|
||||
from six import string_types
|
||||
from six.moves.urllib import parse as urlparse
|
||||
from six import PY3, string_types
|
||||
from six.moves import urllib
|
||||
|
||||
from canonicaljson import encode_canonical_json, json
|
||||
import treq
|
||||
from canonicaljson import encode_canonical_json
|
||||
from prometheus_client import Counter
|
||||
from signedjson.sign import sign_json
|
||||
|
||||
from twisted.internet import defer, protocol, reactor
|
||||
from twisted.internet.error import DNSLookupError
|
||||
from twisted.web._newclient import ResponseDone
|
||||
from twisted.web.client import Agent, HTTPConnectionPool, readBody
|
||||
from twisted.web.client import Agent, HTTPConnectionPool
|
||||
from twisted.web.http_headers import Headers
|
||||
|
||||
import synapse.metrics
|
||||
|
@ -58,13 +58,18 @@ incoming_responses_counter = Counter("synapse_http_matrixfederationclient_respon
|
|||
MAX_LONG_RETRIES = 10
|
||||
MAX_SHORT_RETRIES = 3
|
||||
|
||||
if PY3:
|
||||
MAXINT = sys.maxsize
|
||||
else:
|
||||
MAXINT = sys.maxint
|
||||
|
||||
|
||||
class MatrixFederationEndpointFactory(object):
|
||||
def __init__(self, hs):
|
||||
self.tls_client_options_factory = hs.tls_client_options_factory
|
||||
|
||||
def endpointForURI(self, uri):
|
||||
destination = uri.netloc
|
||||
destination = uri.netloc.decode('ascii')
|
||||
|
||||
return matrix_federation_endpoint(
|
||||
reactor, destination, timeout=10,
|
||||
|
@ -93,26 +98,32 @@ class MatrixFederationHttpClient(object):
|
|||
)
|
||||
self.clock = hs.get_clock()
|
||||
self._store = hs.get_datastore()
|
||||
self.version_string = hs.version_string
|
||||
self.version_string = hs.version_string.encode('ascii')
|
||||
self._next_id = 1
|
||||
|
||||
def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
|
||||
return urlparse.urlunparse(
|
||||
("matrix", destination, path_bytes, param_bytes, query_bytes, "")
|
||||
return urllib.parse.urlunparse(
|
||||
(b"matrix", destination, path_bytes, param_bytes, query_bytes, b"")
|
||||
)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _request(self, destination, method, path,
|
||||
body_callback, headers_dict={}, param_bytes=b"",
|
||||
query_bytes=b"", retry_on_dns_fail=True,
|
||||
json=None, json_callback=None,
|
||||
param_bytes=b"",
|
||||
query=None, retry_on_dns_fail=True,
|
||||
timeout=None, long_retries=False,
|
||||
ignore_backoff=False,
|
||||
backoff_on_404=False):
|
||||
""" Creates and sends a request to the given server
|
||||
"""
|
||||
Creates and sends a request to the given server.
|
||||
|
||||
Args:
|
||||
destination (str): The remote server to send the HTTP request to.
|
||||
method (str): HTTP method
|
||||
path (str): The HTTP path
|
||||
json (dict or None): JSON to send in the body.
|
||||
json_callback (func or None): A callback to generate the JSON.
|
||||
query (dict or None): Query arguments.
|
||||
ignore_backoff (bool): true to ignore the historical backoff data
|
||||
and try the request anyway.
|
||||
backoff_on_404 (bool): Back off if we get a 404
|
||||
|
@ -146,22 +157,29 @@ class MatrixFederationHttpClient(object):
|
|||
ignore_backoff=ignore_backoff,
|
||||
)
|
||||
|
||||
destination = destination.encode("ascii")
|
||||
headers_dict = {}
|
||||
path_bytes = path.encode("ascii")
|
||||
with limiter:
|
||||
headers_dict[b"User-Agent"] = [self.version_string]
|
||||
headers_dict[b"Host"] = [destination]
|
||||
if query:
|
||||
query_bytes = encode_query_args(query)
|
||||
else:
|
||||
query_bytes = b""
|
||||
|
||||
url_bytes = self._create_url(
|
||||
destination, path_bytes, param_bytes, query_bytes
|
||||
)
|
||||
headers_dict = {
|
||||
"User-Agent": [self.version_string],
|
||||
"Host": [destination],
|
||||
}
|
||||
|
||||
with limiter:
|
||||
url = self._create_url(
|
||||
destination.encode("ascii"), path_bytes, param_bytes, query_bytes
|
||||
).decode('ascii')
|
||||
|
||||
txn_id = "%s-O-%s" % (method, self._next_id)
|
||||
self._next_id = (self._next_id + 1) % (sys.maxint - 1)
|
||||
self._next_id = (self._next_id + 1) % (MAXINT - 1)
|
||||
|
||||
outbound_logger.info(
|
||||
"{%s} [%s] Sending request: %s %s",
|
||||
txn_id, destination, method, url_bytes
|
||||
txn_id, destination, method, url
|
||||
)
|
||||
|
||||
# XXX: Would be much nicer to retry only at the transaction-layer
|
||||
|
@ -171,23 +189,33 @@ class MatrixFederationHttpClient(object):
|
|||
else:
|
||||
retries_left = MAX_SHORT_RETRIES
|
||||
|
||||
http_url_bytes = urlparse.urlunparse(
|
||||
("", "", path_bytes, param_bytes, query_bytes, "")
|
||||
)
|
||||
http_url = urllib.parse.urlunparse(
|
||||
(b"", b"", path_bytes, param_bytes, query_bytes, b"")
|
||||
).decode('ascii')
|
||||
|
||||
log_result = None
|
||||
try:
|
||||
while True:
|
||||
producer = None
|
||||
if body_callback:
|
||||
producer = body_callback(method, http_url_bytes, headers_dict)
|
||||
|
||||
try:
|
||||
request_deferred = self.agent.request(
|
||||
if json_callback:
|
||||
json = json_callback()
|
||||
|
||||
if json:
|
||||
data = encode_canonical_json(json)
|
||||
headers_dict["Content-Type"] = ["application/json"]
|
||||
self.sign_request(
|
||||
destination, method, http_url, headers_dict, json
|
||||
)
|
||||
else:
|
||||
data = None
|
||||
self.sign_request(destination, method, http_url, headers_dict)
|
||||
|
||||
request_deferred = treq.request(
|
||||
method,
|
||||
url_bytes,
|
||||
Headers(headers_dict),
|
||||
producer
|
||||
url,
|
||||
headers=Headers(headers_dict),
|
||||
data=data,
|
||||
agent=self.agent,
|
||||
)
|
||||
add_timeout_to_deferred(
|
||||
request_deferred,
|
||||
|
@ -218,7 +246,7 @@ class MatrixFederationHttpClient(object):
|
|||
txn_id,
|
||||
destination,
|
||||
method,
|
||||
url_bytes,
|
||||
url,
|
||||
_flatten_response_never_received(e),
|
||||
)
|
||||
|
||||
|
@ -252,7 +280,7 @@ class MatrixFederationHttpClient(object):
|
|||
# :'(
|
||||
# Update transactions table?
|
||||
with logcontext.PreserveLoggingContext():
|
||||
body = yield readBody(response)
|
||||
body = yield treq.content(response)
|
||||
raise HttpResponseException(
|
||||
response.code, response.phrase, body
|
||||
)
|
||||
|
@ -297,11 +325,11 @@ class MatrixFederationHttpClient(object):
|
|||
auth_headers = []
|
||||
|
||||
for key, sig in request["signatures"][self.server_name].items():
|
||||
auth_headers.append(bytes(
|
||||
auth_headers.append((
|
||||
"X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
|
||||
self.server_name, key, sig,
|
||||
)
|
||||
))
|
||||
)).encode('ascii')
|
||||
)
|
||||
|
||||
headers_dict[b"Authorization"] = auth_headers
|
||||
|
||||
|
@ -347,24 +375,14 @@ class MatrixFederationHttpClient(object):
|
|||
"""
|
||||
|
||||
if not json_data_callback:
|
||||
def json_data_callback():
|
||||
return data
|
||||
|
||||
def body_callback(method, url_bytes, headers_dict):
|
||||
json_data = json_data_callback()
|
||||
self.sign_request(
|
||||
destination, method, url_bytes, headers_dict, json_data
|
||||
)
|
||||
producer = _JsonProducer(json_data)
|
||||
return producer
|
||||
json_data_callback = lambda: data
|
||||
|
||||
response = yield self._request(
|
||||
destination,
|
||||
"PUT",
|
||||
path,
|
||||
body_callback=body_callback,
|
||||
headers_dict={"Content-Type": ["application/json"]},
|
||||
query_bytes=encode_query_args(args),
|
||||
json_callback=json_data_callback,
|
||||
query=args,
|
||||
long_retries=long_retries,
|
||||
timeout=timeout,
|
||||
ignore_backoff=ignore_backoff,
|
||||
|
@ -376,8 +394,8 @@ class MatrixFederationHttpClient(object):
|
|||
check_content_type_is_json(response.headers)
|
||||
|
||||
with logcontext.PreserveLoggingContext():
|
||||
body = yield readBody(response)
|
||||
defer.returnValue(json.loads(body))
|
||||
body = yield treq.json_content(response)
|
||||
defer.returnValue(body)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def post_json(self, destination, path, data={}, long_retries=False,
|
||||
|
@ -410,20 +428,12 @@ class MatrixFederationHttpClient(object):
|
|||
Fails with ``FederationDeniedError`` if this destination
|
||||
is not on our federation whitelist
|
||||
"""
|
||||
|
||||
def body_callback(method, url_bytes, headers_dict):
|
||||
self.sign_request(
|
||||
destination, method, url_bytes, headers_dict, data
|
||||
)
|
||||
return _JsonProducer(data)
|
||||
|
||||
response = yield self._request(
|
||||
destination,
|
||||
"POST",
|
||||
path,
|
||||
query_bytes=encode_query_args(args),
|
||||
body_callback=body_callback,
|
||||
headers_dict={"Content-Type": ["application/json"]},
|
||||
query=args,
|
||||
json=data,
|
||||
long_retries=long_retries,
|
||||
timeout=timeout,
|
||||
ignore_backoff=ignore_backoff,
|
||||
|
@ -434,9 +444,9 @@ class MatrixFederationHttpClient(object):
|
|||
check_content_type_is_json(response.headers)
|
||||
|
||||
with logcontext.PreserveLoggingContext():
|
||||
body = yield readBody(response)
|
||||
body = yield treq.json_content(response)
|
||||
|
||||
defer.returnValue(json.loads(body))
|
||||
defer.returnValue(body)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_json(self, destination, path, args=None, retry_on_dns_fail=True,
|
||||
|
@ -471,16 +481,11 @@ class MatrixFederationHttpClient(object):
|
|||
|
||||
logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
|
||||
|
||||
def body_callback(method, url_bytes, headers_dict):
|
||||
self.sign_request(destination, method, url_bytes, headers_dict)
|
||||
return None
|
||||
|
||||
response = yield self._request(
|
||||
destination,
|
||||
"GET",
|
||||
path,
|
||||
query_bytes=encode_query_args(args),
|
||||
body_callback=body_callback,
|
||||
query=args,
|
||||
retry_on_dns_fail=retry_on_dns_fail,
|
||||
timeout=timeout,
|
||||
ignore_backoff=ignore_backoff,
|
||||
|
@ -491,9 +496,9 @@ class MatrixFederationHttpClient(object):
|
|||
check_content_type_is_json(response.headers)
|
||||
|
||||
with logcontext.PreserveLoggingContext():
|
||||
body = yield readBody(response)
|
||||
body = yield treq.json_content(response)
|
||||
|
||||
defer.returnValue(json.loads(body))
|
||||
defer.returnValue(body)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def delete_json(self, destination, path, long_retries=False,
|
||||
|
@ -523,13 +528,11 @@ class MatrixFederationHttpClient(object):
|
|||
Fails with ``FederationDeniedError`` if this destination
|
||||
is not on our federation whitelist
|
||||
"""
|
||||
|
||||
response = yield self._request(
|
||||
destination,
|
||||
"DELETE",
|
||||
path,
|
||||
query_bytes=encode_query_args(args),
|
||||
headers_dict={"Content-Type": ["application/json"]},
|
||||
query=args,
|
||||
long_retries=long_retries,
|
||||
timeout=timeout,
|
||||
ignore_backoff=ignore_backoff,
|
||||
|
@ -540,9 +543,9 @@ class MatrixFederationHttpClient(object):
|
|||
check_content_type_is_json(response.headers)
|
||||
|
||||
with logcontext.PreserveLoggingContext():
|
||||
body = yield readBody(response)
|
||||
body = yield treq.json_content(response)
|
||||
|
||||
defer.returnValue(json.loads(body))
|
||||
defer.returnValue(body)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_file(self, destination, path, output_stream, args={},
|
||||
|
@ -569,26 +572,11 @@ class MatrixFederationHttpClient(object):
|
|||
Fails with ``FederationDeniedError`` if this destination
|
||||
is not on our federation whitelist
|
||||
"""
|
||||
|
||||
encoded_args = {}
|
||||
for k, vs in args.items():
|
||||
if isinstance(vs, string_types):
|
||||
vs = [vs]
|
||||
encoded_args[k] = [v.encode("UTF-8") for v in vs]
|
||||
|
||||
query_bytes = urllib.urlencode(encoded_args, True)
|
||||
logger.debug("Query bytes: %s Retry DNS: %s", query_bytes, retry_on_dns_fail)
|
||||
|
||||
def body_callback(method, url_bytes, headers_dict):
|
||||
self.sign_request(destination, method, url_bytes, headers_dict)
|
||||
return None
|
||||
|
||||
response = yield self._request(
|
||||
destination,
|
||||
"GET",
|
||||
path,
|
||||
query_bytes=query_bytes,
|
||||
body_callback=body_callback,
|
||||
query=args,
|
||||
retry_on_dns_fail=retry_on_dns_fail,
|
||||
ignore_backoff=ignore_backoff,
|
||||
)
|
||||
|
@ -639,30 +627,6 @@ def _readBodyToFile(response, stream, max_size):
|
|||
return d
|
||||
|
||||
|
||||
class _JsonProducer(object):
|
||||
""" Used by the twisted http client to create the HTTP body from json
|
||||
"""
|
||||
def __init__(self, jsn):
|
||||
self.reset(jsn)
|
||||
|
||||
def reset(self, jsn):
|
||||
self.body = encode_canonical_json(jsn)
|
||||
self.length = len(self.body)
|
||||
|
||||
def startProducing(self, consumer):
|
||||
consumer.write(self.body)
|
||||
return defer.succeed(None)
|
||||
|
||||
def pauseProducing(self):
|
||||
pass
|
||||
|
||||
def stopProducing(self):
|
||||
pass
|
||||
|
||||
def resumeProducing(self):
|
||||
pass
|
||||
|
||||
|
||||
def _flatten_response_never_received(e):
|
||||
if hasattr(e, "reasons"):
|
||||
reasons = ", ".join(
|
||||
|
@ -693,7 +657,7 @@ def check_content_type_is_json(headers):
|
|||
"No Content-Type header"
|
||||
)
|
||||
|
||||
c_type = c_type[0] # only the first header
|
||||
c_type = c_type[0].decode('ascii') # only the first header
|
||||
val, options = cgi.parse_header(c_type)
|
||||
if val != "application/json":
|
||||
raise RuntimeError(
|
||||
|
@ -711,6 +675,6 @@ def encode_query_args(args):
|
|||
vs = [vs]
|
||||
encoded_args[k] = [v.encode("UTF-8") for v in vs]
|
||||
|
||||
query_bytes = urllib.urlencode(encoded_args, True)
|
||||
query_bytes = urllib.parse.urlencode(encoded_args, True)
|
||||
|
||||
return query_bytes
|
||||
return query_bytes.encode('utf8')
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue