Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add ResponseCache tests #9458

Merged
merged 8 commits into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/9458.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add tests to ResponseCache.
2 changes: 1 addition & 1 deletion synapse/appservice/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def __init__(self, hs):
self.clock = hs.get_clock()

self.protocol_meta_cache = ResponseCache(
hs, "as_protocol_meta", timeout_ms=HOUR_IN_MS
hs.get_clock(), "as_protocol_meta", timeout_ms=HOUR_IN_MS
) # type: ResponseCache[Tuple[str, str]]

async def query_user(self, service, user_id):
Expand Down
13 changes: 8 additions & 5 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
Awaitable,
Callable,
Dict,
Iterable,
List,
Optional,
Tuple,
Expand Down Expand Up @@ -99,7 +100,7 @@


class FederationServer(FederationBase):
def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)

self.auth = hs.get_auth()
Expand All @@ -119,7 +120,7 @@ def __init__(self, hs):

# We cache results for transaction with the same ID
self._transaction_resp_cache = ResponseCache(
hs, "fed_txn_handler", timeout_ms=30000
hs.get_clock(), "fed_txn_handler", timeout_ms=30000
) # type: ResponseCache[Tuple[str, str]]

self.transaction_actions = TransactionActions(self.store)
Expand All @@ -129,10 +130,10 @@ def __init__(self, hs):
# We cache responses to state queries, as they take a while and often
# come in waves.
self._state_resp_cache = ResponseCache(
hs, "state_resp", timeout_ms=30000
hs.get_clock(), "state_resp", timeout_ms=30000
) # type: ResponseCache[Tuple[str, str]]
self._state_ids_resp_cache = ResponseCache(
hs, "state_ids_resp", timeout_ms=30000
hs.get_clock(), "state_ids_resp", timeout_ms=30000
) # type: ResponseCache[Tuple[str, str]]

self._federation_metrics_domains = (
Expand Down Expand Up @@ -455,7 +456,9 @@ async def _on_context_state_request_compute(
self, room_id: str, event_id: str
) -> Dict[str, list]:
if event_id:
pdus = await self.handler.get_state_for_pdu(room_id, event_id)
pdus = await self.handler.get_state_for_pdu(
room_id, event_id
) # type: Iterable[EventBase]
else:
pdus = (await self.state.get_current_state(room_id)).values()

Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/initial_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.validator = EventValidator()
self.snapshot_cache = ResponseCache(
hs, "initial_sync_cache"
hs.get_clock(), "initial_sync_cache"
) # type: ResponseCache[Tuple[str, Optional[StreamToken], Optional[StreamToken], str, Optional[int], bool, bool]]
self._event_serializer = hs.get_event_client_serializer()
self.storage = hs.get_storage()
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def __init__(self, hs: "HomeServer"):
# succession, only process the first attempt and return its result to
# subsequent requests
self._upgrade_response_cache = ResponseCache(
hs, "room_upgrade", timeout_ms=FIVE_MINUTES_IN_MS
hs.get_clock(), "room_upgrade", timeout_ms=FIVE_MINUTES_IN_MS
) # type: ResponseCache[Tuple[str, str]]
self._server_notices_mxid = hs.config.server_notices_mxid

Expand Down
4 changes: 2 additions & 2 deletions synapse/handlers/room_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.enable_room_list_search = hs.config.enable_room_list_search
self.response_cache = ResponseCache(
hs, "room_list"
hs.get_clock(), "room_list"
) # type: ResponseCache[Tuple[Optional[int], Optional[str], ThirdPartyInstanceID]]
self.remote_response_cache = ResponseCache(
hs, "remote_room_list", timeout_ms=30 * 1000
hs.get_clock(), "remote_room_list", timeout_ms=30 * 1000
) # type: ResponseCache[Tuple[str, Optional[int], Optional[str], bool, Optional[str]]]

async def get_local_public_room_list(
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ def __init__(self, hs: "HomeServer"):
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
self.response_cache = ResponseCache(
hs, "sync"
hs.get_clock(), "sync"
) # type: ResponseCache[Tuple[Any, ...]]
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
Expand Down
9 changes: 6 additions & 3 deletions synapse/replication/http/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import re
import urllib
from inspect import signature
from typing import Dict, List, Tuple
from typing import TYPE_CHECKING, Dict, List, Tuple

from prometheus_client import Counter, Gauge

Expand All @@ -28,6 +28,9 @@
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.stringutils import random_string

if TYPE_CHECKING:
from synapse.server import HomeServer

logger = logging.getLogger(__name__)

_pending_outgoing_requests = Gauge(
Expand Down Expand Up @@ -88,10 +91,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
CACHE = True
RETRY_ON_TIMEOUT = True

def __init__(self, hs):
def __init__(self, hs: "HomeServer"):
if self.CACHE:
self.response_cache = ResponseCache(
hs, "repl." + self.NAME, timeout_ms=30 * 60 * 1000
hs.get_clock(), "repl." + self.NAME, timeout_ms=30 * 60 * 1000
) # type: ResponseCache[str]

# We reserve `instance_name` as a parameter to sending requests, so we
Expand Down
10 changes: 4 additions & 6 deletions synapse/util/caches/response_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from typing import TYPE_CHECKING, Any, Callable, Dict, Generic, Optional, TypeVar
from typing import Any, Callable, Dict, Generic, Optional, TypeVar

from twisted.internet import defer

from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.util import Clock
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches import register_cache

if TYPE_CHECKING:
from synapse.app.homeserver import HomeServer

logger = logging.getLogger(__name__)

T = TypeVar("T")
Expand All @@ -37,11 +35,11 @@ class ResponseCache(Generic[T]):
used rather than trying to compute a new response.
"""

def __init__(self, hs: "HomeServer", name: str, timeout_ms: float = 0):
def __init__(self, clock: Clock, name: str, timeout_ms: float = 0):
# Requests that haven't finished yet.
self.pending_result_cache = {} # type: Dict[T, ObservableDeferred]

self.clock = hs.get_clock()
self.clock = clock
self.timeout_sec = timeout_ms / 1000.0

self._name = name
Expand Down
125 changes: 125 additions & 0 deletions tests/util/caches/test_responsecache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Copyright 2021 Vector Creations Ltd
ShadowJonathan marked this conversation as resolved.
Show resolved Hide resolved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from twisted.internet.task import Clock

from synapse.util import Clock as SynapseClock
from synapse.util.caches.response_cache import ResponseCache

from tests.unittest import TestCase

# few notes about test naming here:
# 'wait': denotes tests that have an element of "waiting" before its wrapped result becomes available
# 'expire': denotes tests that test expiry after assured existence
ShadowJonathan marked this conversation as resolved.
Show resolved Hide resolved


class DeferredCacheTestCase(TestCase):
def setUp(self):
self.reactor = Clock()
self.synapse_clock = SynapseClock(self.reactor)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want to use tests.server.get_clock here (and it would be nice to name them the same as reactor and clock (to match HomeserverTestCase).

It might even make sense to create the cache object here and then use it for each test (this simplifies each individual test-case, which makes it easier to see what each test-case is testing). I know the timeouts are currently different on them, but I think the tests could be slightly modified to take that into account.

Copy link
Contributor Author

@ShadowJonathan ShadowJonathan Mar 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll use get_clock and rename to clock 👍, but i wont move the cache here, as they need to be variable for tests (see the naming and timeout_ms on each.

One thing I will do is make it a separate function (with accepts a name and optionally ms=), so that future tests can just use that instead of instantiating it directly every time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the instantiation to with_cache to make it easier on the eyes, please notify if this is wrong.


@staticmethod
async def instant_return(o: str) -> str:
return o

async def delayed_return(self, o: str) -> str:
await self.synapse_clock.sleep(1)
return o

def test_cache_hit(self):
cache = ResponseCache(self.synapse_clock, "keeping_cache", timeout_ms=9001)

expected_result = "howdy"

wrap_d = cache.wrap(0, self.instant_return, expected_result)

self.assertEqual(
expected_result,
self.successResultOf(wrap_d),
"initial wrap result should be the same",
)
self.assertEqual(
expected_result,
self.successResultOf(cache.get(0)),
"cache should have the result",
)

def test_cache_miss(self):
cache = ResponseCache(self.synapse_clock, "trashing_cache", timeout_ms=0)

expected_result = "howdy"

wrap_d = cache.wrap(0, self.instant_return, expected_result)

self.assertEqual(
expected_result,
self.successResultOf(wrap_d),
"initial wrap result should be the same",
)
self.assertIsNone(cache.get(0), "cache should not have the result now")

def test_cache_expire(self):
cache = ResponseCache(self.synapse_clock, "short_cache", timeout_ms=1000)

expected_result = "howdy"

wrap_d = cache.wrap(0, self.instant_return, expected_result)

self.assertEqual(expected_result, self.successResultOf(wrap_d))
self.assertEqual(
expected_result,
self.successResultOf(cache.get(0)),
"cache should still have the result",
)

# cache eviction timer is handled
self.reactor.pump((2,))

self.assertIsNone(cache.get(0), "cache should not have the result now")

def test_cache_wait_hit(self):
cache = ResponseCache(self.synapse_clock, "neutral_cache")

expected_result = "howdy"

wrap_d = cache.wrap(0, self.delayed_return, expected_result)
self.assertNoResult(wrap_d)

# function wakes up, returns result
self.reactor.pump((2,))

self.assertEqual(expected_result, self.successResultOf(wrap_d))

def test_cache_wait_expire(self):
cache = ResponseCache(self.synapse_clock, "short_cache", timeout_ms=3000)

expected_result = "howdy"

wrap_d = cache.wrap(0, self.delayed_return, expected_result)
self.assertNoResult(wrap_d)

# stop at 1 second to callback cache eviction callLater at that time, then another to set time at 2
self.reactor.pump((1, 1))

self.assertEqual(expected_result, self.successResultOf(wrap_d))
self.assertEqual(
expected_result,
self.successResultOf(cache.get(0)),
"cache should still have the result",
)

# (1 + 1 + 2) < 3.0, cache eviction timer is handled
self.reactor.pump((2,))

self.assertIsNone(cache.get(0), "cache should not have the result now")