Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Support pagination tokens from sync/messages in the relations API #11952

Merged
merged 9 commits into from
Feb 10, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions synapse/rest/client/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@

async def _parse_token(
store: "DataStore", token: Optional[str]
) -> Optional[RoomStreamToken]:
) -> Optional[StreamToken]:
"""
For backwards compatibility support RelationPaginationToken, but new pagination
tokens are generated as full StreamTokens, to be compatible with /sync and /messages.
Expand All @@ -55,10 +55,10 @@ async def _parse_token(
# RoomStreamToken (as part of StreamToken), but RelationPaginationToken uses
# "-" only for separators.
if "_" in token:
stream_token = await StreamToken.from_string(store, token)
return await StreamToken.from_string(store, token)
else:
relation_token = RelationPaginationToken.from_string(token)
stream_token = StreamToken(
return StreamToken(
room_key=RoomStreamToken(relation_token.topological, relation_token.stream),
presence_key=0,
typing_key=0,
Expand All @@ -69,7 +69,6 @@ async def _parse_token(
device_list_key=0,
groups_key=0,
)
return stream_token.room_key


class RelationPaginationServlet(RestServlet):
Expand Down
45 changes: 26 additions & 19 deletions synapse/storage/databases/main/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,8 @@
)
from synapse.storage.databases.main.stream import generate_pagination_where_clause
from synapse.storage.engines import PostgresEngine
from synapse.storage.relations import (
AggregationPaginationToken,
PaginationChunk,
RelationPaginationToken,
)
from synapse.types import JsonDict, RoomStreamToken
from synapse.storage.relations import AggregationPaginationToken, PaginationChunk
from synapse.types import JsonDict, RoomStreamToken, StreamToken
from synapse.util.caches.descriptors import cached, cachedList

if TYPE_CHECKING:
Expand Down Expand Up @@ -99,8 +95,8 @@ async def get_relations_for_event(
aggregation_key: Optional[str] = None,
limit: int = 5,
direction: str = "b",
from_token: Optional[RoomStreamToken] = None,
to_token: Optional[RoomStreamToken] = None,
from_token: Optional[StreamToken] = None,
to_token: Optional[StreamToken] = None,
) -> PaginationChunk:
"""Get a list of relations for an event, ordered by topological ordering.

Expand Down Expand Up @@ -139,8 +135,10 @@ async def get_relations_for_event(
pagination_clause = generate_pagination_where_clause(
direction=direction,
column_names=("topological_ordering", "stream_ordering"),
from_token=from_token.as_historical_tuple() if from_token else None,
to_token=to_token.as_historical_tuple() if to_token else None,
from_token=from_token.room_key.as_historical_tuple()
if from_token
else None,
to_token=to_token.room_key.as_historical_tuple() if to_token else None,
engine=self.database_engine,
)

Expand Down Expand Up @@ -178,18 +176,27 @@ def _get_recent_references_for_event_txn(
last_topo_id = row[1]
last_stream_id = row[2]

next_batch = None
# If there are more events, generate the next pagination key.
next_token = None
if len(events) > limit and last_topo_id and last_stream_id:
next_batch = RelationPaginationToken(last_topo_id, last_stream_id)
prev_batch = None
if from_token:
assert from_token.topological is not None
prev_batch = RelationPaginationToken(
from_token.topological, from_token.stream
)
next_key = RoomStreamToken(last_topo_id, last_stream_id)
if from_token:
next_token = from_token.copy_and_replace("room_key", next_key)
else:
next_token = StreamToken(
room_key=next_key,
presence_key=0,
typing_key=0,
receipt_key=0,
account_data_key=0,
push_rules_key=0,
to_device_key=0,
device_list_key=0,
groups_key=0,
)

return PaginationChunk(
chunk=list(events[:limit]), next_batch=next_batch, prev_batch=prev_batch
chunk=list(events[:limit]), next_batch=next_token, prev_batch=from_token
)

return await self.db_pool.runInteraction(
Expand Down
54 changes: 54 additions & 0 deletions tests/rest/client/test_relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,60 @@ def test_repeated_paginate_relations(self):
found_event_ids.reverse()
self.assertEquals(found_event_ids, expected_event_ids)

def test_pagination_from_sync_messages(self):
clokep marked this conversation as resolved.
Show resolved Hide resolved
"""Pagination tokens from sync and messages should work."""
clokep marked this conversation as resolved.
Show resolved Hide resolved
channel = self._send_relation(RelationTypes.ANNOTATION, "m.reaction", "A")
self.assertEquals(200, channel.code, channel.json_body)
annotation_id = channel.json_body["event_id"]
# Send an event after the relation events.
self.helper.send(self.room, body="Latest event", tok=self.user_token)

# Request sync, but limit the timeline to get a pagination token before
# the reaction.
filter = urllib.parse.quote_plus(
'{"room": {"timeline": {"limit": 1}}}'.encode()
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this 'syncing from the top' as it were, so a later sync would include the 'A' reaction?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean by "from the top", this returns the latest events, as a normal sync does.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is confusing to me, since it says the pagination token will be before the reaction; what am I missing?

channel = self.make_request(
"GET", f"/sync?filter={filter}", access_token=self.user_token
)
self.assertEquals(200, channel.code, channel.json_body)
room_timeline = channel.json_body["rooms"]["join"][self.room]["timeline"]
sync_prev_batch = room_timeline["prev_batch"]
self.assertIsNotNone(sync_prev_batch)
# Ensure the event above is not in the batch.
self.assertNotIn(
annotation_id, [ev["event_id"] for ev in room_timeline["events"]]
)
clokep marked this conversation as resolved.
Show resolved Hide resolved

channel = self.make_request(
clokep marked this conversation as resolved.
Show resolved Hide resolved
"GET",
f"/rooms/{self.room}/messages?dir=b&limit=1",
access_token=self.user_token,
)
self.assertEquals(200, channel.code, channel.json_body)
messages_end = channel.json_body["end"]
self.assertIsNotNone(messages_end)
# Ensure the event above is not in the chunk.
clokep marked this conversation as resolved.
Show resolved Hide resolved
self.assertNotIn(
annotation_id, [ev["event_id"] for ev in channel.json_body["chunk"]]
)

# Request the relations with the token from sync/messages -- this is a
# tiny bit silly since how do we know the parent ID? Just assume we had
# it from an earlier sync and the above are gappy.
clokep marked this conversation as resolved.
Show resolved Hide resolved
for from_token in (sync_prev_batch, messages_end):
channel = self.make_request(
"GET",
f"/_matrix/client/unstable/rooms/{self.room}/relations/{self.parent_id}?from={from_token}",
access_token=self.user_token,
)
self.assertEquals(200, channel.code, channel.json_body)

# The relation should be in the above.
self.assertIn(
annotation_id, [ev["event_id"] for ev in channel.json_body["chunk"]]
)

def test_aggregation_pagination_groups(self):
"""Test that we can paginate annotation groups correctly."""

Expand Down