Merge pull request #20 from matrix-org/http_client_refactor

Http client refactor
This commit is contained in:
Mark Haines 2014-11-20 17:54:40 +00:00
commit 8f8c484bc6
8 changed files with 374 additions and 356 deletions

View File

@ -26,7 +26,7 @@ from twisted.web.server import Site
from synapse.http.server import JsonResource, RootRedirect from synapse.http.server import JsonResource, RootRedirect
from synapse.http.content_repository import ContentRepoResource from synapse.http.content_repository import ContentRepoResource
from synapse.http.server_key_resource import LocalKey from synapse.http.server_key_resource import LocalKey
from synapse.http.client import MatrixHttpClient from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.api.urls import ( from synapse.api.urls import (
CLIENT_PREFIX, FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX, CLIENT_PREFIX, FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX,
SERVER_KEY_PREFIX, SERVER_KEY_PREFIX,
@ -51,7 +51,7 @@ logger = logging.getLogger(__name__)
class SynapseHomeServer(HomeServer): class SynapseHomeServer(HomeServer):
def build_http_client(self): def build_http_client(self):
return MatrixHttpClient(self) return MatrixFederationHttpClient(self)
def build_resource_for_client(self): def build_resource_for_client(self):
return JsonResource() return JsonResource()

View File

@ -17,7 +17,7 @@
from twisted.web.http import HTTPClient from twisted.web.http import HTTPClient
from twisted.internet.protocol import Factory from twisted.internet.protocol import Factory
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
from synapse.http.endpoint import matrix_endpoint from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logcontext import PreserveLoggingContext
import json import json
import logging import logging
@ -31,7 +31,7 @@ def fetch_server_key(server_name, ssl_context_factory):
"""Fetch the keys for a remote server.""" """Fetch the keys for a remote server."""
factory = SynapseKeyClientFactory() factory = SynapseKeyClientFactory()
endpoint = matrix_endpoint( endpoint = matrix_federation_endpoint(
reactor, server_name, ssl_context_factory, timeout=30 reactor, server_name, ssl_context_factory, timeout=30
) )
@ -48,7 +48,7 @@ def fetch_server_key(server_name, ssl_context_factory):
class SynapseKeyClientError(Exception): class SynapseKeyClientError(Exception):
"""The key wasn't retireved from the remote server.""" """The key wasn't retrieved from the remote server."""
pass pass

View File

@ -17,7 +17,7 @@ from twisted.internet import defer
from ._base import BaseHandler from ._base import BaseHandler
from synapse.api.errors import LoginError, Codes from synapse.api.errors import LoginError, Codes
from synapse.http.client import IdentityServerHttpClient from synapse.http.client import SimpleHttpClient
from synapse.util.emailutils import EmailException from synapse.util.emailutils import EmailException
import synapse.util.emailutils as emailutils import synapse.util.emailutils as emailutils
@ -97,10 +97,14 @@ class LoginHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def _query_email(self, email): def _query_email(self, email):
httpCli = IdentityServerHttpClient(self.hs) httpCli = SimpleHttpClient(self.hs)
data = yield httpCli.get_json( data = yield httpCli.get_json(
'matrix.org:8090', # TODO FIXME This should be configurable. # TODO FIXME This should be configurable.
"/_matrix/identity/api/v1/lookup?medium=email&address=" + # XXX: ID servers need to use HTTPS
"%s" % urllib.quote(email) "http://%s%s" % ("matrix.org:8090", "/_matrix/identity/api/v1/lookup"),
{
'medium': 'email',
'address': email
}
) )
defer.returnValue(data) defer.returnValue(data)

View File

@ -22,7 +22,7 @@ from synapse.api.errors import (
) )
from ._base import BaseHandler from ._base import BaseHandler
import synapse.util.stringutils as stringutils import synapse.util.stringutils as stringutils
from synapse.http.client import IdentityServerHttpClient from synapse.http.client import SimpleHttpClient
from synapse.http.client import CaptchaServerHttpClient from synapse.http.client import CaptchaServerHttpClient
import base64 import base64
@ -133,7 +133,7 @@ class RegistrationHandler(BaseHandler):
if not threepid: if not threepid:
raise RegistrationError(400, "Couldn't validate 3pid") raise RegistrationError(400, "Couldn't validate 3pid")
logger.info("got threepid medium %s address %s", logger.info("got threepid with medium '%s' and address '%s'",
threepid['medium'], threepid['address']) threepid['medium'], threepid['address'])
@defer.inlineCallbacks @defer.inlineCallbacks
@ -159,7 +159,7 @@ class RegistrationHandler(BaseHandler):
def _threepid_from_creds(self, creds): def _threepid_from_creds(self, creds):
# TODO: get this from the homeserver rather than creating a new one for # TODO: get this from the homeserver rather than creating a new one for
# each request # each request
httpCli = IdentityServerHttpClient(self.hs) httpCli = SimpleHttpClient(self.hs)
# XXX: make this configurable! # XXX: make this configurable!
trustedIdServers = ['matrix.org:8090'] trustedIdServers = ['matrix.org:8090']
if not creds['idServer'] in trustedIdServers: if not creds['idServer'] in trustedIdServers:
@ -167,8 +167,8 @@ class RegistrationHandler(BaseHandler):
'credentials', creds['idServer']) 'credentials', creds['idServer'])
defer.returnValue(None) defer.returnValue(None)
data = yield httpCli.get_json( data = yield httpCli.get_json(
creds['idServer'], # XXX: This should be HTTPS
"/_matrix/identity/api/v1/3pid/getValidated3pid", "http://%s%s" % (creds['idServer'], "/_matrix/identity/api/v1/3pid/getValidated3pid"),
{'sid': creds['sid'], 'clientSecret': creds['clientSecret']} {'sid': creds['sid'], 'clientSecret': creds['clientSecret']}
) )
@ -178,16 +178,19 @@ class RegistrationHandler(BaseHandler):
@defer.inlineCallbacks @defer.inlineCallbacks
def _bind_threepid(self, creds, mxid): def _bind_threepid(self, creds, mxid):
httpCli = IdentityServerHttpClient(self.hs) yield
logger.debug("binding threepid")
httpCli = SimpleHttpClient(self.hs)
data = yield httpCli.post_urlencoded_get_json( data = yield httpCli.post_urlencoded_get_json(
creds['idServer'], # XXX: Change when ID servers are all HTTPS
"/_matrix/identity/api/v1/3pid/bind", "http://%s%s" % (creds['idServer'], "/_matrix/identity/api/v1/3pid/bind"),
{ {
'sid': creds['sid'], 'sid': creds['sid'],
'clientSecret': creds['clientSecret'], 'clientSecret': creds['clientSecret'],
'mxid': mxid, 'mxid': mxid,
} }
) )
logger.debug("bound threepid")
defer.returnValue(data) defer.returnValue(data)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -215,10 +218,7 @@ class RegistrationHandler(BaseHandler):
# each request # each request
client = CaptchaServerHttpClient(self.hs) client = CaptchaServerHttpClient(self.hs)
data = yield client.post_urlencoded_get_raw( data = yield client.post_urlencoded_get_raw(
"www.google.com:80", "http://www.google.com:80/recaptcha/api/verify",
"/recaptcha/api/verify",
# twisted dislikes google's response, no content length.
accept_partial=True,
args={ args={
'privatekey': private_key, 'privatekey': private_key,
'remoteip': ip_addr, 'remoteip': ip_addr,

View File

@ -15,308 +15,44 @@
from twisted.internet import defer, reactor from twisted.internet import defer, reactor
from twisted.internet.error import DNSLookupError
from twisted.web.client import ( from twisted.web.client import (
_AgentBase, _URI, readBody, FileBodyProducer, PartialDownloadError Agent, readBody, FileBodyProducer, PartialDownloadError
) )
from twisted.web.http_headers import Headers from twisted.web.http_headers import Headers
from synapse.http.endpoint import matrix_endpoint
from synapse.util.async import sleep
from synapse.util.logcontext import PreserveLoggingContext
from syutil.jsonutil import encode_canonical_json
from synapse.api.errors import CodeMessageException, SynapseError
from syutil.crypto.jsonsign import sign_json
from StringIO import StringIO from StringIO import StringIO
import json import json
import logging import logging
import urllib import urllib
import urlparse
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class MatrixHttpAgent(_AgentBase): class SimpleHttpClient(object):
"""
def __init__(self, reactor, pool=None): A simple, no-frills HTTP client with methods that wrap up common ways of using HTTP in Matrix
_AgentBase.__init__(self, reactor, pool)
def request(self, destination, endpoint, method, path, params, query,
headers, body_producer):
host = b""
port = 0
fragment = b""
parsed_URI = _URI(b"http", destination, host, port, path, params,
query, fragment)
# Set the connection pool key to be the destination.
key = destination
return self._requestWithEndpoint(key, endpoint, method, parsed_URI,
headers, body_producer,
parsed_URI.originForm)
class BaseHttpClient(object):
"""Base class for HTTP clients using twisted.
""" """
def __init__(self, hs): def __init__(self, hs):
self.agent = MatrixHttpAgent(reactor)
self.hs = hs self.hs = hs
# The default context factory in Twisted 14.0.0 (which we require) is
# BrowserLikePolicyForHTTPS which will do regular cert validation 'like a browser'
self.agent = Agent(reactor)
@defer.inlineCallbacks @defer.inlineCallbacks
def _create_request(self, destination, method, path_bytes, def post_urlencoded_get_json(self, uri, args={}):
body_callback, headers_dict={}, param_bytes=b"",
query_bytes=b"", retry_on_dns_fail=True):
""" Creates and sends a request to the given url
"""
headers_dict[b"User-Agent"] = [b"Synapse"]
headers_dict[b"Host"] = [destination]
url_bytes = urlparse.urlunparse(
("", "", path_bytes, param_bytes, query_bytes, "",)
)
logger.debug("Sending request to %s: %s %s",
destination, method, url_bytes)
logger.debug(
"Types: %s",
[
type(destination), type(method), type(path_bytes),
type(param_bytes),
type(query_bytes)
]
)
retries_left = 5
endpoint = self._getEndpoint(reactor, destination)
while True:
producer = None
if body_callback:
producer = body_callback(method, url_bytes, headers_dict)
try:
with PreserveLoggingContext():
response = yield self.agent.request(
destination,
endpoint,
method,
path_bytes,
param_bytes,
query_bytes,
Headers(headers_dict),
producer
)
logger.debug("Got response to %s", method)
break
except Exception as e:
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
logger.warn("DNS Lookup failed to %s with %s", destination,
e)
raise SynapseError(400, "Domain specified not found.")
logger.exception("Got error in _create_request")
_print_ex(e)
if retries_left:
yield sleep(2 ** (5 - retries_left))
retries_left -= 1
else:
raise
if 200 <= response.code < 300:
# We need to update the transactions table to say it was sent?
pass
else:
# :'(
# Update transactions table?
logger.error(
"Got response %d %s", response.code, response.phrase
)
raise CodeMessageException(
response.code, response.phrase
)
defer.returnValue(response)
class MatrixHttpClient(BaseHttpClient):
""" Wrapper around the twisted HTTP client api. Implements
Attributes:
agent (twisted.web.client.Agent): The twisted Agent used to send the
requests.
"""
RETRY_DNS_LOOKUP_FAILURES = "__retry_dns"
def __init__(self, hs):
self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname
BaseHttpClient.__init__(self, hs)
def sign_request(self, destination, method, url_bytes, headers_dict,
content=None):
request = {
"method": method,
"uri": url_bytes,
"origin": self.server_name,
"destination": destination,
}
if content is not None:
request["content"] = content
request = sign_json(request, self.server_name, self.signing_key)
auth_headers = []
for key, sig in request["signatures"][self.server_name].items():
auth_headers.append(bytes(
"X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
self.server_name, key, sig,
)
))
headers_dict[b"Authorization"] = auth_headers
@defer.inlineCallbacks
def put_json(self, destination, path, data={}, json_data_callback=None):
""" Sends the specifed json data using PUT
Args:
destination (str): The remote server to send the HTTP request
to.
path (str): The HTTP path.
data (dict): A dict containing the data that will be used as
the request body. This will be encoded as JSON.
json_data_callback (callable): A callable returning the dict to
use as the request body.
Returns:
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body. On a 4xx or 5xx error response a
CodeMessageException is raised.
"""
if not json_data_callback:
def json_data_callback():
return data
def body_callback(method, url_bytes, headers_dict):
json_data = json_data_callback()
self.sign_request(
destination, method, url_bytes, headers_dict, json_data
)
producer = _JsonProducer(json_data)
return producer
response = yield self._create_request(
destination.encode("ascii"),
"PUT",
path.encode("ascii"),
body_callback=body_callback,
headers_dict={"Content-Type": ["application/json"]},
)
logger.debug("Getting resp body")
body = yield readBody(response)
logger.debug("Got resp body")
defer.returnValue((response.code, body))
@defer.inlineCallbacks
def get_json(self, destination, path, args={}, retry_on_dns_fail=True):
""" Get's some json from the given host homeserver and path
Args:
destination (str): The remote server to send the HTTP request
to.
path (str): The HTTP path.
args (dict): A dictionary used to create query strings, defaults to
None.
**Note**: The value of each key is assumed to be an iterable
and *not* a string.
Returns:
Deferred: Succeeds when we get *any* HTTP response.
The result of the deferred is a tuple of `(code, response)`,
where `response` is a dict representing the decoded JSON body.
"""
logger.debug("get_json args: %s", args)
encoded_args = {}
for k, vs in args.items():
if isinstance(vs, basestring):
vs = [vs]
encoded_args[k] = [v.encode("UTF-8") for v in vs]
query_bytes = urllib.urlencode(encoded_args, True)
logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
def body_callback(method, url_bytes, headers_dict):
self.sign_request(destination, method, url_bytes, headers_dict)
return None
response = yield self._create_request(
destination.encode("ascii"),
"GET",
path.encode("ascii"),
query_bytes=query_bytes,
body_callback=body_callback,
retry_on_dns_fail=retry_on_dns_fail
)
body = yield readBody(response)
defer.returnValue(json.loads(body))
def _getEndpoint(self, reactor, destination):
return matrix_endpoint(
reactor, destination, timeout=10,
ssl_context_factory=self.hs.tls_context_factory
)
class IdentityServerHttpClient(BaseHttpClient):
"""Separate HTTP client for talking to the Identity servers since they
don't use SRV records and talk x-www-form-urlencoded rather than JSON.
"""
def _getEndpoint(self, reactor, destination):
#TODO: This should be talking TLS
return matrix_endpoint(reactor, destination, timeout=10)
@defer.inlineCallbacks
def post_urlencoded_get_json(self, destination, path, args={}):
logger.debug("post_urlencoded_get_json args: %s", args) logger.debug("post_urlencoded_get_json args: %s", args)
query_bytes = urllib.urlencode(args, True) query_bytes = urllib.urlencode(args, True)
def body_callback(method, url_bytes, headers_dict): response = yield self.agent.request(
return FileBodyProducer(StringIO(query_bytes))
response = yield self._create_request(
destination.encode("ascii"),
"POST", "POST",
path.encode("ascii"), uri.encode("ascii"),
body_callback=body_callback, headers=Headers({
headers_dict={
"Content-Type": ["application/x-www-form-urlencoded"] "Content-Type": ["application/x-www-form-urlencoded"]
} }),
bodyProducer=FileBodyProducer(StringIO(query_bytes))
) )
body = yield readBody(response) body = yield readBody(response)
@ -324,13 +60,11 @@ class IdentityServerHttpClient(BaseHttpClient):
defer.returnValue(json.loads(body)) defer.returnValue(json.loads(body))
@defer.inlineCallbacks @defer.inlineCallbacks
def get_json(self, destination, path, args={}, retry_on_dns_fail=True): def get_json(self, uri, args={}):
""" Get's some json from the given host homeserver and path """ Get's some json from the given host and path
Args: Args:
destination (str): The remote server to send the HTTP request uri (str): The URI to request, not including query parameters
to.
path (str): The HTTP path.
args (dict): A dictionary used to create query strings, defaults to args (dict): A dictionary used to create query strings, defaults to
None. None.
**Note**: The value of each key is assumed to be an iterable **Note**: The value of each key is assumed to be an iterable
@ -342,18 +76,15 @@ class IdentityServerHttpClient(BaseHttpClient):
The result of the deferred is a tuple of `(code, response)`, The result of the deferred is a tuple of `(code, response)`,
where `response` is a dict representing the decoded JSON body. where `response` is a dict representing the decoded JSON body.
""" """
logger.debug("get_json args: %s", args)
query_bytes = urllib.urlencode(args, True) yield
logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail) if len(args):
query_bytes = urllib.urlencode(args, True)
uri = "%s?%s" % (uri, query_bytes)
response = yield self._create_request( response = yield self.agent.request(
destination.encode("ascii"),
"GET", "GET",
path.encode("ascii"), uri.encode("ascii"),
query_bytes=query_bytes,
retry_on_dns_fail=retry_on_dns_fail,
body_callback=None
) )
body = yield readBody(response) body = yield readBody(response)
@ -361,38 +92,31 @@ class IdentityServerHttpClient(BaseHttpClient):
defer.returnValue(json.loads(body)) defer.returnValue(json.loads(body))
class CaptchaServerHttpClient(MatrixHttpClient): class CaptchaServerHttpClient(SimpleHttpClient):
"""Separate HTTP client for talking to google's captcha servers""" """
Separate HTTP client for talking to google's captcha servers
def _getEndpoint(self, reactor, destination): Only slightly special because accepts partial download responses
return matrix_endpoint(reactor, destination, timeout=10) """
@defer.inlineCallbacks @defer.inlineCallbacks
def post_urlencoded_get_raw(self, destination, path, accept_partial=False, def post_urlencoded_get_raw(self, url, args={}):
args={}):
query_bytes = urllib.urlencode(args, True) query_bytes = urllib.urlencode(args, True)
def body_callback(method, url_bytes, headers_dict): response = yield self.agent.request(
return FileBodyProducer(StringIO(query_bytes))
response = yield self._create_request(
destination.encode("ascii"),
"POST", "POST",
path.encode("ascii"), url.encode("ascii"),
body_callback=body_callback, bodyProducer=FileBodyProducer(StringIO(query_bytes)),
headers_dict={ headers=Headers({
"Content-Type": ["application/x-www-form-urlencoded"] "Content-Type": ["application/x-www-form-urlencoded"]
} })
) )
try: try:
body = yield readBody(response) body = yield readBody(response)
defer.returnValue(body) defer.returnValue(body)
except PartialDownloadError as e: except PartialDownloadError as e:
if accept_partial: # twisted dislikes google's response, no content length.
defer.returnValue(e.response) defer.returnValue(e.response)
else:
raise e
def _print_ex(e): def _print_ex(e):
@ -401,24 +125,3 @@ def _print_ex(e):
_print_ex(ex) _print_ex(ex)
else: else:
logger.exception(e) logger.exception(e)
class _JsonProducer(object):
""" Used by the twisted http client to create the HTTP body from json
"""
def __init__(self, jsn):
self.reset(jsn)
def reset(self, jsn):
self.body = encode_canonical_json(jsn)
self.length = len(self.body)
def startProducing(self, consumer):
consumer.write(self.body)
return defer.succeed(None)
def pauseProducing(self):
pass
def stopProducing(self):
pass

View File

@ -27,7 +27,7 @@ import random
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def matrix_endpoint(reactor, destination, ssl_context_factory=None, def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None,
timeout=None): timeout=None):
"""Construct an endpoint for the given matrix destination. """Construct an endpoint for the given matrix destination.

View File

@ -0,0 +1,308 @@
# -*- coding: utf-8 -*-
# Copyright 2014 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet import defer, reactor
from twisted.internet.error import DNSLookupError
from twisted.web.client import readBody, _AgentBase, _URI
from twisted.web.http_headers import Headers
from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util.async import sleep
from synapse.util.logcontext import PreserveLoggingContext
from syutil.jsonutil import encode_canonical_json
from synapse.api.errors import CodeMessageException, SynapseError
from syutil.crypto.jsonsign import sign_json
import json
import logging
import urllib
import urlparse
logger = logging.getLogger(__name__)
class MatrixFederationHttpAgent(_AgentBase):
def __init__(self, reactor, pool=None):
_AgentBase.__init__(self, reactor, pool)
def request(self, destination, endpoint, method, path, params, query,
headers, body_producer):
host = b""
port = 0
fragment = b""
parsed_URI = _URI(b"http", destination, host, port, path, params,
query, fragment)
# Set the connection pool key to be the destination.
key = destination
return self._requestWithEndpoint(key, endpoint, method, parsed_URI,
headers, body_producer,
parsed_URI.originForm)
class MatrixFederationHttpClient(object):
"""HTTP client used to talk to other homeservers over the federation protocol.
Send client certificates and signs requests.
Attributes:
agent (twisted.web.client.Agent): The twisted Agent used to send the
requests.
"""
def __init__(self, hs):
self.hs = hs
self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname
self.agent = MatrixFederationHttpAgent(reactor)
@defer.inlineCallbacks
def _create_request(self, destination, method, path_bytes,
body_callback, headers_dict={}, param_bytes=b"",
query_bytes=b"", retry_on_dns_fail=True):
""" Creates and sends a request to the given url
"""
headers_dict[b"User-Agent"] = [b"Synapse"]
headers_dict[b"Host"] = [destination]
url_bytes = urlparse.urlunparse(
("", "", path_bytes, param_bytes, query_bytes, "",)
)
logger.debug("Sending request to %s: %s %s",
destination, method, url_bytes)
logger.debug(
"Types: %s",
[
type(destination), type(method), type(path_bytes),
type(param_bytes),
type(query_bytes)
]
)
retries_left = 5
endpoint = self._getEndpoint(reactor, destination)
while True:
producer = None
if body_callback:
producer = body_callback(method, url_bytes, headers_dict)
try:
with PreserveLoggingContext():
response = yield self.agent.request(
destination,
endpoint,
method,
path_bytes,
param_bytes,
query_bytes,
Headers(headers_dict),
producer
)
logger.debug("Got response to %s", method)
break
except Exception as e:
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
logger.warn("DNS Lookup failed to %s with %s", destination,
e)
raise SynapseError(400, "Domain specified not found.")
logger.exception("Got error in _create_request")
_print_ex(e)
if retries_left:
yield sleep(2 ** (5 - retries_left))
retries_left -= 1
else:
raise
if 200 <= response.code < 300:
# We need to update the transactions table to say it was sent?
pass
else:
# :'(
# Update transactions table?
logger.error(
"Got response %d %s", response.code, response.phrase
)
raise CodeMessageException(
response.code, response.phrase
)
defer.returnValue(response)
def sign_request(self, destination, method, url_bytes, headers_dict,
content=None):
request = {
"method": method,
"uri": url_bytes,
"origin": self.server_name,
"destination": destination,
}
if content is not None:
request["content"] = content
request = sign_json(request, self.server_name, self.signing_key)
auth_headers = []
for key, sig in request["signatures"][self.server_name].items():
auth_headers.append(bytes(
"X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
self.server_name, key, sig,
)
))
headers_dict[b"Authorization"] = auth_headers
@defer.inlineCallbacks
def put_json(self, destination, path, data={}, json_data_callback=None):
""" Sends the specifed json data using PUT
Args:
destination (str): The remote server to send the HTTP request
to.
path (str): The HTTP path.
data (dict): A dict containing the data that will be used as
the request body. This will be encoded as JSON.
json_data_callback (callable): A callable returning the dict to
use as the request body.
Returns:
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body. On a 4xx or 5xx error response a
CodeMessageException is raised.
"""
if not json_data_callback:
def json_data_callback():
return data
def body_callback(method, url_bytes, headers_dict):
json_data = json_data_callback()
self.sign_request(
destination, method, url_bytes, headers_dict, json_data
)
producer = _JsonProducer(json_data)
return producer
response = yield self._create_request(
destination.encode("ascii"),
"PUT",
path.encode("ascii"),
body_callback=body_callback,
headers_dict={"Content-Type": ["application/json"]},
)
logger.debug("Getting resp body")
body = yield readBody(response)
logger.debug("Got resp body")
defer.returnValue((response.code, body))
@defer.inlineCallbacks
def get_json(self, destination, path, args={}, retry_on_dns_fail=True):
""" Get's some json from the given host homeserver and path
Args:
destination (str): The remote server to send the HTTP request
to.
path (str): The HTTP path.
args (dict): A dictionary used to create query strings, defaults to
None.
**Note**: The value of each key is assumed to be an iterable
and *not* a string.
Returns:
Deferred: Succeeds when we get *any* HTTP response.
The result of the deferred is a tuple of `(code, response)`,
where `response` is a dict representing the decoded JSON body.
"""
logger.debug("get_json args: %s", args)
encoded_args = {}
for k, vs in args.items():
if isinstance(vs, basestring):
vs = [vs]
encoded_args[k] = [v.encode("UTF-8") for v in vs]
query_bytes = urllib.urlencode(encoded_args, True)
logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
def body_callback(method, url_bytes, headers_dict):
self.sign_request(destination, method, url_bytes, headers_dict)
return None
response = yield self._create_request(
destination.encode("ascii"),
"GET",
path.encode("ascii"),
query_bytes=query_bytes,
body_callback=body_callback,
retry_on_dns_fail=retry_on_dns_fail
)
body = yield readBody(response)
defer.returnValue(json.loads(body))
def _getEndpoint(self, reactor, destination):
return matrix_federation_endpoint(
reactor, destination, timeout=10,
ssl_context_factory=self.hs.tls_context_factory
)
def _print_ex(e):
if hasattr(e, "reasons") and e.reasons:
for ex in e.reasons:
_print_ex(ex)
else:
logger.exception(e)
class _JsonProducer(object):
""" Used by the twisted http client to create the HTTP body from json
"""
def __init__(self, jsn):
self.reset(jsn)
def reset(self, jsn):
self.body = encode_canonical_json(jsn)
self.length = len(self.body)
def startProducing(self, consumer):
consumer.write(self.body)
return defer.succeed(None)
def pauseProducing(self):
pass
def stopProducing(self):
pass

View File

@ -222,6 +222,7 @@ class RegisterRestServlet(RestServlet):
threepidCreds = register_json['threepidCreds'] threepidCreds = register_json['threepidCreds']
handler = self.handlers.registration_handler handler = self.handlers.registration_handler
logger.debug("Registering email. threepidcreds: %s" % (threepidCreds))
yield handler.register_email(threepidCreds) yield handler.register_email(threepidCreds)
session["threepidCreds"] = threepidCreds # store creds for next stage session["threepidCreds"] = threepidCreds # store creds for next stage
session[LoginType.EMAIL_IDENTITY] = True # mark email as done session[LoginType.EMAIL_IDENTITY] = True # mark email as done
@ -232,6 +233,7 @@ class RegisterRestServlet(RestServlet):
@defer.inlineCallbacks @defer.inlineCallbacks
def _do_password(self, request, register_json, session): def _do_password(self, request, register_json, session):
yield
if (self.hs.config.enable_registration_captcha and if (self.hs.config.enable_registration_captcha and
not session[LoginType.RECAPTCHA]): not session[LoginType.RECAPTCHA]):
# captcha should've been done by this stage! # captcha should've been done by this stage!
@ -259,6 +261,7 @@ class RegisterRestServlet(RestServlet):
) )
if session[LoginType.EMAIL_IDENTITY]: if session[LoginType.EMAIL_IDENTITY]:
logger.debug("Binding emails %s to %s" % (session["threepidCreds"], user_id))
yield handler.bind_emails(user_id, session["threepidCreds"]) yield handler.bind_emails(user_id, session["threepidCreds"])
result = { result = {