From ce45cc1d44dcc9b469b23b4daab8426c5dd2c31f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Jun 2024 17:18:48 -0500 Subject: [PATCH 01/27] Sliding sync sort stub --- synapse/handlers/sliding_sync.py | 18 +++++++++++++++++- synapse/storage/databases/main/stream.py | 20 +++++++++++--------- synapse/types/rest/client/__init__.py | 15 --------------- 3 files changed, 28 insertions(+), 25 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 1c37f83a2be..7379a007cd9 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -169,7 +169,7 @@ async def current_sync_for_user( # `["m.room.member", "$LAZY"]` filtered_room_ids = room_id_set # TODO: Apply sorts - sorted_room_ids = sorted(filtered_room_ids) + sorted_room_ids = await self.sort_rooms(filtered_room_ids, to_token) ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] if list_config.ranges: @@ -439,3 +439,19 @@ async def get_sync_room_ids_for_user( sync_room_id_set.add(room_id) return sync_room_id_set + + async def sort_rooms( + self, + room_id_set: AbstractSet[str], + to_token: StreamToken, + ) -> List[str]: + """ + Sort by recency of the last event in the room (stream_ordering). In order to get + a stable sort, we tie-break by room ID. + + Args: + room_id_set: Set of room IDs to sort + to_token: We sort based on the events in the room at this token + """ + # TODO: `get_last_event_in_room_before_stream_ordering()` + pass diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 7ab6003f61e..188bba0f1fd 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -914,11 +914,13 @@ async def get_last_event_in_room_before_stream_ordering( def get_last_event_in_room_before_stream_ordering_txn( txn: LoggingTransaction, ) -> Optional[str]: - # We need to handle the fact that the stream tokens can be vector - # clocks. We do this by getting all rows between the minimum and - # maximum stream ordering in the token, plus one row less than the - # minimum stream ordering. We then filter the results against the - # token and return the first row that matches. + # We need to handle the fact that the stream tokens can be vector clocks. We + # do this by getting all rows between the minimum and maximum stream + # ordering in the token, plus one row less than the minimum stream ordering + # (TODO: Why?). We then filter the results against the token and return the + # first row that matches. + min_stream = end_token.stream + max_stream = end_token.get_max_stream_pos() sql = """ SELECT * FROM ( @@ -926,7 +928,7 @@ def get_last_event_in_room_before_stream_ordering_txn( FROM events LEFT JOIN rejections USING (event_id) WHERE room_id = ? - AND ? < stream_ordering AND stream_ordering <= ? + AND stream_ordering > ? AND stream_ordering <= ? AND NOT outlier AND rejections.event_id IS NULL ORDER BY stream_ordering DESC @@ -948,10 +950,10 @@ def get_last_event_in_room_before_stream_ordering_txn( sql, ( room_id, - end_token.stream, - end_token.get_max_stream_pos(), + min_stream, + max_stream, room_id, - end_token.stream, + min_stream, ), ) diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py index ef261518a05..4bae162161c 100644 --- a/synapse/types/rest/client/__init__.py +++ b/synapse/types/rest/client/__init__.py @@ -175,20 +175,6 @@ class SlidingSyncList(CommonRoomParameters): ranges: Sliding window ranges. If this field is missing, no sliding window is used and all rooms are returned in this list. Integers are *inclusive*. - sort: How the list should be sorted on the server. The first value is - applied first, then tiebreaks are performed with each subsequent sort - listed. - - FIXME: Furthermore, it's not currently defined how servers should behave - if they encounter a filter or sort operation they do not recognise. If - the server rejects the request with an HTTP 400 then that will break - backwards compatibility with new clients vs old servers. However, the - client would be otherwise unaware that only some of the sort/filter - operations have taken effect. We may need to include a "warnings" - section to indicate which sort/filter operations are unrecognised, - allowing for some form of graceful degradation of service. - -- https://github.com/matrix-org/matrix-spec-proposals/blob/kegan/sync-v3/proposals/3575-sync.md#filter-and-sort-extensions - slow_get_all_rooms: Just get all rooms (for clients that don't want to deal with sliding windows). When true, the `ranges` and `sort` fields are ignored. required_state: Required state for each room returned. An array of event @@ -253,7 +239,6 @@ class Filters(RequestBodyModel): ranges: Optional[List[Tuple[int, int]]] = None else: ranges: Optional[List[Tuple[conint(ge=0, strict=True), conint(ge=0, strict=True)]]] = None # type: ignore[valid-type] - sort: Optional[List[StrictStr]] = None slow_get_all_rooms: Optional[StrictBool] = False include_heroes: Optional[StrictBool] = False filters: Optional[Filters] = None From 75b701f0ee522b9c7d75e3e122e6c4a96bb33e3f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Jun 2024 17:21:09 -0500 Subject: [PATCH 02/27] Add changelog --- changelog.d/17293.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17293.feature diff --git a/changelog.d/17293.feature b/changelog.d/17293.feature new file mode 100644 index 00000000000..bd5c27a8a77 --- /dev/null +++ b/changelog.d/17293.feature @@ -0,0 +1 @@ +Add recency sort to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. From c8a240f59d7b00e00d51ad7a50f0b62ea1243134 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 11 Jun 2024 16:00:02 -0500 Subject: [PATCH 03/27] Prefer `? < a AND a <= ?` See https://github.com/element-hq/synapse/pull/17293#discussion_r1634863810 > FWIW I mildly prefer the styling `? < a AND a <= ?` as then my brain can more easily convert that to `a in (?...?]` --- synapse/storage/databases/main/stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 188bba0f1fd..b02d42ac097 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -928,7 +928,7 @@ def get_last_event_in_room_before_stream_ordering_txn( FROM events LEFT JOIN rejections USING (event_id) WHERE room_id = ? - AND stream_ordering > ? AND stream_ordering <= ? + AND ? < stream_ordering AND stream_ordering <= ? AND NOT outlier AND rejections.event_id IS NULL ORDER BY stream_ordering DESC From 2a82ac0cbe7ccd8a308d4af88499f56c350dbe1f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 11 Jun 2024 20:14:01 -0500 Subject: [PATCH 04/27] Fix `get_last_event_in_room_before_stream_ordering(...)` not finding the last event Previously, it would use the event with the lowest `stream_ordering` that was fetched (we want the highest `stream_ordering. `union` does not work how you might think at first and does not preserve the ordering of the operand queries. `union all` also doesn't necessarily have that gurantee but it does behave that way. See https://dba.stackexchange.com/questions/316818/are-results-from-union-all-clauses-always-appended-in-order/316835#316835 --- synapse/storage/databases/main/stream.py | 2 +- tests/storage/test_stream.py | 270 ++++++++++++++++++++++- 2 files changed, 269 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index b02d42ac097..47c7366a035 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -933,7 +933,7 @@ def get_last_event_in_room_before_stream_ordering_txn( AND rejections.event_id IS NULL ORDER BY stream_ordering DESC ) AS a - UNION + UNION ALL SELECT * FROM ( SELECT instance_name, stream_ordering, topological_ordering, event_id FROM events diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index 2029cd9c685..de01107ee1d 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -19,7 +19,10 @@ # # -from typing import List +import logging +from typing import List, Tuple + +from immutabledict import immutabledict from twisted.test.proto_helpers import MemoryReactor @@ -28,11 +31,13 @@ from synapse.rest import admin from synapse.rest.client import login, room from synapse.server import HomeServer -from synapse.types import JsonDict +from synapse.types import JsonDict, PersistedEventPosition, RoomStreamToken from synapse.util import Clock from tests.unittest import HomeserverTestCase +logger = logging.getLogger(__name__) + class PaginationTestCase(HomeserverTestCase): """ @@ -268,3 +273,264 @@ def test_filter_not_rel_types(self) -> None: } chunk = self._filter_messages(filter) self.assertEqual(chunk, [self.event_id_1, self.event_id_2, self.event_id_none]) + + +class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase): + """ + Test `get_last_event_in_room_before_stream_ordering(...)` + """ + + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + self.event_sources = hs.get_event_sources() + + def _update_persisted_instance_name_for_event( + self, event_id: str, instance_name: str + ) -> None: + """ + Update the `instance_name` that persisted the the event in the database. + """ + return self.get_success( + self.store.db_pool.simple_update_one( + "events", + keyvalues={"event_id": event_id}, + updatevalues={"instance_name": instance_name}, + ) + ) + + def _send_event_on_instance( + self, instance_name: str, room_id: str, access_token: str + ) -> Tuple[JsonDict, PersistedEventPosition]: + """ + Send an event in a room and mimic that it was persisted by a specific + instance/worker. + """ + event_response = self.helper.send( + room_id, f"{instance_name} message", tok=access_token + ) + + self._update_persisted_instance_name_for_event( + event_response["event_id"], instance_name + ) + + event_pos = self.get_success( + self.store.get_position_for_event(event_response["event_id"]) + ) + + return event_response, event_pos + + def test_before_room_created(self) -> None: + """ + Test that no event is returned if we are using a token before the room was even created + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + before_room_token = self.event_sources.get_current_token() + + room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id, + end_token=before_room_token.room_key, + ) + ) + + self.assertIsNone(last_event) + + def test_after_room_created(self) -> None: + """ + Test that an event returned if we are using a token after the room was created + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + + after_room_token = self.event_sources.get_current_token() + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id, + end_token=after_room_token.room_key, + ) + ) + + self.assertIsNotNone(last_event) + + def test_activity_in_other_rooms(self) -> None: + """ + Test to make sure that the last event in room is returned even if the + `stream_ordering` has advanced past it (and that's from the correct room) + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response = self.helper.send(room_id1, "target!", tok=user1_tok) + # Create another room to advance the stream_ordering + self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + + after_room_token = self.event_sources.get_current_token() + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id1, + end_token=after_room_token.room_key, + ) + ) + + # Make sure it's the event we expect (which also means we know it's from the + # correct room) + self.assertEqual(last_event, event_response["event_id"]) + + def test_activity_after_token_has_no_effect(self) -> None: + """ + Test to make sure we return the last event before the token even if there is + activity after it. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response = self.helper.send(room_id1, "target!", tok=user1_tok) + + after_room_token = self.event_sources.get_current_token() + + # Send some events after the token + self.helper.send(room_id1, "after1", tok=user1_tok) + self.helper.send(room_id1, "after2", tok=user1_tok) + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id1, + end_token=after_room_token.room_key, + ) + ) + + # Make sure it's the last event before the token + self.assertEqual(last_event, event_response["event_id"]) + + def test_last_event_within_sharded_token(self) -> None: + """ + Test to make sure we can find the last event that that is *within* the sharded + token (a token that has an `instance_map` and looks like + `m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`). We are specifically testing + that we can find an event within the tokens minimum and instance + `stream_ordering`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response1, event_pos1 = self._send_event_on_instance( + "worker1", room_id1, user1_tok + ) + event_response2, event_pos2 = self._send_event_on_instance( + "worker1", room_id1, user1_tok + ) + event_response3, event_pos3 = self._send_event_on_instance( + "worker1", room_id1, user1_tok + ) + + # Create another room to advance the `stream_ordering` on the same worker + # so we can sandwich event3 in the middle of the token + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response4, event_pos4 = self._send_event_on_instance( + "worker1", room_id2, user1_tok + ) + + # Assemble a token that encompasses event1 -> event4 on worker1 + end_token = RoomStreamToken( + stream=event_pos1.stream, + instance_map=immutabledict({"worker1": event_pos4.stream}), + ) + + # Send some events after the token + self.helper.send(room_id1, "after1", tok=user1_tok) + self.helper.send(room_id1, "after2", tok=user1_tok) + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id1, + end_token=end_token, + ) + ) + + # Should find closest event before the token in room1 + self.assertEqual( + last_event, + event_response3["event_id"], + f"We expected {event_response3["event_id"]} but saw {last_event} which corresponds to" + + str( + { + "event1": event_response1["event_id"], + "event2": event_response2["event_id"], + "event3": event_response3["event_id"], + } + ), + ) + + def test_last_event_before_sharded_token(self) -> None: + """ + Test to make sure we can find the last event that that is *before* the sharded + token (a token that has an `instance_map` and looks like + `m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`). + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response1, event_pos1 = self._send_event_on_instance( + "worker1", room_id1, user1_tok + ) + event_response2, event_pos2 = self._send_event_on_instance( + "worker1", room_id1, user1_tok + ) + + # Create another room to advance the `stream_ordering` on the same worker + # so we can sandwich event3 in the middle of the token + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response3, event_pos3 = self._send_event_on_instance( + "worker1", room_id2, user1_tok + ) + event_response4, event_pos4 = self._send_event_on_instance( + "worker1", room_id2, user1_tok + ) + + # Assemble a token that encompasses event3 -> event4 on worker1 + end_token = RoomStreamToken( + stream=event_pos3.stream, + instance_map=immutabledict({"worker1": event_pos4.stream}), + ) + + # Send some events after the token + self.helper.send(room_id1, "after1", tok=user1_tok) + self.helper.send(room_id1, "after2", tok=user1_tok) + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id1, + end_token=end_token, + ) + ) + + # Should find closest event before the token in room1 + self.assertEqual( + last_event, + event_response2["event_id"], + f"We expected {event_response2["event_id"]} but saw {last_event} which corresponds to" + + str( + { + "event1": event_response1["event_id"], + "event2": event_response2["event_id"], + } + ), + ) From 2e1d1428929b9c0a50c01a4ac790eb05ef1238a3 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 11 Jun 2024 20:29:40 -0500 Subject: [PATCH 05/27] Add actual guranteed order for UNION We use `union all` because we don't need any of the deduplication logic (`union` is really a union + distinct). `UNION ALL`` does preserve the ordering of the operand queries but there is no actual gurantee that it has this behavior in all scenarios so we need the extra `ORDER BY` at the bottom. See https://dba.stackexchange.com/questions/316818/are-results-from-union-all-clauses-always-appended-in-order/316835#316835 --- synapse/storage/databases/main/stream.py | 8 ++++++++ tests/storage/test_stream.py | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 47c7366a035..70c3a3cb07e 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -922,6 +922,13 @@ def get_last_event_in_room_before_stream_ordering_txn( min_stream = end_token.stream max_stream = end_token.get_max_stream_pos() + # We use `union all` because we don't need any of the deduplication logic + # (`union` is really a union + distinct). `UNION ALL`` does preserve the + # ordering of the operand queries but there is no actual gurantee that it + # has this behavior in all scenarios so we need the extra `ORDER BY` at the + # bottom. + # + # We're using the subquery syntax for SQLite compatibility. sql = """ SELECT * FROM ( SELECT instance_name, stream_ordering, topological_ordering, event_id @@ -945,6 +952,7 @@ def get_last_event_in_room_before_stream_ordering_txn( ORDER BY stream_ordering DESC LIMIT 1 ) AS b + ORDER BY stream_ordering DESC """ txn.execute( sql, diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index de01107ee1d..7488eae0eed 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -449,7 +449,7 @@ def test_last_event_within_sharded_token(self) -> None: # Assemble a token that encompasses event1 -> event4 on worker1 end_token = RoomStreamToken( - stream=event_pos1.stream, + stream=event_pos2.stream, instance_map=immutabledict({"worker1": event_pos4.stream}), ) From b1af992113a1939707ff5e50946ae816ae009beb Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 11 Jun 2024 20:30:32 -0500 Subject: [PATCH 06/27] Some clean-up --- synapse/handlers/sliding_sync.py | 5 ++++- tests/handlers/test_sliding_sync.py | 3 --- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 7379a007cd9..352e00c79ca 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -454,4 +454,7 @@ async def sort_rooms( to_token: We sort based on the events in the room at this token """ # TODO: `get_last_event_in_room_before_stream_ordering()` - pass + + # TODO: Handle when people are left/banned from the room and shouldn't see past that point + + return list(room_id_set) diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 5c27474b966..6389afdd603 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -1047,7 +1047,6 @@ def test_sharded_event_persisters(self) -> None: # Get a token while things are stuck after our activity stuck_activity_token = self.event_sources.get_current_token() - logger.info("stuck_activity_token %s", stuck_activity_token) # Let's make sure we're working with a token that has an `instance_map` self.assertNotEqual(len(stuck_activity_token.room_key.instance_map), 0) @@ -1057,7 +1056,6 @@ def test_sharded_event_persisters(self) -> None: join_on_worker2_pos = self.get_success( self.store.get_position_for_event(join_on_worker2_response["event_id"]) ) - logger.info("join_on_worker2_pos %s", join_on_worker2_pos) # Ensure the join technially came after our token self.assertGreater( join_on_worker2_pos.stream, @@ -1076,7 +1074,6 @@ def test_sharded_event_persisters(self) -> None: join_on_worker3_pos = self.get_success( self.store.get_position_for_event(join_on_worker3_response["event_id"]) ) - logger.info("join_on_worker3_pos %s", join_on_worker3_pos) # Ensure the join came after the min but still encapsulated by the token self.assertGreaterEqual( join_on_worker3_pos.stream, From 901ce62d079a40deb380e683a4c3b2b69dad3fd6 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 11 Jun 2024 20:48:02 -0500 Subject: [PATCH 07/27] Try to better explain why See https://github.com/element-hq/synapse/pull/17293#discussion_r1633904606 --- synapse/storage/databases/main/stream.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 70c3a3cb07e..680014bad52 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -914,11 +914,15 @@ async def get_last_event_in_room_before_stream_ordering( def get_last_event_in_room_before_stream_ordering_txn( txn: LoggingTransaction, ) -> Optional[str]: - # We need to handle the fact that the stream tokens can be vector clocks. We - # do this by getting all rows between the minimum and maximum stream - # ordering in the token, plus one row less than the minimum stream ordering - # (TODO: Why?). We then filter the results against the token and return the - # first row that matches. + # We're looking for the closest event at or before the token. We need to + # handle the fact that the stream token can be a vector clock (with an + # `instance_map`) and events can be persisted on different instances + # (sharded event persisters). The first subquery handles the events that + # would be within the vector clock and gets all rows between the minimum and + # maximum stream ordering in the token which need to be filtered against the + # `instance_map`. The second subquery handles the "before" case and finds a + # row before the token. We then filter out any results past the token's + # vector clock and return the first row that matches. min_stream = end_token.stream max_stream = end_token.get_max_stream_pos() From 87ad458d77729f5d9e37a082e1e46cfc88e36acf Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 11 Jun 2024 20:14:01 -0500 Subject: [PATCH 08/27] Fix `get_last_event_in_room_before_stream_ordering(...)` not finding the last event Previously, it would use the event with the lowest `stream_ordering` that was fetched (we want the highest `stream_ordering. `union` does not work how you might think at first and does not preserve the ordering of the operand queries. `union all` also doesn't necessarily have that gurantee but it does behave that way. See https://dba.stackexchange.com/questions/316818/are-results-from-union-all-clauses-always-appended-in-order/316835#316835 --- synapse/storage/databases/main/stream.py | 2 +- tests/storage/test_stream.py | 270 ++++++++++++++++++++++- 2 files changed, 269 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 7ab6003f61e..a6712989676 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -931,7 +931,7 @@ def get_last_event_in_room_before_stream_ordering_txn( AND rejections.event_id IS NULL ORDER BY stream_ordering DESC ) AS a - UNION + UNION ALL SELECT * FROM ( SELECT instance_name, stream_ordering, topological_ordering, event_id FROM events diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index 2029cd9c685..de01107ee1d 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -19,7 +19,10 @@ # # -from typing import List +import logging +from typing import List, Tuple + +from immutabledict import immutabledict from twisted.test.proto_helpers import MemoryReactor @@ -28,11 +31,13 @@ from synapse.rest import admin from synapse.rest.client import login, room from synapse.server import HomeServer -from synapse.types import JsonDict +from synapse.types import JsonDict, PersistedEventPosition, RoomStreamToken from synapse.util import Clock from tests.unittest import HomeserverTestCase +logger = logging.getLogger(__name__) + class PaginationTestCase(HomeserverTestCase): """ @@ -268,3 +273,264 @@ def test_filter_not_rel_types(self) -> None: } chunk = self._filter_messages(filter) self.assertEqual(chunk, [self.event_id_1, self.event_id_2, self.event_id_none]) + + +class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase): + """ + Test `get_last_event_in_room_before_stream_ordering(...)` + """ + + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + self.event_sources = hs.get_event_sources() + + def _update_persisted_instance_name_for_event( + self, event_id: str, instance_name: str + ) -> None: + """ + Update the `instance_name` that persisted the the event in the database. + """ + return self.get_success( + self.store.db_pool.simple_update_one( + "events", + keyvalues={"event_id": event_id}, + updatevalues={"instance_name": instance_name}, + ) + ) + + def _send_event_on_instance( + self, instance_name: str, room_id: str, access_token: str + ) -> Tuple[JsonDict, PersistedEventPosition]: + """ + Send an event in a room and mimic that it was persisted by a specific + instance/worker. + """ + event_response = self.helper.send( + room_id, f"{instance_name} message", tok=access_token + ) + + self._update_persisted_instance_name_for_event( + event_response["event_id"], instance_name + ) + + event_pos = self.get_success( + self.store.get_position_for_event(event_response["event_id"]) + ) + + return event_response, event_pos + + def test_before_room_created(self) -> None: + """ + Test that no event is returned if we are using a token before the room was even created + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + before_room_token = self.event_sources.get_current_token() + + room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id, + end_token=before_room_token.room_key, + ) + ) + + self.assertIsNone(last_event) + + def test_after_room_created(self) -> None: + """ + Test that an event returned if we are using a token after the room was created + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + + after_room_token = self.event_sources.get_current_token() + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id, + end_token=after_room_token.room_key, + ) + ) + + self.assertIsNotNone(last_event) + + def test_activity_in_other_rooms(self) -> None: + """ + Test to make sure that the last event in room is returned even if the + `stream_ordering` has advanced past it (and that's from the correct room) + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response = self.helper.send(room_id1, "target!", tok=user1_tok) + # Create another room to advance the stream_ordering + self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + + after_room_token = self.event_sources.get_current_token() + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id1, + end_token=after_room_token.room_key, + ) + ) + + # Make sure it's the event we expect (which also means we know it's from the + # correct room) + self.assertEqual(last_event, event_response["event_id"]) + + def test_activity_after_token_has_no_effect(self) -> None: + """ + Test to make sure we return the last event before the token even if there is + activity after it. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response = self.helper.send(room_id1, "target!", tok=user1_tok) + + after_room_token = self.event_sources.get_current_token() + + # Send some events after the token + self.helper.send(room_id1, "after1", tok=user1_tok) + self.helper.send(room_id1, "after2", tok=user1_tok) + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id1, + end_token=after_room_token.room_key, + ) + ) + + # Make sure it's the last event before the token + self.assertEqual(last_event, event_response["event_id"]) + + def test_last_event_within_sharded_token(self) -> None: + """ + Test to make sure we can find the last event that that is *within* the sharded + token (a token that has an `instance_map` and looks like + `m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`). We are specifically testing + that we can find an event within the tokens minimum and instance + `stream_ordering`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response1, event_pos1 = self._send_event_on_instance( + "worker1", room_id1, user1_tok + ) + event_response2, event_pos2 = self._send_event_on_instance( + "worker1", room_id1, user1_tok + ) + event_response3, event_pos3 = self._send_event_on_instance( + "worker1", room_id1, user1_tok + ) + + # Create another room to advance the `stream_ordering` on the same worker + # so we can sandwich event3 in the middle of the token + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response4, event_pos4 = self._send_event_on_instance( + "worker1", room_id2, user1_tok + ) + + # Assemble a token that encompasses event1 -> event4 on worker1 + end_token = RoomStreamToken( + stream=event_pos1.stream, + instance_map=immutabledict({"worker1": event_pos4.stream}), + ) + + # Send some events after the token + self.helper.send(room_id1, "after1", tok=user1_tok) + self.helper.send(room_id1, "after2", tok=user1_tok) + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id1, + end_token=end_token, + ) + ) + + # Should find closest event before the token in room1 + self.assertEqual( + last_event, + event_response3["event_id"], + f"We expected {event_response3["event_id"]} but saw {last_event} which corresponds to" + + str( + { + "event1": event_response1["event_id"], + "event2": event_response2["event_id"], + "event3": event_response3["event_id"], + } + ), + ) + + def test_last_event_before_sharded_token(self) -> None: + """ + Test to make sure we can find the last event that that is *before* the sharded + token (a token that has an `instance_map` and looks like + `m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`). + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response1, event_pos1 = self._send_event_on_instance( + "worker1", room_id1, user1_tok + ) + event_response2, event_pos2 = self._send_event_on_instance( + "worker1", room_id1, user1_tok + ) + + # Create another room to advance the `stream_ordering` on the same worker + # so we can sandwich event3 in the middle of the token + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response3, event_pos3 = self._send_event_on_instance( + "worker1", room_id2, user1_tok + ) + event_response4, event_pos4 = self._send_event_on_instance( + "worker1", room_id2, user1_tok + ) + + # Assemble a token that encompasses event3 -> event4 on worker1 + end_token = RoomStreamToken( + stream=event_pos3.stream, + instance_map=immutabledict({"worker1": event_pos4.stream}), + ) + + # Send some events after the token + self.helper.send(room_id1, "after1", tok=user1_tok) + self.helper.send(room_id1, "after2", tok=user1_tok) + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id1, + end_token=end_token, + ) + ) + + # Should find closest event before the token in room1 + self.assertEqual( + last_event, + event_response2["event_id"], + f"We expected {event_response2["event_id"]} but saw {last_event} which corresponds to" + + str( + { + "event1": event_response1["event_id"], + "event2": event_response2["event_id"], + } + ), + ) From 431b31e0f2529cf58be09d561b59926a508eee0f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 11 Jun 2024 20:29:40 -0500 Subject: [PATCH 09/27] Add actual guranteed order for UNION We use `union all` because we don't need any of the deduplication logic (`union` is really a union + distinct). `UNION ALL`` does preserve the ordering of the operand queries but there is no actual gurantee that it has this behavior in all scenarios so we need the extra `ORDER BY` at the bottom. See https://dba.stackexchange.com/questions/316818/are-results-from-union-all-clauses-always-appended-in-order/316835#316835 --- synapse/storage/databases/main/stream.py | 8 ++++++++ tests/storage/test_stream.py | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index a6712989676..11af5fdea84 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -920,6 +920,13 @@ def get_last_event_in_room_before_stream_ordering_txn( # minimum stream ordering. We then filter the results against the # token and return the first row that matches. + # We use `union all` because we don't need any of the deduplication logic + # (`union` is really a union + distinct). `UNION ALL`` does preserve the + # ordering of the operand queries but there is no actual gurantee that it + # has this behavior in all scenarios so we need the extra `ORDER BY` at the + # bottom. + # + # We're using the subquery syntax for SQLite compatibility. sql = """ SELECT * FROM ( SELECT instance_name, stream_ordering, topological_ordering, event_id @@ -943,6 +950,7 @@ def get_last_event_in_room_before_stream_ordering_txn( ORDER BY stream_ordering DESC LIMIT 1 ) AS b + ORDER BY stream_ordering DESC """ txn.execute( sql, diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index de01107ee1d..7488eae0eed 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -449,7 +449,7 @@ def test_last_event_within_sharded_token(self) -> None: # Assemble a token that encompasses event1 -> event4 on worker1 end_token = RoomStreamToken( - stream=event_pos1.stream, + stream=event_pos2.stream, instance_map=immutabledict({"worker1": event_pos4.stream}), ) From d7f40aedf701f4d75d05d0c37aff8064b07d3f5e Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 11 Jun 2024 20:48:02 -0500 Subject: [PATCH 10/27] Try to better explain why See https://github.com/element-hq/synapse/pull/17293#discussion_r1633904606 --- synapse/storage/databases/main/stream.py | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 11af5fdea84..680014bad52 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -914,11 +914,17 @@ async def get_last_event_in_room_before_stream_ordering( def get_last_event_in_room_before_stream_ordering_txn( txn: LoggingTransaction, ) -> Optional[str]: - # We need to handle the fact that the stream tokens can be vector - # clocks. We do this by getting all rows between the minimum and - # maximum stream ordering in the token, plus one row less than the - # minimum stream ordering. We then filter the results against the - # token and return the first row that matches. + # We're looking for the closest event at or before the token. We need to + # handle the fact that the stream token can be a vector clock (with an + # `instance_map`) and events can be persisted on different instances + # (sharded event persisters). The first subquery handles the events that + # would be within the vector clock and gets all rows between the minimum and + # maximum stream ordering in the token which need to be filtered against the + # `instance_map`. The second subquery handles the "before" case and finds a + # row before the token. We then filter out any results past the token's + # vector clock and return the first row that matches. + min_stream = end_token.stream + max_stream = end_token.get_max_stream_pos() # We use `union all` because we don't need any of the deduplication logic # (`union` is really a union + distinct). `UNION ALL`` does preserve the @@ -956,10 +962,10 @@ def get_last_event_in_room_before_stream_ordering_txn( sql, ( room_id, - end_token.stream, - end_token.get_max_stream_pos(), + min_stream, + max_stream, room_id, - end_token.stream, + min_stream, ), ) From a8056ae67da27bf5effab35775ec5e37d935b4fd Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 11 Jun 2024 22:04:48 -0500 Subject: [PATCH 11/27] Add changelog --- changelog.d/17295.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/17295.bugfix diff --git a/changelog.d/17295.bugfix b/changelog.d/17295.bugfix new file mode 100644 index 00000000000..4484253bb8a --- /dev/null +++ b/changelog.d/17295.bugfix @@ -0,0 +1 @@ +Fix edge case in `/sync` returning the wrong the state when using sharded event persisters. From 3f317a9929eb9c32fd733764ff5f9389b318c75b Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 11 Jun 2024 22:05:27 -0500 Subject: [PATCH 12/27] We're actually using sub-query syntax so we can ORDER each query --- synapse/storage/databases/main/stream.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 680014bad52..dd5f76dd53d 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -931,8 +931,6 @@ def get_last_event_in_room_before_stream_ordering_txn( # ordering of the operand queries but there is no actual gurantee that it # has this behavior in all scenarios so we need the extra `ORDER BY` at the # bottom. - # - # We're using the subquery syntax for SQLite compatibility. sql = """ SELECT * FROM ( SELECT instance_name, stream_ordering, topological_ordering, event_id From 54bdc0ca92002287bb5280089c7b5089e7b14714 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 11 Jun 2024 22:36:15 -0500 Subject: [PATCH 13/27] Fix invalid syntax --- tests/storage/test_stream.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index 7488eae0eed..2dd40831994 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -468,7 +468,7 @@ def test_last_event_within_sharded_token(self) -> None: self.assertEqual( last_event, event_response3["event_id"], - f"We expected {event_response3["event_id"]} but saw {last_event} which corresponds to" + f"We expected {event_response3['event_id']} but saw {last_event} which corresponds to " + str( { "event1": event_response1["event_id"], @@ -526,7 +526,7 @@ def test_last_event_before_sharded_token(self) -> None: self.assertEqual( last_event, event_response2["event_id"], - f"We expected {event_response2["event_id"]} but saw {last_event} which corresponds to" + f"We expected {event_response2['event_id']} but saw {last_event} which corresponds to " + str( { "event1": event_response1["event_id"], From c94550d54278ace5bc0d22544612bfed92dbe17d Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 12 Jun 2024 14:53:36 -0500 Subject: [PATCH 14/27] Add `event.internal_metadata.instance_name` and event position to `get_last_event_in_room_before_stream_ordering(...)` --- synapse/events/utils.py | 2 + synapse/handlers/message.py | 3 +- synapse/handlers/sync.py | 10 +++-- synapse/storage/databases/main/events.py | 1 + .../storage/databases/main/events_worker.py | 16 +++++--- synapse/storage/databases/main/stream.py | 34 +++++++++++++++-- synapse/synapse_rust/events.pyi | 2 + tests/events/test_utils.py | 3 ++ tests/storage/test_event_chain.py | 1 + tests/storage/test_stream.py | 38 ++++++++++++------- 10 files changed, 82 insertions(+), 28 deletions(-) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 07724723125..b997d82d71f 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -90,6 +90,7 @@ def prune_event(event: EventBase) -> EventBase: pruned_event.internal_metadata.stream_ordering = ( event.internal_metadata.stream_ordering ) + pruned_event.internal_metadata.instance_name = event.internal_metadata.instance_name pruned_event.internal_metadata.outlier = event.internal_metadata.outlier # Mark the event as redacted @@ -116,6 +117,7 @@ def clone_event(event: EventBase) -> EventBase: new_event.internal_metadata.stream_ordering = ( event.internal_metadata.stream_ordering ) + new_event.internal_metadata.instance_name = event.internal_metadata.instance_name new_event.internal_metadata.outlier = event.internal_metadata.outlier return new_event diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index de5bd44a5fc..16d01efc671 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -201,7 +201,7 @@ async def get_state_events( if at_token: last_event_id = ( - await self.store.get_last_event_in_room_before_stream_ordering( + await self.store.get_last_event_id_in_room_before_stream_ordering( room_id, end_token=at_token.room_key, ) @@ -1551,6 +1551,7 @@ async def _persist_events( # stream_ordering entry manually (as it was persisted on # another worker). event.internal_metadata.stream_ordering = stream_id + event.internal_metadata.instance_name = writer_instance return event diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 39964726c5b..881f66e2da7 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1038,9 +1038,11 @@ async def get_state_at( # FIXME: This gets the state at the latest event before the stream ordering, # which might not be the same as the "current state" of the room at the time # of the stream token if there were multiple forward extremities at the time. - last_event_id = await self.store.get_last_event_in_room_before_stream_ordering( - room_id, - end_token=stream_position.room_key, + last_event_id = ( + await self.store.get_last_event_id_in_room_before_stream_ordering( + room_id, + end_token=stream_position.room_key, + ) ) if last_event_id: @@ -1521,7 +1523,7 @@ async def _compute_state_delta_for_incremental_sync( # We need to make sure the first event in our batch points to the # last event in the previous batch. last_event_id_prev_batch = ( - await self.store.get_last_event_in_room_before_stream_ordering( + await self.store.get_last_event_id_in_room_before_stream_ordering( room_id, end_token=since_token.room_key, ) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index f1bd85aa276..66428e6c8ee 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -207,6 +207,7 @@ async def _persist_events_and_state_updates( async with stream_ordering_manager as stream_orderings: for (event, _), stream in zip(events_and_contexts, stream_orderings): event.internal_metadata.stream_ordering = stream + event.internal_metadata.instance_name = self._instance_name await self.db_pool.runInteraction( "persist_events", diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index c06c44deb1f..e264d36f025 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -156,6 +156,7 @@ class _EventRow: event_id: str stream_ordering: int + instance_name: str json: str internal_metadata: str format_version: Optional[int] @@ -1354,6 +1355,7 @@ async def _fetch_event_ids_and_get_outstanding_redactions( rejected_reason=rejected_reason, ) original_ev.internal_metadata.stream_ordering = row.stream_ordering + original_ev.internal_metadata.instance_name = row.instance_name original_ev.internal_metadata.outlier = row.outlier # Consistency check: if the content of the event has been modified in the @@ -1439,6 +1441,7 @@ def _fetch_event_rows( SELECT e.event_id, e.stream_ordering, + e.instance_name, ej.internal_metadata, ej.json, ej.format_version, @@ -1462,13 +1465,14 @@ def _fetch_event_rows( event_dict[event_id] = _EventRow( event_id=event_id, stream_ordering=row[1], - internal_metadata=row[2], - json=row[3], - format_version=row[4], - room_version_id=row[5], - rejected_reason=row[6], + instance_name=row[2], + internal_metadata=row[3], + json=row[4], + format_version=row[5], + room_version_id=row[6], + rejected_reason=row[7], redactions=[], - outlier=bool(row[7]), # This is an int in SQLite3 + outlier=bool(row[8]), # This is an int in SQLite3 ) # check for redactions diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index dd5f76dd53d..d7cd5ae4141 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -895,7 +895,7 @@ def _f(txn: LoggingTransaction) -> Optional[Tuple[int, int, str]]: "get_room_event_before_stream_ordering", _f ) - async def get_last_event_in_room_before_stream_ordering( + async def get_last_event_id_in_room_before_stream_ordering( self, room_id: str, end_token: RoomStreamToken, @@ -910,10 +910,36 @@ async def get_last_event_in_room_before_stream_ordering( The ID of the most recent event, or None if there are no events in the room before this stream ordering. """ + last_event_result = await self.get_last_event_in_room_before_stream_ordering( + room_id, end_token + ) + + if last_event_result: + return last_event_result[0] + + return None + + async def get_last_event_in_room_before_stream_ordering( + self, + room_id: str, + end_token: RoomStreamToken, + ) -> Optional[Tuple[str, PersistedEventPosition]]: + """ + Returns the ID and event position of the last event in a room at or before a + stream ordering. + + Args: + room_id + end_token: The token used to stream from + + Returns: + The ID of the most recent event and it's position, or None if there are no + events in the room before this stream ordering. + """ def get_last_event_in_room_before_stream_ordering_txn( txn: LoggingTransaction, - ) -> Optional[str]: + ) -> Optional[Tuple[str, PersistedEventPosition]]: # We're looking for the closest event at or before the token. We need to # handle the fact that the stream token can be a vector clock (with an # `instance_map`) and events can be persisted on different instances @@ -975,7 +1001,9 @@ def get_last_event_in_room_before_stream_ordering_txn( topological_ordering=topological_ordering, stream_ordering=stream_ordering, ): - return event_id + return event_id, PersistedEventPosition( + instance_name, stream_ordering + ) return None diff --git a/synapse/synapse_rust/events.pyi b/synapse/synapse_rust/events.pyi index 69837617f55..1682d0d151a 100644 --- a/synapse/synapse_rust/events.pyi +++ b/synapse/synapse_rust/events.pyi @@ -19,6 +19,8 @@ class EventInternalMetadata: stream_ordering: Optional[int] """the stream ordering of this event. None, until it has been persisted.""" + instance_name: Optional[str] + """the instance name of the server that persisted this event. None, until it has been persisted.""" outlier: bool """whether this event is an outlier (ie, whether we have the state at that diff --git a/tests/events/test_utils.py b/tests/events/test_utils.py index d5ac66a6ed9..30f87877588 100644 --- a/tests/events/test_utils.py +++ b/tests/events/test_utils.py @@ -625,6 +625,8 @@ def test_unsigned_is_copied(self) -> None: ) original.internal_metadata.stream_ordering = 1234 self.assertEqual(original.internal_metadata.stream_ordering, 1234) + original.internal_metadata.instance_name = "worker1" + self.assertEqual(original.internal_metadata.instance_name, "worker1") cloned = clone_event(original) cloned.unsigned["b"] = 3 @@ -632,6 +634,7 @@ def test_unsigned_is_copied(self) -> None: self.assertEqual(original.unsigned, {"a": 1, "b": 2}) self.assertEqual(cloned.unsigned, {"a": 1, "b": 3}) self.assertEqual(cloned.internal_metadata.stream_ordering, 1234) + self.assertEqual(cloned.internal_metadata.instance_name, "worker1") self.assertEqual(cloned.internal_metadata.txn_id, "txn") diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py index 27d5b0125fd..81feb3ec299 100644 --- a/tests/storage/test_event_chain.py +++ b/tests/storage/test_event_chain.py @@ -431,6 +431,7 @@ def persist( for e in events: e.internal_metadata.stream_ordering = self._next_stream_ordering + e.internal_metadata.instance_name = self.hs.get_instance_name() self._next_stream_ordering += 1 def _persist(txn: LoggingTransaction) -> None: diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index 2dd40831994..b8dd796a718 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -336,14 +336,14 @@ def test_before_room_created(self) -> None: room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) - last_event = self.get_success( + last_event_result = self.get_success( self.store.get_last_event_in_room_before_stream_ordering( room_id=room_id, end_token=before_room_token.room_key, ) ) - self.assertIsNone(last_event) + self.assertIsNone(last_event_result) def test_after_room_created(self) -> None: """ @@ -356,14 +356,16 @@ def test_after_room_created(self) -> None: after_room_token = self.event_sources.get_current_token() - last_event = self.get_success( + last_event_result = self.get_success( self.store.get_last_event_in_room_before_stream_ordering( room_id=room_id, end_token=after_room_token.room_key, ) ) + assert last_event_result is not None + last_event_id, _ = last_event_result - self.assertIsNotNone(last_event) + self.assertIsNotNone(last_event_id) def test_activity_in_other_rooms(self) -> None: """ @@ -380,16 +382,18 @@ def test_activity_in_other_rooms(self) -> None: after_room_token = self.event_sources.get_current_token() - last_event = self.get_success( + last_event_result = self.get_success( self.store.get_last_event_in_room_before_stream_ordering( room_id=room_id1, end_token=after_room_token.room_key, ) ) + assert last_event_result is not None + last_event_id, _ = last_event_result # Make sure it's the event we expect (which also means we know it's from the # correct room) - self.assertEqual(last_event, event_response["event_id"]) + self.assertEqual(last_event_id, event_response["event_id"]) def test_activity_after_token_has_no_effect(self) -> None: """ @@ -408,15 +412,17 @@ def test_activity_after_token_has_no_effect(self) -> None: self.helper.send(room_id1, "after1", tok=user1_tok) self.helper.send(room_id1, "after2", tok=user1_tok) - last_event = self.get_success( + last_event_result = self.get_success( self.store.get_last_event_in_room_before_stream_ordering( room_id=room_id1, end_token=after_room_token.room_key, ) ) + assert last_event_result is not None + last_event_id, _ = last_event_result # Make sure it's the last event before the token - self.assertEqual(last_event, event_response["event_id"]) + self.assertEqual(last_event_id, event_response["event_id"]) def test_last_event_within_sharded_token(self) -> None: """ @@ -457,18 +463,20 @@ def test_last_event_within_sharded_token(self) -> None: self.helper.send(room_id1, "after1", tok=user1_tok) self.helper.send(room_id1, "after2", tok=user1_tok) - last_event = self.get_success( + last_event_result = self.get_success( self.store.get_last_event_in_room_before_stream_ordering( room_id=room_id1, end_token=end_token, ) ) + assert last_event_result is not None + last_event_id, _ = last_event_result # Should find closest event before the token in room1 self.assertEqual( - last_event, + last_event_id, event_response3["event_id"], - f"We expected {event_response3['event_id']} but saw {last_event} which corresponds to " + f"We expected {event_response3['event_id']} but saw {last_event_id} which corresponds to " + str( { "event1": event_response1["event_id"], @@ -515,18 +523,20 @@ def test_last_event_before_sharded_token(self) -> None: self.helper.send(room_id1, "after1", tok=user1_tok) self.helper.send(room_id1, "after2", tok=user1_tok) - last_event = self.get_success( + last_event_result = self.get_success( self.store.get_last_event_in_room_before_stream_ordering( room_id=room_id1, end_token=end_token, ) ) + assert last_event_result is not None + last_event_id, _ = last_event_result # Should find closest event before the token in room1 self.assertEqual( - last_event, + last_event_id, event_response2["event_id"], - f"We expected {event_response2['event_id']} but saw {last_event} which corresponds to " + f"We expected {event_response2['event_id']} but saw {last_event_id} which corresponds to " + str( { "event1": event_response1["event_id"], From afb6627b6ff82b0bf30ec021e440b936908139ea Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 12 Jun 2024 15:11:29 -0500 Subject: [PATCH 15/27] Add rust changes for `event.internal_metadata.instance_name` --- rust/src/events/internal_metadata.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rust/src/events/internal_metadata.rs b/rust/src/events/internal_metadata.rs index 63774fbd547..ad87825f163 100644 --- a/rust/src/events/internal_metadata.rs +++ b/rust/src/events/internal_metadata.rs @@ -204,6 +204,8 @@ pub struct EventInternalMetadata { /// The stream ordering of this event. None, until it has been persisted. #[pyo3(get, set)] stream_ordering: Option, + #[pyo3(get, set)] + instance_name: Option, /// whether this event is an outlier (ie, whether we have the state at that /// point in the DAG) @@ -232,6 +234,7 @@ impl EventInternalMetadata { Ok(EventInternalMetadata { data, stream_ordering: None, + instance_name: None, outlier: false, }) } From af60f7b508e2f6cb29476408aa3a8329d6020b7c Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 12 Jun 2024 15:46:37 -0500 Subject: [PATCH 16/27] First pass on `sort_rooms` and refactor to include room membership alongside the sync rooms --- synapse/handlers/sliding_sync.py | 115 ++++++++++++++++++++++------ tests/handlers/test_sliding_sync.py | 44 +++++------ 2 files changed, 114 insertions(+), 45 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 352e00c79ca..d22191f1b9a 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -24,7 +24,14 @@ from synapse.api.constants import Membership from synapse.events import EventBase -from synapse.types import Requester, RoomStreamToken, StreamToken, UserID +from synapse.storage.roommember import RoomsForUser +from synapse.types import ( + PersistedEventPosition, + Requester, + RoomStreamToken, + StreamToken, + UserID, +) from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult if TYPE_CHECKING: @@ -33,6 +40,27 @@ logger = logging.getLogger(__name__) +def convert_event_to_rooms_for_user(event: EventBase) -> RoomsForUser: + """ + Quick helper to convert an event to a `RoomsForUser` object. + """ + # These fields should be present for all persisted events + assert event.internal_metadata.stream_ordering is not None + assert event.internal_metadata.instance_name is not None + + return RoomsForUser( + room_id=event.room_id, + sender=event.sender, + membership=event.membership, + event_id=event.event_id, + event_pos=PersistedEventPosition( + event.internal_metadata.instance_name, + event.internal_metadata.stream_ordering, + ), + room_version_id=event.room_version.identifier, + ) + + def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> bool: """ Returns True if the membership event should be included in the sync response, @@ -151,25 +179,25 @@ async def current_sync_for_user( # See https://github.com/matrix-org/matrix-doc/issues/1144 raise NotImplementedError() - # Get all of the room IDs that the user should be able to see in the sync - # response - room_id_set = await self.get_sync_room_ids_for_user( - sync_config.user, - from_token=from_token, - to_token=to_token, - ) - # Assemble sliding window lists lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} if sync_config.lists: + # Get all of the room IDs that the user should be able to see in the sync + # response + sync_room_map = await self.get_sync_room_ids_for_user( + sync_config.user, + from_token=from_token, + to_token=to_token, + ) + for list_key, list_config in sync_config.lists.items(): # TODO: Apply filters # # TODO: Exclude partially stated rooms unless the `required_state` has # `["m.room.member", "$LAZY"]` - filtered_room_ids = room_id_set + filtered_room_map = sync_room_map # TODO: Apply sorts - sorted_room_ids = await self.sort_rooms(filtered_room_ids, to_token) + sorted_room_ids = await self.sort_rooms(filtered_room_map, to_token) ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] if list_config.ranges: @@ -200,7 +228,7 @@ async def get_sync_room_ids_for_user( user: UserID, to_token: StreamToken, from_token: Optional[StreamToken] = None, - ) -> AbstractSet[str]: + ) -> Dict[str, RoomsForUser]: """ Fetch room IDs that should be listed for this user in the sync response (the full room list that will be filtered, sorted, and sliced). @@ -217,6 +245,15 @@ async def get_sync_room_ids_for_user( `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way to tell when a room was forgotten at the moment so we can't factor it into the from/to range. + + Args: + user: User to fetch rooms for + to_token: The token to fetch rooms up to. + from_token: The point in the stream to sync from. + + Returns: + A dictionary of room IDs that should be listed in the sync response along + with membership information in that room at the time of `to_token`. """ user_id = user.to_string() @@ -236,11 +273,11 @@ async def get_sync_room_ids_for_user( # If the user has never joined any rooms before, we can just return an empty list if not room_for_user_list: - return set() + return {} # Our working list of rooms that can show up in the sync response sync_room_id_set = { - room_for_user.room_id + room_for_user.room_id: room_for_user for room_for_user in room_for_user_list if filter_membership_for_sync( membership=room_for_user.membership, @@ -390,7 +427,9 @@ async def get_sync_room_ids_for_user( not was_last_membership_already_included and should_prev_membership_be_included ): - sync_room_id_set.add(room_id) + sync_room_id_set[room_id] = convert_event_to_rooms_for_user( + last_membership_change_after_to_token + ) # 1b) Remove rooms that the user joined (hasn't left) after the `to_token` # # For example, if the last membership event after the `to_token` is a "join" @@ -401,7 +440,7 @@ async def get_sync_room_ids_for_user( was_last_membership_already_included and not should_prev_membership_be_included ): - sync_room_id_set.discard(room_id) + del sync_room_id_set[room_id] # 2) ----------------------------------------------------- # We fix-up newly_left rooms after the first fixup because it may have removed @@ -436,13 +475,15 @@ async def get_sync_room_ids_for_user( # include newly_left rooms because the last event that the user should see # is their own leave event if last_membership_change_in_from_to_range.membership == Membership.LEAVE: - sync_room_id_set.add(room_id) + sync_room_id_set[room_id] = convert_event_to_rooms_for_user( + last_membership_change_in_from_to_range + ) return sync_room_id_set async def sort_rooms( self, - room_id_set: AbstractSet[str], + sync_room_map: Dict[str, RoomsForUser], to_token: StreamToken, ) -> List[str]: """ @@ -450,11 +491,39 @@ async def sort_rooms( a stable sort, we tie-break by room ID. Args: - room_id_set: Set of room IDs to sort - to_token: We sort based on the events in the room at this token + sync_room_map: Dictionary of room IDs to sort along with membership + information in the room at the time of `to_token`. + to_token: We sort based on the events in the room at this token (<= `to_token`) """ - # TODO: `get_last_event_in_room_before_stream_ordering()` - # TODO: Handle when people are left/banned from the room and shouldn't see past that point + # Assemble a map of room ID to the `stream_ordering` of the last activity that the + # user should see in the room (<= `to_token`) + last_activity_in_room_map: Dict[str, int] = {} + for room_id, room_for_user in sync_room_map.items(): + # If they are fully-joined to the room, let's find the latest activity + # at/before the `to_token`. + if room_for_user.membership == Membership.JOIN: + last_event_result = ( + await self.store.get_last_event_in_room_before_stream_ordering( + room_id, to_token.room_key + ) + ) - return list(room_id_set) + # If the room has no events at/before the `to_token`, this is probably a + # mistake in the code that generates the `sync_room_map` since that should + # only give us rooms that the user had membership in during the token range. + assert last_event_result is not None + + _, event_pos = last_event_result + + last_activity_in_room_map[room_id] = event_pos.stream + else: + # Otherwise, if the user left/banned from the room, they shouldn't see + # past that point. (same for invites/knocks) + last_activity_in_room_map[room_id] = room_for_user.event_pos.stream + + return sorted( + sync_room_map.keys(), + # Sort by the last activity (stream_ordering) in the room, tie-break on room_id + key=lambda room_id: (last_activity_in_room_map[room_id], room_id), + ) diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 6389afdd603..354d48c41d1 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -78,7 +78,7 @@ def test_no_rooms(self) -> None: ) ) - self.assertEqual(room_id_results, set()) + self.assertEqual(room_id_results.keys(), set()) def test_get_newly_joined_room(self) -> None: """ @@ -102,7 +102,7 @@ def test_get_newly_joined_room(self) -> None: ) ) - self.assertEqual(room_id_results, {room_id}) + self.assertEqual(room_id_results.keys(), {room_id}) def test_get_already_joined_room(self) -> None: """ @@ -123,7 +123,7 @@ def test_get_already_joined_room(self) -> None: ) ) - self.assertEqual(room_id_results, {room_id}) + self.assertEqual(room_id_results.keys(), {room_id}) def test_get_invited_banned_knocked_room(self) -> None: """ @@ -179,7 +179,7 @@ def test_get_invited_banned_knocked_room(self) -> None: # Ensure that the invited, ban, and knock rooms show up self.assertEqual( - room_id_results, + room_id_results.keys(), { invited_room_id, ban_room_id, @@ -225,7 +225,7 @@ def test_get_kicked_room(self) -> None: ) # The kicked room should show up - self.assertEqual(room_id_results, {kick_room_id}) + self.assertEqual(room_id_results.keys(), {kick_room_id}) def test_forgotten_rooms(self) -> None: """ @@ -307,7 +307,7 @@ def test_forgotten_rooms(self) -> None: ) # We shouldn't see the room because it was forgotten - self.assertEqual(room_id_results, set()) + self.assertEqual(room_id_results.keys(), set()) def test_only_newly_left_rooms_show_up(self) -> None: """ @@ -339,7 +339,7 @@ def test_only_newly_left_rooms_show_up(self) -> None: ) # Only the newly_left room should show up - self.assertEqual(room_id_results, {room_id2}) + self.assertEqual(room_id_results.keys(), {room_id2}) def test_no_joins_after_to_token(self) -> None: """ @@ -367,7 +367,7 @@ def test_no_joins_after_to_token(self) -> None: ) ) - self.assertEqual(room_id_results, {room_id1}) + self.assertEqual(room_id_results.keys(), {room_id1}) def test_join_during_range_and_left_room_after_to_token(self) -> None: """ @@ -397,7 +397,7 @@ def test_join_during_range_and_left_room_after_to_token(self) -> None: # We should still see the room because we were joined during the # from_token/to_token time period. - self.assertEqual(room_id_results, {room_id1}) + self.assertEqual(room_id_results.keys(), {room_id1}) def test_join_before_range_and_left_room_after_to_token(self) -> None: """ @@ -424,7 +424,7 @@ def test_join_before_range_and_left_room_after_to_token(self) -> None: ) # We should still see the room because we were joined before the `from_token` - self.assertEqual(room_id_results, {room_id1}) + self.assertEqual(room_id_results.keys(), {room_id1}) def test_kicked_before_range_and_left_after_to_token(self) -> None: """ @@ -472,7 +472,7 @@ def test_kicked_before_range_and_left_after_to_token(self) -> None: ) # We shouldn't see the room because it was forgotten - self.assertEqual(room_id_results, {kick_room_id}) + self.assertEqual(room_id_results.keys(), {kick_room_id}) def test_newly_left_during_range_and_join_leave_after_to_token(self) -> None: """ @@ -509,7 +509,7 @@ def test_newly_left_during_range_and_join_leave_after_to_token(self) -> None: ) # Room should still show up because it's newly_left during the from/to range - self.assertEqual(room_id_results, {room_id1}) + self.assertEqual(room_id_results.keys(), {room_id1}) def test_newly_left_during_range_and_join_after_to_token(self) -> None: """ @@ -545,7 +545,7 @@ def test_newly_left_during_range_and_join_after_to_token(self) -> None: ) # Room should still show up because it's newly_left during the from/to range - self.assertEqual(room_id_results, {room_id1}) + self.assertEqual(room_id_results.keys(), {room_id1}) def test_no_from_token(self) -> None: """ @@ -586,7 +586,7 @@ def test_no_from_token(self) -> None: ) # Only rooms we were joined to before the `to_token` should show up - self.assertEqual(room_id_results, {room_id1}) + self.assertEqual(room_id_results.keys(), {room_id1}) def test_from_token_ahead_of_to_token(self) -> None: """ @@ -647,7 +647,7 @@ def test_from_token_ahead_of_to_token(self) -> None: # # There won't be any newly_left rooms because the `from_token` is ahead of the # `to_token` and that range will give no membership changes to check. - self.assertEqual(room_id_results, {room_id1}) + self.assertEqual(room_id_results.keys(), {room_id1}) def test_leave_before_range_and_join_leave_after_to_token(self) -> None: """ @@ -682,7 +682,7 @@ def test_leave_before_range_and_join_leave_after_to_token(self) -> None: ) # Room shouldn't show up because it was left before the `from_token` - self.assertEqual(room_id_results, set()) + self.assertEqual(room_id_results.keys(), set()) def test_leave_before_range_and_join_after_to_token(self) -> None: """ @@ -716,7 +716,7 @@ def test_leave_before_range_and_join_after_to_token(self) -> None: ) # Room shouldn't show up because it was left before the `from_token` - self.assertEqual(room_id_results, set()) + self.assertEqual(room_id_results.keys(), set()) def test_join_leave_multiple_times_during_range_and_after_to_token( self, @@ -758,7 +758,7 @@ def test_join_leave_multiple_times_during_range_and_after_to_token( ) # Room should show up because it was newly_left and joined during the from/to range - self.assertEqual(room_id_results, {room_id1}) + self.assertEqual(room_id_results.keys(), {room_id1}) def test_join_leave_multiple_times_before_range_and_after_to_token( self, @@ -798,7 +798,7 @@ def test_join_leave_multiple_times_before_range_and_after_to_token( ) # Room should show up because we were joined before the from/to range - self.assertEqual(room_id_results, {room_id1}) + self.assertEqual(room_id_results.keys(), {room_id1}) def test_invite_before_range_and_join_leave_after_to_token( self, @@ -835,7 +835,7 @@ def test_invite_before_range_and_join_leave_after_to_token( ) # Room should show up because we were invited before the from/to range - self.assertEqual(room_id_results, {room_id1}) + self.assertEqual(room_id_results.keys(), {room_id1}) def test_multiple_rooms_are_not_confused( self, @@ -888,7 +888,7 @@ def test_multiple_rooms_are_not_confused( ) self.assertEqual( - room_id_results, + room_id_results.keys(), { # `room_id1` shouldn't show up because we left before the from/to range # @@ -1099,7 +1099,7 @@ def test_sharded_event_persisters(self) -> None: ) self.assertEqual( - room_id_results, + room_id_results.keys(), { room_id1, # room_id2 shouldn't show up because we left before the from/to range From bd49c3415b11d062045c5344468266c52c430f42 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 12 Jun 2024 17:53:24 -0500 Subject: [PATCH 17/27] Add some tests --- synapse/handlers/sliding_sync.py | 29 ++++-- tests/handlers/test_sliding_sync.py | 135 ++++++++++++++++++++++++++++ 2 files changed, 156 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index d22191f1b9a..c84e7d265d6 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -18,7 +18,7 @@ # # import logging -from typing import TYPE_CHECKING, AbstractSet, Dict, List, Optional +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple from immutabledict import immutabledict @@ -197,7 +197,7 @@ async def current_sync_for_user( # `["m.room.member", "$LAZY"]` filtered_room_map = sync_room_map # TODO: Apply sorts - sorted_room_ids = await self.sort_rooms(filtered_room_map, to_token) + sorted_room_info = await self.sort_rooms(filtered_room_map, to_token) ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] if list_config.ranges: @@ -206,12 +206,17 @@ async def current_sync_for_user( SlidingSyncResult.SlidingWindowList.Operation( op=OperationType.SYNC, range=range, - room_ids=sorted_room_ids[range[0] : range[1]], + room_ids=[ + room_id + for room_id, rooms_for_user in sorted_room_info.items()[ + range[0] : range[1] + ] + ], ) ) lists[list_key] = SlidingSyncResult.SlidingWindowList( - count=len(sorted_room_ids), + count=len(sorted_room_info), ops=ops, ) @@ -485,15 +490,18 @@ async def sort_rooms( self, sync_room_map: Dict[str, RoomsForUser], to_token: StreamToken, - ) -> List[str]: + ) -> List[Tuple[str, RoomsForUser]]: """ - Sort by recency of the last event in the room (stream_ordering). In order to get + Sort by stream_ordering of the last event in the room. In order to get a stable sort, we tie-break by room ID. Args: sync_room_map: Dictionary of room IDs to sort along with membership information in the room at the time of `to_token`. to_token: We sort based on the events in the room at this token (<= `to_token`) + + Returns: + A sorted list of room IDs by stream_ordering along with membership information. """ # Assemble a map of room ID to the `stream_ordering` of the last activity that the @@ -523,7 +531,12 @@ async def sort_rooms( last_activity_in_room_map[room_id] = room_for_user.event_pos.stream return sorted( - sync_room_map.keys(), + sync_room_map.items(), # Sort by the last activity (stream_ordering) in the room, tie-break on room_id - key=lambda room_id: (last_activity_in_room_map[room_id], room_id), + key=lambda room_info: ( + last_activity_in_room_map[room_info[0]], + room_info[0], + ), + # We want descending order + reverse=True, ) diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 354d48c41d1..bd1ca8f4ef0 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -1113,3 +1113,138 @@ def test_sharded_event_persisters(self) -> None: room_id3, }, ) + + +class SortRoomsTestCase(HomeserverTestCase): + """ + Tests Sliding Sync handler `sort_rooms()` to make sure it sorts/orders rooms + correctly. + """ + + servlets = [ + admin.register_servlets, + knock.register_servlets, + login.register_servlets, + room.register_servlets, + ] + + def default_config(self) -> JsonDict: + config = super().default_config() + # Enable sliding sync + config["experimental_features"] = {"msc3575_enabled": True} + return config + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.sliding_sync_handler = self.hs.get_sliding_sync_handler() + self.store = self.hs.get_datastores().main + self.event_sources = hs.get_event_sources() + + def test_sort_activity_basic(self) -> None: + """ + Rooms with newer activity are sorted first. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as( + user1_id, + tok=user1_tok, + ) + room_id2 = self.helper.create_room_as( + user1_id, + tok=user1_tok, + ) + + after_rooms_token = self.event_sources.get_current_token() + + sync_room_map = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=None, + to_token=after_rooms_token, + ) + ) + + sorted_room_info = self.get_success( + self.sliding_sync_handler.sort_rooms( + sync_room_map=sync_room_map, + to_token=after_rooms_token, + ) + ) + + self.assertEqual( + [room_id for room_id, _ in sorted_room_info], + [room_id2, room_id1], + ) + + # @parameterized.expand( + # [ + # (Membership.INVITE,), + # (Membership.KNOCK,), + # (Membership.INVITE,), + # ] + # ) + def test_activity_after_invite(self) -> None: + """ + When someone is invited to a room, they shouldn't take anything into account after the invite. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + before_rooms_token = self.event_sources.get_current_token() + + # Create the rooms as user2 so we can have user1 with a clean slate to work from + # and join in whatever order we need for the tests. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + room_id3 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + + # Here is the activity with user1 that will determine the sort of the rooms + self.helper.join(room_id3, user1_id, tok=user1_tok) + self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok) + self.helper.join(room_id2, user1_id, tok=user1_tok) + + # Activity before the token but the user is only invited to this room so it + # shouldn't be taken into account + self.helper.send(room_id1, "activity in room1", tok=user2_tok) + + after_rooms_token = self.event_sources.get_current_token() + + # Activity after the token. Just make it in a different order than what we + # expect to make sure we're not taking the activity after the token into + # account. + self.helper.send(room_id1, "activity in room1", tok=user2_tok) + self.helper.send(room_id2, "activity in room2", tok=user2_tok) + self.helper.send(room_id3, "activity in room3", tok=user2_tok) + + # Get the rooms the user should be syncing with + sync_room_map = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_rooms_token, + to_token=after_rooms_token, + ) + ) + + # Sort the rooms (what we're testing) + sorted_room_info = self.get_success( + self.sliding_sync_handler.sort_rooms( + sync_room_map=sync_room_map, + to_token=after_rooms_token, + ) + ) + + self.assertEqual( + [room_id for room_id, _ in sorted_room_info], + [room_id2, room_id1, room_id3], + "Corresponding map to disambiguate the opaque room IDs: " + + str( + { + "room_id1": room_id1, + "room_id2": room_id2, + "room_id3": room_id3, + } + ), + ) From 50605880120237b5722b45a19578395d8c37a0b5 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 12 Jun 2024 18:18:20 -0500 Subject: [PATCH 18/27] Fix newly_left not being added back if we returned early (when `membership_snapshot_token.is_before_or_eq(to_token.room_key)`) Wasn't caught in the tests because the test was wrong --- synapse/handlers/sliding_sync.py | 26 ++++++++++---------- tests/handlers/test_sliding_sync.py | 38 +++++++++++++++++++++-------- 2 files changed, 41 insertions(+), 23 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index c84e7d265d6..21cf16e0086 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -317,12 +317,6 @@ async def get_sync_room_ids_for_user( instance_map=immutabledict(instance_to_max_stream_ordering_map), ) - # If our `to_token` is already the same or ahead of the latest room membership - # for the user, we can just straight-up return the room list (nothing has - # changed) - if membership_snapshot_token.is_before_or_eq(to_token.room_key): - return sync_room_id_set - # Since we fetched the users room list at some point in time after the from/to # tokens, we need to revert/rewind some membership changes to match the point in # time of the `to_token`. In particular, we need to make these fixups: @@ -342,14 +336,20 @@ async def get_sync_room_ids_for_user( # 1) Fetch membership changes that fall in the range from `to_token` up to # `membership_snapshot_token` - membership_change_events_after_to_token = ( - await self.store.get_membership_changes_for_user( - user_id, - from_key=to_token.room_key, - to_key=membership_snapshot_token, - excluded_rooms=self.rooms_to_exclude_globally, + # + # If our `to_token` is already the same or ahead of the latest room membership + # for the user, we don't need to do any "2)" fix-ups and can just straight-up + # use the room list from the snapshot as a base (nothing has changed) + membership_change_events_after_to_token = [] + if not membership_snapshot_token.is_before_or_eq(to_token.room_key): + membership_change_events_after_to_token = ( + await self.store.get_membership_changes_for_user( + user_id, + from_key=to_token.room_key, + to_key=membership_snapshot_token, + excluded_rooms=self.rooms_to_exclude_globally, + ) ) - ) # 1) Assemble a list of the last membership events in some given ranges. Someone # could have left and joined multiple times during the given range but we only diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index bd1ca8f4ef0..5dfd2d671a9 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -20,6 +20,8 @@ import logging from unittest.mock import patch +from parameterized import parameterized + from twisted.test.proto_helpers import MemoryReactor from synapse.api.constants import EventTypes, JoinRules, Membership @@ -326,7 +328,7 @@ def test_only_newly_left_rooms_show_up(self) -> None: # Leave during the from_token/to_token range (newly_left) room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) - self.helper.leave(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id2, user1_id, tok=user1_tok) after_room2_token = self.event_sources.get_current_token() @@ -1177,14 +1179,15 @@ def test_sort_activity_basic(self) -> None: [room_id2, room_id1], ) - # @parameterized.expand( - # [ - # (Membership.INVITE,), - # (Membership.KNOCK,), - # (Membership.INVITE,), - # ] - # ) - def test_activity_after_invite(self) -> None: + @parameterized.expand( + [ + (Membership.LEAVE,), + (Membership.INVITE,), + (Membership.KNOCK,), + (Membership.BAN,), + ] + ) + def test_activity_after_xxx(self, room1_membership: Membership) -> None: """ When someone is invited to a room, they shouldn't take anything into account after the invite. """ @@ -1198,12 +1201,27 @@ def test_activity_after_invite(self) -> None: # Create the rooms as user2 so we can have user1 with a clean slate to work from # and join in whatever order we need for the tests. room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + if room1_membership == Membership.KNOCK: + self.helper.send_state( + room_id1, + EventTypes.JoinRules, + {"join_rule": JoinRules.KNOCK}, + tok=user2_tok, + ) room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) room_id3 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) # Here is the activity with user1 that will determine the sort of the rooms self.helper.join(room_id3, user1_id, tok=user1_tok) - self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok) + if room1_membership == Membership.LEAVE: + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + elif room1_membership == Membership.INVITE: + self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok) + elif room1_membership == Membership.KNOCK: + self.helper.knock(room_id1, user1_id, tok=user1_tok) + elif room1_membership == Membership.BAN: + self.helper.ban(room_id1, user1_id, user2_id) self.helper.join(room_id2, user1_id, tok=user1_tok) # Activity before the token but the user is only invited to this room so it From 5243a30e0073524b29b98068c7a59919646f5892 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 12 Jun 2024 18:20:47 -0500 Subject: [PATCH 19/27] Fix ban test case --- tests/handlers/test_sliding_sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 5dfd2d671a9..f38b607f689 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -1221,7 +1221,7 @@ def test_activity_after_xxx(self, room1_membership: Membership) -> None: elif room1_membership == Membership.KNOCK: self.helper.knock(room_id1, user1_id, tok=user1_tok) elif room1_membership == Membership.BAN: - self.helper.ban(room_id1, user1_id, user2_id) + self.helper.ban(room_id1, src=user2_id, targ=user1_id, tok=user2_tok) self.helper.join(room_id2, user1_id, tok=user1_tok) # Activity before the token but the user is only invited to this room so it From d5929f15af9172024d198d7ed72ed1bff491d208 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 12 Jun 2024 18:25:20 -0500 Subject: [PATCH 20/27] Adjust wording --- synapse/handlers/sliding_sync.py | 4 ++-- tests/handlers/test_sliding_sync.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 21cf16e0086..c216467c296 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -526,8 +526,8 @@ async def sort_rooms( last_activity_in_room_map[room_id] = event_pos.stream else: - # Otherwise, if the user left/banned from the room, they shouldn't see - # past that point. (same for invites/knocks) + # Otherwise, if the user has left/been invited/knocked/been banned from + # a room, they shouldn't see anything past that point. last_activity_in_room_map[room_id] = room_for_user.event_pos.stream return sorted( diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index f38b607f689..89c823fdc5b 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -1189,7 +1189,8 @@ def test_sort_activity_basic(self) -> None: ) def test_activity_after_xxx(self, room1_membership: Membership) -> None: """ - When someone is invited to a room, they shouldn't take anything into account after the invite. + When someone has left/been invited/knocked/been banned from a room, they + shouldn't take anything into account after that membership event. """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") From 8935c6c81a762cf2bb38acd6495e6da9bc06e453 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 12 Jun 2024 18:29:39 -0500 Subject: [PATCH 21/27] Fix lints --- synapse/handlers/sliding_sync.py | 2 +- tests/handlers/test_sliding_sync.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index c216467c296..f5356cdd892 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -208,7 +208,7 @@ async def current_sync_for_user( range=range, room_ids=[ room_id - for room_id, rooms_for_user in sorted_room_info.items()[ + for room_id, _ in sorted_room_info[ range[0] : range[1] ] ], diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 89c823fdc5b..a3817959710 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -1187,7 +1187,7 @@ def test_sort_activity_basic(self) -> None: (Membership.BAN,), ] ) - def test_activity_after_xxx(self, room1_membership: Membership) -> None: + def test_activity_after_xxx(self, room1_membership: str) -> None: """ When someone has left/been invited/knocked/been banned from a room, they shouldn't take anything into account after that membership event. From 35808b38db7e89a9be341353b19cf14407e379a6 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 13 Jun 2024 14:53:23 -0500 Subject: [PATCH 22/27] Fix filtering --- synapse/handlers/sliding_sync.py | 28 ++++++++++++++++++---------- tests/handlers/test_sliding_sync.py | 23 +++++++++++++++++------ 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index ff0d3ba4f31..67471ac362a 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -18,7 +18,7 @@ # # import logging -from typing import TYPE_CHECKING, AbstractSet, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple from immutabledict import immutabledict @@ -210,14 +210,16 @@ async def current_sync_for_user( for list_key, list_config in sync_config.lists.items(): # Apply filters - filtered_room_ids = room_id_set + filtered_sync_room_map = sync_room_map if list_config.filters is not None: - filtered_room_ids = await self.filter_rooms( - sync_config.user, room_id_set, list_config.filters, to_token + filtered_sync_room_map = await self.filter_rooms( + sync_config.user, sync_room_map, list_config.filters, to_token ) # TODO: Apply sorts - sorted_room_info = await self.sort_rooms(filtered_room_map, to_token) + sorted_room_info = await self.sort_rooms( + filtered_sync_room_map, to_token + ) ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] if list_config.ranges: @@ -509,18 +511,23 @@ async def get_sync_room_ids_for_user( async def filter_rooms( self, user: UserID, - room_id_set: AbstractSet[str], + sync_room_map: Dict[str, RoomsForUser], filters: SlidingSyncConfig.SlidingSyncList.Filters, to_token: StreamToken, - ) -> AbstractSet[str]: + ) -> Dict[str, RoomsForUser]: """ Filter rooms based on the sync request. Args: user: User to filter rooms for - room_id_set: Set of room IDs to filter down + sync_room_map: Dictionary of room IDs to sort along with membership + information in the room at the time of `to_token`. filters: Filters to apply to_token: We filter based on the state of the room at this token + + Returns: + A filtered dictionary of room IDs along with membership information in the + room at the time of `to_token`. """ user_id = user.to_string() @@ -529,7 +536,7 @@ async def filter_rooms( # TODO: Exclude partially stated rooms unless the `required_state` has # `["m.room.member", "$LAZY"]` - filtered_room_id_set = set(room_id_set) + filtered_room_id_set = set(sync_room_map.keys()) # Filter for Direct-Message (DM) rooms if filters.is_dm is not None: @@ -585,7 +592,8 @@ async def filter_rooms( if filters.not_tags: raise NotImplementedError() - return filtered_room_id_set + # Assemble a new sync room map but only with the `filtered_room_id_set` + return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set} async def sort_rooms( self, diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 755086e43d9..f6b6d579ae7 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -1216,11 +1216,20 @@ def test_filter_dm_rooms(self) -> None: after_rooms_token = self.event_sources.get_current_token() + # Get the rooms the user should be syncing with + sync_room_map = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=None, + to_token=after_rooms_token, + ) + ) + # Try with `is_dm=True` - truthy_filtered_room_ids = self.get_success( + truthy_filtered_room_map = self.get_success( self.sliding_sync_handler.filter_rooms( UserID.from_string(user1_id), - {room_id, dm_room_id}, + sync_room_map, SlidingSyncConfig.SlidingSyncList.Filters( is_dm=True, ), @@ -1228,13 +1237,13 @@ def test_filter_dm_rooms(self) -> None: ) ) - self.assertEqual(truthy_filtered_room_ids, {dm_room_id}) + self.assertEqual(truthy_filtered_room_map.keys(), {dm_room_id}) # Try with `is_dm=False` - falsy_filtered_room_ids = self.get_success( + falsy_filtered_room_map = self.get_success( self.sliding_sync_handler.filter_rooms( UserID.from_string(user1_id), - {room_id, dm_room_id}, + sync_room_map, SlidingSyncConfig.SlidingSyncList.Filters( is_dm=False, ), @@ -1242,7 +1251,7 @@ def test_filter_dm_rooms(self) -> None: ) ) - self.assertEqual(falsy_filtered_room_ids, {room_id}) + self.assertEqual(falsy_filtered_room_map.keys(), {room_id}) class SortRoomsTestCase(HomeserverTestCase): @@ -1287,6 +1296,7 @@ def test_sort_activity_basic(self) -> None: after_rooms_token = self.event_sources.get_current_token() + # Get the rooms the user should be syncing with sync_room_map = self.get_success( self.sliding_sync_handler.get_sync_room_ids_for_user( UserID.from_string(user1_id), @@ -1295,6 +1305,7 @@ def test_sort_activity_basic(self) -> None: ) ) + # Sort the rooms (what we're testing) sorted_room_info = self.get_success( self.sliding_sync_handler.sort_rooms( sync_room_map=sync_room_map, From a917edad8ce76fe363d60a984fad26f322562138 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 13 Jun 2024 15:47:33 -0500 Subject: [PATCH 23/27] No more sort option We always just sort by stream_ordering --- synapse/handlers/sliding_sync.py | 1 - synapse/types/rest/client/__init__.py | 9 +-------- tests/rest/client/test_sync.py | 4 ---- 3 files changed, 1 insertion(+), 13 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 67471ac362a..800f918198d 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -216,7 +216,6 @@ async def current_sync_for_user( sync_config.user, sync_room_map, list_config.filters, to_token ) - # TODO: Apply sorts sorted_room_info = await self.sort_rooms( filtered_sync_room_map, to_token ) diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py index 8da0df40779..e2c79c41064 100644 --- a/synapse/types/rest/client/__init__.py +++ b/synapse/types/rest/client/__init__.py @@ -176,7 +176,7 @@ class SlidingSyncList(CommonRoomParameters): is used and all rooms are returned in this list. Integers are *inclusive*. slow_get_all_rooms: Just get all rooms (for clients that don't want to deal with - sliding windows). When true, the `ranges` and `sort` fields are ignored. + sliding windows). When true, the `ranges` field is ignored. required_state: Required state for each room returned. An array of event type and state key tuples. Elements in this array are ORd together to produce the final set of state events to return. @@ -215,12 +215,6 @@ class SlidingSyncList(CommonRoomParameters): `user_id` and optionally `avatar_url` and `displayname`) for the users used to calculate the room name. filters: Filters to apply to the list before sorting. - bump_event_types: Allowlist of event types which should be considered recent activity - when sorting `by_recency`. By omitting event types from this field, - clients can ensure that uninteresting events (e.g. a profile rename) do - not cause a room to jump to the top of its list(s). Empty or omitted - `bump_event_types` have no effect—all events in a room will be - considered recent activity. """ class Filters(RequestBodyModel): @@ -289,7 +283,6 @@ class Filters(RequestBodyModel): slow_get_all_rooms: Optional[StrictBool] = False include_heroes: Optional[StrictBool] = False filters: Optional[Filters] = None - bump_event_types: Optional[List[StrictStr]] = None class RoomSubscription(CommonRoomParameters): pass diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 40870b2cfe7..bbc53b216d3 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -1299,7 +1299,6 @@ def test_sync_list(self) -> None: "lists": { "foo-list": { "ranges": [[0, 99]], - "sort": ["by_notification_level", "by_recency", "by_name"], "required_state": [ ["m.room.join_rules", ""], ["m.room.history_visibility", ""], @@ -1361,7 +1360,6 @@ def test_wait_for_sync_token(self) -> None: "lists": { "foo-list": { "ranges": [[0, 99]], - "sort": ["by_notification_level", "by_recency", "by_name"], "required_state": [ ["m.room.join_rules", ""], ["m.room.history_visibility", ""], @@ -1415,14 +1413,12 @@ def test_filter_list(self) -> None: "lists": { "dms": { "ranges": [[0, 99]], - "sort": ["by_recency"], "required_state": [], "timeline_limit": 1, "filters": {"is_dm": True}, }, "foo-list": { "ranges": [[0, 99]], - "sort": ["by_recency"], "required_state": [], "timeline_limit": 1, "filters": {"is_dm": False}, From 84eaeeafa8aab42bbdf26123e05bbb1f0466def7 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 13 Jun 2024 16:07:33 -0500 Subject: [PATCH 24/27] Add rest test --- tests/handlers/test_sliding_sync.py | 4 +- tests/rest/client/test_sync.py | 57 +++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index f6b6d579ae7..af48041f1f8 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -1341,6 +1341,7 @@ def test_activity_after_xxx(self, room1_membership: str) -> None: # Create the rooms as user2 so we can have user1 with a clean slate to work from # and join in whatever order we need for the tests. room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + # If we're testing knocks, set the room to knock if room1_membership == Membership.KNOCK: self.helper.send_state( room_id1, @@ -1352,6 +1353,7 @@ def test_activity_after_xxx(self, room1_membership: str) -> None: room_id3 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) # Here is the activity with user1 that will determine the sort of the rooms + # (room2, room1, room3) self.helper.join(room_id3, user1_id, tok=user1_tok) if room1_membership == Membership.LEAVE: self.helper.join(room_id1, user1_id, tok=user1_tok) @@ -1364,7 +1366,7 @@ def test_activity_after_xxx(self, room1_membership: str) -> None: self.helper.ban(room_id1, src=user2_id, targ=user1_id, tok=user2_tok) self.helper.join(room_id2, user1_id, tok=user1_tok) - # Activity before the token but the user is only invited to this room so it + # Activity before the token but the user is only been xxx to this room so it # shouldn't be taken into account self.helper.send(room_id1, "activity in room1", tok=user2_tok) diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index bbc53b216d3..2b06767b8a8 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -1459,3 +1459,60 @@ def test_filter_list(self) -> None: ], list(channel.json_body["lists"]["foo-list"]), ) + + def test_sort_list(self) -> None: + """ + Test that the lists are sorted by `stream_ordering` + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + + # Activity that will order the rooms + self.helper.send(room_id3, "activity in room3", tok=user1_tok) + self.helper.send(room_id1, "activity in room1", tok=user1_tok) + self.helper.send(room_id2, "activity in room2", tok=user1_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 99]], + "required_state": [ + ["m.room.join_rules", ""], + ["m.room.history_visibility", ""], + ["m.space.child", "*"], + ], + "timeline_limit": 1, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # Make sure it has the foo-list we requested + self.assertListEqual( + list(channel.json_body["lists"].keys()), + ["foo-list"], + channel.json_body["lists"].keys(), + ) + + # Make sure the list is sorted in the way we expect + self.assertListEqual( + list(channel.json_body["lists"]["foo-list"]["ops"]), + [ + { + "op": "SYNC", + "range": [0, 99], + "room_ids": [room_id2, room_id1, room_id3], + } + ], + channel.json_body["lists"]["foo-list"], + ) From 99ed0128cf45f557bf8553c59a4485fb15691f81 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 13 Jun 2024 16:08:47 -0500 Subject: [PATCH 25/27] Update changelog --- changelog.d/17293.feature | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/17293.feature b/changelog.d/17293.feature index bd5c27a8a77..60ca7721a0e 100644 --- a/changelog.d/17293.feature +++ b/changelog.d/17293.feature @@ -1 +1 @@ -Add recency sort to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. +Add `stream_ordering` sort to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. From ef92f3cbf91dba8bd958736e20e05c67eb019fea Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 17 Jun 2024 10:46:18 -0500 Subject: [PATCH 26/27] Stable sort with just `stream_ordering` > FWIW `stream_ordering` is unique, so we don't need a tie break. See https://github.com/element-hq/synapse/pull/17293#discussion_r1639481062 --- synapse/handlers/sliding_sync.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 800f918198d..7c73be944ea 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -600,8 +600,8 @@ async def sort_rooms( to_token: StreamToken, ) -> List[Tuple[str, RoomsForUser]]: """ - Sort by stream_ordering of the last event in the room. In order to get - a stable sort, we tie-break by room ID. + Sort by `stream_ordering` of the last event that the user should see in the + room. `stream_ordering` is unique so we get a stable sort. Args: sync_room_map: Dictionary of room IDs to sort along with membership @@ -609,7 +609,7 @@ async def sort_rooms( to_token: We sort based on the events in the room at this token (<= `to_token`) Returns: - A sorted list of room IDs by stream_ordering along with membership information. + A sorted list of room IDs by `stream_ordering` along with membership information. """ # Assemble a map of room ID to the `stream_ordering` of the last activity that the @@ -640,11 +640,8 @@ async def sort_rooms( return sorted( sync_room_map.items(), - # Sort by the last activity (stream_ordering) in the room, tie-break on room_id - key=lambda room_info: ( - last_activity_in_room_map[room_info[0]], - room_info[0], - ), + # Sort by the last activity (stream_ordering) in the room + key=lambda room_info: last_activity_in_room_map[room_info[0]], # We want descending order reverse=True, ) From 63ff8f95cd9f94822cb8603426326185fd8afe22 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 17 Jun 2024 10:48:33 -0500 Subject: [PATCH 27/27] Rename to `get_last_event_pos_in_room_before_stream_ordering(...)` See https://github.com/element-hq/synapse/pull/17293#discussion_r1639496753 --- synapse/handlers/sliding_sync.py | 2 +- synapse/storage/databases/main/stream.py | 14 ++++++++------ tests/storage/test_stream.py | 14 +++++++------- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 7c73be944ea..b84cf67f7d7 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -620,7 +620,7 @@ async def sort_rooms( # at/before the `to_token`. if room_for_user.membership == Membership.JOIN: last_event_result = ( - await self.store.get_last_event_in_room_before_stream_ordering( + await self.store.get_last_event_pos_in_room_before_stream_ordering( room_id, to_token.room_key ) ) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 31a387f37d1..ff0d723684d 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -910,8 +910,10 @@ async def get_last_event_id_in_room_before_stream_ordering( The ID of the most recent event, or None if there are no events in the room before this stream ordering. """ - last_event_result = await self.get_last_event_in_room_before_stream_ordering( - room_id, end_token + last_event_result = ( + await self.get_last_event_pos_in_room_before_stream_ordering( + room_id, end_token + ) ) if last_event_result: @@ -919,7 +921,7 @@ async def get_last_event_id_in_room_before_stream_ordering( return None - async def get_last_event_in_room_before_stream_ordering( + async def get_last_event_pos_in_room_before_stream_ordering( self, room_id: str, end_token: RoomStreamToken, @@ -937,7 +939,7 @@ async def get_last_event_in_room_before_stream_ordering( events in the room before this stream ordering. """ - def get_last_event_in_room_before_stream_ordering_txn( + def get_last_event_pos_in_room_before_stream_ordering_txn( txn: LoggingTransaction, ) -> Optional[Tuple[str, PersistedEventPosition]]: # We're looking for the closest event at or before the token. We need to @@ -1008,8 +1010,8 @@ def get_last_event_in_room_before_stream_ordering_txn( return None return await self.db_pool.runInteraction( - "get_last_event_in_room_before_stream_ordering", - get_last_event_in_room_before_stream_ordering_txn, + "get_last_event_pos_in_room_before_stream_ordering", + get_last_event_pos_in_room_before_stream_ordering_txn, ) async def get_current_room_stream_token_for_room_id( diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index 36bfbd81977..fe1e873e154 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -277,7 +277,7 @@ def test_filter_not_rel_types(self) -> None: class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase): """ - Test `get_last_event_in_room_before_stream_ordering(...)` + Test `get_last_event_pos_in_room_before_stream_ordering(...)` """ servlets = [ @@ -337,7 +337,7 @@ def test_before_room_created(self) -> None: room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) last_event_result = self.get_success( - self.store.get_last_event_in_room_before_stream_ordering( + self.store.get_last_event_pos_in_room_before_stream_ordering( room_id=room_id, end_token=before_room_token.room_key, ) @@ -357,7 +357,7 @@ def test_after_room_created(self) -> None: after_room_token = self.event_sources.get_current_token() last_event_result = self.get_success( - self.store.get_last_event_in_room_before_stream_ordering( + self.store.get_last_event_pos_in_room_before_stream_ordering( room_id=room_id, end_token=after_room_token.room_key, ) @@ -383,7 +383,7 @@ def test_activity_in_other_rooms(self) -> None: after_room_token = self.event_sources.get_current_token() last_event_result = self.get_success( - self.store.get_last_event_in_room_before_stream_ordering( + self.store.get_last_event_pos_in_room_before_stream_ordering( room_id=room_id1, end_token=after_room_token.room_key, ) @@ -413,7 +413,7 @@ def test_activity_after_token_has_no_effect(self) -> None: self.helper.send(room_id1, "after2", tok=user1_tok) last_event_result = self.get_success( - self.store.get_last_event_in_room_before_stream_ordering( + self.store.get_last_event_pos_in_room_before_stream_ordering( room_id=room_id1, end_token=after_room_token.room_key, ) @@ -464,7 +464,7 @@ def test_last_event_within_sharded_token(self) -> None: self.helper.send(room_id1, "after2", tok=user1_tok) last_event_result = self.get_success( - self.store.get_last_event_in_room_before_stream_ordering( + self.store.get_last_event_pos_in_room_before_stream_ordering( room_id=room_id1, end_token=end_token, ) @@ -523,7 +523,7 @@ def test_last_event_before_sharded_token(self) -> None: self.helper.send(room_id1, "after2", tok=user1_tok) last_event_result = self.get_success( - self.store.get_last_event_in_room_before_stream_ordering( + self.store.get_last_event_pos_in_room_before_stream_ordering( room_id=room_id1, end_token=end_token, )