Separate out the matrix http client completely because just about all of its code it now separate from the simple case we need for standard HTTP(S)

This commit is contained in:
David Baker 2014-11-20 17:41:56 +00:00
parent 20326054da
commit e377d33652
8 changed files with 362 additions and 338 deletions

View File

@ -26,7 +26,7 @@ from twisted.web.server import Site
from synapse.http.server import JsonResource, RootRedirect
from synapse.http.content_repository import ContentRepoResource
from synapse.http.server_key_resource import LocalKey
from synapse.http.client import MatrixFederationHttpClient
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.api.urls import (
CLIENT_PREFIX, FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX,
SERVER_KEY_PREFIX,

View File

@ -17,7 +17,7 @@
from twisted.web.http import HTTPClient
from twisted.internet.protocol import Factory
from twisted.internet import defer, reactor
from synapse.http.endpoint import matrix_endpoint
from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util.logcontext import PreserveLoggingContext
import json
import logging
@ -31,7 +31,7 @@ def fetch_server_key(server_name, ssl_context_factory):
"""Fetch the keys for a remote server."""
factory = SynapseKeyClientFactory()
endpoint = matrix_endpoint(
endpoint = matrix_federation_endpoint(
reactor, server_name, ssl_context_factory, timeout=30
)
@ -48,7 +48,7 @@ def fetch_server_key(server_name, ssl_context_factory):
class SynapseKeyClientError(Exception):
"""The key wasn't retireved from the remote server."""
"""The key wasn't retrieved from the remote server."""
pass

View File

@ -99,8 +99,12 @@ class LoginHandler(BaseHandler):
def _query_email(self, email):
httpCli = SimpleHttpClient(self.hs)
data = yield httpCli.get_json(
'matrix.org:8090', # TODO FIXME This should be configurable.
"/_matrix/identity/api/v1/lookup?medium=email&address=" +
"%s" % urllib.quote(email)
# TODO FIXME This should be configurable.
# XXX: ID servers need to use HTTPS
"http://%s%s" % ("matrix.org:8090", "/_matrix/identity/api/v1/lookup"),
{
'medium': 'email',
'address': email
}
)
defer.returnValue(data)

View File

@ -133,7 +133,7 @@ class RegistrationHandler(BaseHandler):
if not threepid:
raise RegistrationError(400, "Couldn't validate 3pid")
logger.info("got threepid medium %s address %s",
logger.info("got threepid with medium '%s' and address '%s'",
threepid['medium'], threepid['address'])
@defer.inlineCallbacks
@ -167,8 +167,8 @@ class RegistrationHandler(BaseHandler):
'credentials', creds['idServer'])
defer.returnValue(None)
data = yield httpCli.get_json(
creds['idServer'],
"/_matrix/identity/api/v1/3pid/getValidated3pid",
# XXX: This should be HTTPS
"http://%s%s" % (creds['idServer'], "/_matrix/identity/api/v1/3pid/getValidated3pid"),
{'sid': creds['sid'], 'clientSecret': creds['clientSecret']}
)
@ -178,16 +178,19 @@ class RegistrationHandler(BaseHandler):
@defer.inlineCallbacks
def _bind_threepid(self, creds, mxid):
yield
logger.debug("binding threepid")
httpCli = SimpleHttpClient(self.hs)
data = yield httpCli.post_urlencoded_get_json(
creds['idServer'],
"/_matrix/identity/api/v1/3pid/bind",
# XXX: Change when ID servers are all HTTPS
"http://%s%s" % (creds['idServer'], "/_matrix/identity/api/v1/3pid/bind"),
{
'sid': creds['sid'],
'clientSecret': creds['clientSecret'],
'mxid': mxid,
}
)
logger.debug("bound threepid")
defer.returnValue(data)
@defer.inlineCallbacks
@ -215,8 +218,7 @@ class RegistrationHandler(BaseHandler):
# each request
client = CaptchaServerHttpClient(self.hs)
data = yield client.post_urlencoded_get_raw(
"www.google.com:80",
"/recaptcha/api/verify",
"http://www.google.com:80/recaptcha/api/verify",
args={
'privatekey': private_key,
'remoteip': ip_addr,

View File

@ -15,168 +15,44 @@
from twisted.internet import defer, reactor
from twisted.internet.error import DNSLookupError
from twisted.web.client import (
_AgentBase, _URI, readBody, FileBodyProducer, PartialDownloadError
Agent, readBody, FileBodyProducer, PartialDownloadError
)
from twisted.web.http_headers import Headers
from synapse.http.endpoint import matrix_endpoint
from synapse.util.async import sleep
from synapse.util.logcontext import PreserveLoggingContext
from syutil.jsonutil import encode_canonical_json
from synapse.api.errors import CodeMessageException, SynapseError
from syutil.crypto.jsonsign import sign_json
from StringIO import StringIO
import json
import logging
import urllib
import urlparse
logger = logging.getLogger(__name__)
class MatrixHttpAgent(_AgentBase):
def __init__(self, reactor, pool=None):
_AgentBase.__init__(self, reactor, pool)
def request(self, destination, endpoint, method, path, params, query,
headers, body_producer):
host = b""
port = 0
fragment = b""
parsed_URI = _URI(b"http", destination, host, port, path, params,
query, fragment)
# Set the connection pool key to be the destination.
key = destination
return self._requestWithEndpoint(key, endpoint, method, parsed_URI,
headers, body_producer,
parsed_URI.originForm)
class BaseHttpClient(object):
"""Base class for HTTP clients using twisted.
"""
def __init__(self, hs):
self.agent = MatrixHttpAgent(reactor)
self.hs = hs
@defer.inlineCallbacks
def _create_request(self, destination, method, path_bytes,
body_callback, headers_dict={}, param_bytes=b"",
query_bytes=b"", retry_on_dns_fail=True):
""" Creates and sends a request to the given url
"""
headers_dict[b"User-Agent"] = [b"Synapse"]
headers_dict[b"Host"] = [destination]
url_bytes = urlparse.urlunparse(
("", "", path_bytes, param_bytes, query_bytes, "",)
)
logger.debug("Sending request to %s: %s %s",
destination, method, url_bytes)
logger.debug(
"Types: %s",
[
type(destination), type(method), type(path_bytes),
type(param_bytes),
type(query_bytes)
]
)
retries_left = 5
endpoint = self._getEndpoint(reactor, destination)
while True:
producer = None
if body_callback:
producer = body_callback(method, url_bytes, headers_dict)
try:
with PreserveLoggingContext():
response = yield self.agent.request(
destination,
endpoint,
method,
path_bytes,
param_bytes,
query_bytes,
Headers(headers_dict),
producer
)
logger.debug("Got response to %s", method)
break
except Exception as e:
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
logger.warn("DNS Lookup failed to %s with %s", destination,
e)
raise SynapseError(400, "Domain specified not found.")
logger.exception("Got error in _create_request")
_print_ex(e)
if retries_left:
yield sleep(2 ** (5 - retries_left))
retries_left -= 1
else:
raise
if 200 <= response.code < 300:
# We need to update the transactions table to say it was sent?
pass
else:
# :'(
# Update transactions table?
logger.error(
"Got response %d %s", response.code, response.phrase
)
raise CodeMessageException(
response.code, response.phrase
)
defer.returnValue(response)
class SimpleHttpClient(BaseHttpClient):
class SimpleHttpClient(object):
"""
A simple, no-frills HTTP client with methods that wrap up common ways of using HTTP in Matrix
"""
def _getEndpoint(self, reactor, destination):
return matrix_endpoint(reactor, destination, timeout=10)
def __init__(self, hs):
self.hs = hs
# The default context factory in Twisted 14.0.0 (which we require) is
# BrowserLikePolicyForHTTPS which will do regular cert validation 'like a browser'
self.agent = Agent(reactor)
@defer.inlineCallbacks
def post_urlencoded_get_json(self, destination, path, args={}):
def post_urlencoded_get_json(self, uri, args={}):
logger.debug("post_urlencoded_get_json args: %s", args)
query_bytes = urllib.urlencode(args, True)
def body_callback(method, url_bytes, headers_dict):
return FileBodyProducer(StringIO(query_bytes))
response = yield self._create_request(
destination.encode("ascii"),
response = yield self.agent.request(
"POST",
path.encode("ascii"),
body_callback=body_callback,
headers_dict={
uri.encode("ascii"),
headers=Headers({
"Content-Type": ["application/x-www-form-urlencoded"]
}
}),
bodyProducer=FileBodyProducer(StringIO(query_bytes))
)
body = yield readBody(response)
@ -184,12 +60,11 @@ class SimpleHttpClient(BaseHttpClient):
defer.returnValue(json.loads(body))
@defer.inlineCallbacks
def get_json(self, destination, path, args={}, retry_on_dns_fail=True):
def get_json(self, uri, args={}):
""" Get's some json from the given host and path
Args:
destination (str): The remote server to send the HTTP request to.
path (str): The HTTP path.
uri (str): The URI to request, not including query parameters
args (dict): A dictionary used to create query strings, defaults to
None.
**Note**: The value of each key is assumed to be an iterable
@ -201,18 +76,15 @@ class SimpleHttpClient(BaseHttpClient):
The result of the deferred is a tuple of `(code, response)`,
where `response` is a dict representing the decoded JSON body.
"""
logger.debug("get_json args: %s", args)
query_bytes = urllib.urlencode(args, True)
logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
yield
if len(args):
query_bytes = urllib.urlencode(args, True)
uri = "%s?%s" % (uri, query_bytes)
response = yield self._create_request(
destination.encode("ascii"),
response = yield self.agent.request(
"GET",
path.encode("ascii"),
query_bytes=query_bytes,
retry_on_dns_fail=retry_on_dns_fail,
body_callback=None
uri.encode("ascii"),
)
body = yield readBody(response)
@ -220,174 +92,30 @@ class SimpleHttpClient(BaseHttpClient):
defer.returnValue(json.loads(body))
class MatrixFederationHttpClient(BaseHttpClient):
"""HTTP client used to talk to other homeservers over the federation protocol.
Send client certificates and signs requests.
Attributes:
agent (twisted.web.client.Agent): The twisted Agent used to send the
requests.
"""
def __init__(self, hs):
self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname
BaseHttpClient.__init__(self, hs)
def sign_request(self, destination, method, url_bytes, headers_dict,
content=None):
request = {
"method": method,
"uri": url_bytes,
"origin": self.server_name,
"destination": destination,
}
if content is not None:
request["content"] = content
request = sign_json(request, self.server_name, self.signing_key)
auth_headers = []
for key, sig in request["signatures"][self.server_name].items():
auth_headers.append(bytes(
"X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
self.server_name, key, sig,
)
))
headers_dict[b"Authorization"] = auth_headers
@defer.inlineCallbacks
def put_json(self, destination, path, data={}, json_data_callback=None):
""" Sends the specifed json data using PUT
Args:
destination (str): The remote server to send the HTTP request
to.
path (str): The HTTP path.
data (dict): A dict containing the data that will be used as
the request body. This will be encoded as JSON.
json_data_callback (callable): A callable returning the dict to
use as the request body.
Returns:
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body. On a 4xx or 5xx error response a
CodeMessageException is raised.
"""
if not json_data_callback:
def json_data_callback():
return data
def body_callback(method, url_bytes, headers_dict):
json_data = json_data_callback()
self.sign_request(
destination, method, url_bytes, headers_dict, json_data
)
producer = _JsonProducer(json_data)
return producer
response = yield self._create_request(
destination.encode("ascii"),
"PUT",
path.encode("ascii"),
body_callback=body_callback,
headers_dict={"Content-Type": ["application/json"]},
)
logger.debug("Getting resp body")
body = yield readBody(response)
logger.debug("Got resp body")
defer.returnValue((response.code, body))
@defer.inlineCallbacks
def get_json(self, destination, path, args={}, retry_on_dns_fail=True):
""" Get's some json from the given host homeserver and path
Args:
destination (str): The remote server to send the HTTP request
to.
path (str): The HTTP path.
args (dict): A dictionary used to create query strings, defaults to
None.
**Note**: The value of each key is assumed to be an iterable
and *not* a string.
Returns:
Deferred: Succeeds when we get *any* HTTP response.
The result of the deferred is a tuple of `(code, response)`,
where `response` is a dict representing the decoded JSON body.
"""
logger.debug("get_json args: %s", args)
encoded_args = {}
for k, vs in args.items():
if isinstance(vs, basestring):
vs = [vs]
encoded_args[k] = [v.encode("UTF-8") for v in vs]
query_bytes = urllib.urlencode(encoded_args, True)
logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
def body_callback(method, url_bytes, headers_dict):
self.sign_request(destination, method, url_bytes, headers_dict)
return None
response = yield self._create_request(
destination.encode("ascii"),
"GET",
path.encode("ascii"),
query_bytes=query_bytes,
body_callback=body_callback,
retry_on_dns_fail=retry_on_dns_fail
)
body = yield readBody(response)
defer.returnValue(json.loads(body))
def _getEndpoint(self, reactor, destination):
return matrix_endpoint(
reactor, destination, timeout=10,
ssl_context_factory=self.hs.tls_context_factory
)
class CaptchaServerHttpClient(BaseHttpClient):
class CaptchaServerHttpClient(SimpleHttpClient):
"""
Separate HTTP client for talking to google's captcha servers
Only slightly special because accepts partial download responses
"""
def _getEndpoint(self, reactor, destination):
return matrix_endpoint(reactor, destination, timeout=10)
@defer.inlineCallbacks
def post_urlencoded_get_raw(self, destination, path, args={}):
def post_urlencoded_get_raw(self, url, args={}):
query_bytes = urllib.urlencode(args, True)
def body_callback(method, url_bytes, headers_dict):
return FileBodyProducer(StringIO(query_bytes))
response = yield self._create_request(
destination.encode("ascii"),
response = yield self.agent.request(
"POST",
path.encode("ascii"),
body_callback=body_callback,
headers_dict={
url.encode("ascii"),
bodyProducer=FileBodyProducer(StringIO(query_bytes)),
headers=Headers({
"Content-Type": ["application/x-www-form-urlencoded"]
}
})
)
try:
body = yield readBody(response)
defer.returnValue(body)
except PartialDownloadError as e:
# twisted dislikes google's response, no content length.
defer.returnValue(e.response)
@ -397,24 +125,3 @@ def _print_ex(e):
_print_ex(ex)
else:
logger.exception(e)
class _JsonProducer(object):
""" Used by the twisted http client to create the HTTP body from json
"""
def __init__(self, jsn):
self.reset(jsn)
def reset(self, jsn):
self.body = encode_canonical_json(jsn)
self.length = len(self.body)
def startProducing(self, consumer):
consumer.write(self.body)
return defer.succeed(None)
def pauseProducing(self):
pass
def stopProducing(self):
pass

View File

@ -27,7 +27,7 @@ import random
logger = logging.getLogger(__name__)
def matrix_endpoint(reactor, destination, ssl_context_factory=None,
def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None,
timeout=None):
"""Construct an endpoint for the given matrix destination.

View File

@ -0,0 +1,308 @@
# -*- coding: utf-8 -*-
# Copyright 2014 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer, reactor
from twisted.internet.error import DNSLookupError
from twisted.web.client import readBody, _AgentBase, _URI
from twisted.web.http_headers import Headers
from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util.async import sleep
from synapse.util.logcontext import PreserveLoggingContext
from syutil.jsonutil import encode_canonical_json
from synapse.api.errors import CodeMessageException, SynapseError
from syutil.crypto.jsonsign import sign_json
import json
import logging
import urllib
import urlparse
logger = logging.getLogger(__name__)
class MatrixFederationHttpAgent(_AgentBase):
def __init__(self, reactor, pool=None):
_AgentBase.__init__(self, reactor, pool)
def request(self, destination, endpoint, method, path, params, query,
headers, body_producer):
host = b""
port = 0
fragment = b""
parsed_URI = _URI(b"http", destination, host, port, path, params,
query, fragment)
# Set the connection pool key to be the destination.
key = destination
return self._requestWithEndpoint(key, endpoint, method, parsed_URI,
headers, body_producer,
parsed_URI.originForm)
class MatrixFederationHttpClient(object):
"""HTTP client used to talk to other homeservers over the federation protocol.
Send client certificates and signs requests.
Attributes:
agent (twisted.web.client.Agent): The twisted Agent used to send the
requests.
"""
def __init__(self, hs):
self.hs = hs
self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname
self.agent = MatrixFederationHttpAgent(reactor)
@defer.inlineCallbacks
def _create_request(self, destination, method, path_bytes,
body_callback, headers_dict={}, param_bytes=b"",
query_bytes=b"", retry_on_dns_fail=True):
""" Creates and sends a request to the given url
"""
headers_dict[b"User-Agent"] = [b"Synapse"]
headers_dict[b"Host"] = [destination]
url_bytes = urlparse.urlunparse(
("", "", path_bytes, param_bytes, query_bytes, "",)
)
logger.debug("Sending request to %s: %s %s",
destination, method, url_bytes)
logger.debug(
"Types: %s",
[
type(destination), type(method), type(path_bytes),
type(param_bytes),
type(query_bytes)
]
)
retries_left = 5
endpoint = self._getEndpoint(reactor, destination)
while True:
producer = None
if body_callback:
producer = body_callback(method, url_bytes, headers_dict)
try:
with PreserveLoggingContext():
response = yield self.agent.request(
destination,
endpoint,
method,
path_bytes,
param_bytes,
query_bytes,
Headers(headers_dict),
producer
)
logger.debug("Got response to %s", method)
break
except Exception as e:
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
logger.warn("DNS Lookup failed to %s with %s", destination,
e)
raise SynapseError(400, "Domain specified not found.")
logger.exception("Got error in _create_request")
_print_ex(e)
if retries_left:
yield sleep(2 ** (5 - retries_left))
retries_left -= 1
else:
raise
if 200 <= response.code < 300:
# We need to update the transactions table to say it was sent?
pass
else:
# :'(
# Update transactions table?
logger.error(
"Got response %d %s", response.code, response.phrase
)
raise CodeMessageException(
response.code, response.phrase
)
defer.returnValue(response)
def sign_request(self, destination, method, url_bytes, headers_dict,
content=None):
request = {
"method": method,
"uri": url_bytes,
"origin": self.server_name,
"destination": destination,
}
if content is not None:
request["content"] = content
request = sign_json(request, self.server_name, self.signing_key)
auth_headers = []
for key, sig in request["signatures"][self.server_name].items():
auth_headers.append(bytes(
"X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
self.server_name, key, sig,
)
))
headers_dict[b"Authorization"] = auth_headers
@defer.inlineCallbacks
def put_json(self, destination, path, data={}, json_data_callback=None):
""" Sends the specifed json data using PUT
Args:
destination (str): The remote server to send the HTTP request
to.
path (str): The HTTP path.
data (dict): A dict containing the data that will be used as
the request body. This will be encoded as JSON.
json_data_callback (callable): A callable returning the dict to
use as the request body.
Returns:
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body. On a 4xx or 5xx error response a
CodeMessageException is raised.
"""
if not json_data_callback:
def json_data_callback():
return data
def body_callback(method, url_bytes, headers_dict):
json_data = json_data_callback()
self.sign_request(
destination, method, url_bytes, headers_dict, json_data
)
producer = _JsonProducer(json_data)
return producer
response = yield self._create_request(
destination.encode("ascii"),
"PUT",
path.encode("ascii"),
body_callback=body_callback,
headers_dict={"Content-Type": ["application/json"]},
)
logger.debug("Getting resp body")
body = yield readBody(response)
logger.debug("Got resp body")
defer.returnValue((response.code, body))
@defer.inlineCallbacks
def get_json(self, destination, path, args={}, retry_on_dns_fail=True):
""" Get's some json from the given host homeserver and path
Args:
destination (str): The remote server to send the HTTP request
to.
path (str): The HTTP path.
args (dict): A dictionary used to create query strings, defaults to
None.
**Note**: The value of each key is assumed to be an iterable
and *not* a string.
Returns:
Deferred: Succeeds when we get *any* HTTP response.
The result of the deferred is a tuple of `(code, response)`,
where `response` is a dict representing the decoded JSON body.
"""
logger.debug("get_json args: %s", args)
encoded_args = {}
for k, vs in args.items():
if isinstance(vs, basestring):
vs = [vs]
encoded_args[k] = [v.encode("UTF-8") for v in vs]
query_bytes = urllib.urlencode(encoded_args, True)
logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
def body_callback(method, url_bytes, headers_dict):
self.sign_request(destination, method, url_bytes, headers_dict)
return None
response = yield self._create_request(
destination.encode("ascii"),
"GET",
path.encode("ascii"),
query_bytes=query_bytes,
body_callback=body_callback,
retry_on_dns_fail=retry_on_dns_fail
)
body = yield readBody(response)
defer.returnValue(json.loads(body))
def _getEndpoint(self, reactor, destination):
return matrix_federation_endpoint(
reactor, destination, timeout=10,
ssl_context_factory=self.hs.tls_context_factory
)
def _print_ex(e):
if hasattr(e, "reasons") and e.reasons:
for ex in e.reasons:
_print_ex(ex)
else:
logger.exception(e)
class _JsonProducer(object):
""" Used by the twisted http client to create the HTTP body from json
"""
def __init__(self, jsn):
self.reset(jsn)
def reset(self, jsn):
self.body = encode_canonical_json(jsn)
self.length = len(self.body)
def startProducing(self, consumer):
consumer.write(self.body)
return defer.succeed(None)
def pauseProducing(self):
pass
def stopProducing(self):
pass

View File

@ -222,6 +222,7 @@ class RegisterRestServlet(RestServlet):
threepidCreds = register_json['threepidCreds']
handler = self.handlers.registration_handler
logger.debug("Registering email. threepidcreds: %s" % (threepidCreds))
yield handler.register_email(threepidCreds)
session["threepidCreds"] = threepidCreds # store creds for next stage
session[LoginType.EMAIL_IDENTITY] = True # mark email as done
@ -232,6 +233,7 @@ class RegisterRestServlet(RestServlet):
@defer.inlineCallbacks
def _do_password(self, request, register_json, session):
yield
if (self.hs.config.enable_registration_captcha and
not session[LoginType.RECAPTCHA]):
# captcha should've been done by this stage!
@ -259,6 +261,7 @@ class RegisterRestServlet(RestServlet):
)
if session[LoginType.EMAIL_IDENTITY]:
logger.debug("Binding emails %s to %s" % (session["threepidCreds"], user_id))
yield handler.bind_emails(user_id, session["threepidCreds"])
result = {