Merge pull request #3700 from matrix-org/rav/wait_for_producers

Refactor request logging code
This commit is contained in:
Richard van der Hoff 2018-08-17 14:57:45 +01:00 committed by GitHub
commit 3f8709ffe4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 180 additions and 92 deletions

1
changelog.d/3700.bugfix Normal file
View File

@ -0,0 +1 @@
Improve HTTP request logging to include all requests

View File

@ -25,7 +25,7 @@ from canonicaljson import encode_canonical_json, encode_pretty_printed_json, jso
from twisted.internet import defer from twisted.internet import defer
from twisted.python import failure from twisted.python import failure
from twisted.web import resource, server from twisted.web import resource
from twisted.web.server import NOT_DONE_YET from twisted.web.server import NOT_DONE_YET
from twisted.web.util import redirectTo from twisted.web.util import redirectTo
@ -37,10 +37,8 @@ from synapse.api.errors import (
SynapseError, SynapseError,
UnrecognizedRequestError, UnrecognizedRequestError,
) )
from synapse.http.request_metrics import requests_counter
from synapse.util.caches import intern_dict from synapse.util.caches import intern_dict
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.logcontext import preserve_fn
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -60,11 +58,10 @@ HTML_ERROR_TEMPLATE = """<!DOCTYPE html>
def wrap_json_request_handler(h): def wrap_json_request_handler(h):
"""Wraps a request handler method with exception handling. """Wraps a request handler method with exception handling.
Also adds logging as per wrap_request_handler_with_logging. Also does the wrapping with request.processing as per wrap_async_request_handler.
The handler method must have a signature of "handle_foo(self, request)", The handler method must have a signature of "handle_foo(self, request)",
where "self" must have a "clock" attribute (and "request" must be a where "request" must be a SynapseRequest.
SynapseRequest).
The handler must return a deferred. If the deferred succeeds we assume that The handler must return a deferred. If the deferred succeeds we assume that
a response has been sent. If the deferred fails with a SynapseError we use a response has been sent. If the deferred fails with a SynapseError we use
@ -108,24 +105,23 @@ def wrap_json_request_handler(h):
pretty_print=_request_user_agent_is_curl(request), pretty_print=_request_user_agent_is_curl(request),
) )
return wrap_request_handler_with_logging(wrapped_request_handler) return wrap_async_request_handler(wrapped_request_handler)
def wrap_html_request_handler(h): def wrap_html_request_handler(h):
"""Wraps a request handler method with exception handling. """Wraps a request handler method with exception handling.
Also adds logging as per wrap_request_handler_with_logging. Also does the wrapping with request.processing as per wrap_async_request_handler.
The handler method must have a signature of "handle_foo(self, request)", The handler method must have a signature of "handle_foo(self, request)",
where "self" must have a "clock" attribute (and "request" must be a where "request" must be a SynapseRequest.
SynapseRequest).
""" """
def wrapped_request_handler(self, request): def wrapped_request_handler(self, request):
d = defer.maybeDeferred(h, self, request) d = defer.maybeDeferred(h, self, request)
d.addErrback(_return_html_error, request) d.addErrback(_return_html_error, request)
return d return d
return wrap_request_handler_with_logging(wrapped_request_handler) return wrap_async_request_handler(wrapped_request_handler)
def _return_html_error(f, request): def _return_html_error(f, request):
@ -170,46 +166,26 @@ def _return_html_error(f, request):
finish_request(request) finish_request(request)
def wrap_request_handler_with_logging(h): def wrap_async_request_handler(h):
"""Wraps a request handler to provide logging and metrics """Wraps an async request handler so that it calls request.processing.
This helps ensure that work done by the request handler after the request is completed
is correctly recorded against the request metrics/logs.
The handler method must have a signature of "handle_foo(self, request)", The handler method must have a signature of "handle_foo(self, request)",
where "self" must have a "clock" attribute (and "request" must be a where "request" must be a SynapseRequest.
SynapseRequest).
As well as calling `request.processing` (which will log the response and The handler may return a deferred, in which case the completion of the request isn't
duration for this request), the wrapped request handler will insert the logged until the deferred completes.
request id into the logging context.
""" """
@defer.inlineCallbacks @defer.inlineCallbacks
def wrapped_request_handler(self, request): def wrapped_async_request_handler(self, request):
""" with request.processing():
Args: yield h(self, request)
self:
request (synapse.http.site.SynapseRequest):
"""
request_id = request.get_request_id() # we need to preserve_fn here, because the synchronous render method won't yield for
with LoggingContext(request_id) as request_context: # us (obviously)
request_context.request = request_id return preserve_fn(wrapped_async_request_handler)
with Measure(self.clock, "wrapped_request_handler"):
# we start the request metrics timer here with an initial stab
# at the servlet name. For most requests that name will be
# JsonResource (or a subclass), and JsonResource._async_render
# will update it once it picks a servlet.
servlet_name = self.__class__.__name__
with request.processing(servlet_name):
with PreserveLoggingContext(request_context):
d = defer.maybeDeferred(h, self, request)
# record the arrival of the request *after*
# dispatching to the handler, so that the handler
# can update the servlet name in the request
# metrics
requests_counter.labels(request.method,
request.request_metrics.name).inc()
yield d
return wrapped_request_handler
class HttpServer(object): class HttpServer(object):
@ -272,7 +248,7 @@ class JsonResource(HttpServer, resource.Resource):
""" This gets called by twisted every time someone sends us a request. """ This gets called by twisted every time someone sends us a request.
""" """
self._async_render(request) self._async_render(request)
return server.NOT_DONE_YET return NOT_DONE_YET
@wrap_json_request_handler @wrap_json_request_handler
@defer.inlineCallbacks @defer.inlineCallbacks

View File

@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import contextlib import contextlib
import logging import logging
import time import time
@ -19,8 +18,8 @@ import time
from twisted.web.server import Request, Site from twisted.web.server import Request, Site
from synapse.http import redact_uri from synapse.http import redact_uri
from synapse.http.request_metrics import RequestMetrics from synapse.http.request_metrics import RequestMetrics, requests_counter
from synapse.util.logcontext import ContextResourceUsage, LoggingContext from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -34,25 +33,43 @@ class SynapseRequest(Request):
It extends twisted's twisted.web.server.Request, and adds: It extends twisted's twisted.web.server.Request, and adds:
* Unique request ID * Unique request ID
* A log context associated with the request
* Redaction of access_token query-params in __repr__ * Redaction of access_token query-params in __repr__
* Logging at start and end * Logging at start and end
* Metrics to record CPU, wallclock and DB time by endpoint. * Metrics to record CPU, wallclock and DB time by endpoint.
It provides a method `processing` which should be called by the Resource It also provides a method `processing`, which returns a context manager. If this
which is handling the request, and returns a context manager. method is called, the request won't be logged until the context manager is closed;
this is useful for asynchronous request handlers which may go on processing the
request even after the client has disconnected.
Attributes:
logcontext(LoggingContext) : the log context for this request
""" """
def __init__(self, site, channel, *args, **kw): def __init__(self, site, channel, *args, **kw):
Request.__init__(self, channel, *args, **kw) Request.__init__(self, channel, *args, **kw)
self.site = site self.site = site
self._channel = channel self._channel = channel # this is used by the tests
self.authenticated_entity = None self.authenticated_entity = None
self.start_time = 0 self.start_time = 0
# we can't yet create the logcontext, as we don't know the method.
self.logcontext = None
global _next_request_seq global _next_request_seq
self.request_seq = _next_request_seq self.request_seq = _next_request_seq
_next_request_seq += 1 _next_request_seq += 1
# whether an asynchronous request handler has called processing()
self._is_processing = False
# the time when the asynchronous request handler completed its processing
self._processing_finished_time = None
# what time we finished sending the response to the client (or the connection
# dropped)
self.finish_time = None
def __repr__(self): def __repr__(self):
# We overwrite this so that we don't log ``access_token`` # We overwrite this so that we don't log ``access_token``
return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % ( return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % (
@ -74,11 +91,116 @@ class SynapseRequest(Request):
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1] return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
def render(self, resrc): def render(self, resrc):
# this is called once a Resource has been found to serve the request; in our
# case the Resource in question will normally be a JsonResource.
# create a LogContext for this request
request_id = self.get_request_id()
logcontext = self.logcontext = LoggingContext(request_id)
logcontext.request = request_id
# override the Server header which is set by twisted # override the Server header which is set by twisted
self.setHeader("Server", self.site.server_version_string) self.setHeader("Server", self.site.server_version_string)
return Request.render(self, resrc)
with PreserveLoggingContext(self.logcontext):
# we start the request metrics timer here with an initial stab
# at the servlet name. For most requests that name will be
# JsonResource (or a subclass), and JsonResource._async_render
# will update it once it picks a servlet.
servlet_name = resrc.__class__.__name__
self._started_processing(servlet_name)
Request.render(self, resrc)
# record the arrival of the request *after*
# dispatching to the handler, so that the handler
# can update the servlet name in the request
# metrics
requests_counter.labels(self.method,
self.request_metrics.name).inc()
@contextlib.contextmanager
def processing(self):
"""Record the fact that we are processing this request.
Returns a context manager; the correct way to use this is:
@defer.inlineCallbacks
def handle_request(request):
with request.processing("FooServlet"):
yield really_handle_the_request()
Once the context manager is closed, the completion of the request will be logged,
and the various metrics will be updated.
"""
if self._is_processing:
raise RuntimeError("Request is already processing")
self._is_processing = True
try:
yield
except Exception:
# this should already have been caught, and sent back to the client as a 500.
logger.exception("Asynchronous messge handler raised an uncaught exception")
finally:
# the request handler has finished its work and either sent the whole response
# back, or handed over responsibility to a Producer.
self._processing_finished_time = time.time()
self._is_processing = False
# if we've already sent the response, log it now; otherwise, we wait for the
# response to be sent.
if self.finish_time is not None:
self._finished_processing()
def finish(self):
"""Called when all response data has been written to this Request.
Overrides twisted.web.server.Request.finish to record the finish time and do
logging.
"""
self.finish_time = time.time()
Request.finish(self)
if not self._is_processing:
with PreserveLoggingContext(self.logcontext):
self._finished_processing()
def connectionLost(self, reason):
"""Called when the client connection is closed before the response is written.
Overrides twisted.web.server.Request.connectionLost to record the finish time and
do logging.
"""
self.finish_time = time.time()
Request.connectionLost(self, reason)
# we only get here if the connection to the client drops before we send
# the response.
#
# It's useful to log it here so that we can get an idea of when
# the client disconnects.
with PreserveLoggingContext(self.logcontext):
logger.warn(
"Error processing request: %s %s", reason.type, reason.value,
)
if not self._is_processing:
self._finished_processing()
def _started_processing(self, servlet_name): def _started_processing(self, servlet_name):
"""Record the fact that we are processing this request.
This will log the request's arrival. Once the request completes,
be sure to call finished_processing.
Args:
servlet_name (str): the name of the servlet which will be
processing this request. This is used in the metrics.
It is possible to update this afterwards by updating
self.request_metrics.name.
"""
self.start_time = time.time() self.start_time = time.time()
self.request_metrics = RequestMetrics() self.request_metrics = RequestMetrics()
self.request_metrics.start( self.request_metrics.start(
@ -94,13 +216,21 @@ class SynapseRequest(Request):
) )
def _finished_processing(self): def _finished_processing(self):
try: """Log the completion of this request and update the metrics
context = LoggingContext.current_context() """
usage = context.get_resource_usage()
except Exception:
usage = ContextResourceUsage()
end_time = time.time() usage = self.logcontext.get_resource_usage()
if self._processing_finished_time is None:
# we completed the request without anything calling processing()
self._processing_finished_time = time.time()
# the time between receiving the request and the request handler finishing
processing_time = self._processing_finished_time - self.start_time
# the time between the request handler finishing and the response being sent
# to the client (nb may be negative)
response_send_time = self.finish_time - self._processing_finished_time
# need to decode as it could be raw utf-8 bytes # need to decode as it could be raw utf-8 bytes
# from a IDN servname in an auth header # from a IDN servname in an auth header
@ -116,22 +246,31 @@ class SynapseRequest(Request):
user_agent = self.get_user_agent() user_agent = self.get_user_agent()
if user_agent is not None: if user_agent is not None:
user_agent = user_agent.decode("utf-8", "replace") user_agent = user_agent.decode("utf-8", "replace")
else:
user_agent = "-"
code = str(self.code)
if not self.finished:
# we didn't send the full response before we gave up (presumably because
# the connection dropped)
code += "!"
self.site.access_logger.info( self.site.access_logger.info(
"%s - %s - {%s}" "%s - %s - {%s}"
" Processed request: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)" " Processed request: %.3fsec/%.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
" %sB %s \"%s %s %s\" \"%s\" [%d dbevts]", " %sB %s \"%s %s %s\" \"%s\" [%d dbevts]",
self.getClientIP(), self.getClientIP(),
self.site.site_tag, self.site.site_tag,
authenticated_entity, authenticated_entity,
end_time - self.start_time, processing_time,
response_send_time,
usage.ru_utime, usage.ru_utime,
usage.ru_stime, usage.ru_stime,
usage.db_sched_duration_sec, usage.db_sched_duration_sec,
usage.db_txn_duration_sec, usage.db_txn_duration_sec,
int(usage.db_txn_count), int(usage.db_txn_count),
self.sentLength, self.sentLength,
self.code, code,
self.method, self.method,
self.get_redacted_uri(), self.get_redacted_uri(),
self.clientproto, self.clientproto,
@ -140,38 +279,10 @@ class SynapseRequest(Request):
) )
try: try:
self.request_metrics.stop(end_time, self) self.request_metrics.stop(self.finish_time, self)
except Exception as e: except Exception as e:
logger.warn("Failed to stop metrics: %r", e) logger.warn("Failed to stop metrics: %r", e)
@contextlib.contextmanager
def processing(self, servlet_name):
"""Record the fact that we are processing this request.
Returns a context manager; the correct way to use this is:
@defer.inlineCallbacks
def handle_request(request):
with request.processing("FooServlet"):
yield really_handle_the_request()
This will log the request's arrival. Once the context manager is
closed, the completion of the request will be logged, and the various
metrics will be updated.
Args:
servlet_name (str): the name of the servlet which will be
processing this request. This is used in the metrics.
It is possible to update this afterwards by updating
self.request_metrics.servlet_name.
"""
# TODO: we should probably just move this into render() and finish(),
# to save having to call a separate method.
self._started_processing(servlet_name)
yield
self._finished_processing()
class XForwardedForRequest(SynapseRequest): class XForwardedForRequest(SynapseRequest):
def __init__(self, *args, **kw): def __init__(self, *args, **kw):