From 924ae2b0d4a28b568d2fd40e482d860605074650 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 14 Sep 2022 17:15:03 -0500 Subject: [PATCH 01/32] Track when the pulled event signature fails Part of https://github.com/matrix-org/synapse/issues/13700 --- synapse/federation/federation_base.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index abe2c1971a19..23175847e19d 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -80,7 +80,13 @@ async def _check_sigs_and_hash( InvalidEventSignatureError if the signature check failed. Nothing will be logged in this case. """ - await _check_sigs_on_pdu(self.keyring, room_version, pdu) + try: + await _check_sigs_on_pdu(self.keyring, room_version, pdu) + except Exception as exc: + await self._store.record_event_failed_pull_attempt( + pdu.room_id, pdu.event_id, str(exc) + ) + raise exc if not check_event_content_hash(pdu): # let's try to distinguish between failures because the event was @@ -116,6 +122,9 @@ async def _check_sigs_and_hash( "event_id": pdu.event_id, } ) + await self._store.record_event_failed_pull_attempt( + pdu.room_id, pdu.event_id, "Event content has been tampered with" + ) return redacted_event spam_check = await self.spam_checker.check_event_for_spam(pdu) From d240aeb48e72226f3e7b575d5abe774eebebca91 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 14 Sep 2022 18:02:08 -0500 Subject: [PATCH 02/32] Add changelog --- changelog.d/13815.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/13815.feature diff --git a/changelog.d/13815.feature b/changelog.d/13815.feature new file mode 100644 index 000000000000..ba411f5067b6 --- /dev/null +++ b/changelog.d/13815.feature @@ -0,0 +1 @@ +Keep track when an event pulled over federation fails its signature check so we can intelligently back-off in the future. From cfb4e88d0e66f72ed77d2528fb56ccdd7af925bc Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 14 Sep 2022 18:24:54 -0500 Subject: [PATCH 03/32] Fix reference --- synapse/federation/federation_base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 23175847e19d..8d25e92a67fb 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -83,7 +83,7 @@ async def _check_sigs_and_hash( try: await _check_sigs_on_pdu(self.keyring, room_version, pdu) except Exception as exc: - await self._store.record_event_failed_pull_attempt( + await self.store.record_event_failed_pull_attempt( pdu.room_id, pdu.event_id, str(exc) ) raise exc @@ -122,7 +122,7 @@ async def _check_sigs_and_hash( "event_id": pdu.event_id, } ) - await self._store.record_event_failed_pull_attempt( + await self.store.record_event_failed_pull_attempt( pdu.room_id, pdu.event_id, "Event content has been tampered with" ) return redacted_event From 9b390a35546420521ba661f7a006a9f99d3f554b Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 14 Sep 2022 19:06:57 -0500 Subject: [PATCH 04/32] Stop getting missing prev_events after we already know their signature is invalid MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Related to - https://github.com/matrix-org/synapse/issues/13622 - https://github.com/matrix-org/synapse/pull/13635 - https://github.com/matrix-org/synapse/issues/13676 Follow-up to https://github.com/matrix-org/synapse/pull/13815 which tracks event signature failures. This PR aims to stop us from trying to `_get_state_ids_after_missing_prev_event` after we already know that the prev_event will fail from a previous attempt To explain an exact scenario around `/messages` -> backfill, we call `/backfill` and first check the signatures of the 100 events. We see bad signature for `$luA4l7QHhf_jadH3mI-AyFqho0U2Q-IXXUbGSMq6h6M` and `$zuOn2Rd2vsC7SUia3Hp3r6JSkSFKcc5j3QTTqW_0jDw` (both member events). Then we process the 98 events remaining that have valid signatures but one of the events references `$luA4l7QHhf_jadH3mI-AyFqho0U2Q-IXXUbGSMq6h6M` as a `prev_event`. So we have to do the whole `_get_state_ids_after_missing_prev_event` rigmarole which pulls in those same events which fail again because the signatures are still invalid. - `backfill` - `outgoing-federation-request` `/backfill` - `_check_sigs_and_hash_and_fetch` - `_check_sigs_and_hash_and_fetch_one` for each event received over backfill - ❗ `$luA4l7QHhf_jadH3mI-AyFqho0U2Q-IXXUbGSMq6h6M` fails with `Signature on retrieved event was invalid.`: `unable to verify signature for sender domain xxx: 401: Failed to find any key to satisfy: _FetchKeyRequest(...)` - ❗ `$zuOn2Rd2vsC7SUia3Hp3r6JSkSFKcc5j3QTTqW_0jDw` fails with `Signature on retrieved event was invalid.`: `unable to verify signature for sender domain xxx: 401: Failed to find any key to satisfy: _FetchKeyRequest(...)` - `_process_pulled_events` - `_process_pulled_event` for each validated event - ❗ Event `$Q0iMdqtz3IJYfZQU2Xk2WjB5NDF8Gg8cFSYYyKQgKJ0` references `$luA4l7QHhf_jadH3mI-AyFqho0U2Q-IXXUbGSMq6h6M` as a `prev_event` which is missing so we try to get it - `_get_state_ids_after_missing_prev_event` - `outgoing-federation-request` `/state_ids` - ❗ `get_pdu` for `$luA4l7QHhf_jadH3mI-AyFqho0U2Q-IXXUbGSMq6h6M` which fails the signature check again - ❗ `get_pdu` for `$zuOn2Rd2vsC7SUia3Hp3r6JSkSFKcc5j3QTTqW_0jDw` which fails the signature check With this PR, we no longer call the burdensome `_get_state_ids_after_missing_prev_event` because the signature failure will count as an attempt before we try to run this. --- synapse/handlers/federation_event.py | 6 ++ .../databases/main/event_federation.py | 82 ++++++++++++++++++- 2 files changed, 87 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 9e065e1116b5..50a24e88183f 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -955,6 +955,12 @@ async def _compute_event_context_with_maybe_missing_prevs( seen = await self._store.have_events_in_timeline(prevs) missing_prevs = prevs - seen + # Filter out events we've tried to pull recently + prevs_to_ignore = await self.store.filter_events_with_pull_attempt_backoff( + room_id, missing_prevs + ) + missing_prevs = missing_prevs - prevs_to_ignore + if not missing_prevs: return await self._state_handler.compute_event_context(event) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index ef477978ed63..8b9de2ae4f09 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import datetime import itertools import logging from queue import Empty, PriorityQueue @@ -21,6 +22,7 @@ Iterable, List, Optional, + Sequence, Set, Tuple, cast, @@ -43,7 +45,7 @@ ) from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.signatures import SignatureWorkerStore -from synapse.storage.engines import PostgresEngine +from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -72,6 +74,12 @@ logger = logging.getLogger(__name__) +PULLED_EVENT_BACKOFF_UPPER_BOUND_SECONDS: int = int( + datetime.timedelta(days=7).total_seconds() +) +PULLED_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS: int = int( + datetime.timedelta(hours=1).total_seconds() +) # All the info we need while iterating the DAG while backfilling @attr.s(frozen=True, slots=True, auto_attribs=True) @@ -1339,6 +1347,78 @@ def _record_event_failed_pull_attempt_upsert_txn( txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause)) + @trace + async def filter_events_with_pull_attempt_backoff( + self, + room_id: str, + event_ids: Sequence[str], + ) -> List[str]: + """ + Filter out events that we've failed to pull before + recently. Uses exponential backoff. + + Args: + room_id: The room that the events belong to + event_ids: A list of events to filter down + + Returns: + List of event_ids that can be attempted to be pulled + """ + return await self.db_pool.runInteraction( + "filter_events_with_pull_attempt_backoff", + self._filter_events_with_pull_attempt_backoff_txn, + room_id, + event_ids, + ) + + def _filter_events_with_pull_attempt_backoff_txn( + self, + txn: LoggingTransaction, + room_id: str, + event_ids: Sequence[str], + ) -> None: + where_event_ids_match_clause, values = make_in_list_sql_clause( + txn.database_engine, "event_id", event_ids + ) + + sql = """ + SELECT event_id FROM event_failed_pull_attempts + WHERE + room_id = ? + %s /* where_event_ids_match_clause */ + /** + * Exponential back-off (up to the upper bound) so we don't try to + * pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc. + * + * We use `1 << n` as a power of 2 equivalent for compatibility + * with older SQLites. The left shift equivalent only works with + * powers of 2 because left shift is a binary operation (base-2). + * Otherwise, we would use `power(2, n)` or the power operator, `2^n`. + */ + AND ( + event_id IS NULL + OR ? /* current_time */ >= last_attempt_ts + /*least*/%s((1 << num_attempts) * ? /* step */, ? /* upper bound */) + ) + """ + + if isinstance(self.database_engine, PostgresEngine): + least_function = "least" + elif isinstance(self.database_engine, Sqlite3Engine): + least_function = "min" + else: + raise RuntimeError("Unknown database engine") + + txn.execute( + sql % (where_event_ids_match_clause, least_function), + ( + room_id, + *values, + self._clock.time_msec(), + 1000 * PULLED_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS, + 1000 * PULLED_EVENT_BACKOFF_UPPER_BOUND_SECONDS, + ), + ) + async def get_missing_events( self, room_id: str, From 88a75cf53bdde623fd2d32524c78f8273cb6eb6b Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 23 Sep 2022 18:04:30 -0500 Subject: [PATCH 05/32] Use callback pattern to record signature failures See https://github.com/matrix-org/synapse/pull/13815#discussion_r971432765 --- synapse/federation/federation_base.py | 21 +++++++++++------- synapse/federation/federation_client.py | 29 ++++++++++++++++++------- tests/test_federation.py | 4 ++-- 3 files changed, 36 insertions(+), 18 deletions(-) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 8d25e92a67fb..1388b9123788 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Awaitable, Callable, Optional from synapse.api.constants import MAX_DEPTH, EventContentFields, EventTypes, Membership from synapse.api.errors import Codes, SynapseError @@ -58,7 +58,12 @@ def __init__(self, hs: "HomeServer"): @trace async def _check_sigs_and_hash( - self, room_version: RoomVersion, pdu: EventBase + self, + room_version: RoomVersion, + pdu: EventBase, + record_failure_callback: Optional[ + Callable[[EventBase, str], Awaitable[None]] + ] = None, ) -> EventBase: """Checks that event is correctly signed by the sending server. @@ -83,9 +88,8 @@ async def _check_sigs_and_hash( try: await _check_sigs_on_pdu(self.keyring, room_version, pdu) except Exception as exc: - await self.store.record_event_failed_pull_attempt( - pdu.room_id, pdu.event_id, str(exc) - ) + if record_failure_callback: + await record_failure_callback(pdu, str(exc)) raise exc if not check_event_content_hash(pdu): @@ -122,9 +126,10 @@ async def _check_sigs_and_hash( "event_id": pdu.event_id, } ) - await self.store.record_event_failed_pull_attempt( - pdu.room_id, pdu.event_id, "Event content has been tampered with" - ) + if record_failure_callback: + await record_failure_callback( + pdu, "Event content has been tampered with" + ) return redacted_event spam_check = await self.spam_checker.check_event_for_spam(pdu) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 464672a3da81..7bdbf7e27780 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -278,7 +278,7 @@ async def backfill( pdus = [event_from_pdu_json(p, room_version) for p in transaction_data_pdus] # Check signatures and hash of pdus, removing any from the list that fail checks - pdus[:] = await self._check_sigs_and_hash_and_fetch( + pdus[:] = await self._check_sigs_and_hash_for_pulled_events_and_fetch( dest, pdus, room_version=room_version ) @@ -547,24 +547,28 @@ async def get_room_state( len(auth_event_map), ) - valid_auth_events = await self._check_sigs_and_hash_and_fetch( + valid_auth_events = await self._check_sigs_and_hash_for_pulled_events_and_fetch( destination, auth_event_map.values(), room_version ) - valid_state_events = await self._check_sigs_and_hash_and_fetch( - destination, state_event_map.values(), room_version + valid_state_events = ( + await self._check_sigs_and_hash_for_pulled_events_and_fetch( + destination, state_event_map.values(), room_version + ) ) return valid_state_events, valid_auth_events @trace - async def _check_sigs_and_hash_and_fetch( + async def _check_sigs_and_hash_for_pulled_events_and_fetch( self, origin: str, pdus: Collection[EventBase], room_version: RoomVersion, ) -> List[EventBase]: - """Checks the signatures and hashes of a list of events. + """ + Checks the signatures and hashes of a list of pulled events we got from + federation and records any signature failures as failed pull attempts. If a PDU fails its signature check then we check if we have it in the database, and if not then request it from the sender's server (if that @@ -597,11 +601,17 @@ async def _check_sigs_and_hash_and_fetch( valid_pdus: List[EventBase] = [] + async def _record_failure_callback(event: EventBase, cause: str) -> None: + await self.store.record_event_failed_pull_attempt( + event.room_id, event.event_id, cause + ) + async def _execute(pdu: EventBase) -> None: valid_pdu = await self._check_sigs_and_hash_and_fetch_one( pdu=pdu, origin=origin, room_version=room_version, + record_failure_callback=_record_failure_callback, ) if valid_pdu: @@ -618,6 +628,9 @@ async def _check_sigs_and_hash_and_fetch_one( pdu: EventBase, origin: str, room_version: RoomVersion, + record_failure_callback: Optional[ + Callable[[EventBase, str], Awaitable[None]] + ] = None, ) -> Optional[EventBase]: """Takes a PDU and checks its signatures and hashes. @@ -694,7 +707,7 @@ async def get_event_auth( auth_chain = [event_from_pdu_json(p, room_version) for p in res["auth_chain"]] - signed_auth = await self._check_sigs_and_hash_and_fetch( + signed_auth = await self._check_sigs_and_hash_for_pulled_events_and_fetch( destination, auth_chain, room_version=room_version ) @@ -1401,7 +1414,7 @@ async def get_missing_events( event_from_pdu_json(e, room_version) for e in content.get("events", []) ] - signed_events = await self._check_sigs_and_hash_and_fetch( + signed_events = await self._check_sigs_and_hash_for_pulled_events_and_fetch( destination, events, room_version=room_version ) except HttpResponseException as e: diff --git a/tests/test_federation.py b/tests/test_federation.py index 779fad1f6398..80e5c590d836 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -86,8 +86,8 @@ async def _check_event_auth(origin, event, context): federation_event_handler._check_event_auth = _check_event_auth self.client = self.homeserver.get_federation_client() - self.client._check_sigs_and_hash_and_fetch = lambda dest, pdus, **k: succeed( - pdus + self.client._check_sigs_and_hash_for_pulled_events_and_fetch = ( + lambda dest, pdus, **k: succeed(pdus) ) # Send the join, it should return None (which is not an error) From d29ac0bd5fdec5bc232fb9e33756cf210d252495 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 23 Sep 2022 19:06:38 -0500 Subject: [PATCH 06/32] Add docstring --- synapse/federation/federation_base.py | 5 +++++ synapse/federation/federation_client.py | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 1388b9123788..c5505f19643c 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -75,6 +75,11 @@ async def _check_sigs_and_hash( Args: room_version: The room version of the PDU pdu: the event to be checked + record_failure_callback: A callback to run whenever the given event + fails signature or hash checks. This includes exceptions + that would be normally be thrown/raised but also things like + checking for event tampering where we just return the redacted + event. Returns: * the original event if the checks pass diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 7bdbf7e27780..6c22a540ebe3 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -647,6 +647,11 @@ async def _check_sigs_and_hash_and_fetch_one( origin pdu room_version + record_failure_callback: A callback to run whenever the given event + fails signature or hash checks. This includes exceptions + that would be normally be thrown/raised but also things like + checking for event tampering where we just return the redacted + event. Returns: The PDU (possibly redacted) if it has valid signatures and hashes. From 14e39ee5b92275b168b44d2485b4b13ddcf7ba65 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 23 Sep 2022 20:16:00 -0500 Subject: [PATCH 07/32] Record failure from get_pdu and add test --- synapse/federation/federation_client.py | 16 ++++- tests/federation/test_federation_client.py | 75 ++++++++++++++++++++++ 2 files changed, 89 insertions(+), 2 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 6c22a540ebe3..4dca711cd28d 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -328,7 +328,17 @@ async def get_pdu_from_destination_raw( # Check signatures are correct. try: - signed_pdu = await self._check_sigs_and_hash(room_version, pdu) + + async def _record_failure_callback( + event: EventBase, cause: str + ) -> None: + await self.store.record_event_failed_pull_attempt( + event.room_id, event.event_id, cause + ) + + signed_pdu = await self._check_sigs_and_hash( + room_version, pdu, _record_failure_callback + ) except InvalidEventSignatureError as e: errmsg = f"event id {pdu.event_id}: {e}" logger.warning("%s", errmsg) @@ -659,7 +669,9 @@ async def _check_sigs_and_hash_and_fetch_one( """ try: - return await self._check_sigs_and_hash(room_version, pdu) + return await self._check_sigs_and_hash( + room_version, pdu, record_failure_callback + ) except InvalidEventSignatureError as e: logger.warning( "Signature on retrieved event %s was invalid (%s). " diff --git a/tests/federation/test_federation_client.py b/tests/federation/test_federation_client.py index 50e376f69574..a538215931e3 100644 --- a/tests/federation/test_federation_client.py +++ b/tests/federation/test_federation_client.py @@ -23,14 +23,23 @@ from synapse.api.room_versions import RoomVersions from synapse.events import EventBase +from synapse.rest import admin +from synapse.rest.client import login, room from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util import Clock +from tests.test_utils import event_injection from tests.unittest import FederatingHomeserverTestCase class FederationClientTest(FederatingHomeserverTestCase): + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + def prepare(self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer): super().prepare(reactor, clock, homeserver) @@ -231,6 +240,72 @@ def _get_pdu_once(self) -> EventBase: return remote_pdu + def test_backfill_invalid_signature_records_failed_pull_attempts( + self, + ) -> None: + """ + Test to make sure that events from /backfill with invalid signatures get + recorded as failed pull attempts. + """ + OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}" + main_store = self.hs.get_datastores().main + + # Create the room + user_id = self.register_user("kermit", "test") + tok = self.login("kermit", "test") + room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) + + # We purposely don't run `add_hashes_and_signatures_from_other_server` + # over this because we want the signature check to fail. + pulled_event, _ = self.get_success( + event_injection.create_event( + self.hs, + room_id=room_id, + sender=OTHER_USER, + type="test_event_type", + content={"body": "garply"}, + ) + ) + + # We expect an outbound request to /backfill, so stub that out + self._mock_agent.request.side_effect = lambda *args, **kwargs: defer.succeed( + _mock_response( + { + "origin": "yet.another.server", + "origin_server_ts": 900, + # Mimic the other server returning our new `pulled_event` + "pdus": [pulled_event.get_pdu_json()], + } + ) + ) + + self.get_success( + self.hs.get_federation_client().backfill( + # We use "yet.another.server" instead of + # `self.OTHER_SERVER_NAME` because we want to see the behavior + # from `_check_sigs_and_hash_and_fetch_one` where it tries to + # fetch the PDU again from the origin server if the signature + # fails. Just want to make sure that the failure is counted from + # both code paths. + dest="yet.another.server", + room_id=room_id, + limit=1, + extremities=[pulled_event.event_id], + ), + ) + + # Make sure our failed pull attempt was recorded + backfill_num_attempts = self.get_success( + main_store.db_pool.simple_select_one_onecol( + table="event_failed_pull_attempts", + keyvalues={"event_id": pulled_event.event_id}, + retcol="num_attempts", + ) + ) + # This is 2 because it failed once from `self.OTHER_SERVER_NAME` and the + # other from "yet.another.server" + self.assertEqual(backfill_num_attempts, 2) + def _mock_response(resp: JsonDict): body = json.dumps(resp).encode("utf-8") From 7898371c25872f98e5bfab5cce8b2fd8c91a995b Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 29 Sep 2022 21:17:01 -0500 Subject: [PATCH 08/32] Be more selective about which errors to care about See https://github.com/matrix-org/synapse/pull/13815#discussion_r983384698 So wes can avoid things like `CancelledError` which is a valid error but probably should not count as an error --- synapse/federation/federation_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index c5505f19643c..6bd4742140c4 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -92,7 +92,7 @@ async def _check_sigs_and_hash( """ try: await _check_sigs_on_pdu(self.keyring, room_version, pdu) - except Exception as exc: + except InvalidEventSignatureError as exc: if record_failure_callback: await record_failure_callback(pdu, str(exc)) raise exc From e24db414bf284d41b81bd1e14dc983e08a5047fd Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 3 Oct 2022 22:42:12 -0500 Subject: [PATCH 09/32] Weird name and add tests --- synapse/handlers/federation_event.py | 2 +- .../databases/main/event_federation.py | 105 +++++++++--------- tests/storage/test_event_federation.py | 65 +++++++++++ 3 files changed, 117 insertions(+), 55 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 3d78171c8fd0..6910d11924b5 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -956,7 +956,7 @@ async def _compute_event_context_with_maybe_missing_prevs( missing_prevs = prevs - seen # Filter out events we've tried to pull recently - prevs_to_ignore = await self.store.filter_events_with_pull_attempt_backoff( + prevs_to_ignore = await self.store.filter_event_ids_with_pull_attempt_backoff( room_id, missing_prevs ) missing_prevs = missing_prevs - prevs_to_ignore diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 9c5d97a7c503..2accc84d1a08 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1532,78 +1532,75 @@ def _record_event_failed_pull_attempt_upsert_txn( txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause)) @trace - async def filter_events_with_pull_attempt_backoff( + async def filter_event_ids_with_pull_attempt_backoff( self, room_id: str, event_ids: Sequence[str], ) -> List[str]: """ - Filter out events that we've failed to pull before - recently. Uses exponential backoff. + Filter down the events to ones that we've failed to pull before recently. Uses + exponential backoff. Args: room_id: The room that the events belong to event_ids: A list of events to filter down Returns: - List of event_ids that can be attempted to be pulled + List of event_ids that should not be attempted to be pulled """ - return await self.db_pool.runInteraction( - "filter_events_with_pull_attempt_backoff", - self._filter_events_with_pull_attempt_backoff_txn, - room_id, - event_ids, - ) - def _filter_events_with_pull_attempt_backoff_txn( - self, - txn: LoggingTransaction, - room_id: str, - event_ids: Sequence[str], - ) -> None: - where_event_ids_match_clause, values = make_in_list_sql_clause( - txn.database_engine, "event_id", event_ids - ) + def _filter_event_ids_with_pull_attempt_backoff_txn( + txn: LoggingTransaction, + ) -> List[str]: + where_event_ids_match_clause, values = make_in_list_sql_clause( + txn.database_engine, "event_id", event_ids + ) - sql = """ - SELECT event_id FROM event_failed_pull_attempts as failed_backfill_attempt_info - WHERE - failed_backfill_attempt_info.room_id = ? - %s /* where_event_ids_match_clause */ - /** - * Exponential back-off (up to the upper bound) so we don't try to - * pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc. - * - * We use `1 << n` as a power of 2 equivalent for compatibility - * with older SQLites. The left shift equivalent only works with - * powers of 2 because left shift is a binary operation (base-2). - * Otherwise, we would use `power(2, n)` or the power operator, `2^n`. - */ - AND ( - failed_backfill_attempt_info.event_id IS NULL - OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + ( - (1 << {least_function}(failed_backfill_attempt_info.num_attempts, ? /* max doubling steps */)) + if isinstance(self.database_engine, PostgresEngine): + least_function = "least" + elif isinstance(self.database_engine, Sqlite3Engine): + least_function = "min" + else: + raise RuntimeError("Unknown database engine") + + sql = f""" + SELECT event_id FROM event_failed_pull_attempts + WHERE + room_id = ? + AND {where_event_ids_match_clause} + /** + * Exponential back-off (up to the upper bound) so we don't try to + * pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc. + * + * We use `1 << n` as a power of 2 equivalent for compatibility + * with older SQLites. The left shift equivalent only works with + * powers of 2 because left shift is a binary operation (base-2). + * Otherwise, we would use `power(2, n)` or the power operator, `2^n`. + */ + AND ? /* current_time */ < last_attempt_ts + ( + (1 << {least_function}(num_attempts, ? /* max doubling steps */)) * ? /* step */ ) - ) - """ + """ - if isinstance(self.database_engine, PostgresEngine): - least_function = "least" - elif isinstance(self.database_engine, Sqlite3Engine): - least_function = "min" - else: - raise RuntimeError("Unknown database engine") + txn.execute( + sql, + ( + room_id, + *values, + self._clock.time_msec(), + BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS, + BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS, + ), + ) - txn.execute( - sql % (where_event_ids_match_clause, least_function), - ( - room_id, - *values, - self._clock.time_msec(), - BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS, - BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS, - ), + event_ids_to_ignore_result = cast(List[Tuple[str]], txn.fetchall()) + + return [event_id for event_id, in event_ids_to_ignore_result] + + return await self.db_pool.runInteraction( + "filter_event_ids_with_pull_attempt_backoff_txn", + _filter_event_ids_with_pull_attempt_backoff_txn, ) async def get_missing_events( diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index 59b891090726..264495263f1c 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -27,6 +27,8 @@ RoomVersion, ) from synapse.events import _EventInternalMetadata +from synapse.rest import admin +from synapse.rest.client import login, room from synapse.server import HomeServer from synapse.storage.database import LoggingTransaction from synapse.types import JsonDict @@ -43,6 +45,12 @@ class _BackfillSetupInfo: class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main @@ -1122,6 +1130,63 @@ def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_ backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] self.assertEqual(backfill_event_ids, ["insertion_eventA"]) + def test_filter_event_ids_with_pull_attempt_backoff( + self, + ): + """ + Test to make sure only event IDs we should backoff from are returned. + """ + + # Create the room + user_id = self.register_user("alice", "test") + tok = self.login("alice", "test") + room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) + + self.get_success( + self.store.record_event_failed_pull_attempt( + room_id, "$failed_event_id", "fake cause" + ) + ) + + event_ids_to_backoff = self.get_success( + self.store.filter_event_ids_with_pull_attempt_backoff( + room_id=room_id, event_ids=["$failed_event_id", "$normal_event_id"] + ) + ) + + self.assertEqual(event_ids_to_backoff, ["$failed_event_id"]) + + def test_filter_event_ids_with_pull_attempt_backoff_retry_after_backoff_duration( + self, + ): + """ + Test to make sure only event IDs are returned after the backoff duration has + elapsed. + """ + + # Create the room + user_id = self.register_user("alice", "test") + tok = self.login("alice", "test") + room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) + + self.get_success( + self.store.record_event_failed_pull_attempt( + room_id, "$failed_event_id", "fake cause" + ) + ) + + # Now advance time by 2 hours so we wait long enough for the single failed + # attempt (2^1 hours). + self.reactor.advance(datetime.timedelta(hours=2).total_seconds()) + + event_ids_to_backoff = self.get_success( + self.store.filter_event_ids_with_pull_attempt_backoff( + room_id=room_id, event_ids=["$failed_event_id", "$normal_event_id"] + ) + ) + + self.assertEqual(event_ids_to_backoff, []) + @attr.s class FakeEvent: From f853e78469b4f97942bf623923c773428db69051 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 4 Oct 2022 00:38:24 -0500 Subject: [PATCH 10/32] Add changelog --- changelog.d/13816.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/13816.feature diff --git a/changelog.d/13816.feature b/changelog.d/13816.feature new file mode 100644 index 000000000000..5eaa936b0823 --- /dev/null +++ b/changelog.d/13816.feature @@ -0,0 +1 @@ +Stop fetching missing `prev_events` after we already know their signature is invalid. From 43fb6b871bb9739146da0c5a67f04bb8e586d8bc Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 4 Oct 2022 01:20:35 -0500 Subject: [PATCH 11/32] Bail early and fix lints See https://github.com/matrix-org/synapse/pull/13816#discussion_r986373714 --- synapse/handlers/federation_event.py | 14 +++++++++++--- synapse/storage/databases/main/event_federation.py | 3 +-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 6910d11924b5..1bfed552662f 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -955,11 +955,19 @@ async def _compute_event_context_with_maybe_missing_prevs( seen = await self._store.have_events_in_timeline(prevs) missing_prevs = prevs - seen - # Filter out events we've tried to pull recently - prevs_to_ignore = await self.store.filter_event_ids_with_pull_attempt_backoff( + # If we've already recently attempted to pull this missing event recently, don't + # try it again so soon. Since we have to fetch all of the prev_events, we can + # bail early here if we find any to ignore. + prevs_to_ignore = await self._store.filter_event_ids_with_pull_attempt_backoff( room_id, missing_prevs ) - missing_prevs = missing_prevs - prevs_to_ignore + if len(prevs_to_ignore) > 0: + raise FederationError( + "ERROR", + 429, + "Not attempting to pull event that we already tried recently (backing off).", + affected=prevs_to_ignore[0], + ) if not missing_prevs: return await self._state_handler.compute_event_context(event) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 2accc84d1a08..92851a414b8a 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -22,7 +22,6 @@ Iterable, List, Optional, - Sequence, Set, Tuple, cast, @@ -1535,7 +1534,7 @@ def _record_event_failed_pull_attempt_upsert_txn( async def filter_event_ids_with_pull_attempt_backoff( self, room_id: str, - event_ids: Sequence[str], + event_ids: Collection[str], ) -> List[str]: """ Filter down the events to ones that we've failed to pull before recently. Uses From 03f23b720b280d4fa9f243d337b35fd1e1e0e848 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 4 Oct 2022 11:54:19 -0500 Subject: [PATCH 12/32] Add integration test with backfill --- synapse/handlers/federation_event.py | 7 +- tests/handlers/test_federation_event.py | 185 +++++++++++++++++++++++- 2 files changed, 190 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 1bfed552662f..f351d62a5e4a 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -906,6 +906,11 @@ async def _process_pulled_event( if e.code == 403: logger.warning("Pulled event %s failed history check.", event_id) + elif e.code == 429: + logger.warning( + "Not attempting to pull event %s that we already tried to pull recently (backing off).", + event_id, + ) else: raise @@ -965,7 +970,7 @@ async def _compute_event_context_with_maybe_missing_prevs( raise FederationError( "ERROR", 429, - "Not attempting to pull event that we already tried recently (backing off).", + "Not attempting to pull event that we already tried to pull recently (backing off).", affected=prevs_to_ignore[0], ) diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py index 918010cddbf9..ae065a719823 100644 --- a/tests/handlers/test_federation_event.py +++ b/tests/handlers/test_federation_event.py @@ -43,7 +43,7 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase): def make_homeserver(self, reactor, clock): # mock out the federation transport client self.mock_federation_transport_client = mock.Mock( - spec=["get_room_state_ids", "get_room_state", "get_event"] + spec=["get_room_state_ids", "get_room_state", "get_event", "backfill"] ) return super().setup_test_homeserver( federation_transport_client=self.mock_federation_transport_client @@ -459,6 +459,189 @@ def test_process_pulled_event_clears_backfill_attempts_after_being_successfully_ ) self.assertIsNotNone(persisted, "pulled event was not persisted at all") + def test_backfill_signature_failure_does_not_fetch_same_prev_event_later( + self, + ) -> None: + """ + Test to make sure we backoff and don't try to fetch a missing prev_event when we + already know it has a invalid signature from checking the signatures of all of + the events in the backfill response. + """ + OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}" + main_store = self.hs.get_datastores().main + + # Create the room + user_id = self.register_user("kermit", "test") + tok = self.login("kermit", "test") + room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) + room_version = self.get_success(main_store.get_room_version(room_id)) + + # Allow the remote user to send state events + self.helper.send_state( + room_id, + "m.room.power_levels", + {"events_default": 0, "state_default": 0}, + tok=tok, + ) + + # Add the remote user to the room + member_event = self.get_success( + event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join") + ) + + initial_state_map = self.get_success( + main_store.get_partial_current_state_ids(room_id) + ) + + auth_event_ids = [ + initial_state_map[("m.room.create", "")], + initial_state_map[("m.room.power_levels", "")], + member_event.event_id, + ] + + # We purposely don't run `add_hashes_and_signatures_from_other_server` + # over this because we want the signature check to fail. + pulled_event_without_signatures = make_event_from_dict( + { + "type": "test_regular_type", + "room_id": room_id, + "sender": OTHER_USER, + "prev_events": [member_event.event_id], + "auth_events": auth_event_ids, + "origin_server_ts": 1, + "depth": 12, + "content": {"body": "pulled_event_without_signatures"}, + }, + room_version, + ) + + # Create a regular event that should pass except for the + # `pulled_event_without_signatures` in the `prev_event`. + pulled_event = make_event_from_dict( + self.add_hashes_and_signatures_from_other_server( + { + "type": "test_regular_type", + "room_id": room_id, + "sender": OTHER_USER, + "prev_events": [ + member_event.event_id, + pulled_event_without_signatures.event_id, + ], + "auth_events": auth_event_ids, + "origin_server_ts": 1, + "depth": 12, + "content": {"body": "pulled_event"}, + } + ), + room_version, + ) + + # We expect an outbound request to /backfill, so stub that out + self.mock_federation_transport_client.backfill.return_value = make_awaitable( + { + "origin": self.OTHER_SERVER_NAME, + "origin_server_ts": 123, + "pdus": [ + # This is one of the important aspects of this test: we include + # `pulled_event_without_signatures` so it fails the signature check + # when we filter down the backfill response down to events which + # have valid signatures in + # `_check_sigs_and_hash_for_pulled_events_and_fetch` + pulled_event_without_signatures.get_pdu_json(), + # Then later when we process this valid signature event, when we + # fetch the missing `prev_event`s, we want to make sure that we + # backoff and don't try and fetch `pulled_event_without_signatures` + # again since we know it just had an invalid signature. + pulled_event.get_pdu_json(), + ], + } + ) + + event_endpoint_requested_count = 0 + room_state_ids_endpoint_requested_count = 0 + room_state_endpoint_requested_count = 0 + + async def get_event( + destination: str, event_id: str, timeout: Optional[int] = None + ) -> JsonDict: + nonlocal event_endpoint_requested_count + event_endpoint_requested_count += 1 + + async def get_room_state_ids( + destination: str, room_id: str, event_id: str + ) -> JsonDict: + nonlocal room_state_ids_endpoint_requested_count + room_state_ids_endpoint_requested_count += 1 + + async def get_room_state( + room_version: RoomVersion, destination: str, room_id: str, event_id: str + ) -> StateRequestResponse: + nonlocal room_state_endpoint_requested_count + room_state_endpoint_requested_count += 1 + + # We don't expect an outbound request to `/event`, `/state_ids`, or `/state` in + # the happy path but if the logic is sneaking around what we expect, stub that + # out so we can detect that failure + self.mock_federation_transport_client.get_event.side_effect = get_event + self.mock_federation_transport_client.get_room_state_ids.side_effect = ( + get_room_state_ids + ) + self.mock_federation_transport_client.get_room_state.side_effect = ( + get_room_state + ) + + # The function under test: try to backfill and process the pulled event + with LoggingContext("test"): + self.get_success( + self.hs.get_federation_event_handler().backfill( + self.OTHER_SERVER_NAME, + room_id, + limit=1, + extremities=["$some_extremity"], + ) + ) + + if event_endpoint_requested_count > 0: + self.fail( + "We don't expect an outbound request to /event in the happy path but if " + + "the logic is sneaking around what we expect, make sure to fail the test " + ) + + if room_state_ids_endpoint_requested_count > 0: + self.fail( + "We don't expect an outbound request to /state_ids in the happy path but if " + + "the logic is sneaking around what we expect, make sure to fail the test " + ) + + if room_state_endpoint_requested_count > 0: + self.fail( + "We don't expect an outbound request to /state in the happy path but if " + + "the logic is sneaking around what we expect, make sure to fail the test " + ) + + # Make sure we only recorded a single failure which corresponds to the signature + # failure initially in `_check_sigs_and_hash_for_pulled_events_and_fetch` before + # we process all of the pulled events. + backfill_num_attempts_for_event_without_signatures = self.get_success( + main_store.db_pool.simple_select_one_onecol( + table="event_failed_pull_attempts", + keyvalues={"event_id": pulled_event_without_signatures.event_id}, + retcol="num_attempts", + ) + ) + self.assertEqual(backfill_num_attempts_for_event_without_signatures, 1) + + # And we recorded a failure for the event that has the missing prev_event + # since we weren't able to go and fetch it. + backfill_num_attempts = self.get_success( + main_store.db_pool.simple_select_one_onecol( + table="event_failed_pull_attempts", + keyvalues={"event_id": pulled_event.event_id}, + retcol="num_attempts", + ) + ) + self.assertEqual(backfill_num_attempts, 1) + def test_process_pulled_event_with_rejected_missing_state(self) -> None: """Ensure that we correctly handle pulled events with missing state containing a rejected state event From 99d3e79d8ef75743f0e290c55cf732c25e120e92 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 4 Oct 2022 17:03:08 -0500 Subject: [PATCH 13/32] Fix lints and better message --- tests/handlers/test_federation_event.py | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py index ae065a719823..3a4e3f1a4802 100644 --- a/tests/handlers/test_federation_event.py +++ b/tests/handlers/test_federation_event.py @@ -557,25 +557,26 @@ def test_backfill_signature_failure_does_not_fetch_same_prev_event_later( } ) + # Keep track of the count and make sure we don't make any of these requests event_endpoint_requested_count = 0 room_state_ids_endpoint_requested_count = 0 room_state_endpoint_requested_count = 0 async def get_event( destination: str, event_id: str, timeout: Optional[int] = None - ) -> JsonDict: + ) -> None: nonlocal event_endpoint_requested_count event_endpoint_requested_count += 1 async def get_room_state_ids( destination: str, room_id: str, event_id: str - ) -> JsonDict: + ) -> None: nonlocal room_state_ids_endpoint_requested_count room_state_ids_endpoint_requested_count += 1 async def get_room_state( room_version: RoomVersion, destination: str, room_id: str, event_id: str - ) -> StateRequestResponse: + ) -> None: nonlocal room_state_endpoint_requested_count room_state_endpoint_requested_count += 1 @@ -604,19 +605,28 @@ async def get_room_state( if event_endpoint_requested_count > 0: self.fail( "We don't expect an outbound request to /event in the happy path but if " - + "the logic is sneaking around what we expect, make sure to fail the test " + "the logic is sneaking around what we expect, make sure to fail the test. " + "We don't expect it because the signature failure should cause us to backoff " + "and not asking about pulled_event_without_signatures=" + f"{pulled_event_without_signatures.event_id} again" ) if room_state_ids_endpoint_requested_count > 0: self.fail( "We don't expect an outbound request to /state_ids in the happy path but if " - + "the logic is sneaking around what we expect, make sure to fail the test " + "the logic is sneaking around what we expect, make sure to fail the test. " + "We don't expect it because the signature failure should cause us to backoff " + "and not asking about pulled_event_without_signatures=" + f"{pulled_event_without_signatures.event_id} again" ) if room_state_endpoint_requested_count > 0: self.fail( "We don't expect an outbound request to /state in the happy path but if " - + "the logic is sneaking around what we expect, make sure to fail the test " + "the logic is sneaking around what we expect, make sure to fail the test. " + "We don't expect it because the signature failure should cause us to backoff " + "and not asking about pulled_event_without_signatures=" + f"{pulled_event_without_signatures.event_id} again" ) # Make sure we only recorded a single failure which corresponds to the signature From f11f5b51eb81d5aa3653e4a4895982b9c0721436 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 4 Oct 2022 17:06:34 -0500 Subject: [PATCH 14/32] Fix test description --- tests/storage/test_event_federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index 264495263f1c..6ef9cf8bf42c 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -1160,7 +1160,7 @@ def test_filter_event_ids_with_pull_attempt_backoff_retry_after_backoff_duration self, ): """ - Test to make sure only event IDs are returned after the backoff duration has + Test to make sure event IDs are returned after the backoff duration has elapsed. """ From 6878faa4f266c6340078063db61d858550259c7b Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 4 Oct 2022 17:06:46 -0500 Subject: [PATCH 15/32] Fix test descriptions --- tests/storage/test_event_federation.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index 6ef9cf8bf42c..7ca379859adc 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -1136,7 +1136,6 @@ def test_filter_event_ids_with_pull_attempt_backoff( """ Test to make sure only event IDs we should backoff from are returned. """ - # Create the room user_id = self.register_user("alice", "test") tok = self.login("alice", "test") @@ -1160,10 +1159,8 @@ def test_filter_event_ids_with_pull_attempt_backoff_retry_after_backoff_duration self, ): """ - Test to make sure event IDs are returned after the backoff duration has - elapsed. + Test to make sure event IDs are returned after the backoff duration has elapsed. """ - # Create the room user_id = self.register_user("alice", "test") tok = self.login("alice", "test") From f3b443deb19adaa27e92ea6fdd0edd2ce5f91fba Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 5 Oct 2022 03:30:58 -0500 Subject: [PATCH 16/32] Scratch debug changes --- synapse/handlers/federation.py | 17 ++++- synapse/handlers/federation_event.py | 10 ++- synapse/handlers/message.py | 9 +++ synapse/storage/controllers/persist_events.py | 2 + .../databases/main/event_federation.py | 6 ++ synapse/storage/databases/main/events.py | 5 ++ .../storage/databases/main/purge_events.py | 65 +++++++++++++++++++ 7 files changed, 110 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 986ffed3d592..f099d9dfa578 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1005,12 +1005,25 @@ async def on_invite_request( await self._bulk_push_rule_evaluator.action_for_event_by_user(event, context) try: + logger.info( + "on_invite_request persist_events_and_notify=%s sender=%s", + event.event_id, + event.sender, + ) + logger.info( + "on_invite_request debug_event=%s", + event.get_pdu_json(), + ) + # await self._federation_event_handler.backfill_event_id( + # origin, event.room_id, event.event_id + # ) await self._federation_event_handler.persist_events_and_notify( event.room_id, [(event, context)] ) - except Exception: + except Exception as exc: + logger.info("on_invite_request persist exception=%s", str(exc)) await self.store.remove_push_actions_from_staging(event.event_id) - raise + raise exc return event diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index f351d62a5e4a..e0648aa9df0f 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -908,8 +908,9 @@ async def _process_pulled_event( logger.warning("Pulled event %s failed history check.", event_id) elif e.code == 429: logger.warning( - "Not attempting to pull event %s that we already tried to pull recently (backing off).", + "Not attempting to pull event=%s because of affected=%s that we already tried to pull recently (backing off).", event_id, + e.affected, ) else: raise @@ -966,11 +967,16 @@ async def _compute_event_context_with_maybe_missing_prevs( prevs_to_ignore = await self._store.filter_event_ids_with_pull_attempt_backoff( room_id, missing_prevs ) + logger.info( + "_compute_event_context_with_maybe_missing_prevs(event=%s) prevs_to_ignore=%s", + event.event_id, + prevs_to_ignore, + ) if len(prevs_to_ignore) > 0: raise FederationError( "ERROR", 429, - "Not attempting to pull event that we already tried to pull recently (backing off).", + f"While computing context for event={event_id}, not attempting to pull missing prev_event={prevs_to_ignore[0]} because we already tried to pull recently (backing off).", affected=prevs_to_ignore[0], ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 00e7645ba5cc..ff94d8da87ee 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1719,6 +1719,11 @@ async def persist_and_notify_client_events( # way? If we have been invited by a remote server, we need # to get them to sign the event. + logger.info( + "asdf send_invite event=%s outlier=%s", + event.event_id, + event.internal_metadata.outlier, + ) returned_invite = await federation_handler.send_invite( invitee.domain, event ) @@ -1727,6 +1732,10 @@ async def persist_and_notify_client_events( # TODO: Make sure the signatures actually are correct. event.signatures.update(returned_invite.signatures) + logger.info( + "asdf send_invite debug event=%s", event.get_pdu_json() + ) + if event.content["membership"] == Membership.KNOCK: event.unsigned[ "knock_room_state" diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 06e71a8053c7..773dd35c00ec 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -392,6 +392,7 @@ async def persist_events( partitioned.setdefault(event.room_id, []).append((event, ctx)) event_ids.append(event.event_id) + logger.info("persist_events event_ids=%s", event_ids) set_tag( SynapseTags.FUNC_ARG_PREFIX + "event_ids", str(event_ids), @@ -442,6 +443,7 @@ async def enqueue( async def persist_event( self, event: EventBase, context: EventContext, backfilled: bool = False ) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]: + logger.info("persist_event event=%s", event.event_id) """ Returns: The event, stream ordering of `event`, and the stream ordering of the diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 92851a414b8a..09db96e6799f 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1501,6 +1501,12 @@ async def record_event_failed_pull_attempt( event_id: The event that failed to be fetched or processed cause: The error message or reason that we failed to pull the event """ + logger.info( + "record_event_failed_pull_attempt room_id=%s, event_id=%s, cause=%s", + room_id, + event_id, + cause, + ) await self.db_pool.runInteraction( "record_event_failed_pull_attempt", self._record_event_failed_pull_attempt_upsert_txn, diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 3e1582798656..d80e1ab63b29 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -182,6 +182,11 @@ async def _persist_events_and_state_updates( a room that has been un-partial stated. """ + logger.info( + "_persist_events_and_state_updates events=%s", + [x[0].event_id for x in events_and_contexts], + ) + # We want to calculate the stream orderings as late as possible, as # we only notify after all events with a lesser stream ordering have # been persisted. I.e. if we spend 10s inside the with block then diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 9213ce0b5abb..46f289e81342 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -57,6 +57,48 @@ async def purge_history( delete_local_events, ) + def asdf_get_debug_events_in_room_ordered_by_depth_txn( + self, txn: LoggingTransaction, room_id: str + ) -> Any: + """ + Output all events in the room in a nice table as a string to log + + Based on https://github.com/matrix-org/matrix-react-sdk/pull/8354 + """ + sql = ( + "SELECT depth, stream_ordering, type, state_key, outlier, event_id FROM events" + " WHERE events.room_id = ?" + " ORDER BY depth DESC, stream_ordering DESC;" + ) + txn.execute( + sql, + (room_id,), + ) + + headerColumnLengthMap = { + "depth": 4, + "stream_ordering": 12, + "type": 30, + "state_key": 40, + "outlier": 4, + # event_ids are 44 long but we don't need the extra 5 padding + # because it's the last item and we're just cheating by building + # this into the value instead of the format code. + "event_id": 39, + } + + output = "" + row_format = "".join( + [ + "{:<" + str(columnLength + 5) + "}" + for columnLength in headerColumnLengthMap.values() + ] + ) + output += row_format.format(*headerColumnLengthMap.keys()) + "\n" + for row in txn: + output += row_format.format(*[str(x) for x in row]) + "\n" + return output + def _purge_history_txn( self, txn: LoggingTransaction, @@ -134,6 +176,11 @@ def _purge_history_txn( should_delete_params += (room_id, token.topological) + logger.info( + "before purge asdf_get_debug_events_in_room_ordered_by_depth\n%s", + self.asdf_get_debug_events_in_room_ordered_by_depth_txn(txn, room_id), + ) + # Note that we insert events that are outliers and aren't going to be # deleted, as nothing will happen to them. txn.execute( @@ -214,6 +261,19 @@ def _purge_history_txn( "WHERE event_id IN (SELECT event_id from events_to_purge)" ) + txn.execute( + """ + SELECT event_id FROM events_to_purge + """ + ) + events_to_purge_results = txn.fetchall() + # events_to_purge = [x[0] for x in events_to_purge_results] + logger.info( + "qwer delete_local_events=%s events_to_purge=%s", + delete_local_events, + events_to_purge_results, + ) + # Delete all remote non-state events for table in ( "event_edges", @@ -308,6 +368,11 @@ def _purge_history_txn( logger.info("[purge] done") + logger.info( + "after purge asdf_get_debug_events_in_room_ordered_by_depth\n%s", + self.asdf_get_debug_events_in_room_ordered_by_depth_txn(txn, room_id), + ) + return referenced_state_groups async def purge_room(self, room_id: str) -> List[int]: From d135d419669e819f33d4d15bb44a6237c32b2c4c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 5 Oct 2022 03:52:58 -0500 Subject: [PATCH 17/32] Stop cascading backoff errors See https://github.com/matrix-org/synapse/pull/13816#discussion_r987676306 --- synapse/handlers/federation_event.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index e0648aa9df0f..e5e80ec59d1a 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -900,19 +900,28 @@ async def _process_pulled_event( backfilled=backfilled, ) except FederationError as e: - await self._store.record_event_failed_pull_attempt( - event.room_id, event_id, str(e) - ) - if e.code == 403: logger.warning("Pulled event %s failed history check.", event_id) + await self._store.record_event_failed_pull_attempt( + event.room_id, event_id, str(e) + ) elif e.code == 429: logger.warning( "Not attempting to pull event=%s because of affected=%s that we already tried to pull recently (backing off).", event_id, e.affected, ) + # We do not record a failed pull attempt when we backoff fetching a + # missing `prev_event` because not being able to fetch the `prev_events` for + # a given event, doesn't mean we won't be able to fetch the given event as a + # `prev_event` for another downstream event. + # + # This avoids a cascade of backoff for all events in the DAG downstream + # from one backoff attempt. else: + await self._store.record_event_failed_pull_attempt( + event.room_id, event_id, str(e) + ) raise @trace From 4effca913157911b9cf8157f569ddf36a619a340 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 5 Oct 2022 03:58:34 -0500 Subject: [PATCH 18/32] Remove scratch changes --- synapse/handlers/federation.py | 17 +---- synapse/handlers/federation_event.py | 5 -- synapse/handlers/message.py | 9 --- synapse/storage/controllers/persist_events.py | 2 - .../databases/main/event_federation.py | 2 +- synapse/storage/databases/main/events.py | 5 -- .../storage/databases/main/purge_events.py | 65 ------------------- 7 files changed, 3 insertions(+), 102 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f099d9dfa578..986ffed3d592 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1005,25 +1005,12 @@ async def on_invite_request( await self._bulk_push_rule_evaluator.action_for_event_by_user(event, context) try: - logger.info( - "on_invite_request persist_events_and_notify=%s sender=%s", - event.event_id, - event.sender, - ) - logger.info( - "on_invite_request debug_event=%s", - event.get_pdu_json(), - ) - # await self._federation_event_handler.backfill_event_id( - # origin, event.room_id, event.event_id - # ) await self._federation_event_handler.persist_events_and_notify( event.room_id, [(event, context)] ) - except Exception as exc: - logger.info("on_invite_request persist exception=%s", str(exc)) + except Exception: await self.store.remove_push_actions_from_staging(event.event_id) - raise exc + raise return event diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index e5e80ec59d1a..469062f098c2 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -976,11 +976,6 @@ async def _compute_event_context_with_maybe_missing_prevs( prevs_to_ignore = await self._store.filter_event_ids_with_pull_attempt_backoff( room_id, missing_prevs ) - logger.info( - "_compute_event_context_with_maybe_missing_prevs(event=%s) prevs_to_ignore=%s", - event.event_id, - prevs_to_ignore, - ) if len(prevs_to_ignore) > 0: raise FederationError( "ERROR", diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index ff94d8da87ee..00e7645ba5cc 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1719,11 +1719,6 @@ async def persist_and_notify_client_events( # way? If we have been invited by a remote server, we need # to get them to sign the event. - logger.info( - "asdf send_invite event=%s outlier=%s", - event.event_id, - event.internal_metadata.outlier, - ) returned_invite = await federation_handler.send_invite( invitee.domain, event ) @@ -1732,10 +1727,6 @@ async def persist_and_notify_client_events( # TODO: Make sure the signatures actually are correct. event.signatures.update(returned_invite.signatures) - logger.info( - "asdf send_invite debug event=%s", event.get_pdu_json() - ) - if event.content["membership"] == Membership.KNOCK: event.unsigned[ "knock_room_state" diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 773dd35c00ec..06e71a8053c7 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -392,7 +392,6 @@ async def persist_events( partitioned.setdefault(event.room_id, []).append((event, ctx)) event_ids.append(event.event_id) - logger.info("persist_events event_ids=%s", event_ids) set_tag( SynapseTags.FUNC_ARG_PREFIX + "event_ids", str(event_ids), @@ -443,7 +442,6 @@ async def enqueue( async def persist_event( self, event: EventBase, context: EventContext, backfilled: bool = False ) -> Tuple[EventBase, PersistedEventPosition, RoomStreamToken]: - logger.info("persist_event event=%s", event.event_id) """ Returns: The event, stream ordering of `event`, and the stream ordering of the diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 09db96e6799f..2853840e7112 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1501,7 +1501,7 @@ async def record_event_failed_pull_attempt( event_id: The event that failed to be fetched or processed cause: The error message or reason that we failed to pull the event """ - logger.info( + logger.debug( "record_event_failed_pull_attempt room_id=%s, event_id=%s, cause=%s", room_id, event_id, diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index d80e1ab63b29..3e1582798656 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -182,11 +182,6 @@ async def _persist_events_and_state_updates( a room that has been un-partial stated. """ - logger.info( - "_persist_events_and_state_updates events=%s", - [x[0].event_id for x in events_and_contexts], - ) - # We want to calculate the stream orderings as late as possible, as # we only notify after all events with a lesser stream ordering have # been persisted. I.e. if we spend 10s inside the with block then diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 46f289e81342..9213ce0b5abb 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -57,48 +57,6 @@ async def purge_history( delete_local_events, ) - def asdf_get_debug_events_in_room_ordered_by_depth_txn( - self, txn: LoggingTransaction, room_id: str - ) -> Any: - """ - Output all events in the room in a nice table as a string to log - - Based on https://github.com/matrix-org/matrix-react-sdk/pull/8354 - """ - sql = ( - "SELECT depth, stream_ordering, type, state_key, outlier, event_id FROM events" - " WHERE events.room_id = ?" - " ORDER BY depth DESC, stream_ordering DESC;" - ) - txn.execute( - sql, - (room_id,), - ) - - headerColumnLengthMap = { - "depth": 4, - "stream_ordering": 12, - "type": 30, - "state_key": 40, - "outlier": 4, - # event_ids are 44 long but we don't need the extra 5 padding - # because it's the last item and we're just cheating by building - # this into the value instead of the format code. - "event_id": 39, - } - - output = "" - row_format = "".join( - [ - "{:<" + str(columnLength + 5) + "}" - for columnLength in headerColumnLengthMap.values() - ] - ) - output += row_format.format(*headerColumnLengthMap.keys()) + "\n" - for row in txn: - output += row_format.format(*[str(x) for x in row]) + "\n" - return output - def _purge_history_txn( self, txn: LoggingTransaction, @@ -176,11 +134,6 @@ def _purge_history_txn( should_delete_params += (room_id, token.topological) - logger.info( - "before purge asdf_get_debug_events_in_room_ordered_by_depth\n%s", - self.asdf_get_debug_events_in_room_ordered_by_depth_txn(txn, room_id), - ) - # Note that we insert events that are outliers and aren't going to be # deleted, as nothing will happen to them. txn.execute( @@ -261,19 +214,6 @@ def _purge_history_txn( "WHERE event_id IN (SELECT event_id from events_to_purge)" ) - txn.execute( - """ - SELECT event_id FROM events_to_purge - """ - ) - events_to_purge_results = txn.fetchall() - # events_to_purge = [x[0] for x in events_to_purge_results] - logger.info( - "qwer delete_local_events=%s events_to_purge=%s", - delete_local_events, - events_to_purge_results, - ) - # Delete all remote non-state events for table in ( "event_edges", @@ -368,11 +308,6 @@ def _purge_history_txn( logger.info("[purge] done") - logger.info( - "after purge asdf_get_debug_events_in_room_ordered_by_depth\n%s", - self.asdf_get_debug_events_in_room_ordered_by_depth_txn(txn, room_id), - ) - return referenced_state_groups async def purge_room(self, room_id: str) -> List[int]: From e3cc0546c071a2ae889080dfee5c098ff907be76 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 5 Oct 2022 04:21:30 -0500 Subject: [PATCH 19/32] Use custom FederationPullAttemptBackoffError See https://github.com/matrix-org/synapse/pull/13816#discussion_r987685635 --- synapse/api/errors.py | 25 ++++++++++++++++ synapse/handlers/federation_event.py | 43 ++++++++++++---------------- 2 files changed, 44 insertions(+), 24 deletions(-) diff --git a/synapse/api/errors.py b/synapse/api/errors.py index c6062075690e..95c6ea257d8c 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -274,6 +274,31 @@ def __init__(self, destination: Optional[str]): ) +class FederationPullAttemptBackoffError(SynapseError): + """ + Raised to indicate that we are are deliberately not attempting to pull the given + event over federation because we've already done so recently and are backing off. + + Attributes: + event_id: The event_id which we are refusing to pull + message: A custom error message that gives more context + """ + + def __init__(self, event_id: str, message: Optional[str]): + self.event_id = event_id + + if message: + error_message = message + else: + error_message = f"Not attempting to pull event={self.event_id} because we already tried to pull it recently (backing off)." + + super().__init__( + code=403, + msg=error_message, + errcode=Codes.FORBIDDEN, + ) + + class InteractiveAuthIncompleteError(Exception): """An error raised when UI auth is not yet complete diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 469062f098c2..cd089e7c310a 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -44,6 +44,7 @@ AuthError, Codes, FederationError, + FederationPullAttemptBackoffError, HttpResponseException, RequestSendFailed, SynapseError, @@ -900,29 +901,25 @@ async def _process_pulled_event( backfilled=backfilled, ) except FederationError as e: + await self._store.record_event_failed_pull_attempt( + event.room_id, event_id, str(e) + ) + if e.code == 403: logger.warning("Pulled event %s failed history check.", event_id) - await self._store.record_event_failed_pull_attempt( - event.room_id, event_id, str(e) - ) - elif e.code == 429: - logger.warning( - "Not attempting to pull event=%s because of affected=%s that we already tried to pull recently (backing off).", - event_id, - e.affected, - ) - # We do not record a failed pull attempt when we backoff fetching a - # missing `prev_event` because not being able to fetch the `prev_events` for - # a given event, doesn't mean we won't be able to fetch the given event as a - # `prev_event` for another downstream event. - # - # This avoids a cascade of backoff for all events in the DAG downstream - # from one backoff attempt. else: - await self._store.record_event_failed_pull_attempt( - event.room_id, event_id, str(e) - ) raise + except FederationPullAttemptBackoffError as exc: + # Log a warning about why we failed to process the event (the error message + # is pretty good) + logger.warning(str(exc)) + # We do not record a failed pull attempt when we backoff fetching a + # missing `prev_event` because not being able to fetch the `prev_events` for + # a given event, doesn't mean we won't be able to fetch the given event as a + # `prev_event` for another downstream event. + # + # This avoids a cascade of backoff for all events in the DAG downstream + # from one backoff attempt. @trace async def _compute_event_context_with_maybe_missing_prevs( @@ -977,11 +974,9 @@ async def _compute_event_context_with_maybe_missing_prevs( room_id, missing_prevs ) if len(prevs_to_ignore) > 0: - raise FederationError( - "ERROR", - 429, - f"While computing context for event={event_id}, not attempting to pull missing prev_event={prevs_to_ignore[0]} because we already tried to pull recently (backing off).", - affected=prevs_to_ignore[0], + raise FederationPullAttemptBackoffError( + event_id=prevs_to_ignore[0], + message=f"While computing context for event={event_id}, not attempting to pull missing prev_event={prevs_to_ignore[0]} because we already tried to pull recently (backing off).", ) if not missing_prevs: From 74f9e03381d0b0822440a49636c6f1e084eea671 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 5 Oct 2022 04:21:46 -0500 Subject: [PATCH 20/32] Fix test to reflect no more cascade See https://github.com/matrix-org/synapse/pull/13816#discussion_r987676306 --- tests/handlers/test_federation_event.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py index 3a4e3f1a4802..2fb2f045e491 100644 --- a/tests/handlers/test_federation_event.py +++ b/tests/handlers/test_federation_event.py @@ -14,7 +14,7 @@ from typing import Optional from unittest import mock -from synapse.api.errors import AuthError +from synapse.api.errors import AuthError, StoreError from synapse.api.room_versions import RoomVersion from synapse.event_auth import ( check_state_dependent_auth_rules, @@ -641,16 +641,18 @@ async def get_room_state( ) self.assertEqual(backfill_num_attempts_for_event_without_signatures, 1) - # And we recorded a failure for the event that has the missing prev_event - # since we weren't able to go and fetch it. - backfill_num_attempts = self.get_success( + # And make sure we didn't record a failure for the event that has the missing + # prev_event because we don't want to cause a cascade of failures. Just because + # we can't pull a `prev_event` for this `pulled_event`, doesn't mean we won't be + # able to fetch `pulled_event` as a `prev_event` for another event downstream. + self.get_failure( main_store.db_pool.simple_select_one_onecol( table="event_failed_pull_attempts", keyvalues={"event_id": pulled_event.event_id}, retcol="num_attempts", - ) + ), + StoreError, ) - self.assertEqual(backfill_num_attempts, 1) def test_process_pulled_event_with_rejected_missing_state(self) -> None: """Ensure that we correctly handle pulled events with missing state containing a From 0b900e14b8e3b8c96ee4a486b587f0d7a33ab553 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 5 Oct 2022 05:01:29 -0500 Subject: [PATCH 21/32] Better comments from what I've gathered See https://github.com/matrix-org/synapse/pull/13816#discussion_r987740318 --- synapse/handlers/federation_event.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index cd089e7c310a..e649cb4f3554 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -911,15 +911,16 @@ async def _process_pulled_event( raise except FederationPullAttemptBackoffError as exc: # Log a warning about why we failed to process the event (the error message - # is pretty good) + # for `FederationPullAttemptBackoffError` is pretty good) logger.warning(str(exc)) - # We do not record a failed pull attempt when we backoff fetching a - # missing `prev_event` because not being able to fetch the `prev_events` for - # a given event, doesn't mean we won't be able to fetch the given event as a - # `prev_event` for another downstream event. + # We do not record a failed pull attempt when we backoff fetching a missing + # `prev_event` because not being able to fetch the `prev_events` just means + # we won't be able to de-outlier the pulled event. But we can still use an + # `outlier` in the state/auth chain for another event. So we shouldn't stop + # a downstream event from trying to pull it. # - # This avoids a cascade of backoff for all events in the DAG downstream - # from one backoff attempt. + # This avoids a cascade of backoff for all events in the DAG downstream from + # one event backoff upstream. @trace async def _compute_event_context_with_maybe_missing_prevs( From 3cb28264e48d65f00aaf15debda43202b394d5d7 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 5 Oct 2022 05:02:58 -0500 Subject: [PATCH 22/32] Clarify which error in comments --- tests/handlers/test_federation_event.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py index 2fb2f045e491..f878f974036b 100644 --- a/tests/handlers/test_federation_event.py +++ b/tests/handlers/test_federation_event.py @@ -651,6 +651,7 @@ async def get_room_state( keyvalues={"event_id": pulled_event.event_id}, retcol="num_attempts", ), + # StoreError: 404: No row found StoreError, ) From 5f313df4f157d46158dca9161d0c929bf98d302a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 5 Oct 2022 05:05:58 -0500 Subject: [PATCH 23/32] Add some clarification to the test comment --- tests/handlers/test_federation_event.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py index f878f974036b..e448cb1901e0 100644 --- a/tests/handlers/test_federation_event.py +++ b/tests/handlers/test_federation_event.py @@ -642,9 +642,10 @@ async def get_room_state( self.assertEqual(backfill_num_attempts_for_event_without_signatures, 1) # And make sure we didn't record a failure for the event that has the missing - # prev_event because we don't want to cause a cascade of failures. Just because - # we can't pull a `prev_event` for this `pulled_event`, doesn't mean we won't be - # able to fetch `pulled_event` as a `prev_event` for another event downstream. + # prev_event because we don't want to cause a cascade of failures. Not being + # able to fetch the `prev_events` just means we won't be able to de-outlier the + # pulled event. But we can still use an `outlier` in the state/auth chain for + # another event. So we shouldn't stop a downstream event from trying to pull it. self.get_failure( main_store.db_pool.simple_select_one_onecol( table="event_failed_pull_attempts", From 7f4fdd2a4960498d7860dfc9a2c259dc6b41b922 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 12 Oct 2022 13:52:22 -0500 Subject: [PATCH 24/32] Not a SynapseError Also can pass multiple event_ids, see https://github.com/matrix-org/synapse/pull/13816#discussion_r993260296 --- synapse/api/errors.py | 46 +++++++++++++--------------- synapse/handlers/federation_event.py | 5 ++- 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 95c6ea257d8c..e0873b191309 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -274,31 +274,6 @@ def __init__(self, destination: Optional[str]): ) -class FederationPullAttemptBackoffError(SynapseError): - """ - Raised to indicate that we are are deliberately not attempting to pull the given - event over federation because we've already done so recently and are backing off. - - Attributes: - event_id: The event_id which we are refusing to pull - message: A custom error message that gives more context - """ - - def __init__(self, event_id: str, message: Optional[str]): - self.event_id = event_id - - if message: - error_message = message - else: - error_message = f"Not attempting to pull event={self.event_id} because we already tried to pull it recently (backing off)." - - super().__init__( - code=403, - msg=error_message, - errcode=Codes.FORBIDDEN, - ) - - class InteractiveAuthIncompleteError(Exception): """An error raised when UI auth is not yet complete @@ -665,6 +640,27 @@ def get_dict(self) -> "JsonDict": } +class FederationPullAttemptBackoffError(RuntimeError): + """ + Raised to indicate that we are are deliberately not attempting to pull the given + event over federation because we've already done so recently and are backing off. + + Attributes: + event_id: The event_id which we are refusing to pull + message: A custom error message that gives more context + """ + + def __init__(self, event_ids: List[str], message: Optional[str]): + self.event_ids = event_ids + + if message: + error_message = message + else: + error_message = f"Not attempting to pull event_ids={self.event_ids} because we already tried to pull them recently (backing off)." + + super().__init__(error_message) + + class HttpResponseException(CodeMessageException): """ Represents an HTTP-level failure of an outbound request diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index d3b9192a8473..c6bcd28f647a 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -958,6 +958,9 @@ async def _compute_event_context_with_maybe_missing_prevs( The event context. Raises: + FederationPullAttemptBackoffError if we are are deliberately not attempting + to pull the given event over federation because we've already done so + recently and are backing off. FederationError if we fail to get the state from the remote server after any missing `prev_event`s. """ @@ -976,7 +979,7 @@ async def _compute_event_context_with_maybe_missing_prevs( ) if len(prevs_to_ignore) > 0: raise FederationPullAttemptBackoffError( - event_id=prevs_to_ignore[0], + event_ids=prevs_to_ignore, message=f"While computing context for event={event_id}, not attempting to pull missing prev_event={prevs_to_ignore[0]} because we already tried to pull recently (backing off).", ) From 89ffbcb36e61e2448bcd38c5cc01d2f3b65a90d2 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 12 Oct 2022 14:05:59 -0500 Subject: [PATCH 25/32] Make sure usages of _compute_event_context_with_maybe_missing_prevs handle FederationPullAttemptBackoffError See https://github.com/matrix-org/synapse/pull/13816#discussion_r993261712 --- synapse/handlers/federation.py | 16 ++++++++++++++++ synapse/handlers/federation_event.py | 23 +++++++++++++---------- 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 44e70c6c3c30..e87fe5fe014a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -45,6 +45,7 @@ Codes, FederationDeniedError, FederationError, + FederationPullAttemptBackoffError, HttpResponseException, LimitExceededError, NotFoundError, @@ -1720,7 +1721,22 @@ async def _sync_partial_state_room( destination, event ) break + except FederationPullAttemptBackoffError as exc: + # Log a warning about why we failed to process the event (the error message + # for `FederationPullAttemptBackoffError` is pretty good) + logger.warning("_sync_partial_state_room: " + str(exc)) + # We do not record a failed pull attempt when we backoff fetching a missing + # `prev_event` because not being able to fetch the `prev_events` just means + # we won't be able to de-outlier the pulled event. But we can still use an + # `outlier` in the state/auth chain for another event. So we shouldn't stop + # a downstream event from trying to pull it. + # + # This avoids a cascade of backoff for all events in the DAG downstream from + # one event backoff upstream. except FederationError as e: + # TODO: We should `record_event_failed_pull_attempt` here, + # see https://github.com/matrix-org/synapse/issues/13700 + if attempt == len(destinations) - 1: # We have tried every remote server for this event. Give up. # TODO(faster_joins) giving up isn't the right thing to do diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index c6bcd28f647a..d6837b54bcad 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -566,6 +566,9 @@ async def update_state_for_partial_state_event( event: partial-state event to be de-partial-stated Raises: + FederationPullAttemptBackoffError if we are are deliberately not attempting + to pull the given event over federation because we've already done so + recently and are backing off. FederationError if we fail to request state from the remote server. """ logger.info("Updating state for %s", event.event_id) @@ -900,19 +903,10 @@ async def _process_pulled_event( context, backfilled=backfilled, ) - except FederationError as e: - await self._store.record_event_failed_pull_attempt( - event.room_id, event_id, str(e) - ) - - if e.code == 403: - logger.warning("Pulled event %s failed history check.", event_id) - else: - raise except FederationPullAttemptBackoffError as exc: # Log a warning about why we failed to process the event (the error message # for `FederationPullAttemptBackoffError` is pretty good) - logger.warning(str(exc)) + logger.warning("_process_pulled_event: " + str(exc)) # We do not record a failed pull attempt when we backoff fetching a missing # `prev_event` because not being able to fetch the `prev_events` just means # we won't be able to de-outlier the pulled event. But we can still use an @@ -921,6 +915,15 @@ async def _process_pulled_event( # # This avoids a cascade of backoff for all events in the DAG downstream from # one event backoff upstream. + except FederationError as e: + await self._store.record_event_failed_pull_attempt( + event.room_id, event_id, str(e) + ) + + if e.code == 403: + logger.warning("Pulled event %s failed history check.", event_id) + else: + raise @trace async def _compute_event_context_with_maybe_missing_prevs( From 354f68297a60c381c47a98bcc45c843db680d544 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 12 Oct 2022 14:07:22 -0500 Subject: [PATCH 26/32] Remove double "recently" --- synapse/handlers/federation_event.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index d6837b54bcad..0cedd24581cb 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -974,7 +974,7 @@ async def _compute_event_context_with_maybe_missing_prevs( seen = await self._store.have_events_in_timeline(prevs) missing_prevs = prevs - seen - # If we've already recently attempted to pull this missing event recently, don't + # If we've already recently attempted to pull this missing event, don't # try it again so soon. Since we have to fetch all of the prev_events, we can # bail early here if we find any to ignore. prevs_to_ignore = await self._store.filter_event_ids_with_pull_attempt_backoff( From e0b04472a2a81d534164be5a63241b29a2f787e0 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 12 Oct 2022 14:50:30 -0500 Subject: [PATCH 27/32] Do the calculation in Python because it's more clear when we need results for all given events anyway See https://github.com/matrix-org/synapse/pull/13816#discussion_r993254667 --- .../databases/main/event_federation.py | 47 +++++++++---------- 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 2853840e7112..a4570d5de974 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1561,31 +1561,11 @@ def _filter_event_ids_with_pull_attempt_backoff_txn( txn.database_engine, "event_id", event_ids ) - if isinstance(self.database_engine, PostgresEngine): - least_function = "least" - elif isinstance(self.database_engine, Sqlite3Engine): - least_function = "min" - else: - raise RuntimeError("Unknown database engine") - sql = f""" - SELECT event_id FROM event_failed_pull_attempts + SELECT event_id, last_attempt_ts, num_attempts FROM event_failed_pull_attempts WHERE room_id = ? AND {where_event_ids_match_clause} - /** - * Exponential back-off (up to the upper bound) so we don't try to - * pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc. - * - * We use `1 << n` as a power of 2 equivalent for compatibility - * with older SQLites. The left shift equivalent only works with - * powers of 2 because left shift is a binary operation (base-2). - * Otherwise, we would use `power(2, n)` or the power operator, `2^n`. - */ - AND ? /* current_time */ < last_attempt_ts + ( - (1 << {least_function}(num_attempts, ? /* max doubling steps */)) - * ? /* step */ - ) """ txn.execute( @@ -1593,15 +1573,30 @@ def _filter_event_ids_with_pull_attempt_backoff_txn( ( room_id, *values, - self._clock.time_msec(), - BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS, - BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS, ), ) - event_ids_to_ignore_result = cast(List[Tuple[str]], txn.fetchall()) + event_failed_pull_attempts = cast( + List[Tuple[str, int, int]], txn.fetchall() + ) - return [event_id for event_id, in event_ids_to_ignore_result] + current_time = self._clock.time_msec() + return [ + event_id + for event_id, last_attempt_ts, num_attempts in event_failed_pull_attempts + # Exponential back-off (up to the upper bound) so we don't try to + # pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc. + if current_time + < last_attempt_ts + + ( + 2 + ** min( + num_attempts, + BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS, + ) + ) + * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS + ] return await self.db_pool.runInteraction( "filter_event_ids_with_pull_attempt_backoff_txn", From e72c4e54d748789ae00fd3aafe309baf296aa4e4 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 12 Oct 2022 14:59:32 -0500 Subject: [PATCH 28/32] Use built-in select many function --- .../databases/main/event_federation.py | 38 +++++++------------ 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index a4570d5de974..1d1ca80c65eb 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1557,41 +1557,31 @@ async def filter_event_ids_with_pull_attempt_backoff( def _filter_event_ids_with_pull_attempt_backoff_txn( txn: LoggingTransaction, ) -> List[str]: - where_event_ids_match_clause, values = make_in_list_sql_clause( - txn.database_engine, "event_id", event_ids - ) - - sql = f""" - SELECT event_id, last_attempt_ts, num_attempts FROM event_failed_pull_attempts - WHERE - room_id = ? - AND {where_event_ids_match_clause} - """ - - txn.execute( - sql, - ( - room_id, - *values, + event_failed_pull_attempts = self.db_pool.simple_select_many_txn( + txn, + table="event_failed_pull_attempts", + column="event_id", + iterable=event_ids, + keyvalues={}, + retcols=( + "event_id", + "last_attempt_ts", + "num_attempts", ), ) - event_failed_pull_attempts = cast( - List[Tuple[str, int, int]], txn.fetchall() - ) - current_time = self._clock.time_msec() return [ - event_id - for event_id, last_attempt_ts, num_attempts in event_failed_pull_attempts + event_failed_pull_attempt["event_id"] + for event_failed_pull_attempt in event_failed_pull_attempts # Exponential back-off (up to the upper bound) so we don't try to # pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc. if current_time - < last_attempt_ts + < event_failed_pull_attempt["last_attempt_ts"] + ( 2 ** min( - num_attempts, + event_failed_pull_attempt["num_attempts"], BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS, ) ) From 4e0803982522d522f8023dda92461aef1c68b45d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 12 Oct 2022 15:04:03 -0500 Subject: [PATCH 29/32] No need for txn --- .../databases/main/event_federation.py | 65 ++++++++----------- 1 file changed, 28 insertions(+), 37 deletions(-) diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 1d1ca80c65eb..17c42f3c8f8b 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1553,45 +1553,36 @@ async def filter_event_ids_with_pull_attempt_backoff( Returns: List of event_ids that should not be attempted to be pulled """ + event_failed_pull_attempts = await self.db_pool.simple_select_many_batch( + table="event_failed_pull_attempts", + column="event_id", + iterable=event_ids, + keyvalues={}, + retcols=( + "event_id", + "last_attempt_ts", + "num_attempts", + ), + desc="filter_event_ids_with_pull_attempt_backoff", + ) - def _filter_event_ids_with_pull_attempt_backoff_txn( - txn: LoggingTransaction, - ) -> List[str]: - event_failed_pull_attempts = self.db_pool.simple_select_many_txn( - txn, - table="event_failed_pull_attempts", - column="event_id", - iterable=event_ids, - keyvalues={}, - retcols=( - "event_id", - "last_attempt_ts", - "num_attempts", - ), - ) - - current_time = self._clock.time_msec() - return [ - event_failed_pull_attempt["event_id"] - for event_failed_pull_attempt in event_failed_pull_attempts - # Exponential back-off (up to the upper bound) so we don't try to - # pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc. - if current_time - < event_failed_pull_attempt["last_attempt_ts"] - + ( - 2 - ** min( - event_failed_pull_attempt["num_attempts"], - BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS, - ) + current_time = self._clock.time_msec() + return [ + event_failed_pull_attempt["event_id"] + for event_failed_pull_attempt in event_failed_pull_attempts + # Exponential back-off (up to the upper bound) so we don't try to + # pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc. + if current_time + < event_failed_pull_attempt["last_attempt_ts"] + + ( + 2 + ** min( + event_failed_pull_attempt["num_attempts"], + BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS, ) - * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS - ] - - return await self.db_pool.runInteraction( - "filter_event_ids_with_pull_attempt_backoff_txn", - _filter_event_ids_with_pull_attempt_backoff_txn, - ) + ) + * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS + ] async def get_missing_events( self, From b0606524ee6472fb84562aa5558bc65770103eb1 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 12 Oct 2022 15:12:41 -0500 Subject: [PATCH 30/32] Rename function to reflect functionality See https://github.com/matrix-org/synapse/pull/13816#discussion_r987741403 --- synapse/handlers/federation_event.py | 2 +- synapse/storage/databases/main/event_federation.py | 4 ++-- tests/storage/test_event_federation.py | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 0cedd24581cb..e9cb28b17f6c 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -977,7 +977,7 @@ async def _compute_event_context_with_maybe_missing_prevs( # If we've already recently attempted to pull this missing event, don't # try it again so soon. Since we have to fetch all of the prev_events, we can # bail early here if we find any to ignore. - prevs_to_ignore = await self._store.filter_event_ids_with_pull_attempt_backoff( + prevs_to_ignore = await self._store.get_event_ids_to_not_pull_from_backoff( room_id, missing_prevs ) if len(prevs_to_ignore) > 0: diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 17c42f3c8f8b..309a4ba6643c 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1537,7 +1537,7 @@ def _record_event_failed_pull_attempt_upsert_txn( txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause)) @trace - async def filter_event_ids_with_pull_attempt_backoff( + async def get_event_ids_to_not_pull_from_backoff( self, room_id: str, event_ids: Collection[str], @@ -1563,7 +1563,7 @@ async def filter_event_ids_with_pull_attempt_backoff( "last_attempt_ts", "num_attempts", ), - desc="filter_event_ids_with_pull_attempt_backoff", + desc="get_event_ids_to_not_pull_from_backoff", ) current_time = self._clock.time_msec() diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index 7ca379859adc..f139022869a7 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -1130,7 +1130,7 @@ def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_ backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points] self.assertEqual(backfill_event_ids, ["insertion_eventA"]) - def test_filter_event_ids_with_pull_attempt_backoff( + def test_get_event_ids_to_not_pull_from_backoff( self, ): """ @@ -1148,14 +1148,14 @@ def test_filter_event_ids_with_pull_attempt_backoff( ) event_ids_to_backoff = self.get_success( - self.store.filter_event_ids_with_pull_attempt_backoff( + self.store.get_event_ids_to_not_pull_from_backoff( room_id=room_id, event_ids=["$failed_event_id", "$normal_event_id"] ) ) self.assertEqual(event_ids_to_backoff, ["$failed_event_id"]) - def test_filter_event_ids_with_pull_attempt_backoff_retry_after_backoff_duration( + def test_get_event_ids_to_not_pull_from_backoff_retry_after_backoff_duration( self, ): """ @@ -1177,7 +1177,7 @@ def test_filter_event_ids_with_pull_attempt_backoff_retry_after_backoff_duration self.reactor.advance(datetime.timedelta(hours=2).total_seconds()) event_ids_to_backoff = self.get_success( - self.store.filter_event_ids_with_pull_attempt_backoff( + self.store.get_event_ids_to_not_pull_from_backoff( room_id=room_id, event_ids=["$failed_event_id", "$normal_event_id"] ) ) From b1a0c1b4c9b6ece204e864818adfb7753989d3cf Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 12 Oct 2022 15:15:44 -0500 Subject: [PATCH 31/32] Fix test description to make it accurate --- tests/storage/test_event_federation.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index f139022869a7..853db930d6af 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -1159,7 +1159,8 @@ def test_get_event_ids_to_not_pull_from_backoff_retry_after_backoff_duration( self, ): """ - Test to make sure event IDs are returned after the backoff duration has elapsed. + Test to make sure no event IDs are returned after the backoff duration has + elapsed. """ # Create the room user_id = self.register_user("alice", "test") @@ -1181,7 +1182,8 @@ def test_get_event_ids_to_not_pull_from_backoff_retry_after_backoff_duration( room_id=room_id, event_ids=["$failed_event_id", "$normal_event_id"] ) ) - + # Since this function only returns events we should backoff from, time has + # elapsed past the backoff range so there is no events to backoff from. self.assertEqual(event_ids_to_backoff, []) From bccd802ddc4fa04e0068c295643c72366c7fe8bf Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Fri, 14 Oct 2022 23:26:13 -0500 Subject: [PATCH 32/32] Use more standard string interpolation with `logger` Co-authored-by: Erik Johnston --- synapse/handlers/federation.py | 2 +- synapse/handlers/federation_event.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index e87fe5fe014a..5f7e0a1f7937 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1724,7 +1724,7 @@ async def _sync_partial_state_room( except FederationPullAttemptBackoffError as exc: # Log a warning about why we failed to process the event (the error message # for `FederationPullAttemptBackoffError` is pretty good) - logger.warning("_sync_partial_state_room: " + str(exc)) + logger.warning("_sync_partial_state_room: %s", exc) # We do not record a failed pull attempt when we backoff fetching a missing # `prev_event` because not being able to fetch the `prev_events` just means # we won't be able to de-outlier the pulled event. But we can still use an diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index e9cb28b17f6c..a1832197ee61 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -906,7 +906,7 @@ async def _process_pulled_event( except FederationPullAttemptBackoffError as exc: # Log a warning about why we failed to process the event (the error message # for `FederationPullAttemptBackoffError` is pretty good) - logger.warning("_process_pulled_event: " + str(exc)) + logger.warning("_process_pulled_event: %s", exc) # We do not record a failed pull attempt when we backoff fetching a missing # `prev_event` because not being able to fetch the `prev_events` just means # we won't be able to de-outlier the pulled event. But we can still use an