From ee236caf027eab70d4a9e6047e2a15e00e3bfe83 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 18 Jul 2022 16:36:22 -0500 Subject: [PATCH 01/20] Update get_pdu to return original, pristine EventBase Split out from https://github.com/matrix-org/synapse/pull/13205 As discussed at: - https://github.com/matrix-org/synapse/pull/13205#discussion_r918365746 - https://github.com/matrix-org/synapse/pull/13205#discussion_r918366125 --- synapse/federation/federation_client.py | 43 ++++++++++++++++++++---- synapse/handlers/federation_event.py | 19 ++++++++--- synapse/storage/databases/main/events.py | 23 +++++++++++-- 3 files changed, 72 insertions(+), 13 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 66e6305562f5..f53dd23eba05 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -53,7 +53,7 @@ RoomVersion, RoomVersions, ) -from synapse.events import EventBase, builder +from synapse.events import EventBase, builder, make_event_from_dict from synapse.federation.federation_base import ( FederationBase, InvalidEventSignatureError, @@ -309,7 +309,7 @@ async def get_pdu_from_destination_raw( ) logger.debug( - "retrieved event id %s from %s: %r", + "get_pdu_from_destination_raw: retrieved event id %s from %s: %r", event_id, destination, transaction_data, @@ -360,9 +360,25 @@ async def get_pdu( # TODO: Rate limit the number of times we try and get the same event. - ev = self._get_pdu_cache.get(event_id) - if ev: - return ev + event_from_cache = self._get_pdu_cache.get(event_id) + if event_from_cache: + assert not event_from_cache.internal_metadata.outlier, ( + "Event from cache unexpectedly an `outlier` when it should be pristine and untouched without metadata set. " + "We are probably not be returning a copy of the event because downstream callers are modifying the event reference we have in the cache." + ) + + # Make sure to return a copy because downstream callers will use + # this event reference directly and change our original, pristine, + # untouched PDU. For example when people mark the event as an + # `outlier` (`event.internal_metadata.outlier = true`), we don't + # want that to propagate back into the cache. + event_copy = make_event_from_dict( + event_from_cache.get_pdu_json(), + event_from_cache.room_version, + internal_metadata_dict=None, + ) + + return event_copy pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) @@ -405,7 +421,22 @@ async def get_pdu( if signed_pdu: self._get_pdu_cache[event_id] = signed_pdu - return signed_pdu + # Make sure to return a copy because downstream callers will use this + # event reference directly and change our original, pristine, untouched + # PDU. For example when people mark the event as an `outlier` + # (`event.internal_metadata.outlier = true`), we don't want that to + # propagate back into the cache. + # + # We could get away with only making a new copy of the event when + # pulling from cache but it's probably better to have good hygiene and + # not dirty the cache in the first place as well. + event_copy = make_event_from_dict( + signed_pdu.get_pdu_json(), + signed_pdu.room_version, + internal_metadata_dict=None, + ) + + return event_copy async def get_room_state_ids( self, destination: str, room_id: str, event_id: str diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index b1dab57447ae..5c2ae66c88c2 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -766,10 +766,21 @@ async def _process_pulled_event( """ logger.info("Processing pulled event %s", event) - # these should not be outliers. - assert ( - not event.internal_metadata.is_outlier() - ), "pulled event unexpectedly flagged as outlier" + # This function should not be used to persist outliers. If you happen to + # run into a situation where the event you're trying to process/backfill + # is marked as an `outlier`, then you should update that spot to return + # an `EventBase` copy that doesn't have `outlier` flag set. + # + # `EventBase` is used to represent both an event we have not yet + # persisted, and one that we have persisted and now keep in the cache. + # In an ideal world this method would only be called with the first type + # of event, but it turns out that's not actually the case and for + # example, you could get an event from cache that is marked as an + # `outlier` (fix up that spot though). + assert not event.internal_metadata.is_outlier(), ( + "This is a safe-guard to make sure we're not trying to persist an outlier using this function (use something else). " + "If you're trying to process/backfill an event, this is the right method but you need pass in an event copy that doesn't have `event.internal_metada.outlier = true`." + ) event_id = event.event_id diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index fa2266ba2036..1222d50dd580 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1346,9 +1346,24 @@ def _update_outliers_txn( event_id: outlier for event_id, outlier in txn } + logger.debug( + "_update_outliers_txn: events=%s have_persisted=%s", + [ev.event_id for ev, _ in events_and_contexts], + have_persisted, + ) + to_remove = set() for event, context in events_and_contexts: - if event.event_id not in have_persisted: + outlier_persisted = have_persisted.get(event.event_id) + logger.debug( + "_update_outliers_txn: event=%s outlier=%s outlier_persisted=%s", + event.event_id, + event.internal_metadata.is_outlier(), + outlier_persisted, + ) + + # Ignore events which we haven't persisted at all + if outlier_persisted is None: continue to_remove.add(event) @@ -1358,7 +1373,6 @@ def _update_outliers_txn( # was an outlier or not - what we have is at least as good. continue - outlier_persisted = have_persisted[event.event_id] if not event.internal_metadata.is_outlier() and outlier_persisted: # We received a copy of an event that we had already stored as # an outlier in the database. We now have some state at that event @@ -1369,7 +1383,10 @@ def _update_outliers_txn( # events down /sync. In general they will be historical events, so that # doesn't matter too much, but that is not always the case. - logger.info("Updating state for ex-outlier event %s", event.event_id) + logger.info( + "_update_outliers_txn: Updating state for ex-outlier event %s", + event.event_id, + ) # insert into event_to_state_groups. try: From 79a1b724d32e68ff3bc3e131c8e0392eade0fed9 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 18 Jul 2022 16:55:23 -0500 Subject: [PATCH 02/20] Add changelog --- changelog.d/13320.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/13320.bugfix diff --git a/changelog.d/13320.bugfix b/changelog.d/13320.bugfix new file mode 100644 index 000000000000..d33cf3a25af3 --- /dev/null +++ b/changelog.d/13320.bugfix @@ -0,0 +1 @@ +Fix `FederationClient.get_pdu()` returning events from the cache as `outliers` instead of original events we saw over federation. From bfd35fde919185e6b5d22b77958f22f9af31a1ca Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 18 Jul 2022 17:00:20 -0500 Subject: [PATCH 03/20] Internal change, no specific bugfix --- changelog.d/{13320.bugfix => 13320.misc} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changelog.d/{13320.bugfix => 13320.misc} (100%) diff --git a/changelog.d/13320.bugfix b/changelog.d/13320.misc similarity index 100% rename from changelog.d/13320.bugfix rename to changelog.d/13320.misc From e0e20a5c9a2d4e364ad0813e208acb809140a1ff Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 18 Jul 2022 17:26:25 -0500 Subject: [PATCH 04/20] Explain why not --- synapse/handlers/federation_event.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 5c2ae66c88c2..f60b9dcac86b 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -766,10 +766,13 @@ async def _process_pulled_event( """ logger.info("Processing pulled event %s", event) - # This function should not be used to persist outliers. If you happen to - # run into a situation where the event you're trying to process/backfill - # is marked as an `outlier`, then you should update that spot to return - # an `EventBase` copy that doesn't have `outlier` flag set. + # This function should not be used to persist outliers (use something + # else) because this does a bunch of operations that aren't necessary + # (extra work; in particular, it makes sure we have all the prev_events + # and resolves the state across those prev events). If you happen to run + # into a situation where the event you're trying to process/backfill is + # marked as an `outlier`, then you should update that spot to return an + # `EventBase` copy that doesn't have `outlier` flag set. # # `EventBase` is used to represent both an event we have not yet # persisted, and one that we have persisted and now keep in the cache. From 22410f2a3ce17905ebe7fcecb16df64e17158fc6 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 18 Jul 2022 20:10:43 -0500 Subject: [PATCH 05/20] Add tests --- synapse/federation/federation_client.py | 13 +++ tests/federation/test_federation_client.py | 122 +++++++++++++++++++-- 2 files changed, 123 insertions(+), 12 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index f53dd23eba05..0b4e820061fb 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -358,8 +358,13 @@ async def get_pdu( The requested PDU, or None if we were unable to find it. """ + logger.debug( + "get_pdu: event_id=%s from destinations=%s", event_id, destinations + ) + # TODO: Rate limit the number of times we try and get the same event. + event_copy = None event_from_cache = self._get_pdu_cache.get(event_id) if event_from_cache: assert not event_from_cache.internal_metadata.outlier, ( @@ -378,6 +383,7 @@ async def get_pdu( internal_metadata_dict=None, ) + logger.debug("get_pdu: returning event_from_cache=%s", event_from_cache) return event_copy pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) @@ -387,6 +393,13 @@ async def get_pdu( now = self._clock.time_msec() last_attempt = pdu_attempts.get(destination, 0) if last_attempt + PDU_RETRY_TIME_MS > now: + logger.debug( + "get_pdu: skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)", + destination, + last_attempt, + PDU_RETRY_TIME_MS, + now, + ) continue try: diff --git a/tests/federation/test_federation_client.py b/tests/federation/test_federation_client.py index d2bda0719872..c06515b62cb7 100644 --- a/tests/federation/test_federation_client.py +++ b/tests/federation/test_federation_client.py @@ -22,6 +22,7 @@ from twisted.test.proto_helpers import MemoryReactor from synapse.api.room_versions import RoomVersions +from synapse.events import EventBase from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util import Clock @@ -38,20 +39,24 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer): self._mock_agent = mock.create_autospec(twisted.web.client.Agent, spec_set=True) homeserver.get_federation_http_client().agent = self._mock_agent - def test_get_room_state(self): - creator = f"@creator:{self.OTHER_SERVER_NAME}" - test_room_id = "!room_id" + # Move clock up to somewhat realistic time so the PDU destination retry + # works (`now` needs to be larger than `0 + PDU_RETRY_TIME_MS`). + self.reactor.advance(1000000000) + + self.creator = f"@creator:{self.OTHER_SERVER_NAME}" + self.test_room_id = "!room_id" + def test_get_room_state(self): # mock up some events to use in the response. # In real life, these would have things in `prev_events` and `auth_events`, but that's # a bit annoying to mock up, and the code under test doesn't care, so we don't bother. create_event_dict = self.add_hashes_and_signatures_from_other_server( { - "room_id": test_room_id, + "room_id": self.test_room_id, "type": "m.room.create", "state_key": "", - "sender": creator, - "content": {"creator": creator}, + "sender": self.creator, + "content": {"creator": self.creator}, "prev_events": [], "auth_events": [], "origin_server_ts": 500, @@ -59,10 +64,10 @@ def test_get_room_state(self): ) member_event_dict = self.add_hashes_and_signatures_from_other_server( { - "room_id": test_room_id, + "room_id": self.test_room_id, "type": "m.room.member", - "sender": creator, - "state_key": creator, + "sender": self.creator, + "state_key": self.creator, "content": {"membership": "join"}, "prev_events": [], "auth_events": [], @@ -71,9 +76,9 @@ def test_get_room_state(self): ) pl_event_dict = self.add_hashes_and_signatures_from_other_server( { - "room_id": test_room_id, + "room_id": self.test_room_id, "type": "m.room.power_levels", - "sender": creator, + "sender": self.creator, "state_key": "", "content": {}, "prev_events": [], @@ -103,7 +108,7 @@ def test_get_room_state(self): state_resp, auth_resp = self.get_success( self.hs.get_federation_client().get_room_state( "yet_another_server", - test_room_id, + self.test_room_id, "event_id", RoomVersions.V9, ) @@ -130,6 +135,99 @@ def test_get_room_state(self): ["m.room.create", "m.room.member", "m.room.power_levels"], ) + def test_get_pdu_returns_nothing_when_event_does_not_exist(self): + """No event should be returned when there the event does not exist""" + remote_pdu = self.get_success( + self.hs.get_federation_client().get_pdu( + ["yet_another_server"], + "event_should_not_exist", + RoomVersions.V9, + ) + ) + self.assertEqual(remote_pdu, None) + + def test_get_pdu(self): + """Test to make sure an event is returned by `get_pdu()`""" + self._get_pdu_once() + + def test_get_pdu_event_from_cache_is_pristine(self): + """Test that modifications made to events returned by `get_pdu()` + do not propagate back to to the internal cache (events returned should + be a copy). + """ + + # Get the PDU in the cache + remote_pdu = self._get_pdu_once() + + # Modify the the event reference. + # This change should not make it back to the `_get_pdu_cache`. + remote_pdu.internal_metadata.outlier = True + + # Get the event again + remote_pdu2 = self.get_success( + self.hs.get_federation_client().get_pdu( + ["yet_another_server"], + remote_pdu.event_id, + RoomVersions.V9, + ) + ) + + # Make sure event does not include modification earlier + self.assertIsNotNone(remote_pdu2) + self.assertEqual(remote_pdu2.internal_metadata.outlier, False) + + def _get_pdu_once(self) -> EventBase: + """Retrieve an event via `get_pdu()` and asserts that an event was returned. + Also used to prime the cache for subsequent test logic. + """ + message_event_dict = self.add_hashes_and_signatures_from_other_server( + { + "room_id": "!room_id", + "type": "m.room.message", + "sender": self.creator, + "state_key": "", + "content": {}, + "prev_events": [], + "auth_events": [], + "origin_server_ts": 700, + "depth": 10, + } + ) + + # mock up the response, and have the agent return it + self._mock_agent.request.side_effect = lambda *args, **kwargs: defer.succeed( + _mock_response( + { + "origin": "yet_another_server", + "origin_server_ts": 900, + "pdus": [ + message_event_dict, + ], + } + ) + ) + + remote_pdu = self.get_success( + self.hs.get_federation_client().get_pdu( + ["yet_another_server"], + "event_id", + RoomVersions.V9, + ) + ) + + # check the right call got made to the agent + self._mock_agent.request.assert_called_once_with( + b"GET", + b"matrix://yet_another_server/_matrix/federation/v1/event/event_id", + headers=mock.ANY, + bodyProducer=None, + ) + + self.assertIsNotNone(remote_pdu) + self.assertEqual(remote_pdu.internal_metadata.outlier, False) + + return remote_pdu + def _mock_response(resp: JsonDict): body = json.dumps(resp).encode("utf-8") From 09c411bb77c85b63b1eff2bc0531dca4e376d9a5 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 18 Jul 2022 20:22:31 -0500 Subject: [PATCH 06/20] Some more clarity --- synapse/federation/federation_client.py | 4 ++-- tests/federation/test_federation_client.py | 7 +++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 0b4e820061fb..f7d6a8df41c4 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -367,6 +367,7 @@ async def get_pdu( event_copy = None event_from_cache = self._get_pdu_cache.get(event_id) if event_from_cache: + logger.debug("get_pdu: found event_from_cache=%s", event_from_cache) assert not event_from_cache.internal_metadata.outlier, ( "Event from cache unexpectedly an `outlier` when it should be pristine and untouched without metadata set. " "We are probably not be returning a copy of the event because downstream callers are modifying the event reference we have in the cache." @@ -383,7 +384,6 @@ async def get_pdu( internal_metadata_dict=None, ) - logger.debug("get_pdu: returning event_from_cache=%s", event_from_cache) return event_copy pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) @@ -432,7 +432,7 @@ async def get_pdu( continue if signed_pdu: - self._get_pdu_cache[event_id] = signed_pdu + self._get_pdu_cache[signed_pdu.event_id] = signed_pdu # Make sure to return a copy because downstream callers will use this # event reference directly and change our original, pristine, untouched diff --git a/tests/federation/test_federation_client.py b/tests/federation/test_federation_client.py index c06515b62cb7..9551dcb89c6a 100644 --- a/tests/federation/test_federation_client.py +++ b/tests/federation/test_federation_client.py @@ -163,7 +163,7 @@ def test_get_pdu_event_from_cache_is_pristine(self): # This change should not make it back to the `_get_pdu_cache`. remote_pdu.internal_metadata.outlier = True - # Get the event again + # Get the event again. This time it should read it from cache. remote_pdu2 = self.get_success( self.hs.get_federation_client().get_pdu( ["yet_another_server"], @@ -172,7 +172,10 @@ def test_get_pdu_event_from_cache_is_pristine(self): ) ) - # Make sure event does not include modification earlier + # Sanity check that we are working against the same event + self.assertEqual(remote_pdu.event_id, remote_pdu2.event_id) + + # Make sure the event does not include modification from earlier self.assertIsNotNone(remote_pdu2) self.assertEqual(remote_pdu2.internal_metadata.outlier, False) From 6029b42086e4b5af2b2fc0cc43a85ab4663aa3ae Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 18 Jul 2022 20:24:35 -0500 Subject: [PATCH 07/20] Re-use room ID --- tests/federation/test_federation_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/federation/test_federation_client.py b/tests/federation/test_federation_client.py index 9551dcb89c6a..c78be65eb1dc 100644 --- a/tests/federation/test_federation_client.py +++ b/tests/federation/test_federation_client.py @@ -185,7 +185,7 @@ def _get_pdu_once(self) -> EventBase: """ message_event_dict = self.add_hashes_and_signatures_from_other_server( { - "room_id": "!room_id", + "room_id": self.test_room_id, "type": "m.room.message", "sender": self.creator, "state_key": "", From 09167b1ea5171ea5658c42b886dd1a12a21c00ef Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 19 Jul 2022 11:09:01 -0500 Subject: [PATCH 08/20] Better still actionable no-fluff assertion message See https://github.com/matrix-org/synapse/pull/13320#discussion_r923903077 --- synapse/handlers/federation_event.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index f60b9dcac86b..df84c330b88a 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -781,8 +781,8 @@ async def _process_pulled_event( # example, you could get an event from cache that is marked as an # `outlier` (fix up that spot though). assert not event.internal_metadata.is_outlier(), ( - "This is a safe-guard to make sure we're not trying to persist an outlier using this function (use something else). " - "If you're trying to process/backfill an event, this is the right method but you need pass in an event copy that doesn't have `event.internal_metada.outlier = true`." + "Outlier event passed to _process_pulled_event. " + "To persist an event as a non-outlier, make sure to pass in a copy without `event.internal_metadata.outlier = true`." ) event_id = event.event_id From eb6a291ba28bff27ef26e193192c04e2f07346f6 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 19 Jul 2022 11:16:16 -0500 Subject: [PATCH 09/20] Describe why we use a cache here Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/federation/federation_client.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index f7d6a8df41c4..c54c66a1d68c 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -365,6 +365,10 @@ async def get_pdu( # TODO: Rate limit the number of times we try and get the same event. event_copy = None + # We might need the same event multiple times in quick succession (before + # it gets persisted to the database), so we cache the results of the lookup. + # Note that this is separate to the regular get_event cache which caches + # events once they have been persisted. event_from_cache = self._get_pdu_cache.get(event_id) if event_from_cache: logger.debug("get_pdu: found event_from_cache=%s", event_from_cache) From 1c4e57c13ef20d63cef76e99680e672003841012 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 19 Jul 2022 11:17:00 -0500 Subject: [PATCH 10/20] Remove direct access to internal property We'd like to stop doing this altogether in the future. Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/federation/federation_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c54c66a1d68c..5b04b48cb58a 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -372,7 +372,7 @@ async def get_pdu( event_from_cache = self._get_pdu_cache.get(event_id) if event_from_cache: logger.debug("get_pdu: found event_from_cache=%s", event_from_cache) - assert not event_from_cache.internal_metadata.outlier, ( + assert not event_from_cache.internal_metadata.is_outlier(), ( "Event from cache unexpectedly an `outlier` when it should be pristine and untouched without metadata set. " "We are probably not be returning a copy of the event because downstream callers are modifying the event reference we have in the cache." ) From 488f5ed6e86e6d47989792d1ed16c2ecba45d4d2 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 19 Jul 2022 11:17:31 -0500 Subject: [PATCH 11/20] Make it obvious that we're pulling and using a different cache Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/federation/federation_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 5b04b48cb58a..93c31d302aa9 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -373,7 +373,7 @@ async def get_pdu( if event_from_cache: logger.debug("get_pdu: found event_from_cache=%s", event_from_cache) assert not event_from_cache.internal_metadata.is_outlier(), ( - "Event from cache unexpectedly an `outlier` when it should be pristine and untouched without metadata set. " + "Event from `get_pdu_cache` unexpectedly an `outlier` when it should be pristine and untouched without metadata set. " "We are probably not be returning a copy of the event because downstream callers are modifying the event reference we have in the cache." ) From 29a526926fd8789b8d889056e442ffc0e241d949 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 19 Jul 2022 11:20:02 -0500 Subject: [PATCH 12/20] Remove assumption/speculation See https://github.com/matrix-org/synapse/pull/13320#discussion_r924302473 --- synapse/federation/federation_client.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 93c31d302aa9..2003c383c234 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -372,10 +372,9 @@ async def get_pdu( event_from_cache = self._get_pdu_cache.get(event_id) if event_from_cache: logger.debug("get_pdu: found event_from_cache=%s", event_from_cache) - assert not event_from_cache.internal_metadata.is_outlier(), ( - "Event from `get_pdu_cache` unexpectedly an `outlier` when it should be pristine and untouched without metadata set. " - "We are probably not be returning a copy of the event because downstream callers are modifying the event reference we have in the cache." - ) + assert ( + not event_from_cache.internal_metadata.is_outlier() + ), "Event from `get_pdu_cache` unexpectedly an `outlier` when it should be pristine and untouched without metadata set. " # Make sure to return a copy because downstream callers will use # this event reference directly and change our original, pristine, From 2688e44d6bdbebac6b2f46eabea35bd279a34a16 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 19 Jul 2022 11:24:03 -0500 Subject: [PATCH 13/20] Default is already no metadata See https://github.com/matrix-org/synapse/pull/13320#discussion_r924278042 --- synapse/federation/federation_client.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 2003c383c234..fe3f4bbf1540 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -384,7 +384,6 @@ async def get_pdu( event_copy = make_event_from_dict( event_from_cache.get_pdu_json(), event_from_cache.room_version, - internal_metadata_dict=None, ) return event_copy @@ -449,7 +448,6 @@ async def get_pdu( event_copy = make_event_from_dict( signed_pdu.get_pdu_json(), signed_pdu.room_version, - internal_metadata_dict=None, ) return event_copy From 24913e705d4141bc328453ef5d0141c053a8b59e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 19 Jul 2022 11:41:19 -0500 Subject: [PATCH 14/20] Refactor structure to avoid duplicating the event copy logic See: - https://github.com/matrix-org/synapse/pull/13320#discussion_r924276741 - https://github.com/matrix-org/synapse/pull/13320#discussion_r924283339 --- synapse/federation/federation_client.py | 141 ++++++++++++------------ 1 file changed, 68 insertions(+), 73 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index fe3f4bbf1540..537fe3cf5b2c 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -364,91 +364,86 @@ async def get_pdu( # TODO: Rate limit the number of times we try and get the same event. - event_copy = None # We might need the same event multiple times in quick succession (before # it gets persisted to the database), so we cache the results of the lookup. # Note that this is separate to the regular get_event cache which caches # events once they have been persisted. event_from_cache = self._get_pdu_cache.get(event_id) - if event_from_cache: - logger.debug("get_pdu: found event_from_cache=%s", event_from_cache) - assert ( - not event_from_cache.internal_metadata.is_outlier() - ), "Event from `get_pdu_cache` unexpectedly an `outlier` when it should be pristine and untouched without metadata set. " - - # Make sure to return a copy because downstream callers will use - # this event reference directly and change our original, pristine, - # untouched PDU. For example when people mark the event as an - # `outlier` (`event.internal_metadata.outlier = true`), we don't - # want that to propagate back into the cache. - event_copy = make_event_from_dict( - event_from_cache.get_pdu_json(), - event_from_cache.room_version, - ) - - return event_copy - pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) + # If we don't see the event in the cache, go try to fetch it from the + # provided remote federated destinations + event_from_remote = None + if not event_from_cache: + pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) + + for destination in destinations: + now = self._clock.time_msec() + last_attempt = pdu_attempts.get(destination, 0) + if last_attempt + PDU_RETRY_TIME_MS > now: + logger.debug( + "get_pdu: skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)", + destination, + last_attempt, + PDU_RETRY_TIME_MS, + now, + ) + continue + + try: + event_from_remote = await self.get_pdu_from_destination_raw( + destination=destination, + event_id=event_id, + room_version=room_version, + timeout=timeout, + ) - signed_pdu = None - for destination in destinations: - now = self._clock.time_msec() - last_attempt = pdu_attempts.get(destination, 0) - if last_attempt + PDU_RETRY_TIME_MS > now: - logger.debug( - "get_pdu: skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)", - destination, - last_attempt, - PDU_RETRY_TIME_MS, - now, - ) - continue + pdu_attempts[destination] = now - try: - signed_pdu = await self.get_pdu_from_destination_raw( - destination=destination, - event_id=event_id, - room_version=room_version, - timeout=timeout, - ) + # Prime the cache + self._get_pdu_cache[event_from_remote.event_id] = event_from_remote - pdu_attempts[destination] = now + # FIXME: We should add a `break` here to avoid calling every + # destination after we already found a PDU (will follow-up + # in a separate PR) - except SynapseError as e: - logger.info( - "Failed to get PDU %s from %s because %s", event_id, destination, e - ) - continue - except NotRetryingDestination as e: - logger.info(str(e)) - continue - except FederationDeniedError as e: - logger.info(str(e)) - continue - except Exception as e: - pdu_attempts[destination] = now - - logger.info( - "Failed to get PDU %s from %s because %s", event_id, destination, e - ) - continue + except SynapseError as e: + logger.info( + "Failed to get PDU %s from %s because %s", + event_id, + destination, + e, + ) + continue + except NotRetryingDestination as e: + logger.info(str(e)) + continue + except FederationDeniedError as e: + logger.info(str(e)) + continue + except Exception as e: + pdu_attempts[destination] = now + + logger.info( + "Failed to get PDU %s from %s because %s", + event_id, + destination, + e, + ) + continue - if signed_pdu: - self._get_pdu_cache[signed_pdu.event_id] = signed_pdu + event = event_from_cache or event_from_remote + if not event: + return None - # Make sure to return a copy because downstream callers will use this - # event reference directly and change our original, pristine, untouched - # PDU. For example when people mark the event as an `outlier` - # (`event.internal_metadata.outlier = true`), we don't want that to - # propagate back into the cache. - # - # We could get away with only making a new copy of the event when - # pulling from cache but it's probably better to have good hygiene and - # not dirty the cache in the first place as well. - event_copy = make_event_from_dict( - signed_pdu.get_pdu_json(), - signed_pdu.room_version, - ) + # Make sure to return a copy because downstream callers will use this + # event reference directly and change our original, pristine, untouched + # PDU. For example when people mark the event as an `outlier` + # (`event.internal_metadata.outlier = true`), we don't want that to + # propagate back into the cache. + event_copy = make_event_from_dict( + event.get_pdu_json(), + event.room_version, + ) return event_copy From 0e6dd5a3e8463377b2e858b2b65b2b8a0f0547e9 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 19 Jul 2022 11:44:14 -0500 Subject: [PATCH 15/20] Pluralization typo Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- tests/federation/test_federation_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/federation/test_federation_client.py b/tests/federation/test_federation_client.py index c78be65eb1dc..af1800e3c57d 100644 --- a/tests/federation/test_federation_client.py +++ b/tests/federation/test_federation_client.py @@ -180,7 +180,7 @@ def test_get_pdu_event_from_cache_is_pristine(self): self.assertEqual(remote_pdu2.internal_metadata.outlier, False) def _get_pdu_once(self) -> EventBase: - """Retrieve an event via `get_pdu()` and asserts that an event was returned. + """Retrieve an event via `get_pdu()` and assert that an event was returned. Also used to prime the cache for subsequent test logic. """ message_event_dict = self.add_hashes_and_signatures_from_other_server( From 5bc75edbac7376859e99363e5ebb213f85767733 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 19 Jul 2022 11:45:50 -0500 Subject: [PATCH 16/20] Explain that we return a copy that is safe to modify See https://github.com/matrix-org/synapse/pull/13320#discussion_r924284408 --- synapse/federation/federation_client.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 537fe3cf5b2c..5acc4db35100 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -299,7 +299,8 @@ async def get_pdu_from_destination_raw( moving to the next destination. None indicates no timeout. Returns: - The requested PDU, or None if we were unable to find it. + A copy of the requested PDU that is safe to modify, or None if we + were unable to find it. Raises: SynapseError, NotRetryingDestination, FederationDeniedError From dea7669fce45c744ddb3e45161cfdd00bc583a8c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 19 Jul 2022 11:55:34 -0500 Subject: [PATCH 17/20] Fix lints --- synapse/federation/federation_client.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 5acc4db35100..47d18023355d 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -400,12 +400,15 @@ async def get_pdu( pdu_attempts[destination] = now - # Prime the cache - self._get_pdu_cache[event_from_remote.event_id] = event_from_remote - - # FIXME: We should add a `break` here to avoid calling every - # destination after we already found a PDU (will follow-up - # in a separate PR) + if event_from_remote: + # Prime the cache + self._get_pdu_cache[ + event_from_remote.event_id + ] = event_from_remote + + # FIXME: We should add a `break` here to avoid calling every + # destination after we already found a PDU (will follow-up + # in a separate PR) except SynapseError as e: logger.info( From 72e65a58bd711f76208d45ae78b59821b6bb35d4 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 19 Jul 2022 12:04:29 -0500 Subject: [PATCH 18/20] Fix description typo --- tests/federation/test_federation_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/federation/test_federation_client.py b/tests/federation/test_federation_client.py index af1800e3c57d..51a94cf27b3e 100644 --- a/tests/federation/test_federation_client.py +++ b/tests/federation/test_federation_client.py @@ -136,7 +136,7 @@ def test_get_room_state(self): ) def test_get_pdu_returns_nothing_when_event_does_not_exist(self): - """No event should be returned when there the event does not exist""" + """No event should be returned when the event does not exist""" remote_pdu = self.get_success( self.hs.get_federation_client().get_pdu( ["yet_another_server"], From 86fe0dcae821784dd2a9dce61b31ba868bad5940 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 20 Jul 2022 14:40:43 -0500 Subject: [PATCH 19/20] Share event throughout See https://github.com/matrix-org/synapse/pull/13320#discussion_r925225704 --- synapse/federation/federation_client.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 47d18023355d..a07bcf7b0ba0 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -369,12 +369,11 @@ async def get_pdu( # it gets persisted to the database), so we cache the results of the lookup. # Note that this is separate to the regular get_event cache which caches # events once they have been persisted. - event_from_cache = self._get_pdu_cache.get(event_id) + event = self._get_pdu_cache.get(event_id) # If we don't see the event in the cache, go try to fetch it from the # provided remote federated destinations - event_from_remote = None - if not event_from_cache: + if not event: pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) for destination in destinations: @@ -391,7 +390,7 @@ async def get_pdu( continue try: - event_from_remote = await self.get_pdu_from_destination_raw( + event = await self.get_pdu_from_destination_raw( destination=destination, event_id=event_id, room_version=room_version, @@ -400,11 +399,9 @@ async def get_pdu( pdu_attempts[destination] = now - if event_from_remote: + if event: # Prime the cache - self._get_pdu_cache[ - event_from_remote.event_id - ] = event_from_remote + self._get_pdu_cache[event.event_id] = event # FIXME: We should add a `break` here to avoid calling every # destination after we already found a PDU (will follow-up @@ -435,7 +432,6 @@ async def get_pdu( ) continue - event = event_from_cache or event_from_remote if not event: return None From fd879bbf4022e5e6f9fdd922e646e87c50f8b45b Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 20 Jul 2022 14:41:30 -0500 Subject: [PATCH 20/20] Different comment See https://github.com/matrix-org/synapse/pull/13320#discussion_r925326664 --- synapse/federation/federation_client.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index a07bcf7b0ba0..a08bad461e92 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -435,11 +435,10 @@ async def get_pdu( if not event: return None - # Make sure to return a copy because downstream callers will use this - # event reference directly and change our original, pristine, untouched - # PDU. For example when people mark the event as an `outlier` - # (`event.internal_metadata.outlier = true`), we don't want that to - # propagate back into the cache. + # `event` now refers to an object stored in `get_pdu_cache`. Our + # callers may need to modify the returned object (eg to set + # `event.internal_metadata.outlier = true`), so we return a copy + # rather than the original object. event_copy = make_event_from_dict( event.get_pdu_json(), event.room_version,