Make MatrixFederationClient use MatrixFederationAgent

... instead of the matrix_federation_endpoint
This commit is contained in:
Richard van der Hoff 2019-01-21 23:29:47 +00:00
parent 44be7513bf
commit 7871146667
2 changed files with 106 additions and 27 deletions

View File

@ -32,7 +32,7 @@ from twisted.internet import defer, protocol
from twisted.internet.error import DNSLookupError from twisted.internet.error import DNSLookupError
from twisted.internet.task import _EPSILON, Cooperator from twisted.internet.task import _EPSILON, Cooperator
from twisted.web._newclient import ResponseDone from twisted.web._newclient import ResponseDone
from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool from twisted.web.client import FileBodyProducer
from twisted.web.http_headers import Headers from twisted.web.http_headers import Headers
import synapse.metrics import synapse.metrics
@ -44,7 +44,7 @@ from synapse.api.errors import (
RequestSendFailed, RequestSendFailed,
SynapseError, SynapseError,
) )
from synapse.http.endpoint import matrix_federation_endpoint from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
from synapse.util.async_helpers import timeout_deferred from synapse.util.async_helpers import timeout_deferred
from synapse.util.logcontext import make_deferred_yieldable from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
@ -66,20 +66,6 @@ else:
MAXINT = sys.maxint MAXINT = sys.maxint
class MatrixFederationEndpointFactory(object):
def __init__(self, hs):
self.reactor = hs.get_reactor()
self.tls_client_options_factory = hs.tls_client_options_factory
def endpointForURI(self, uri):
destination = uri.netloc.decode('ascii')
return matrix_federation_endpoint(
self.reactor, destination, timeout=10,
tls_client_options_factory=self.tls_client_options_factory
)
_next_id = 1 _next_id = 1
@ -187,12 +173,10 @@ class MatrixFederationHttpClient(object):
self.signing_key = hs.config.signing_key[0] self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname self.server_name = hs.hostname
reactor = hs.get_reactor() reactor = hs.get_reactor()
pool = HTTPConnectionPool(reactor)
pool.retryAutomatically = False self.agent = MatrixFederationAgent(
pool.maxPersistentPerHost = 5 hs.get_reactor(),
pool.cachedConnectionTimeout = 2 * 60 hs.tls_client_options_factory,
self.agent = Agent.usingEndpointFactory(
reactor, MatrixFederationEndpointFactory(hs), pool=pool
) )
self.clock = hs.get_clock() self.clock = hs.get_clock()
self._store = hs.get_datastore() self._store = hs.get_datastore()
@ -316,9 +300,9 @@ class MatrixFederationHttpClient(object):
headers_dict[b"Authorization"] = auth_headers headers_dict[b"Authorization"] = auth_headers
logger.info( logger.info(
"{%s} [%s] Sending request: %s %s", "{%s} [%s] Sending request: %s %s; timeout %fs",
request.txn_id, request.destination, request.method, request.txn_id, request.destination, request.method,
url_str, url_str, _sec_timeout,
) )
try: try:
@ -338,12 +322,11 @@ class MatrixFederationHttpClient(object):
reactor=self.hs.get_reactor(), reactor=self.hs.get_reactor(),
) )
response = yield make_deferred_yieldable( response = yield request_deferred
request_deferred,
)
except DNSLookupError as e: except DNSLookupError as e:
raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e) raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e)
except Exception as e: except Exception as e:
logger.info("Failed to send request: %s", e)
raise_from(RequestSendFailed(e, can_retry=True), e) raise_from(RequestSendFailed(e, can_retry=True), e)
logger.info( logger.info(

View File

@ -15,6 +15,7 @@
from mock import Mock from mock import Mock
from twisted.internet import defer
from twisted.internet.defer import TimeoutError from twisted.internet.defer import TimeoutError
from twisted.internet.error import ConnectingCancelledError, DNSLookupError from twisted.internet.error import ConnectingCancelledError, DNSLookupError
from twisted.test.proto_helpers import StringTransport from twisted.test.proto_helpers import StringTransport
@ -26,11 +27,20 @@ from synapse.http.matrixfederationclient import (
MatrixFederationHttpClient, MatrixFederationHttpClient,
MatrixFederationRequest, MatrixFederationRequest,
) )
from synapse.util.logcontext import LoggingContext
from tests.server import FakeTransport from tests.server import FakeTransport
from tests.unittest import HomeserverTestCase from tests.unittest import HomeserverTestCase
def check_logcontext(context):
current = LoggingContext.current_context()
if current is not context:
raise AssertionError(
"Expected logcontext %s but was %s" % (context, current),
)
class FederationClientTests(HomeserverTestCase): class FederationClientTests(HomeserverTestCase):
def make_homeserver(self, reactor, clock): def make_homeserver(self, reactor, clock):
@ -43,6 +53,70 @@ class FederationClientTests(HomeserverTestCase):
self.cl = MatrixFederationHttpClient(self.hs) self.cl = MatrixFederationHttpClient(self.hs)
self.reactor.lookups["testserv"] = "1.2.3.4" self.reactor.lookups["testserv"] = "1.2.3.4"
def test_client_get(self):
"""
happy-path test of a GET request
"""
@defer.inlineCallbacks
def do_request():
with LoggingContext("one") as context:
fetch_d = self.cl.get_json("testserv:8008", "foo/bar")
# Nothing happened yet
self.assertNoResult(fetch_d)
# should have reset logcontext to the sentinel
check_logcontext(LoggingContext.sentinel)
try:
fetch_res = yield fetch_d
defer.returnValue(fetch_res)
finally:
check_logcontext(context)
test_d = do_request()
self.pump()
# Nothing happened yet
self.assertNoResult(test_d)
# Make sure treq is trying to connect
clients = self.reactor.tcpClients
self.assertEqual(len(clients), 1)
(host, port, factory, _timeout, _bindAddress) = clients[0]
self.assertEqual(host, '1.2.3.4')
self.assertEqual(port, 8008)
# complete the connection and wire it up to a fake transport
protocol = factory.buildProtocol(None)
transport = StringTransport()
protocol.makeConnection(transport)
# that should have made it send the request to the transport
self.assertRegex(transport.value(), b"^GET /foo/bar")
# Deferred is still without a result
self.assertNoResult(test_d)
# Send it the HTTP response
res_json = '{ "a": 1 }'.encode('ascii')
protocol.dataReceived(
b"HTTP/1.1 200 OK\r\n"
b"Server: Fake\r\n"
b"Content-Type: application/json\r\n"
b"Content-Length: %i\r\n"
b"\r\n"
b"%s" % (len(res_json), res_json)
)
self.pump()
res = self.successResultOf(test_d)
# check the response is as expected
self.assertEqual(res, {"a": 1})
def test_dns_error(self): def test_dns_error(self):
""" """
If the DNS lookup returns an error, it will bubble up. If the DNS lookup returns an error, it will bubble up.
@ -54,6 +128,28 @@ class FederationClientTests(HomeserverTestCase):
self.assertIsInstance(f.value, RequestSendFailed) self.assertIsInstance(f.value, RequestSendFailed)
self.assertIsInstance(f.value.inner_exception, DNSLookupError) self.assertIsInstance(f.value.inner_exception, DNSLookupError)
def test_client_connection_refused(self):
d = self.cl.get_json("testserv:8008", "foo/bar", timeout=10000)
self.pump()
# Nothing happened yet
self.assertNoResult(d)
clients = self.reactor.tcpClients
self.assertEqual(len(clients), 1)
(host, port, factory, _timeout, _bindAddress) = clients[0]
self.assertEqual(host, '1.2.3.4')
self.assertEqual(port, 8008)
e = Exception("go away")
factory.clientConnectionFailed(None, e)
self.pump(0.5)
f = self.failureResultOf(d)
self.assertIsInstance(f.value, RequestSendFailed)
self.assertIs(f.value.inner_exception, e)
def test_client_never_connect(self): def test_client_never_connect(self):
""" """
If the HTTP request is not connected and is timed out, it'll give a If the HTTP request is not connected and is timed out, it'll give a