diff --git a/changelog.d/10844.feature b/changelog.d/10844.feature new file mode 100644 index 000000000000..07e7b2c6a75e --- /dev/null +++ b/changelog.d/10844.feature @@ -0,0 +1 @@ +Speed up responding with large JSON objects to requests. diff --git a/synapse/http/server.py b/synapse/http/server.py index b79fa722e931..0930f645fd12 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -21,7 +21,6 @@ import urllib from http import HTTPStatus from inspect import isawaitable -from io import BytesIO from typing import ( Any, Awaitable, @@ -37,7 +36,7 @@ ) import jinja2 -from canonicaljson import iterencode_canonical_json +from canonicaljson import encode_canonical_json from typing_extensions import Protocol from zope.interface import implementer @@ -45,7 +44,7 @@ from twisted.python import failure from twisted.web import resource from twisted.web.server import NOT_DONE_YET, Request -from twisted.web.static import File, NoRangeStaticProducer +from twisted.web.static import File from twisted.web.util import redirectTo from synapse.api.errors import ( @@ -56,10 +55,11 @@ UnrecognizedRequestError, ) from synapse.http.site import SynapseRequest -from synapse.logging.context import preserve_fn +from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background from synapse.logging.opentracing import trace_servlet from synapse.util import json_encoder from synapse.util.caches import intern_dict +from synapse.util.iterutils import chunk_seq logger = logging.getLogger(__name__) @@ -320,7 +320,7 @@ def __init__(self, canonical_json=False, extract_context=False): def _send_response( self, - request: Request, + request: SynapseRequest, code: int, response_object: Any, ): @@ -620,16 +620,15 @@ def stopProducing(self) -> None: self._request = None -def _encode_json_bytes(json_object: Any) -> Iterator[bytes]: +def _encode_json_bytes(json_object: Any) -> bytes: """ Encode an object into JSON. Returns an iterator of bytes. """ - for chunk in json_encoder.iterencode(json_object): - yield chunk.encode("utf-8") + return json_encoder.encode(json_object).encode("utf-8") def respond_with_json( - request: Request, + request: SynapseRequest, code: int, json_object: Any, send_cors: bool = False, @@ -659,7 +658,7 @@ def respond_with_json( return None if canonical_json: - encoder = iterencode_canonical_json + encoder = encode_canonical_json else: encoder = _encode_json_bytes @@ -670,7 +669,9 @@ def respond_with_json( if send_cors: set_cors_headers(request) - _ByteProducer(request, encoder(json_object)) + run_in_background( + _async_write_json_to_request_in_thread, request, encoder, json_object + ) return NOT_DONE_YET @@ -706,15 +707,35 @@ def respond_with_json_bytes( if send_cors: set_cors_headers(request) - # note that this is zero-copy (the bytesio shares a copy-on-write buffer with - # the original `bytes`). - bytes_io = BytesIO(json_bytes) - - producer = NoRangeStaticProducer(request, bytes_io) - producer.start() + _write_json_bytes_to_request(request, json_bytes) return NOT_DONE_YET +def _write_json_bytes_to_request(request: Request, json_bytes: bytes) -> None: + """Writes the JSON bytes to the request using an appropriate producer. + + Note: This should be used instead of `Request.write` to correctly handle + large response bodies. + """ + + # The problem with dumping all of the json response into the `Request` + # object at once (via `Request.write`) is that doing so starts the timeout + # for the next request to be received: so if it takes longer than 60s to + # stream back the response to the client, the client never gets it. + # + # The correct solution is to use a Producer; then the timeout is only + # started once all of the content is sent over the TCP connection. + + # To make sure we don't write the whole of the json at once we split it up + # into chunks. + chunk_size = 4096 + bytes_generator = chunk_seq(json_bytes, chunk_size) + + # We use a `_ByteProducer` here rather than `NoRangeStaticProducer` as the + # unit tests can't cope with being given a pull producer. + _ByteProducer(request, bytes_generator) + + def set_cors_headers(request: Request): """Set the CORS headers so that javascript running in a web browsers can use this API @@ -809,3 +830,24 @@ def finish_request(request: Request): request.finish() except RuntimeError as e: logger.info("Connection disconnected before response was written: %r", e) + + +async def _async_write_json_to_request_in_thread( + request: SynapseRequest, + json_encoder: Callable[[Any], bytes], + json_object: Any, +): + """Encodes the given JSON object on a thread and then writes it to the + request. + + This is done so that encoding large JSON objects doesn't block the reactor + thread. + + Note: We don't use JsonEncoder.iterencode here as that falls back to the + Python implementation (rather than the C backend), which is *much* more + expensive. + """ + + json_str = await defer_to_thread(request.reactor, json_encoder, json_object) + + _write_json_bytes_to_request(request, json_str) diff --git a/synapse/http/site.py b/synapse/http/site.py index c665a9d5db86..a22cd0a0b79a 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -14,14 +14,15 @@ import contextlib import logging import time -from typing import Optional, Tuple, Union +from typing import Generator, Optional, Tuple, Union import attr from zope.interface import implementer from twisted.internet.interfaces import IAddress, IReactorTime from twisted.python.failure import Failure -from twisted.web.resource import IResource +from twisted.web.http import HTTPChannel +from twisted.web.resource import IResource, Resource from twisted.web.server import Request, Site from synapse.config.server import ListenerConfig @@ -61,10 +62,18 @@ class SynapseRequest(Request): logcontext: the log context for this request """ - def __init__(self, channel, *args, max_request_body_size=1024, **kw): - Request.__init__(self, channel, *args, **kw) + def __init__( + self, + channel: HTTPChannel, + site: "SynapseSite", + *args, + max_request_body_size: int = 1024, + **kw, + ): + super().__init__(channel, *args, **kw) self._max_request_body_size = max_request_body_size - self.site: SynapseSite = channel.site + self.synapse_site = site + self.reactor = site.reactor self._channel = channel # this is used by the tests self.start_time = 0.0 @@ -83,13 +92,13 @@ def __init__(self, channel, *args, max_request_body_size=1024, **kw): self._is_processing = False # the time when the asynchronous request handler completed its processing - self._processing_finished_time = None + self._processing_finished_time: Optional[float] = None # what time we finished sending the response to the client (or the connection # dropped) - self.finish_time = None + self.finish_time: Optional[float] = None - def __repr__(self): + def __repr__(self) -> str: # We overwrite this so that we don't log ``access_token`` return "<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>" % ( self.__class__.__name__, @@ -97,10 +106,10 @@ def __repr__(self): self.get_method(), self.get_redacted_uri(), self.clientproto.decode("ascii", errors="replace"), - self.site.site_tag, + self.synapse_site.site_tag, ) - def handleContentChunk(self, data): + def handleContentChunk(self, data: bytes) -> None: # we should have a `content` by now. assert self.content, "handleContentChunk() called before gotLength()" if self.content.tell() + len(data) > self._max_request_body_size: @@ -139,7 +148,7 @@ def requester(self, value: Union[Requester, str]) -> None: # If there's no authenticated entity, it was the requester. self.logcontext.request.authenticated_entity = authenticated_entity or requester - def get_request_id(self): + def get_request_id(self) -> str: return "%s-%i" % (self.get_method(), self.request_seq) def get_redacted_uri(self) -> str: @@ -205,7 +214,7 @@ def get_authenticated_entity(self) -> Tuple[Optional[str], Optional[str]]: return None, None - def render(self, resrc): + def render(self, resrc: Resource) -> None: # this is called once a Resource has been found to serve the request; in our # case the Resource in question will normally be a JsonResource. @@ -216,7 +225,7 @@ def render(self, resrc): request=ContextRequest( request_id=request_id, ip_address=self.getClientIP(), - site_tag=self.site.site_tag, + site_tag=self.synapse_site.site_tag, # The requester is going to be unknown at this point. requester=None, authenticated_entity=None, @@ -228,7 +237,7 @@ def render(self, resrc): ) # override the Server header which is set by twisted - self.setHeader("Server", self.site.server_version_string) + self.setHeader("Server", self.synapse_site.server_version_string) with PreserveLoggingContext(self.logcontext): # we start the request metrics timer here with an initial stab @@ -247,7 +256,7 @@ def render(self, resrc): requests_counter.labels(self.get_method(), self.request_metrics.name).inc() @contextlib.contextmanager - def processing(self): + def processing(self) -> Generator[None, None, None]: """Record the fact that we are processing this request. Returns a context manager; the correct way to use this is: @@ -282,7 +291,7 @@ async def handle_request(request): if self.finish_time is not None: self._finished_processing() - def finish(self): + def finish(self) -> None: """Called when all response data has been written to this Request. Overrides twisted.web.server.Request.finish to record the finish time and do @@ -295,7 +304,7 @@ def finish(self): with PreserveLoggingContext(self.logcontext): self._finished_processing() - def connectionLost(self, reason): + def connectionLost(self, reason: Union[Failure, Exception]) -> None: """Called when the client connection is closed before the response is written. Overrides twisted.web.server.Request.connectionLost to record the finish time and @@ -327,7 +336,7 @@ def connectionLost(self, reason): if not self._is_processing: self._finished_processing() - def _started_processing(self, servlet_name): + def _started_processing(self, servlet_name: str) -> None: """Record the fact that we are processing this request. This will log the request's arrival. Once the request completes, @@ -346,17 +355,19 @@ def _started_processing(self, servlet_name): self.start_time, name=servlet_name, method=self.get_method() ) - self.site.access_logger.debug( + self.synapse_site.access_logger.debug( "%s - %s - Received request: %s %s", self.getClientIP(), - self.site.site_tag, + self.synapse_site.site_tag, self.get_method(), self.get_redacted_uri(), ) - def _finished_processing(self): + def _finished_processing(self) -> None: """Log the completion of this request and update the metrics""" assert self.logcontext is not None + assert self.finish_time is not None + usage = self.logcontext.get_resource_usage() if self._processing_finished_time is None: @@ -386,13 +397,13 @@ def _finished_processing(self): if authenticated_entity: requester = f"{authenticated_entity}|{requester}" - self.site.access_logger.log( + self.synapse_site.access_logger.log( log_level, "%s - %s - {%s}" " Processed request: %.3fsec/%.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)" ' %sB %s "%s %s %s" "%s" [%d dbevts]', self.getClientIP(), - self.site.site_tag, + self.synapse_site.site_tag, requester, processing_time, response_send_time, @@ -437,7 +448,7 @@ class XForwardedForRequest(SynapseRequest): _forwarded_for: "Optional[_XForwardedForAddress]" = None _forwarded_https: bool = False - def requestReceived(self, command, path, version): + def requestReceived(self, command: bytes, path: bytes, version: bytes) -> None: # this method is called by the Channel once the full request has been # received, to dispatch the request to a resource. # We can use it to set the IP address and protocol according to the @@ -445,7 +456,7 @@ def requestReceived(self, command, path, version): self._process_forwarded_headers() return super().requestReceived(command, path, version) - def _process_forwarded_headers(self): + def _process_forwarded_headers(self) -> None: headers = self.requestHeaders.getRawHeaders(b"x-forwarded-for") if not headers: return @@ -470,7 +481,7 @@ def _process_forwarded_headers(self): ) self._forwarded_https = True - def isSecure(self): + def isSecure(self) -> bool: if self._forwarded_https: return True return super().isSecure() @@ -520,7 +531,7 @@ def __init__( site_tag: str, config: ListenerConfig, resource: IResource, - server_version_string, + server_version_string: str, max_request_body_size: int, reactor: IReactorTime, ): @@ -540,19 +551,23 @@ def __init__( Site.__init__(self, resource, reactor=reactor) self.site_tag = site_tag + self.reactor = reactor assert config.http_options is not None proxied = config.http_options.x_forwarded request_class = XForwardedForRequest if proxied else SynapseRequest - def request_factory(channel, queued) -> Request: + def request_factory(channel: HTTPChannel, queued: bool) -> Request: return request_class( - channel, max_request_body_size=max_request_body_size, queued=queued + channel, + self, + max_request_body_size=max_request_body_size, + queued=queued, ) self.requestFactory = request_factory # type: ignore self.access_logger = logging.getLogger(logger_name) self.server_version_string = server_version_string.encode("ascii") - def log(self, request): + def log(self, request: SynapseRequest) -> None: pass diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index e08e125cb8a5..cf5abdfbda49 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -184,7 +184,7 @@ async def _unsafe_process(self) -> None: should_notify_at = max(notif_ready_at, room_ready_at) - if should_notify_at < self.clock.time_msec(): + if should_notify_at <= self.clock.time_msec(): # one of our notifications is ready for sending, so we send # *one* email updating the user on their notifications, # we then consider all previously outstanding notifications diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py index d8fd7938a468..057ddc23dbfe 100644 --- a/synapse/rest/key/v2/remote_key_resource.py +++ b/synapse/rest/key/v2/remote_key_resource.py @@ -17,12 +17,11 @@ from signedjson.sign import sign_json -from twisted.web.server import Request - from synapse.api.errors import Codes, SynapseError from synapse.crypto.keyring import ServerKeyFetcher from synapse.http.server import DirectServeJsonResource, respond_with_json from synapse.http.servlet import parse_integer, parse_json_object_from_request +from synapse.http.site import SynapseRequest from synapse.types import JsonDict from synapse.util import json_decoder from synapse.util.async_helpers import yieldable_gather_results @@ -100,7 +99,7 @@ def __init__(self, hs: "HomeServer"): self.federation_domain_whitelist = hs.config.federation_domain_whitelist self.config = hs.config - async def _async_render_GET(self, request: Request) -> None: + async def _async_render_GET(self, request: SynapseRequest) -> None: assert request.postpath is not None if len(request.postpath) == 1: (server,) = request.postpath @@ -117,7 +116,7 @@ async def _async_render_GET(self, request: Request) -> None: await self.query_keys(request, query, query_remote_on_cache_miss=True) - async def _async_render_POST(self, request: Request) -> None: + async def _async_render_POST(self, request: SynapseRequest) -> None: content = parse_json_object_from_request(request) query = content["server_keys"] @@ -126,7 +125,7 @@ async def _async_render_POST(self, request: Request) -> None: async def query_keys( self, - request: Request, + request: SynapseRequest, query: JsonDict, query_remote_on_cache_miss: bool = False, ) -> None: diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py index 7c881f2bdb18..014fa893d6c8 100644 --- a/synapse/rest/media/v1/_base.py +++ b/synapse/rest/media/v1/_base.py @@ -27,6 +27,7 @@ from synapse.api.errors import Codes, SynapseError, cs_error from synapse.http.server import finish_request, respond_with_json +from synapse.http.site import SynapseRequest from synapse.logging.context import make_deferred_yieldable from synapse.util.stringutils import is_ascii @@ -74,7 +75,7 @@ def parse_media_id(request: Request) -> Tuple[str, str, Optional[str]]: ) -def respond_404(request: Request) -> None: +def respond_404(request: SynapseRequest) -> None: respond_with_json( request, 404, @@ -84,7 +85,7 @@ def respond_404(request: Request) -> None: async def respond_with_file( - request: Request, + request: SynapseRequest, media_type: str, file_path: str, file_size: Optional[int] = None, @@ -221,7 +222,7 @@ def _can_encode_filename_as_token(x: str) -> bool: async def respond_with_responder( - request: Request, + request: SynapseRequest, responder: "Optional[Responder]", media_type: str, file_size: Optional[int], diff --git a/synapse/rest/media/v1/config_resource.py b/synapse/rest/media/v1/config_resource.py index a1d36e5cf189..712d4e83681a 100644 --- a/synapse/rest/media/v1/config_resource.py +++ b/synapse/rest/media/v1/config_resource.py @@ -16,8 +16,6 @@ from typing import TYPE_CHECKING -from twisted.web.server import Request - from synapse.http.server import DirectServeJsonResource, respond_with_json from synapse.http.site import SynapseRequest @@ -39,5 +37,5 @@ async def _async_render_GET(self, request: SynapseRequest) -> None: await self.auth.get_user_by_req(request) respond_with_json(request, 200, self.limits_dict, send_cors=True) - async def _async_render_OPTIONS(self, request: Request) -> None: + async def _async_render_OPTIONS(self, request: SynapseRequest) -> None: respond_with_json(request, 200, {}, send_cors=True) diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py index d6d938953e44..6180fa575ebc 100644 --- a/synapse/rest/media/v1/download_resource.py +++ b/synapse/rest/media/v1/download_resource.py @@ -15,10 +15,9 @@ import logging from typing import TYPE_CHECKING -from twisted.web.server import Request - from synapse.http.server import DirectServeJsonResource, set_cors_headers from synapse.http.servlet import parse_boolean +from synapse.http.site import SynapseRequest from ._base import parse_media_id, respond_404 @@ -37,7 +36,7 @@ def __init__(self, hs: "HomeServer", media_repo: "MediaRepository"): self.media_repo = media_repo self.server_name = hs.hostname - async def _async_render_GET(self, request: Request) -> None: + async def _async_render_GET(self, request: SynapseRequest) -> None: set_cors_headers(request) request.setHeader( b"Content-Security-Policy", diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 50e4c9e29f26..28fa3c72e4d9 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -23,7 +23,6 @@ import twisted.web.http from twisted.internet.defer import Deferred from twisted.web.resource import Resource -from twisted.web.server import Request from synapse.api.errors import ( FederationDeniedError, @@ -34,6 +33,7 @@ ) from synapse.config._base import ConfigError from synapse.config.repository import ThumbnailRequirement +from synapse.http.site import SynapseRequest from synapse.logging.context import defer_to_thread from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import UserID @@ -187,7 +187,7 @@ async def create_content( return "mxc://%s/%s" % (self.server_name, media_id) async def get_local_media( - self, request: Request, media_id: str, name: Optional[str] + self, request: SynapseRequest, media_id: str, name: Optional[str] ) -> None: """Responds to requests for local media, if exists, or returns 404. @@ -221,7 +221,11 @@ async def get_local_media( ) async def get_remote_media( - self, request: Request, server_name: str, media_id: str, name: Optional[str] + self, + request: SynapseRequest, + server_name: str, + media_id: str, + name: Optional[str], ) -> None: """Respond to requests for remote media. diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index fe0627d9b0e6..16f16b482092 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -29,7 +29,6 @@ from twisted.internet.defer import Deferred from twisted.internet.error import DNSLookupError -from twisted.web.server import Request from synapse.api.errors import Codes, SynapseError from synapse.http.client import SimpleHttpClient @@ -167,7 +166,7 @@ def __init__( self._start_expire_url_cache_data, 10 * 1000 ) - async def _async_render_OPTIONS(self, request: Request) -> None: + async def _async_render_OPTIONS(self, request: SynapseRequest) -> None: request.setHeader(b"Allow", b"OPTIONS, GET") respond_with_json(request, 200, {}, send_cors=True) diff --git a/synapse/rest/media/v1/thumbnail_resource.py b/synapse/rest/media/v1/thumbnail_resource.py index 22f43d85310b..cb2f88676ec1 100644 --- a/synapse/rest/media/v1/thumbnail_resource.py +++ b/synapse/rest/media/v1/thumbnail_resource.py @@ -17,11 +17,10 @@ import logging from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple -from twisted.web.server import Request - from synapse.api.errors import SynapseError from synapse.http.server import DirectServeJsonResource, set_cors_headers from synapse.http.servlet import parse_integer, parse_string +from synapse.http.site import SynapseRequest from synapse.rest.media.v1.media_storage import MediaStorage from ._base import ( @@ -57,7 +56,7 @@ def __init__( self.dynamic_thumbnails = hs.config.dynamic_thumbnails self.server_name = hs.hostname - async def _async_render_GET(self, request: Request) -> None: + async def _async_render_GET(self, request: SynapseRequest) -> None: set_cors_headers(request) server_name, media_id, _ = parse_media_id(request) width = parse_integer(request, "width", required=True) @@ -88,7 +87,7 @@ async def _async_render_GET(self, request: Request) -> None: async def _respond_local_thumbnail( self, - request: Request, + request: SynapseRequest, media_id: str, width: int, height: int, @@ -121,7 +120,7 @@ async def _respond_local_thumbnail( async def _select_or_generate_local_thumbnail( self, - request: Request, + request: SynapseRequest, media_id: str, desired_width: int, desired_height: int, @@ -186,7 +185,7 @@ async def _select_or_generate_local_thumbnail( async def _select_or_generate_remote_thumbnail( self, - request: Request, + request: SynapseRequest, server_name: str, media_id: str, desired_width: int, @@ -249,7 +248,7 @@ async def _select_or_generate_remote_thumbnail( async def _respond_remote_thumbnail( self, - request: Request, + request: SynapseRequest, server_name: str, media_id: str, width: int, @@ -280,7 +279,7 @@ async def _respond_remote_thumbnail( async def _select_and_respond_with_thumbnail( self, - request: Request, + request: SynapseRequest, desired_width: int, desired_height: int, desired_method: str, diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py index 146adca8f1bf..39b29318bbe8 100644 --- a/synapse/rest/media/v1/upload_resource.py +++ b/synapse/rest/media/v1/upload_resource.py @@ -16,8 +16,6 @@ import logging from typing import IO, TYPE_CHECKING, Dict, List, Optional -from twisted.web.server import Request - from synapse.api.errors import Codes, SynapseError from synapse.http.server import DirectServeJsonResource, respond_with_json from synapse.http.servlet import parse_bytes_from_args @@ -46,7 +44,7 @@ def __init__(self, hs: "HomeServer", media_repo: "MediaRepository"): self.max_upload_size = hs.config.max_upload_size self.clock = hs.get_clock() - async def _async_render_OPTIONS(self, request: Request) -> None: + async def _async_render_OPTIONS(self, request: SynapseRequest) -> None: respond_with_json(request, 200, {}, send_cors=True) async def _async_render_POST(self, request: SynapseRequest) -> None: diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py index 8ac3eab2f54f..4938ddf70321 100644 --- a/synapse/util/iterutils.py +++ b/synapse/util/iterutils.py @@ -21,13 +21,28 @@ Iterable, Iterator, Mapping, - Sequence, Set, + Sized, Tuple, TypeVar, ) +from typing_extensions import Protocol + T = TypeVar("T") +S = TypeVar("S", bound="_SelfSlice") + + +class _SelfSlice(Sized, Protocol): + """A helper protocol that matches types where taking a slice results in the + same type being returned. + + This is more specific than `Sequence`, which allows another `Sequence` to be + returned. + """ + + def __getitem__(self: S, i: slice) -> S: + ... def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]: @@ -46,7 +61,7 @@ def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]: return iter(lambda: tuple(islice(sourceiter, size)), ()) -def chunk_seq(iseq: Sequence[T], maxlen: int) -> Iterable[Sequence[T]]: +def chunk_seq(iseq: S, maxlen: int) -> Iterator[S]: """Split the given sequence into chunks of the given size The last chunk may be shorter than the given size. diff --git a/tests/http/test_additional_resource.py b/tests/http/test_additional_resource.py index 768c2ba4ead3..391196425c38 100644 --- a/tests/http/test_additional_resource.py +++ b/tests/http/test_additional_resource.py @@ -45,7 +45,9 @@ def test_async(self): handler = _AsyncTestCustomEndpoint({}, None).handle_request resource = AdditionalResource(self.hs, handler) - channel = make_request(self.reactor, FakeSite(resource), "GET", "/") + channel = make_request( + self.reactor, FakeSite(resource, self.reactor), "GET", "/" + ) self.assertEqual(channel.code, 200) self.assertEqual(channel.json_body, {"some_key": "some_value_async"}) @@ -54,7 +56,9 @@ def test_sync(self): handler = _SyncTestCustomEndpoint({}, None).handle_request resource = AdditionalResource(self.hs, handler) - channel = make_request(self.reactor, FakeSite(resource), "GET", "/") + channel = make_request( + self.reactor, FakeSite(resource, self.reactor), "GET", "/" + ) self.assertEqual(channel.code, 200) self.assertEqual(channel.json_body, {"some_key": "some_value_sync"}) diff --git a/tests/logging/test_terse_json.py b/tests/logging/test_terse_json.py index 116071692976..f73fcd684e0e 100644 --- a/tests/logging/test_terse_json.py +++ b/tests/logging/test_terse_json.py @@ -152,7 +152,8 @@ def test_with_request_context(self): site = Mock(spec=["site_tag", "server_version_string", "getResourceFor"]) site.site_tag = "test-site" site.server_version_string = "Server v1" - request = SynapseRequest(FakeChannel(site, None)) + site.reactor = Mock() + request = SynapseRequest(FakeChannel(site, None), site) # Call requestReceived to finish instantiating the object. request.content = BytesIO() # Partially skip some of the internal processing of SynapseRequest. diff --git a/tests/replication/test_multi_media_repo.py b/tests/replication/test_multi_media_repo.py index 01b1b0d4a002..13aa5eb51aa5 100644 --- a/tests/replication/test_multi_media_repo.py +++ b/tests/replication/test_multi_media_repo.py @@ -68,7 +68,7 @@ def _get_media_req( resource = hs.get_media_repository_resource().children[b"download"] channel = make_request( self.reactor, - FakeSite(resource), + FakeSite(resource, self.reactor), "GET", f"/{target}/{media_id}", shorthand=False, diff --git a/tests/rest/admin/test_admin.py b/tests/rest/admin/test_admin.py index febd40b65609..192073c5203f 100644 --- a/tests/rest/admin/test_admin.py +++ b/tests/rest/admin/test_admin.py @@ -201,7 +201,7 @@ def _ensure_quarantined(self, admin_user_tok, server_and_media_id): """Ensure a piece of media is quarantined when trying to access it.""" channel = make_request( self.reactor, - FakeSite(self.download_resource), + FakeSite(self.download_resource, self.reactor), "GET", server_and_media_id, shorthand=False, @@ -271,7 +271,7 @@ def test_quarantine_media_by_id(self): # Attempt to access the media channel = make_request( self.reactor, - FakeSite(self.download_resource), + FakeSite(self.download_resource, self.reactor), "GET", server_name_and_media_id, shorthand=False, @@ -458,7 +458,7 @@ def test_cannot_quarantine_safe_media(self): # Attempt to access each piece of media channel = make_request( self.reactor, - FakeSite(self.download_resource), + FakeSite(self.download_resource, self.reactor), "GET", server_and_media_id_2, shorthand=False, diff --git a/tests/rest/admin/test_media.py b/tests/rest/admin/test_media.py index 2f02934e72b1..f8138660736b 100644 --- a/tests/rest/admin/test_media.py +++ b/tests/rest/admin/test_media.py @@ -125,7 +125,7 @@ def test_delete_media(self): # Attempt to access media channel = make_request( self.reactor, - FakeSite(download_resource), + FakeSite(download_resource, self.reactor), "GET", server_and_media_id, shorthand=False, @@ -164,7 +164,7 @@ def test_delete_media(self): # Attempt to access media channel = make_request( self.reactor, - FakeSite(download_resource), + FakeSite(download_resource, self.reactor), "GET", server_and_media_id, shorthand=False, @@ -525,7 +525,7 @@ def _access_media(self, server_and_media_id, expect_success=True): channel = make_request( self.reactor, - FakeSite(download_resource), + FakeSite(download_resource, self.reactor), "GET", server_and_media_id, shorthand=False, diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index cc3f16c62ae5..e79e0e1850b8 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -2973,7 +2973,7 @@ def _create_media_and_access( # Try to access a media and to create `last_access_ts` channel = make_request( self.reactor, - FakeSite(download_resource), + FakeSite(download_resource, self.reactor), "GET", server_and_media_id, shorthand=False, diff --git a/tests/rest/client/test_account.py b/tests/rest/client/test_account.py index b946fca8b367..9e9e953cf4b2 100644 --- a/tests/rest/client/test_account.py +++ b/tests/rest/client/test_account.py @@ -312,7 +312,7 @@ def _validate_token(self, link): # Load the password reset confirmation page channel = make_request( self.reactor, - FakeSite(self.submit_token_resource), + FakeSite(self.submit_token_resource, self.reactor), "GET", path, shorthand=False, @@ -326,7 +326,7 @@ def _validate_token(self, link): # Confirm the password reset channel = make_request( self.reactor, - FakeSite(self.submit_token_resource), + FakeSite(self.submit_token_resource, self.reactor), "POST", path, content=b"", diff --git a/tests/rest/client/test_consent.py b/tests/rest/client/test_consent.py index 65c58ce70a84..84d092ca8242 100644 --- a/tests/rest/client/test_consent.py +++ b/tests/rest/client/test_consent.py @@ -61,7 +61,11 @@ def test_render_public_consent(self): """You can observe the terms form without specifying a user""" resource = consent_resource.ConsentResource(self.hs) channel = make_request( - self.reactor, FakeSite(resource), "GET", "/consent?v=1", shorthand=False + self.reactor, + FakeSite(resource, self.reactor), + "GET", + "/consent?v=1", + shorthand=False, ) self.assertEqual(channel.code, 200) @@ -83,7 +87,7 @@ def test_accept_consent(self): ) channel = make_request( self.reactor, - FakeSite(resource), + FakeSite(resource, self.reactor), "GET", consent_uri, access_token=access_token, @@ -98,7 +102,7 @@ def test_accept_consent(self): # POST to the consent page, saying we've agreed channel = make_request( self.reactor, - FakeSite(resource), + FakeSite(resource, self.reactor), "POST", consent_uri + "&v=" + version, access_token=access_token, @@ -110,7 +114,7 @@ def test_accept_consent(self): # changed channel = make_request( self.reactor, - FakeSite(resource), + FakeSite(resource, self.reactor), "GET", consent_uri, access_token=access_token, diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py index 954ad1a1fdef..e55936d9c3bd 100644 --- a/tests/rest/client/utils.py +++ b/tests/rest/client/utils.py @@ -372,7 +372,7 @@ def upload_media( path = "/_matrix/media/r0/upload?filename=%s" % (filename,) channel = make_request( self.hs.get_reactor(), - FakeSite(resource), + FakeSite(resource, self.hs.get_reactor()), "POST", path, content=image_data, diff --git a/tests/rest/key/v2/test_remote_key_resource.py b/tests/rest/key/v2/test_remote_key_resource.py index a75c0ea3f04a..4672a6859684 100644 --- a/tests/rest/key/v2/test_remote_key_resource.py +++ b/tests/rest/key/v2/test_remote_key_resource.py @@ -84,7 +84,7 @@ def make_notary_request(self, server_name: str, key_id: str) -> dict: Checks that the response is a 200 and returns the decoded json body. """ channel = FakeChannel(self.site, self.reactor) - req = SynapseRequest(channel) + req = SynapseRequest(channel, self.site) req.content = BytesIO(b"") req.requestReceived( b"GET", @@ -183,7 +183,7 @@ async def post_json(destination, path, data): ) channel = FakeChannel(self.site, self.reactor) - req = SynapseRequest(channel) + req = SynapseRequest(channel, self.site) req.content = BytesIO(encode_canonical_json(data)) req.requestReceived( diff --git a/tests/rest/media/v1/test_media_storage.py b/tests/rest/media/v1/test_media_storage.py index 9ea1c2bf25c1..44a643d50613 100644 --- a/tests/rest/media/v1/test_media_storage.py +++ b/tests/rest/media/v1/test_media_storage.py @@ -252,7 +252,7 @@ def _req(self, content_disposition): channel = make_request( self.reactor, - FakeSite(self.download_resource), + FakeSite(self.download_resource, self.reactor), "GET", self.media_id, shorthand=False, @@ -384,7 +384,7 @@ def test_thumbnail_repeated_thumbnail(self): params = "?width=32&height=32&method=scale" channel = make_request( self.reactor, - FakeSite(self.thumbnail_resource), + FakeSite(self.thumbnail_resource, self.reactor), "GET", self.media_id + params, shorthand=False, @@ -413,7 +413,7 @@ def test_thumbnail_repeated_thumbnail(self): channel = make_request( self.reactor, - FakeSite(self.thumbnail_resource), + FakeSite(self.thumbnail_resource, self.reactor), "GET", self.media_id + params, shorthand=False, @@ -433,7 +433,7 @@ def _test_thumbnail(self, method, expected_body, expected_found): params = "?width=32&height=32&method=" + method channel = make_request( self.reactor, - FakeSite(self.thumbnail_resource), + FakeSite(self.thumbnail_resource, self.reactor), "GET", self.media_id + params, shorthand=False, diff --git a/tests/server.py b/tests/server.py index b861c7b866f8..88dfa8058e62 100644 --- a/tests/server.py +++ b/tests/server.py @@ -19,6 +19,7 @@ IPullProducer, IPushProducer, IReactorPluggableNameResolver, + IReactorTime, IResolverSimple, ITransport, ) @@ -181,13 +182,14 @@ class FakeSite: site_tag = "test" access_logger = logging.getLogger("synapse.access.http.fake") - def __init__(self, resource: IResource): + def __init__(self, resource: IResource, reactor: IReactorTime): """ Args: resource: the resource to be used for rendering all requests """ self._resource = resource + self.reactor = reactor def getResourceFor(self, request): return self._resource @@ -268,7 +270,7 @@ def make_request( channel = FakeChannel(site, reactor, ip=client_ip) - req = request(channel) + req = request(channel, site) req.content = BytesIO(content) # Twisted expects to be at the end of the content when parsing the request. req.content.seek(SEEK_END) diff --git a/tests/test_server.py b/tests/test_server.py index 407e172e41de..f2ffbc895b88 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -65,7 +65,10 @@ def _callback(request, **kwargs): ) make_request( - self.reactor, FakeSite(res), b"GET", b"/_matrix/foo/%E2%98%83?a=%E2%98%83" + self.reactor, + FakeSite(res, self.reactor), + b"GET", + b"/_matrix/foo/%E2%98%83?a=%E2%98%83", ) self.assertEqual(got_kwargs, {"room_id": "\N{SNOWMAN}"}) @@ -84,7 +87,9 @@ def _callback(request, **kwargs): "GET", [re.compile("^/_matrix/foo$")], _callback, "test_servlet" ) - channel = make_request(self.reactor, FakeSite(res), b"GET", b"/_matrix/foo") + channel = make_request( + self.reactor, FakeSite(res, self.reactor), b"GET", b"/_matrix/foo" + ) self.assertEqual(channel.result["code"], b"500") @@ -100,7 +105,7 @@ def _throw(*args): def _callback(request, **kwargs): d = Deferred() d.addCallback(_throw) - self.reactor.callLater(1, d.callback, True) + self.reactor.callLater(0.5, d.callback, True) return make_deferred_yieldable(d) res = JsonResource(self.homeserver) @@ -108,7 +113,9 @@ def _callback(request, **kwargs): "GET", [re.compile("^/_matrix/foo$")], _callback, "test_servlet" ) - channel = make_request(self.reactor, FakeSite(res), b"GET", b"/_matrix/foo") + channel = make_request( + self.reactor, FakeSite(res, self.reactor), b"GET", b"/_matrix/foo" + ) self.assertEqual(channel.result["code"], b"500") @@ -126,7 +133,9 @@ def _callback(request, **kwargs): "GET", [re.compile("^/_matrix/foo$")], _callback, "test_servlet" ) - channel = make_request(self.reactor, FakeSite(res), b"GET", b"/_matrix/foo") + channel = make_request( + self.reactor, FakeSite(res, self.reactor), b"GET", b"/_matrix/foo" + ) self.assertEqual(channel.result["code"], b"403") self.assertEqual(channel.json_body["error"], "Forbidden!!one!") @@ -148,7 +157,9 @@ def _callback(request, **kwargs): "GET", [re.compile("^/_matrix/foo$")], _callback, "test_servlet" ) - channel = make_request(self.reactor, FakeSite(res), b"GET", b"/_matrix/foobar") + channel = make_request( + self.reactor, FakeSite(res, self.reactor), b"GET", b"/_matrix/foobar" + ) self.assertEqual(channel.result["code"], b"400") self.assertEqual(channel.json_body["error"], "Unrecognized request") @@ -173,7 +184,9 @@ def _callback(request, **kwargs): ) # The path was registered as GET, but this is a HEAD request. - channel = make_request(self.reactor, FakeSite(res), b"HEAD", b"/_matrix/foo") + channel = make_request( + self.reactor, FakeSite(res, self.reactor), b"HEAD", b"/_matrix/foo" + ) self.assertEqual(channel.result["code"], b"200") self.assertNotIn("body", channel.result) @@ -280,7 +293,9 @@ async def callback(request): res = WrapHtmlRequestHandlerTests.TestResource() res.callback = callback - channel = make_request(self.reactor, FakeSite(res), b"GET", b"/path") + channel = make_request( + self.reactor, FakeSite(res, self.reactor), b"GET", b"/path" + ) self.assertEqual(channel.result["code"], b"200") body = channel.result["body"] @@ -298,7 +313,9 @@ async def callback(request, **kwargs): res = WrapHtmlRequestHandlerTests.TestResource() res.callback = callback - channel = make_request(self.reactor, FakeSite(res), b"GET", b"/path") + channel = make_request( + self.reactor, FakeSite(res, self.reactor), b"GET", b"/path" + ) self.assertEqual(channel.result["code"], b"301") headers = channel.result["headers"] @@ -319,7 +336,9 @@ async def callback(request, **kwargs): res = WrapHtmlRequestHandlerTests.TestResource() res.callback = callback - channel = make_request(self.reactor, FakeSite(res), b"GET", b"/path") + channel = make_request( + self.reactor, FakeSite(res, self.reactor), b"GET", b"/path" + ) self.assertEqual(channel.result["code"], b"304") headers = channel.result["headers"] @@ -338,7 +357,9 @@ async def callback(request): res = WrapHtmlRequestHandlerTests.TestResource() res.callback = callback - channel = make_request(self.reactor, FakeSite(res), b"HEAD", b"/path") + channel = make_request( + self.reactor, FakeSite(res, self.reactor), b"HEAD", b"/path" + ) self.assertEqual(channel.result["code"], b"200") self.assertNotIn("body", channel.result)