Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Fix ratelimiting for federation /send requests. #8342

Merged
merged 7 commits into from
Sep 18, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add ResponseCache to incoming transactions path
  • Loading branch information
erikjohnston committed Sep 17, 2020
commit 71a36f1ab40997d467eec6ce420be0da190b3b2c
27 changes: 23 additions & 4 deletions synapse/federation/federation_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ def __init__(self, hs):
self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")

# We cache results for transaction with the same ID
self._transaciton_resp_cache = ResponseCache(
hs, "fed_txn_handler", timeout_ms=30000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about the reasoning about the timeout here. 30 seconds seems a bit short?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We tend to default to 30s for these I think. Note that this is 30s after the response is sent, though given the response won't generally be large we could probably up it some more.

Copy link
Member

@clokep clokep Sep 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I thought it was 30s from when the request was sent. That makes more sense now!

)

self.transaction_actions = TransactionActions(self.store)

self.registry = hs.get_federation_registry()
Expand Down Expand Up @@ -135,14 +140,28 @@ async def on_incoming_transaction(
request_time = self._clock.time_msec()

transaction = Transaction(**transaction_data)
transaction_id = transaction.transaction_id # type: ignore

if not transaction.transaction_id: # type: ignore
if not transaction_id:
raise Exception("Transaction missing transaction_id")

logger.debug("[%s] Got transaction", transaction.transaction_id) # type: ignore
logger.debug("[%s] Got transaction", transaction_id)

# We wrap in a ResponseCache so that we de-duplicate retried
# transactions.
return await self._transaciton_resp_cache.wrap(
(origin, transaction_id),
self._on_incoming_transaction_inner,
origin,
transaction,
request_time,
)

# use a linearizer to ensure that we don't process the same transaction
# multiple times in parallel.
async def _on_incoming_transaction_inner(
self, origin: str, transaction: Transaction, request_time: int
) -> Tuple[int, Dict[str, Any]]:
# Use a linearizer to ensure that process transactions from a remote in
# order.
with (
await self._transaction_linearizer.queue(
(origin, transaction.transaction_id) # type: ignore
Expand Down