Refactor matrixfederationclient to fix logging (#3906)

We want to wait until we have read the response body before we log the request
as complete, otherwise a confusing thing happens where the request appears to
have completed, but we later fail it.

To do this, we factor the salient details of a request out to a separate
object, which can then keep track of the txn_id, so that it can be logged.
This commit is contained in:
Richard van der Hoff 2018-09-18 18:17:15 +01:00 committed by GitHub
parent c600886d47
commit 31c15dcb80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 359 additions and 186 deletions

1
changelog.d/3906.misc Normal file
View File

@ -0,0 +1 @@
Improve logging of outbound federation requests

View File

@ -17,10 +17,12 @@ import cgi
import logging import logging
import random import random
import sys import sys
from io import BytesIO
from six import PY3, string_types from six import PY3, string_types
from six.moves import urllib from six.moves import urllib
import attr
import treq import treq
from canonicaljson import encode_canonical_json from canonicaljson import encode_canonical_json
from prometheus_client import Counter from prometheus_client import Counter
@ -28,8 +30,9 @@ from signedjson.sign import sign_json
from twisted.internet import defer, protocol from twisted.internet import defer, protocol
from twisted.internet.error import DNSLookupError from twisted.internet.error import DNSLookupError
from twisted.internet.task import _EPSILON, Cooperator
from twisted.web._newclient import ResponseDone from twisted.web._newclient import ResponseDone
from twisted.web.client import Agent, HTTPConnectionPool from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool
from twisted.web.http_headers import Headers from twisted.web.http_headers import Headers
import synapse.metrics import synapse.metrics
@ -41,13 +44,11 @@ from synapse.api.errors import (
SynapseError, SynapseError,
) )
from synapse.http.endpoint import matrix_federation_endpoint from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util import logcontext
from synapse.util.async_helpers import timeout_no_seriously from synapse.util.async_helpers import timeout_no_seriously
from synapse.util.logcontext import make_deferred_yieldable from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
outbound_logger = logging.getLogger("synapse.http.outbound")
outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests", outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests",
"", ["method"]) "", ["method"])
@ -78,6 +79,93 @@ class MatrixFederationEndpointFactory(object):
) )
_next_id = 1
@attr.s
class MatrixFederationRequest(object):
method = attr.ib()
"""HTTP method
:type: str
"""
path = attr.ib()
"""HTTP path
:type: str
"""
destination = attr.ib()
"""The remote server to send the HTTP request to.
:type: str"""
json = attr.ib(default=None)
"""JSON to send in the body.
:type: dict|None
"""
json_callback = attr.ib(default=None)
"""A callback to generate the JSON.
:type: func|None
"""
query = attr.ib(default=None)
"""Query arguments.
:type: dict|None
"""
txn_id = attr.ib(default=None)
"""Unique ID for this request (for logging)
:type: str|None
"""
def __attrs_post_init__(self):
global _next_id
self.txn_id = "%s-O-%s" % (self.method, _next_id)
_next_id = (_next_id + 1) % (MAXINT - 1)
def get_json(self):
if self.json_callback:
return self.json_callback()
return self.json
@defer.inlineCallbacks
def _handle_json_response(reactor, timeout_sec, request, response):
"""
Reads the JSON body of a response, with a timeout
Args:
reactor (IReactor): twisted reactor, for the timeout
timeout_sec (float): number of seconds to wait for response to complete
request (MatrixFederationRequest): the request that triggered the response
response (IResponse): response to the request
Returns:
dict: parsed JSON response
"""
try:
check_content_type_is_json(response.headers)
d = treq.json_content(response)
d.addTimeout(timeout_sec, reactor)
body = yield make_deferred_yieldable(d)
except Exception as e:
logger.warn(
"{%s} [%d] Error reading response: %s",
request.txn_id,
request.destination,
e,
)
raise
logger.info(
"{%s} [%d] Completed: %d %s",
request.txn_id,
request.destination,
response.code,
response.phrase.decode('ascii', errors='replace'),
)
defer.returnValue(body)
class MatrixFederationHttpClient(object): class MatrixFederationHttpClient(object):
"""HTTP client used to talk to other homeservers over the federation """HTTP client used to talk to other homeservers over the federation
protocol. Send client certificates and signs requests. protocol. Send client certificates and signs requests.
@ -102,34 +190,35 @@ class MatrixFederationHttpClient(object):
self.clock = hs.get_clock() self.clock = hs.get_clock()
self._store = hs.get_datastore() self._store = hs.get_datastore()
self.version_string = hs.version_string.encode('ascii') self.version_string = hs.version_string.encode('ascii')
self._next_id = 1
self.default_timeout = 60 self.default_timeout = 60
def _create_url(self, destination, path_bytes, param_bytes, query_bytes): def schedule(x):
return urllib.parse.urlunparse( reactor.callLater(_EPSILON, x)
(b"matrix", destination, path_bytes, param_bytes, query_bytes, b"")
) self._cooperator = Cooperator(scheduler=schedule)
@defer.inlineCallbacks @defer.inlineCallbacks
def _request(self, destination, method, path, def _send_request(
json=None, json_callback=None, self,
param_bytes=b"", request,
query=None, retry_on_dns_fail=True, retry_on_dns_fail=True,
timeout=None, long_retries=False, timeout=None,
ignore_backoff=False, long_retries=False,
backoff_on_404=False): ignore_backoff=False,
backoff_on_404=False
):
""" """
Creates and sends a request to the given server. Sends a request to the given server.
Args: Args:
destination (str): The remote server to send the HTTP request to. request (MatrixFederationRequest): details of request to be sent
method (str): HTTP method
path (str): The HTTP path timeout (int|None): number of milliseconds to wait for the response headers
json (dict or None): JSON to send in the body. (including connecting to the server). 60s by default.
json_callback (func or None): A callback to generate the JSON.
query (dict or None): Query arguments.
ignore_backoff (bool): true to ignore the historical backoff data ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway. and try the request anyway.
backoff_on_404 (bool): Back off if we get a 404 backoff_on_404 (bool): Back off if we get a 404
Returns: Returns:
@ -154,38 +243,32 @@ class MatrixFederationHttpClient(object):
if ( if (
self.hs.config.federation_domain_whitelist is not None and self.hs.config.federation_domain_whitelist is not None and
destination not in self.hs.config.federation_domain_whitelist request.destination not in self.hs.config.federation_domain_whitelist
): ):
raise FederationDeniedError(destination) raise FederationDeniedError(request.destination)
limiter = yield synapse.util.retryutils.get_retry_limiter( limiter = yield synapse.util.retryutils.get_retry_limiter(
destination, request.destination,
self.clock, self.clock,
self._store, self._store,
backoff_on_404=backoff_on_404, backoff_on_404=backoff_on_404,
ignore_backoff=ignore_backoff, ignore_backoff=ignore_backoff,
) )
headers_dict = {} method = request.method
path_bytes = path.encode("ascii") destination = request.destination
if query: path_bytes = request.path.encode("ascii")
query_bytes = encode_query_args(query) if request.query:
query_bytes = encode_query_args(request.query)
else: else:
query_bytes = b"" query_bytes = b""
headers_dict = { headers_dict = {
"User-Agent": [self.version_string], "User-Agent": [self.version_string],
"Host": [destination], "Host": [request.destination],
} }
with limiter: with limiter:
url = self._create_url(
destination.encode("ascii"), path_bytes, param_bytes, query_bytes
).decode('ascii')
txn_id = "%s-O-%s" % (method, self._next_id)
self._next_id = (self._next_id + 1) % (MAXINT - 1)
# XXX: Would be much nicer to retry only at the transaction-layer # XXX: Would be much nicer to retry only at the transaction-layer
# (once we have reliable transactions in place) # (once we have reliable transactions in place)
if long_retries: if long_retries:
@ -193,16 +276,19 @@ class MatrixFederationHttpClient(object):
else: else:
retries_left = MAX_SHORT_RETRIES retries_left = MAX_SHORT_RETRIES
http_url = urllib.parse.urlunparse( url = urllib.parse.urlunparse((
(b"", b"", path_bytes, param_bytes, query_bytes, b"") b"matrix", destination.encode("ascii"),
).decode('ascii') path_bytes, None, query_bytes, b"",
)).decode('ascii')
http_url = urllib.parse.urlunparse((
b"", b"",
path_bytes, None, query_bytes, b"",
)).decode('ascii')
log_result = None
while True: while True:
try: try:
if json_callback: json = request.get_json()
json = json_callback()
if json: if json:
data = encode_canonical_json(json) data = encode_canonical_json(json)
headers_dict["Content-Type"] = ["application/json"] headers_dict["Content-Type"] = ["application/json"]
@ -213,16 +299,24 @@ class MatrixFederationHttpClient(object):
data = None data = None
self.sign_request(destination, method, http_url, headers_dict) self.sign_request(destination, method, http_url, headers_dict)
outbound_logger.info( logger.info(
"{%s} [%s] Sending request: %s %s", "{%s} [%s] Sending request: %s %s",
txn_id, destination, method, url request.txn_id, destination, method, url
) )
if data:
producer = FileBodyProducer(
BytesIO(data),
cooperator=self._cooperator
)
else:
producer = None
request_deferred = treq.request( request_deferred = treq.request(
method, method,
url, url,
headers=Headers(headers_dict), headers=Headers(headers_dict),
data=data, data=producer,
agent=self.agent, agent=self.agent,
reactor=self.hs.get_reactor(), reactor=self.hs.get_reactor(),
unbuffered=True unbuffered=True
@ -244,33 +338,19 @@ class MatrixFederationHttpClient(object):
request_deferred, request_deferred,
) )
log_result = "%d %s" % (
response.code,
response.phrase.decode('ascii', errors='replace'),
)
break break
except Exception as e: except Exception as e:
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
logger.warn(
"DNS Lookup failed to %s with %s",
destination,
e
)
log_result = "DNS Lookup failed to %s with %s" % (
destination, e
)
raise
logger.warn( logger.warn(
"{%s} Sending request failed to %s: %s %s: %s", "{%s} [%s] Request failed: %s %s: %s",
txn_id, request.txn_id,
destination, destination,
method, method,
url, url,
_flatten_response_never_received(e), _flatten_response_never_received(e),
) )
log_result = _flatten_response_never_received(e) if not retry_on_dns_fail and isinstance(e, DNSLookupError):
raise
if retries_left and not timeout: if retries_left and not timeout:
if long_retries: if long_retries:
@ -283,33 +363,33 @@ class MatrixFederationHttpClient(object):
delay *= random.uniform(0.8, 1.4) delay *= random.uniform(0.8, 1.4)
logger.debug( logger.debug(
"{%s} Waiting %s before sending to %s...", "{%s} [%s] Waiting %ss before re-sending...",
txn_id, request.txn_id,
destination,
delay, delay,
destination
) )
yield self.clock.sleep(delay) yield self.clock.sleep(delay)
retries_left -= 1 retries_left -= 1
else: else:
raise raise
finally:
outbound_logger.info( logger.info(
"{%s} [%s] Result: %s", "{%s} [%s] Got response headers: %d %s",
txn_id, request.txn_id,
destination, destination,
log_result, response.code,
) response.phrase.decode('ascii', errors='replace'),
)
if 200 <= response.code < 300: if 200 <= response.code < 300:
pass pass
else: else:
# :'( # :'(
# Update transactions table? # Update transactions table?
with logcontext.PreserveLoggingContext(): d = treq.content(response)
d = treq.content(response) d.addTimeout(_sec_timeout, self.hs.get_reactor())
d.addTimeout(_sec_timeout, self.hs.get_reactor()) body = yield make_deferred_yieldable(d)
body = yield make_deferred_yieldable(d)
raise HttpResponseException( raise HttpResponseException(
response.code, response.phrase, body response.code, response.phrase, body
) )
@ -403,29 +483,26 @@ class MatrixFederationHttpClient(object):
is not on our federation whitelist is not on our federation whitelist
""" """
if not json_data_callback: request = MatrixFederationRequest(
json_data_callback = lambda: data method="PUT",
destination=destination,
response = yield self._request( path=path,
destination,
"PUT",
path,
json_callback=json_data_callback,
query=args, query=args,
json_callback=json_data_callback,
json=data,
)
response = yield self._send_request(
request,
long_retries=long_retries, long_retries=long_retries,
timeout=timeout, timeout=timeout,
ignore_backoff=ignore_backoff, ignore_backoff=ignore_backoff,
backoff_on_404=backoff_on_404, backoff_on_404=backoff_on_404,
) )
if 200 <= response.code < 300: body = yield _handle_json_response(
# We need to update the transactions table to say it was sent? self.hs.get_reactor(), self.default_timeout, request, response,
check_content_type_is_json(response.headers) )
with logcontext.PreserveLoggingContext():
d = treq.json_content(response)
d.addTimeout(self.default_timeout, self.hs.get_reactor())
body = yield make_deferred_yieldable(d)
defer.returnValue(body) defer.returnValue(body)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -459,31 +536,30 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist is not on our federation whitelist
""" """
response = yield self._request(
destination, request = MatrixFederationRequest(
"POST", method="POST",
path, destination=destination,
path=path,
query=args, query=args,
json=data, json=data,
)
response = yield self._send_request(
request,
long_retries=long_retries, long_retries=long_retries,
timeout=timeout, timeout=timeout,
ignore_backoff=ignore_backoff, ignore_backoff=ignore_backoff,
) )
if 200 <= response.code < 300: if timeout:
# We need to update the transactions table to say it was sent? _sec_timeout = timeout / 1000
check_content_type_is_json(response.headers) else:
_sec_timeout = self.default_timeout
with logcontext.PreserveLoggingContext():
d = treq.json_content(response)
if timeout:
_sec_timeout = timeout / 1000
else:
_sec_timeout = self.default_timeout
d.addTimeout(_sec_timeout, self.hs.get_reactor())
body = yield make_deferred_yieldable(d)
body = yield _handle_json_response(
self.hs.get_reactor(), _sec_timeout, request, response,
)
defer.returnValue(body) defer.returnValue(body)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -519,25 +595,23 @@ class MatrixFederationHttpClient(object):
logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail) logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
response = yield self._request( request = MatrixFederationRequest(
destination, method="GET",
"GET", destination=destination,
path, path=path,
query=args, query=args,
)
response = yield self._send_request(
request,
retry_on_dns_fail=retry_on_dns_fail, retry_on_dns_fail=retry_on_dns_fail,
timeout=timeout, timeout=timeout,
ignore_backoff=ignore_backoff, ignore_backoff=ignore_backoff,
) )
if 200 <= response.code < 300: body = yield _handle_json_response(
# We need to update the transactions table to say it was sent? self.hs.get_reactor(), self.default_timeout, request, response,
check_content_type_is_json(response.headers) )
with logcontext.PreserveLoggingContext():
d = treq.json_content(response)
d.addTimeout(self.default_timeout, self.hs.get_reactor())
body = yield make_deferred_yieldable(d)
defer.returnValue(body) defer.returnValue(body)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -568,25 +642,23 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist is not on our federation whitelist
""" """
response = yield self._request( request = MatrixFederationRequest(
destination, method="DELETE",
"DELETE", destination=destination,
path, path=path,
query=args, query=args,
)
response = yield self._send_request(
request,
long_retries=long_retries, long_retries=long_retries,
timeout=timeout, timeout=timeout,
ignore_backoff=ignore_backoff, ignore_backoff=ignore_backoff,
) )
if 200 <= response.code < 300: body = yield _handle_json_response(
# We need to update the transactions table to say it was sent? self.hs.get_reactor(), self.default_timeout, request, response,
check_content_type_is_json(response.headers) )
with logcontext.PreserveLoggingContext():
d = treq.json_content(response)
d.addTimeout(self.default_timeout, self.hs.get_reactor())
body = yield make_deferred_yieldable(d)
defer.returnValue(body) defer.returnValue(body)
@defer.inlineCallbacks @defer.inlineCallbacks
@ -614,11 +686,15 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist is not on our federation whitelist
""" """
response = yield self._request( request = MatrixFederationRequest(
destination, method="GET",
"GET", destination=destination,
path, path=path,
query=args, query=args,
)
response = yield self._send_request(
request,
retry_on_dns_fail=retry_on_dns_fail, retry_on_dns_fail=retry_on_dns_fail,
ignore_backoff=ignore_backoff, ignore_backoff=ignore_backoff,
) )
@ -626,14 +702,25 @@ class MatrixFederationHttpClient(object):
headers = dict(response.headers.getAllRawHeaders()) headers = dict(response.headers.getAllRawHeaders())
try: try:
with logcontext.PreserveLoggingContext(): d = _readBodyToFile(response, output_stream, max_size)
d = _readBodyToFile(response, output_stream, max_size) d.addTimeout(self.default_timeout, self.hs.get_reactor())
d.addTimeout(self.default_timeout, self.hs.get_reactor()) length = yield make_deferred_yieldable(d)
length = yield make_deferred_yieldable(d) except Exception as e:
except Exception: logger.warn(
logger.exception("Failed to download body") "{%s} [%d] Error reading response: %s",
request.txn_id,
request.destination,
e,
)
raise raise
logger.info(
"{%s} [%d] Completed: %d %s [%d bytes]",
request.txn_id,
request.destination,
response.code,
response.phrase.decode('ascii', errors='replace'),
length,
)
defer.returnValue((length, headers)) defer.returnValue((length, headers))

View File

@ -18,9 +18,14 @@ from mock import Mock
from twisted.internet.defer import TimeoutError from twisted.internet.defer import TimeoutError
from twisted.internet.error import ConnectingCancelledError, DNSLookupError from twisted.internet.error import ConnectingCancelledError, DNSLookupError
from twisted.web.client import ResponseNeverReceived from twisted.web.client import ResponseNeverReceived
from twisted.web.http import HTTPChannel
from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.http.matrixfederationclient import (
MatrixFederationHttpClient,
MatrixFederationRequest,
)
from tests.server import FakeTransport
from tests.unittest import HomeserverTestCase from tests.unittest import HomeserverTestCase
@ -40,7 +45,7 @@ class FederationClientTests(HomeserverTestCase):
""" """
If the DNS raising returns an error, it will bubble up. If the DNS raising returns an error, it will bubble up.
""" """
d = self.cl._request("testserv2:8008", "GET", "foo/bar", timeout=10000) d = self.cl.get_json("testserv2:8008", "foo/bar", timeout=10000)
self.pump() self.pump()
f = self.failureResultOf(d) f = self.failureResultOf(d)
@ -51,7 +56,7 @@ class FederationClientTests(HomeserverTestCase):
If the HTTP request is not connected and is timed out, it'll give a If the HTTP request is not connected and is timed out, it'll give a
ConnectingCancelledError. ConnectingCancelledError.
""" """
d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000) d = self.cl.get_json("testserv:8008", "foo/bar", timeout=10000)
self.pump() self.pump()
@ -78,7 +83,7 @@ class FederationClientTests(HomeserverTestCase):
If the HTTP request is connected, but gets no response before being If the HTTP request is connected, but gets no response before being
timed out, it'll give a ResponseNeverReceived. timed out, it'll give a ResponseNeverReceived.
""" """
d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000) d = self.cl.get_json("testserv:8008", "foo/bar", timeout=10000)
self.pump() self.pump()
@ -108,7 +113,12 @@ class FederationClientTests(HomeserverTestCase):
""" """
Once the client gets the headers, _request returns successfully. Once the client gets the headers, _request returns successfully.
""" """
d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000) request = MatrixFederationRequest(
method="GET",
destination="testserv:8008",
path="foo/bar",
)
d = self.cl._send_request(request, timeout=10000)
self.pump() self.pump()
@ -155,3 +165,26 @@ class FederationClientTests(HomeserverTestCase):
f = self.failureResultOf(d) f = self.failureResultOf(d)
self.assertIsInstance(f.value, TimeoutError) self.assertIsInstance(f.value, TimeoutError)
def test_client_sends_body(self):
self.cl.post_json(
"testserv:8008", "foo/bar", timeout=10000,
data={"a": "b"}
)
self.pump()
clients = self.reactor.tcpClients
self.assertEqual(len(clients), 1)
client = clients[0][2].buildProtocol(None)
server = HTTPChannel()
client.makeConnection(FakeTransport(server, self.reactor))
server.makeConnection(FakeTransport(client, self.reactor))
self.pump(0.1)
self.assertEqual(len(server.requests), 1)
request = server.requests[0]
content = request.content.read()
self.assertEqual(content, b'{"a":"b"}')

View File

@ -15,8 +15,6 @@
from mock import Mock, NonCallableMock from mock import Mock, NonCallableMock
import attr
from synapse.replication.tcp.client import ( from synapse.replication.tcp.client import (
ReplicationClientFactory, ReplicationClientFactory,
ReplicationClientHandler, ReplicationClientHandler,
@ -24,6 +22,7 @@ from synapse.replication.tcp.client import (
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from tests import unittest from tests import unittest
from tests.server import FakeTransport
class BaseSlavedStoreTestCase(unittest.HomeserverTestCase): class BaseSlavedStoreTestCase(unittest.HomeserverTestCase):
@ -56,36 +55,8 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase):
server = server_factory.buildProtocol(None) server = server_factory.buildProtocol(None)
client = client_factory.buildProtocol(None) client = client_factory.buildProtocol(None)
@attr.s client.makeConnection(FakeTransport(server, reactor))
class FakeTransport(object): server.makeConnection(FakeTransport(client, reactor))
other = attr.ib()
disconnecting = False
buffer = attr.ib(default=b'')
def registerProducer(self, producer, streaming):
self.producer = producer
def _produce():
self.producer.resumeProducing()
reactor.callLater(0.1, _produce)
reactor.callLater(0.0, _produce)
def write(self, byt):
self.buffer = self.buffer + byt
if getattr(self.other, "transport") is not None:
self.other.dataReceived(self.buffer)
self.buffer = b""
def writeSequence(self, seq):
for x in seq:
self.write(x)
client.makeConnection(FakeTransport(server))
server.makeConnection(FakeTransport(client))
def replicate(self): def replicate(self):
"""Tell the master side of replication that something has happened, and then """Tell the master side of replication that something has happened, and then

View File

@ -280,3 +280,84 @@ def get_clock():
clock = ThreadedMemoryReactorClock() clock = ThreadedMemoryReactorClock()
hs_clock = Clock(clock) hs_clock = Clock(clock)
return (clock, hs_clock) return (clock, hs_clock)
@attr.s
class FakeTransport(object):
"""
A twisted.internet.interfaces.ITransport implementation which sends all its data
straight into an IProtocol object: it exists to connect two IProtocols together.
To use it, instantiate it with the receiving IProtocol, and then pass it to the
sending IProtocol's makeConnection method:
server = HTTPChannel()
client.makeConnection(FakeTransport(server, self.reactor))
If you want bidirectional communication, you'll need two instances.
"""
other = attr.ib()
"""The Protocol object which will receive any data written to this transport.
:type: twisted.internet.interfaces.IProtocol
"""
_reactor = attr.ib()
"""Test reactor
:type: twisted.internet.interfaces.IReactorTime
"""
disconnecting = False
buffer = attr.ib(default=b'')
producer = attr.ib(default=None)
def getPeer(self):
return None
def getHost(self):
return None
def loseConnection(self):
self.disconnecting = True
def abortConnection(self):
self.disconnecting = True
def pauseProducing(self):
self.producer.pauseProducing()
def unregisterProducer(self):
if not self.producer:
return
self.producer = None
def registerProducer(self, producer, streaming):
self.producer = producer
self.producerStreaming = streaming
def _produce():
d = self.producer.resumeProducing()
d.addCallback(lambda x: self._reactor.callLater(0.1, _produce))
if not streaming:
self._reactor.callLater(0.0, _produce)
def write(self, byt):
self.buffer = self.buffer + byt
def _write():
if getattr(self.other, "transport") is not None:
self.other.dataReceived(self.buffer)
self.buffer = b""
return
self._reactor.callLater(0.0, _write)
_write()
def writeSequence(self, seq):
for x in seq:
self.write(x)