From 325cadc8c4a64137a93aaf8ea199cc36e6fa30a3 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 15 Aug 2022 21:46:07 -0500 Subject: [PATCH 1/7] Add metrics to track how the rate limiter is affecting requests Related to https://github.com/matrix-org/synapse/pull/13499 Mentioned in https://docs.google.com/document/d/1lvUoVfYUiy6UaHB6Rb4HicjaJAU40-APue9Q4vzuW3c/edit#bookmark=id.zjko88lr25j --- synapse/util/ratelimitutils.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 6394cc39ac02..fa6f63c77846 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -20,6 +20,8 @@ from twisted.internet import defer +from prometheus_client.core import Counter + from synapse.api.errors import LimitExceededError from synapse.config.ratelimiting import FederationRatelimitSettings from synapse.logging.context import ( @@ -35,6 +37,11 @@ logger = logging.getLogger(__name__) +rate_limit_sleep_counter = Counter("synapse_rate_limit_sleep", "", ["host"]) + +rate_limit_reject_counter = Counter("synapse_rate_limit_reject", "", ["host"]) + + class FederationRateLimiter: def __init__(self, clock: Clock, config: FederationRatelimitSettings): def new_limiter() -> "_PerHostRatelimiter": @@ -59,7 +66,7 @@ def ratelimit(self, host: str) -> "_GeneratorContextManager[defer.Deferred[None] Returns: context manager which returns a deferred. """ - return self.ratelimiters[host].ratelimit() + return self.ratelimiters[host].ratelimit(host) class _PerHostRatelimiter: @@ -94,12 +101,14 @@ def __init__(self, clock: Clock, config: FederationRatelimitSettings): self.request_times: List[int] = [] @contextlib.contextmanager - def ratelimit(self) -> "Iterator[defer.Deferred[None]]": + def ratelimit(self, host: str) -> "Iterator[defer.Deferred[None]]": # `contextlib.contextmanager` takes a generator and turns it into a # context manager. The generator should only yield once with a value # to be returned by manager. # Exceptions will be reraised at the yield. + self.host = host + request_id = object() ret = self._on_enter(request_id) try: @@ -119,6 +128,7 @@ def _on_enter(self, request_id: object) -> "defer.Deferred[None]": # sleeping or in the ready queue). queue_size = len(self.ready_request_queue) + len(self.sleeping_requests) if queue_size > self.reject_limit: + rate_limit_reject_counter.labels(self.host).inc() raise LimitExceededError( retry_after_ms=int(self.window_size / self.sleep_limit) ) @@ -146,6 +156,7 @@ def queue_request() -> "defer.Deferred[None]": if len(self.request_times) > self.sleep_limit: logger.debug("Ratelimiter: sleeping request for %f sec", self.sleep_sec) + rate_limit_sleep_counter.labels(self.host).inc() ret_defer = run_in_background(self.clock.sleep, self.sleep_sec) self.sleeping_requests.add(request_id) From 326731889fce08f75727de04ec2c2b9a5203ee06 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 15 Aug 2022 21:51:25 -0500 Subject: [PATCH 2/7] Add changelog --- changelog.d/13534.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/13534.misc diff --git a/changelog.d/13534.misc b/changelog.d/13534.misc new file mode 100644 index 000000000000..b488bf74c389 --- /dev/null +++ b/changelog.d/13534.misc @@ -0,0 +1 @@ +Add metrics to track how the rate limiter is affecting requests (sleep/reject). From 5679bb2b5ae2563ee4ba2edfaa2db1f4a421d0bd Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 15 Aug 2022 22:00:47 -0500 Subject: [PATCH 3/7] Fix lints --- synapse/util/ratelimitutils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index fa6f63c77846..8c364304b329 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -18,10 +18,10 @@ import typing from typing import Any, DefaultDict, Iterator, List, Set -from twisted.internet import defer - from prometheus_client.core import Counter +from twisted.internet import defer + from synapse.api.errors import LimitExceededError from synapse.config.ratelimiting import FederationRatelimitSettings from synapse.logging.context import ( From 149ac1db8a32f18f9c4ff0c9c7118940328ad34a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 16 Aug 2022 11:58:08 -0500 Subject: [PATCH 4/7] Remove unbounded host from labels See https://github.com/matrix-org/synapse/pull/13534#discussion_r946496912 --- synapse/util/ratelimitutils.py | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 8c364304b329..b668f9d22541 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -37,9 +37,9 @@ logger = logging.getLogger(__name__) -rate_limit_sleep_counter = Counter("synapse_rate_limit_sleep", "", ["host"]) - -rate_limit_reject_counter = Counter("synapse_rate_limit_reject", "", ["host"]) +# Track how much the ratelimiter is affecting requests +rate_limit_sleep_counter = Counter("synapse_rate_limit_sleep", "") +rate_limit_reject_counter = Counter("synapse_rate_limit_reject", "") class FederationRateLimiter: @@ -128,7 +128,8 @@ def _on_enter(self, request_id: object) -> "defer.Deferred[None]": # sleeping or in the ready queue). queue_size = len(self.ready_request_queue) + len(self.sleeping_requests) if queue_size > self.reject_limit: - rate_limit_reject_counter.labels(self.host).inc() + logger.debug("Ratelimiter(%s): rejecting request", self.host) + rate_limit_reject_counter.inc() raise LimitExceededError( retry_after_ms=int(self.window_size / self.sleep_limit) ) @@ -140,7 +141,8 @@ def queue_request() -> "defer.Deferred[None]": queue_defer: defer.Deferred[None] = defer.Deferred() self.ready_request_queue[request_id] = queue_defer logger.info( - "Ratelimiter: queueing request (queue now %i items)", + "Ratelimiter(%s): queueing request (queue now %i items)", + self.host, len(self.ready_request_queue), ) @@ -149,20 +151,28 @@ def queue_request() -> "defer.Deferred[None]": return defer.succeed(None) logger.debug( - "Ratelimit [%s]: len(self.request_times)=%d", + "Ratelimit(%s) [%s]: len(self.request_times)=%d", + self.host, id(request_id), len(self.request_times), ) if len(self.request_times) > self.sleep_limit: - logger.debug("Ratelimiter: sleeping request for %f sec", self.sleep_sec) - rate_limit_sleep_counter.labels(self.host).inc() + logger.debug( + "Ratelimiter(%s) [%s]: sleeping request for %f sec", + self.host, + id(request_id), + self.sleep_sec, + ) + rate_limit_sleep_counter.inc() ret_defer = run_in_background(self.clock.sleep, self.sleep_sec) self.sleeping_requests.add(request_id) def on_wait_finished(_: Any) -> "defer.Deferred[None]": - logger.debug("Ratelimit [%s]: Finished sleeping", id(request_id)) + logger.debug( + "Ratelimit(%s) [%s]: Finished sleeping", self.host, id(request_id) + ) self.sleeping_requests.discard(request_id) queue_defer = queue_request() return queue_defer @@ -172,7 +182,9 @@ def on_wait_finished(_: Any) -> "defer.Deferred[None]": ret_defer = queue_request() def on_start(r: object) -> object: - logger.debug("Ratelimit [%s]: Processing req", id(request_id)) + logger.debug( + "Ratelimit(%s) [%s]: Processing req", self.host, id(request_id) + ) self.current_processing.add(request_id) return r @@ -194,7 +206,7 @@ def on_both(r: object) -> object: return make_deferred_yieldable(ret_defer) def _on_exit(self, request_id: object) -> None: - logger.debug("Ratelimit [%s]: Processed req", id(request_id)) + logger.debug("Ratelimit(%s) [%s]: Processed req", self.host, id(request_id)) self.current_processing.discard(request_id) try: # start processing the next item on the queue. From 8be321f2d54679dab5bd28a3ce810b1bc5483bc4 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 16 Aug 2022 13:32:42 -0500 Subject: [PATCH 5/7] Track number of hosts affected by the rate limiter Follow-up to https://github.com/matrix-org/synapse/pull/13534 Part of https://github.com/matrix-org/synapse/issues/13356 --- synapse/metrics/__init__.py | 12 +++++++++ synapse/notifier.py | 12 +-------- synapse/util/ratelimitutils.py | 47 +++++++++++++++++++++++++++++++--- 3 files changed, 56 insertions(+), 15 deletions(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 496fce2ecc30..fca6960cb40c 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -78,6 +78,17 @@ def collect() -> Iterable[Metric]: # TODO Do something nicer about this. RegistryProxy = cast(CollectorRegistry, _RegistryProxy) +T = TypeVar("T") + + +def count(func: Callable[[T], bool], it: Iterable[T]) -> int: + """Return the number of items in it for which func returns true.""" + n = 0 + for x in it: + if func(x): + n += 1 + return n + @attr.s(slots=True, hash=True, auto_attribs=True) class LaterGauge(Collector): @@ -475,6 +486,7 @@ def register_threadpool(name: str, threadpool: ThreadPool) -> None: "MetricsResource", "generate_latest", "start_http_server", + count, "LaterGauge", "InFlightGauge", "GaugeBucketCollector", diff --git a/synapse/notifier.py b/synapse/notifier.py index c42bb8266add..768d196372fc 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -40,7 +40,7 @@ from synapse.logging import issue9533_logger from synapse.logging.context import PreserveLoggingContext from synapse.logging.opentracing import log_kv, start_active_span -from synapse.metrics import LaterGauge +from synapse.metrics import count, LaterGauge from synapse.streams.config import PaginationConfig from synapse.types import ( JsonDict, @@ -68,16 +68,6 @@ T = TypeVar("T") -# TODO(paul): Should be shared somewhere -def count(func: Callable[[T], bool], it: Iterable[T]) -> int: - """Return the number of items in it for which func returns true.""" - n = 0 - for x in it: - if func(x): - n += 1 - return n - - class _NotificationListener: """This represents a single client connection to the events stream. The events stream handler will have yielded to the deferred, so to diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index b668f9d22541..8d9917852ef2 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -18,7 +18,7 @@ import typing from typing import Any, DefaultDict, Iterator, List, Set -from prometheus_client.core import Counter +from prometheus_client.core import Counter, Gauge from twisted.internet import defer @@ -29,6 +29,7 @@ make_deferred_yieldable, run_in_background, ) +from synapse.metrics import count, LaterGauge from synapse.util import Clock if typing.TYPE_CHECKING: @@ -51,6 +52,34 @@ def new_limiter() -> "_PerHostRatelimiter": str, "_PerHostRatelimiter" ] = collections.defaultdict(new_limiter) + # We track the number of affected hosts per time-period so we can + # differentiate one really noisy homeserver from a general + # ratelimit tuning problem across the federation. + LaterGauge( + "synapse_rate_limit_sleep_affected_hosts", + "Number of hosts that had requests put to sleep", + [], + lambda: count( + bool, + [ + ratelimiter.should_sleep() + for ratelimiter in self.ratelimiters.values() + ], + ), + ) + LaterGauge( + "synapse_rate_limit_reject_affected_hosts", + "Number of hosts that had requests rejected", + [], + lambda: count( + bool, + [ + ratelimiter.should_reject() + for ratelimiter in self.ratelimiters.values() + ], + ), + ) + def ratelimit(self, host: str) -> "_GeneratorContextManager[defer.Deferred[None]]": """Used to ratelimit an incoming request from a given host @@ -116,6 +145,17 @@ def ratelimit(self, host: str) -> "Iterator[defer.Deferred[None]]": finally: self._on_exit(request_id) + def should_reject(self): + """ + Reject the request if we already have too many queued up (either + sleeping or in the ready queue). + """ + queue_size = len(self.ready_request_queue) + len(self.sleeping_requests) + return queue_size > self.reject_limit + + def should_sleep(self): + return len(self.request_times) > self.sleep_limit + def _on_enter(self, request_id: object) -> "defer.Deferred[None]": time_now = self.clock.time_msec() @@ -126,8 +166,7 @@ def _on_enter(self, request_id: object) -> "defer.Deferred[None]": # reject the request if we already have too many queued up (either # sleeping or in the ready queue). - queue_size = len(self.ready_request_queue) + len(self.sleeping_requests) - if queue_size > self.reject_limit: + if self.should_reject(): logger.debug("Ratelimiter(%s): rejecting request", self.host) rate_limit_reject_counter.inc() raise LimitExceededError( @@ -157,7 +196,7 @@ def queue_request() -> "defer.Deferred[None]": len(self.request_times), ) - if len(self.request_times) > self.sleep_limit: + if self.should_sleep(): logger.debug( "Ratelimiter(%s) [%s]: sleeping request for %f sec", self.host, From de1bbb805132dc4faddddf85d2b7405330c9af53 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 16 Aug 2022 13:46:57 -0500 Subject: [PATCH 6/7] Add changelog and fix lints --- changelog.d/13541.misc | 1 + synapse/metrics/__init__.py | 2 +- synapse/notifier.py | 3 +-- synapse/util/ratelimitutils.py | 16 ++++++++++------ 4 files changed, 13 insertions(+), 9 deletions(-) create mode 100644 changelog.d/13541.misc diff --git a/changelog.d/13541.misc b/changelog.d/13541.misc new file mode 100644 index 000000000000..b488bf74c389 --- /dev/null +++ b/changelog.d/13541.misc @@ -0,0 +1 @@ +Add metrics to track how the rate limiter is affecting requests (sleep/reject). diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index fca6960cb40c..c3453b03865d 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -486,7 +486,7 @@ def register_threadpool(name: str, threadpool: ThreadPool) -> None: "MetricsResource", "generate_latest", "start_http_server", - count, + "count", "LaterGauge", "InFlightGauge", "GaugeBucketCollector", diff --git a/synapse/notifier.py b/synapse/notifier.py index 768d196372fc..8e04b5ddf088 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -19,7 +19,6 @@ Callable, Collection, Dict, - Iterable, List, Optional, Set, @@ -40,7 +39,7 @@ from synapse.logging import issue9533_logger from synapse.logging.context import PreserveLoggingContext from synapse.logging.opentracing import log_kv, start_active_span -from synapse.metrics import count, LaterGauge +from synapse.metrics import LaterGauge, count from synapse.streams.config import PaginationConfig from synapse.types import ( JsonDict, diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 8d9917852ef2..42265d03d306 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -18,7 +18,7 @@ import typing from typing import Any, DefaultDict, Iterator, List, Set -from prometheus_client.core import Counter, Gauge +from prometheus_client.core import Counter from twisted.internet import defer @@ -29,7 +29,7 @@ make_deferred_yieldable, run_in_background, ) -from synapse.metrics import count, LaterGauge +from synapse.metrics import LaterGauge, count from synapse.util import Clock if typing.TYPE_CHECKING: @@ -145,15 +145,19 @@ def ratelimit(self, host: str) -> "Iterator[defer.Deferred[None]]": finally: self._on_exit(request_id) - def should_reject(self): + def should_reject(self) -> bool: """ - Reject the request if we already have too many queued up (either - sleeping or in the ready queue). + Whether to reject the request if we already have too many queued up + (either sleeping or in the ready queue). """ queue_size = len(self.ready_request_queue) + len(self.sleeping_requests) return queue_size > self.reject_limit - def should_sleep(self): + def should_sleep(self) -> bool: + """ + Whether to sleep the request if we already have too many requests coming + through within the window. + """ return len(self.request_times) > self.sleep_limit def _on_enter(self, request_id: object) -> "defer.Deferred[None]": From 7dc68690f3af84aba256acf11bef4725978e7f91 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 17 Aug 2022 14:57:58 -0500 Subject: [PATCH 7/7] Use simple sum solution See: - https://github.com/matrix-org/synapse/pull/13541#discussion_r947684275 - https://github.com/matrix-org/synapse/pull/13541#discussion_r947676503 --- synapse/metrics/__init__.py | 12 ------------ synapse/notifier.py | 13 ++++++++++++- synapse/util/ratelimitutils.py | 19 ++++++------------- 3 files changed, 18 insertions(+), 26 deletions(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index c3453b03865d..496fce2ecc30 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -78,17 +78,6 @@ def collect() -> Iterable[Metric]: # TODO Do something nicer about this. RegistryProxy = cast(CollectorRegistry, _RegistryProxy) -T = TypeVar("T") - - -def count(func: Callable[[T], bool], it: Iterable[T]) -> int: - """Return the number of items in it for which func returns true.""" - n = 0 - for x in it: - if func(x): - n += 1 - return n - @attr.s(slots=True, hash=True, auto_attribs=True) class LaterGauge(Collector): @@ -486,7 +475,6 @@ def register_threadpool(name: str, threadpool: ThreadPool) -> None: "MetricsResource", "generate_latest", "start_http_server", - "count", "LaterGauge", "InFlightGauge", "GaugeBucketCollector", diff --git a/synapse/notifier.py b/synapse/notifier.py index 8e04b5ddf088..c42bb8266add 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -19,6 +19,7 @@ Callable, Collection, Dict, + Iterable, List, Optional, Set, @@ -39,7 +40,7 @@ from synapse.logging import issue9533_logger from synapse.logging.context import PreserveLoggingContext from synapse.logging.opentracing import log_kv, start_active_span -from synapse.metrics import LaterGauge, count +from synapse.metrics import LaterGauge from synapse.streams.config import PaginationConfig from synapse.types import ( JsonDict, @@ -67,6 +68,16 @@ T = TypeVar("T") +# TODO(paul): Should be shared somewhere +def count(func: Callable[[T], bool], it: Iterable[T]) -> int: + """Return the number of items in it for which func returns true.""" + n = 0 + for x in it: + if func(x): + n += 1 + return n + + class _NotificationListener: """This represents a single client connection to the events stream. The events stream handler will have yielded to the deferred, so to diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 12af4897ef91..8bb90b0ece7c 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -30,7 +30,7 @@ run_in_background, ) from synapse.logging.opentracing import start_active_span -from synapse.metrics import LaterGauge, count +from synapse.metrics import LaterGauge from synapse.util import Clock if typing.TYPE_CHECKING: @@ -60,24 +60,17 @@ def new_limiter() -> "_PerHostRatelimiter": "synapse_rate_limit_sleep_affected_hosts", "Number of hosts that had requests put to sleep", [], - lambda: count( - bool, - [ - ratelimiter.should_sleep() - for ratelimiter in self.ratelimiters.values() - ], + lambda: sum( + ratelimiter.should_sleep() for ratelimiter in self.ratelimiters.values() ), ) LaterGauge( "synapse_rate_limit_reject_affected_hosts", "Number of hosts that had requests rejected", [], - lambda: count( - bool, - [ - ratelimiter.should_reject() - for ratelimiter in self.ratelimiters.values() - ], + lambda: sum( + ratelimiter.should_reject() + for ratelimiter in self.ratelimiters.values() ), )