Encode JSON responses on a thread in C, mk2 (#10905)

Currently we use `JsonEncoder.iterencode` to write JSON responses, which ensures that we don't block the main reactor thread when encoding huge objects. The downside to this is that `iterencode` falls back to using a pure Python encoder that is *much* less efficient and can easily burn a lot of CPU for huge responses. To fix this, while still ensuring we don't block the reactor loop, we encode the JSON on a threadpool using the standard `JsonEncoder.encode` functions, which is backed by a C library.

Doing so, however, requires `respond_with_json` to have access to the reactor, which it previously didn't. There are two ways of doing this:

1. threading through the reactor object, which is a bit fiddly as e.g. `DirectServeJsonResource` doesn't currently take a reactor, but is exposed to modules and so is a PITA to change; or
2. expose the reactor in `SynapseRequest`, which requires updating a bunch of servlet types.

I went with the latter as that is just a mechanical change, and I think makes sense as a request already has a reactor associated with it (via its http channel).
This commit is contained in:
Erik Johnston 2021-09-28 10:37:58 +01:00 committed by GitHub
parent d37841787a
commit 707d5e4e48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 76 additions and 18 deletions

View File

@ -0,0 +1 @@
Speed up responding with large JSON objects to requests.

View File

@ -21,7 +21,6 @@ import types
import urllib import urllib
from http import HTTPStatus from http import HTTPStatus
from inspect import isawaitable from inspect import isawaitable
from io import BytesIO
from typing import ( from typing import (
Any, Any,
Awaitable, Awaitable,
@ -37,7 +36,7 @@ from typing import (
) )
import jinja2 import jinja2
from canonicaljson import iterencode_canonical_json from canonicaljson import encode_canonical_json
from typing_extensions import Protocol from typing_extensions import Protocol
from zope.interface import implementer from zope.interface import implementer
@ -45,7 +44,7 @@ from twisted.internet import defer, interfaces
from twisted.python import failure from twisted.python import failure
from twisted.web import resource from twisted.web import resource
from twisted.web.server import NOT_DONE_YET, Request from twisted.web.server import NOT_DONE_YET, Request
from twisted.web.static import File, NoRangeStaticProducer from twisted.web.static import File
from twisted.web.util import redirectTo from twisted.web.util import redirectTo
from synapse.api.errors import ( from synapse.api.errors import (
@ -56,10 +55,11 @@ from synapse.api.errors import (
UnrecognizedRequestError, UnrecognizedRequestError,
) )
from synapse.http.site import SynapseRequest from synapse.http.site import SynapseRequest
from synapse.logging.context import preserve_fn from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
from synapse.logging.opentracing import trace_servlet from synapse.logging.opentracing import trace_servlet
from synapse.util import json_encoder from synapse.util import json_encoder
from synapse.util.caches import intern_dict from synapse.util.caches import intern_dict
from synapse.util.iterutils import chunk_seq
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -620,12 +620,11 @@ class _ByteProducer:
self._request = None self._request = None
def _encode_json_bytes(json_object: Any) -> Iterator[bytes]: def _encode_json_bytes(json_object: Any) -> bytes:
""" """
Encode an object into JSON. Returns an iterator of bytes. Encode an object into JSON. Returns an iterator of bytes.
""" """
for chunk in json_encoder.iterencode(json_object): return json_encoder.encode(json_object).encode("utf-8")
yield chunk.encode("utf-8")
def respond_with_json( def respond_with_json(
@ -659,7 +658,7 @@ def respond_with_json(
return None return None
if canonical_json: if canonical_json:
encoder = iterencode_canonical_json encoder = encode_canonical_json
else: else:
encoder = _encode_json_bytes encoder = _encode_json_bytes
@ -670,7 +669,9 @@ def respond_with_json(
if send_cors: if send_cors:
set_cors_headers(request) set_cors_headers(request)
_ByteProducer(request, encoder(json_object)) run_in_background(
_async_write_json_to_request_in_thread, request, encoder, json_object
)
return NOT_DONE_YET return NOT_DONE_YET
@ -706,15 +707,56 @@ def respond_with_json_bytes(
if send_cors: if send_cors:
set_cors_headers(request) set_cors_headers(request)
# note that this is zero-copy (the bytesio shares a copy-on-write buffer with _write_bytes_to_request(request, json_bytes)
# the original `bytes`).
bytes_io = BytesIO(json_bytes)
producer = NoRangeStaticProducer(request, bytes_io)
producer.start()
return NOT_DONE_YET return NOT_DONE_YET
async def _async_write_json_to_request_in_thread(
request: SynapseRequest,
json_encoder: Callable[[Any], bytes],
json_object: Any,
):
"""Encodes the given JSON object on a thread and then writes it to the
request.
This is done so that encoding large JSON objects doesn't block the reactor
thread.
Note: We don't use JsonEncoder.iterencode here as that falls back to the
Python implementation (rather than the C backend), which is *much* more
expensive.
"""
json_str = await defer_to_thread(request.reactor, json_encoder, json_object)
_write_bytes_to_request(request, json_str)
def _write_bytes_to_request(request: Request, bytes_to_write: bytes) -> None:
"""Writes the bytes to the request using an appropriate producer.
Note: This should be used instead of `Request.write` to correctly handle
large response bodies.
"""
# The problem with dumping all of the response into the `Request` object at
# once (via `Request.write`) is that doing so starts the timeout for the
# next request to be received: so if it takes longer than 60s to stream back
# the response to the client, the client never gets it.
#
# The correct solution is to use a Producer; then the timeout is only
# started once all of the content is sent over the TCP connection.
# To make sure we don't write all of the bytes at once we split it up into
# chunks.
chunk_size = 4096
bytes_generator = chunk_seq(bytes_to_write, chunk_size)
# We use a `_ByteProducer` here rather than `NoRangeStaticProducer` as the
# unit tests can't cope with being given a pull producer.
_ByteProducer(request, bytes_generator)
def set_cors_headers(request: Request): def set_cors_headers(request: Request):
"""Set the CORS headers so that javascript running in a web browsers can """Set the CORS headers so that javascript running in a web browsers can
use this API use this API

View File

@ -184,7 +184,7 @@ class EmailPusher(Pusher):
should_notify_at = max(notif_ready_at, room_ready_at) should_notify_at = max(notif_ready_at, room_ready_at)
if should_notify_at < self.clock.time_msec(): if should_notify_at <= self.clock.time_msec():
# one of our notifications is ready for sending, so we send # one of our notifications is ready for sending, so we send
# *one* email updating the user on their notifications, # *one* email updating the user on their notifications,
# we then consider all previously outstanding notifications # we then consider all previously outstanding notifications

View File

@ -21,13 +21,28 @@ from typing import (
Iterable, Iterable,
Iterator, Iterator,
Mapping, Mapping,
Sequence,
Set, Set,
Sized,
Tuple, Tuple,
TypeVar, TypeVar,
) )
from typing_extensions import Protocol
T = TypeVar("T") T = TypeVar("T")
S = TypeVar("S", bound="_SelfSlice")
class _SelfSlice(Sized, Protocol):
"""A helper protocol that matches types where taking a slice results in the
same type being returned.
This is more specific than `Sequence`, which allows another `Sequence` to be
returned.
"""
def __getitem__(self: S, i: slice) -> S:
...
def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]: def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]:
@ -46,7 +61,7 @@ def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]:
return iter(lambda: tuple(islice(sourceiter, size)), ()) return iter(lambda: tuple(islice(sourceiter, size)), ())
def chunk_seq(iseq: Sequence[T], maxlen: int) -> Iterable[Sequence[T]]: def chunk_seq(iseq: S, maxlen: int) -> Iterator[S]:
"""Split the given sequence into chunks of the given size """Split the given sequence into chunks of the given size
The last chunk may be shorter than the given size. The last chunk may be shorter than the given size.