From 2310933d7b49c414af3fe343d92fad2a86c6dc34 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 21 Sep 2021 16:33:31 +0100 Subject: [PATCH 1/5] Fix chunk_seq to work on str/bytes --- synapse/util/iterutils.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py index 8ac3eab2f54f..4938ddf70321 100644 --- a/synapse/util/iterutils.py +++ b/synapse/util/iterutils.py @@ -21,13 +21,28 @@ Iterable, Iterator, Mapping, - Sequence, Set, + Sized, Tuple, TypeVar, ) +from typing_extensions import Protocol + T = TypeVar("T") +S = TypeVar("S", bound="_SelfSlice") + + +class _SelfSlice(Sized, Protocol): + """A helper protocol that matches types where taking a slice results in the + same type being returned. + + This is more specific than `Sequence`, which allows another `Sequence` to be + returned. + """ + + def __getitem__(self: S, i: slice) -> S: + ... def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]: @@ -46,7 +61,7 @@ def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]: return iter(lambda: tuple(islice(sourceiter, size)), ()) -def chunk_seq(iseq: Sequence[T], maxlen: int) -> Iterable[Sequence[T]]: +def chunk_seq(iseq: S, maxlen: int) -> Iterator[S]: """Split the given sequence into chunks of the given size The last chunk may be shorter than the given size. From 02a50ee35554f4428cb3149ced8a31ee25e58214 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 17 Sep 2021 13:48:18 +0100 Subject: [PATCH 2/5] Add a `_write_json_to_request_in_thread` --- synapse/http/server.py | 72 +++++++++++++++++++++++++++++-------- synapse/push/emailpusher.py | 2 +- 2 files changed, 58 insertions(+), 16 deletions(-) diff --git a/synapse/http/server.py b/synapse/http/server.py index e28b56abb945..0930f645fd12 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -21,7 +21,6 @@ import urllib from http import HTTPStatus from inspect import isawaitable -from io import BytesIO from typing import ( Any, Awaitable, @@ -37,7 +36,7 @@ ) import jinja2 -from canonicaljson import iterencode_canonical_json +from canonicaljson import encode_canonical_json from typing_extensions import Protocol from zope.interface import implementer @@ -45,7 +44,7 @@ from twisted.python import failure from twisted.web import resource from twisted.web.server import NOT_DONE_YET, Request -from twisted.web.static import File, NoRangeStaticProducer +from twisted.web.static import File from twisted.web.util import redirectTo from synapse.api.errors import ( @@ -56,10 +55,11 @@ UnrecognizedRequestError, ) from synapse.http.site import SynapseRequest -from synapse.logging.context import preserve_fn +from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background from synapse.logging.opentracing import trace_servlet from synapse.util import json_encoder from synapse.util.caches import intern_dict +from synapse.util.iterutils import chunk_seq logger = logging.getLogger(__name__) @@ -620,12 +620,11 @@ def stopProducing(self) -> None: self._request = None -def _encode_json_bytes(json_object: Any) -> Iterator[bytes]: +def _encode_json_bytes(json_object: Any) -> bytes: """ Encode an object into JSON. Returns an iterator of bytes. """ - for chunk in json_encoder.iterencode(json_object): - yield chunk.encode("utf-8") + return json_encoder.encode(json_object).encode("utf-8") def respond_with_json( @@ -659,7 +658,7 @@ def respond_with_json( return None if canonical_json: - encoder = iterencode_canonical_json + encoder = encode_canonical_json else: encoder = _encode_json_bytes @@ -670,7 +669,9 @@ def respond_with_json( if send_cors: set_cors_headers(request) - _ByteProducer(request, encoder(json_object)) + run_in_background( + _async_write_json_to_request_in_thread, request, encoder, json_object + ) return NOT_DONE_YET @@ -706,15 +707,35 @@ def respond_with_json_bytes( if send_cors: set_cors_headers(request) - # note that this is zero-copy (the bytesio shares a copy-on-write buffer with - # the original `bytes`). - bytes_io = BytesIO(json_bytes) - - producer = NoRangeStaticProducer(request, bytes_io) - producer.start() + _write_json_bytes_to_request(request, json_bytes) return NOT_DONE_YET +def _write_json_bytes_to_request(request: Request, json_bytes: bytes) -> None: + """Writes the JSON bytes to the request using an appropriate producer. + + Note: This should be used instead of `Request.write` to correctly handle + large response bodies. + """ + + # The problem with dumping all of the json response into the `Request` + # object at once (via `Request.write`) is that doing so starts the timeout + # for the next request to be received: so if it takes longer than 60s to + # stream back the response to the client, the client never gets it. + # + # The correct solution is to use a Producer; then the timeout is only + # started once all of the content is sent over the TCP connection. + + # To make sure we don't write the whole of the json at once we split it up + # into chunks. + chunk_size = 4096 + bytes_generator = chunk_seq(json_bytes, chunk_size) + + # We use a `_ByteProducer` here rather than `NoRangeStaticProducer` as the + # unit tests can't cope with being given a pull producer. + _ByteProducer(request, bytes_generator) + + def set_cors_headers(request: Request): """Set the CORS headers so that javascript running in a web browsers can use this API @@ -809,3 +830,24 @@ def finish_request(request: Request): request.finish() except RuntimeError as e: logger.info("Connection disconnected before response was written: %r", e) + + +async def _async_write_json_to_request_in_thread( + request: SynapseRequest, + json_encoder: Callable[[Any], bytes], + json_object: Any, +): + """Encodes the given JSON object on a thread and then writes it to the + request. + + This is done so that encoding large JSON objects doesn't block the reactor + thread. + + Note: We don't use JsonEncoder.iterencode here as that falls back to the + Python implementation (rather than the C backend), which is *much* more + expensive. + """ + + json_str = await defer_to_thread(request.reactor, json_encoder, json_object) + + _write_json_bytes_to_request(request, json_str) diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index e08e125cb8a5..cf5abdfbda49 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -184,7 +184,7 @@ async def _unsafe_process(self) -> None: should_notify_at = max(notif_ready_at, room_ready_at) - if should_notify_at < self.clock.time_msec(): + if should_notify_at <= self.clock.time_msec(): # one of our notifications is ready for sending, so we send # *one* email updating the user on their notifications, # we then consider all previously outstanding notifications From b0cee6dce17df0fbf72a5d5a7422b206617bf38f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 24 Sep 2021 11:03:52 +0100 Subject: [PATCH 3/5] Newsfile --- changelog.d/10905.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/10905.feature diff --git a/changelog.d/10905.feature b/changelog.d/10905.feature new file mode 100644 index 000000000000..07e7b2c6a75e --- /dev/null +++ b/changelog.d/10905.feature @@ -0,0 +1 @@ +Speed up responding with large JSON objects to requests. From d071f7637df824f7be6e4eb1290a76a1a7225da9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 28 Sep 2021 10:00:40 +0100 Subject: [PATCH 4/5] s/_write_json_bytes_to_request/_write_bytes_to_request --- synapse/http/server.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/synapse/http/server.py b/synapse/http/server.py index 0930f645fd12..c442aa7ed445 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -707,29 +707,29 @@ def respond_with_json_bytes( if send_cors: set_cors_headers(request) - _write_json_bytes_to_request(request, json_bytes) + _write_bytes_to_request(request, json_bytes) return NOT_DONE_YET -def _write_json_bytes_to_request(request: Request, json_bytes: bytes) -> None: - """Writes the JSON bytes to the request using an appropriate producer. +def _write_bytes_to_request(request: Request, bytes_to_write: bytes) -> None: + """Writes the bytes to the request using an appropriate producer. Note: This should be used instead of `Request.write` to correctly handle large response bodies. """ - # The problem with dumping all of the json response into the `Request` - # object at once (via `Request.write`) is that doing so starts the timeout - # for the next request to be received: so if it takes longer than 60s to - # stream back the response to the client, the client never gets it. + # The problem with dumping all of the response into the `Request` object at + # once (via `Request.write`) is that doing so starts the timeout for the + # next request to be received: so if it takes longer than 60s to stream back + # the response to the client, the client never gets it. # # The correct solution is to use a Producer; then the timeout is only # started once all of the content is sent over the TCP connection. - # To make sure we don't write the whole of the json at once we split it up - # into chunks. + # To make sure we don't write all of the bytes at once we split it up into + # chunks. chunk_size = 4096 - bytes_generator = chunk_seq(json_bytes, chunk_size) + bytes_generator = chunk_seq(bytes_to_write, chunk_size) # We use a `_ByteProducer` here rather than `NoRangeStaticProducer` as the # unit tests can't cope with being given a pull producer. @@ -850,4 +850,4 @@ async def _async_write_json_to_request_in_thread( json_str = await defer_to_thread(request.reactor, json_encoder, json_object) - _write_json_bytes_to_request(request, json_str) + _write_bytes_to_request(request, json_str) From 5d1f9cb15d9319f943cbb270e325971c56b52fd0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 28 Sep 2021 10:14:03 +0100 Subject: [PATCH 5/5] Shuffle '_async_write_json_to_request_in_thread' up the file --- synapse/http/server.py | 42 +++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/synapse/http/server.py b/synapse/http/server.py index c442aa7ed445..1a50305dcfdc 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -711,6 +711,27 @@ def respond_with_json_bytes( return NOT_DONE_YET +async def _async_write_json_to_request_in_thread( + request: SynapseRequest, + json_encoder: Callable[[Any], bytes], + json_object: Any, +): + """Encodes the given JSON object on a thread and then writes it to the + request. + + This is done so that encoding large JSON objects doesn't block the reactor + thread. + + Note: We don't use JsonEncoder.iterencode here as that falls back to the + Python implementation (rather than the C backend), which is *much* more + expensive. + """ + + json_str = await defer_to_thread(request.reactor, json_encoder, json_object) + + _write_bytes_to_request(request, json_str) + + def _write_bytes_to_request(request: Request, bytes_to_write: bytes) -> None: """Writes the bytes to the request using an appropriate producer. @@ -830,24 +851,3 @@ def finish_request(request: Request): request.finish() except RuntimeError as e: logger.info("Connection disconnected before response was written: %r", e) - - -async def _async_write_json_to_request_in_thread( - request: SynapseRequest, - json_encoder: Callable[[Any], bytes], - json_object: Any, -): - """Encodes the given JSON object on a thread and then writes it to the - request. - - This is done so that encoding large JSON objects doesn't block the reactor - thread. - - Note: We don't use JsonEncoder.iterencode here as that falls back to the - Python implementation (rather than the C backend), which is *much* more - expensive. - """ - - json_str = await defer_to_thread(request.reactor, json_encoder, json_object) - - _write_bytes_to_request(request, json_str)