From aa5384012b4759c09b92022c0a3bc7dd1a0159b4 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 15 Dec 2021 13:18:59 -0500 Subject: [PATCH 1/4] Add helpers for fetching the bundled aggregations for events. --- synapse/storage/databases/main/relations.py | 128 +++++++++++++++++++- 1 file changed, 125 insertions(+), 3 deletions(-) diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 4ff6aed253a3..c6c4bd18da3e 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -13,14 +13,30 @@ # limitations under the License. import logging -from typing import List, Optional, Tuple, Union, cast +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Iterable, + List, + Optional, + Tuple, + Union, + cast, +) import attr +from frozendict import frozendict -from synapse.api.constants import RelationTypes +from synapse.api.constants import EventTypes, RelationTypes from synapse.events import EventBase from synapse.storage._base import SQLBaseStore -from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, + make_in_list_sql_clause, +) from synapse.storage.databases.main.stream import generate_pagination_where_clause from synapse.storage.relations import ( AggregationPaginationToken, @@ -29,10 +45,24 @@ ) from synapse.util.caches.descriptors import cached +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) class RelationsWorkerStore(SQLBaseStore): + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + self._msc1849_enabled = hs.config.experimental.msc1849_enabled + self._msc3440_enabled = hs.config.experimental.msc3440_enabled + @cached(tree=True) async def get_relations_for_event( self, @@ -515,6 +545,98 @@ def _get_if_user_has_annotated_event(txn: LoggingTransaction) -> bool: "get_if_user_has_annotated_event", _get_if_user_has_annotated_event ) + async def _get_bundled_aggregation_for_event( + self, event: EventBase + ) -> Optional[Dict[str, Any]]: + """Generate bundled aggregations for an event. + + Note that this does not use a cache, but depends on cached methods. + + Args: + event: The event to calculate bundled aggregations for. + + Returns: + The bundled aggregations for an event, if bundled aggregations are + enabled and the event can have bundled aggregations. + """ + # State events and redacted events do not get bundled aggregations. + if event.is_state() or event.internal_metadata.is_redacted(): + return None + + # Do not bundle aggregations for an event which represents an edit or an + # annotation. It does not make sense for them to have related events. + relates_to = event.content.get("m.relates_to") + if isinstance(relates_to, (dict, frozendict)): + relation_type = relates_to.get("rel_type") + if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE): + return None + + event_id = event.event_id + room_id = event.room_id + + # The bundled aggregations to include, a mapping of relation type to a + # type-specific value. Some types include the direct return type here + # while others need more processing during serialization. + aggregations: Dict[str, Any] = {} + + annotations = await self.get_aggregation_groups_for_event(event_id, room_id) + if annotations.chunk: + aggregations[RelationTypes.ANNOTATION] = annotations.to_dict() + + references = await self.get_relations_for_event( + event_id, room_id, RelationTypes.REFERENCE, direction="f" + ) + if references.chunk: + aggregations[RelationTypes.REFERENCE] = references.to_dict() + + edit = None + if event.type == EventTypes.Message: + edit = await self.get_applicable_edit(event_id, room_id) + + if edit: + aggregations[RelationTypes.REPLACE] = edit + + # If this event is the start of a thread, include a summary of the replies. + if self._msc3440_enabled: + ( + thread_count, + latest_thread_event, + ) = await self.get_thread_summary(event_id, room_id) + if latest_thread_event: + aggregations[RelationTypes.THREAD] = { + # Don't bundle aggregations as this could recurse forever. + "latest_event": latest_thread_event, + "count": thread_count, + } + + # Store the bundled aggregations in the event metadata for later use. + return aggregations + + async def get_bundled_aggregations( + self, events: Iterable[EventBase] + ) -> Dict[str, Dict[str, Any]]: + """Generate bundled aggregations for events. + + Args: + events: The iterable of events to calculate bundled aggregations for. + + Returns: + A map of event ID to the bundled aggregation for the event. Not all + events may have bundled aggregations in the results. + """ + # If bundled aggregations are disabled, nothing to do. + if not self._msc1849_enabled: + return {} + + # TODO Parallelize. + results = {} + for event in events: + event_result = await self._get_bundled_aggregation_for_event(event) + if event_result is not None: + results[event.event_id] = event_result + + return results + class RelationsStore(RelationsWorkerStore): pass From 78bba4bc1560403530283e43d9ea49407a4a8bbf Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 15 Dec 2021 14:08:04 -0500 Subject: [PATCH 2/4] Pre-calculate and pass in bundles to serialize events. --- synapse/events/utils.py | 115 +++++++++---------------------- synapse/handlers/pagination.py | 4 +- synapse/handlers/room.py | 10 +++ synapse/rest/admin/rooms.py | 6 +- synapse/rest/client/relations.py | 5 +- synapse/rest/client/room.py | 18 +++-- synapse/rest/client/sync.py | 32 +++++---- synapse/server.py | 2 +- 8 files changed, 85 insertions(+), 107 deletions(-) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 2038e72924a9..dbe62230d024 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -14,17 +14,7 @@ # limitations under the License. import collections.abc import re -from typing import ( - TYPE_CHECKING, - Any, - Callable, - Dict, - Iterable, - List, - Mapping, - Optional, - Union, -) +from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Union from frozendict import frozendict @@ -37,9 +27,6 @@ from . import EventBase -if TYPE_CHECKING: - from synapse.server import HomeServer - # Split strings on "." but not "\." This uses a negative lookbehind assertion for '\' # (? JsonDict: """Serializes a single event. @@ -418,66 +400,41 @@ async def serialize_event( serialized_event = serialize_event(event, time_now, **kwargs) # Check if there are any bundled aggregations to include with the event. - # - # Do not bundle aggregations if any of the following at true: - # - # * Support is disabled via the configuration or the caller. - # * The event is a state event. - # * The event has been redacted. - if ( - self._msc1849_enabled - and bundle_aggregations - and not event.is_state() - and not event.internal_metadata.is_redacted() - ): - await self._injected_bundled_aggregations(event, time_now, serialized_event) + if bundle_aggregations: + event_aggregations = bundle_aggregations.get(event.event_id) + if event_aggregations: + await self._injected_bundled_aggregations( + event, + time_now, + bundle_aggregations[event.event_id], + serialized_event, + ) return serialized_event async def _injected_bundled_aggregations( - self, event: EventBase, time_now: int, serialized_event: JsonDict + self, + event: EventBase, + time_now: int, + aggregations: JsonDict, + serialized_event: JsonDict, ) -> None: """Potentially injects bundled aggregations into the unsigned portion of the serialized event. Args: event: The event being serialized. time_now: The current time in milliseconds + aggregations: The bundled aggregation to serialize. serialized_event: The serialized event which may be modified. """ - # Do not bundle aggregations for an event which represents an edit or an - # annotation. It does not make sense for them to have related events. - relates_to = event.content.get("m.relates_to") - if isinstance(relates_to, (dict, frozendict)): - relation_type = relates_to.get("rel_type") - if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE): - return - - event_id = event.event_id - room_id = event.room_id - - # The bundled aggregations to include. - aggregations = {} - - annotations = await self.store.get_aggregation_groups_for_event( - event_id, room_id - ) - if annotations.chunk: - aggregations[RelationTypes.ANNOTATION] = annotations.to_dict() - - references = await self.store.get_relations_for_event( - event_id, room_id, RelationTypes.REFERENCE, direction="f" - ) - if references.chunk: - aggregations[RelationTypes.REFERENCE] = references.to_dict() + # Make a copy in-case the object is cached. + aggregations = aggregations.copy() - edit = None - if event.type == EventTypes.Message: - edit = await self.store.get_applicable_edit(event_id, room_id) - - if edit: + if RelationTypes.REPLACE in aggregations: # If there is an edit replace the content, preserving existing # relations. + edit = aggregations[RelationTypes.REPLACE] # Ensure we take copies of the edit content, otherwise we risk modifying # the original event. @@ -502,26 +459,20 @@ async def _injected_bundled_aggregations( } # If this event is the start of a thread, include a summary of the replies. - if self._msc3440_enabled: - ( - thread_count, - latest_thread_event, - ) = await self.store.get_thread_summary(event_id, room_id) - if latest_thread_event: - aggregations[RelationTypes.THREAD] = { - # Don't bundle aggregations as this could recurse forever. - "latest_event": await self.serialize_event( - latest_thread_event, time_now, bundle_aggregations=False - ), - "count": thread_count, - } - - # If any bundled aggregations were found, include them. - if aggregations: - serialized_event["unsigned"].setdefault("m.relations", {}).update( - aggregations + if RelationTypes.THREAD in aggregations: + # Serialize the latest thread event. + latest_thread_event = aggregations[RelationTypes.THREAD]["latest_event"] + + # Don't bundle aggregations as this could recurse forever. + aggregations[RelationTypes.THREAD][ + "latest_event" + ] = await self.serialize_event( + latest_thread_event, time_now, bundle_aggregations=None ) + # Include the bundled aggregations in the event. + serialized_event["unsigned"].setdefault("m.relations", {}).update(aggregations) + async def serialize_events( self, events: Iterable[Union[JsonDict, EventBase]], time_now: int, **kwargs: Any ) -> List[JsonDict]: diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 7469cc55a251..b34ddd67d149 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -537,6 +537,8 @@ async def get_messages( state_dict = await self.store.get_events(list(state_ids.values())) state = state_dict.values() + aggregations = await self.store.get_bundled_aggregations(events) + time_now = self.clock.time_msec() chunk = { @@ -544,7 +546,7 @@ async def get_messages( await self._event_serializer.serialize_events( events, time_now, - bundle_aggregations=True, + bundle_aggregations=aggregations, as_client_event=as_client_event, ) ), diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index b9c1cbffa5c5..0e86ea9c5f83 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1177,6 +1177,16 @@ async def filter_evts(events: List[EventBase]) -> List[EventBase]: # `filtered` rather than the event we retrieved from the datastore. results["event"] = filtered[0] + # Fetch the aggregations. + aggregations = await self.store.get_bundled_aggregations([results["event"]]) + aggregations.update( + await self.store.get_bundled_aggregations(results["events_before"]) + ) + aggregations.update( + await self.store.get_bundled_aggregations(results["events_after"]) + ) + results["aggregations"] = aggregations + if results["events_after"]: last_event_id = results["events_after"][-1].event_id else: diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 6030373ebcbb..fcb913ae6d49 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -747,17 +747,17 @@ async def on_GET( results["events_before"] = await self._event_serializer.serialize_events( results["events_before"], time_now, - bundle_aggregations=True, + bundle_aggregations=results["aggregations"], ) results["event"] = await self._event_serializer.serialize_event( results["event"], time_now, - bundle_aggregations=True, + bundle_aggregations=results["aggregations"], ) results["events_after"] = await self._event_serializer.serialize_events( results["events_after"], time_now, - bundle_aggregations=True, + bundle_aggregations=results["aggregations"], ) results["state"] = await self._event_serializer.serialize_events( results["state"], time_now diff --git a/synapse/rest/client/relations.py b/synapse/rest/client/relations.py index 5815650ee6cb..bf8ddb50fb97 100644 --- a/synapse/rest/client/relations.py +++ b/synapse/rest/client/relations.py @@ -228,12 +228,13 @@ async def on_GET( # Do not bundle aggregations when retrieving the original event because # we want the content before relations are applied to it. original_event = await self._event_serializer.serialize_event( - event, now, bundle_aggregations=False + event, now, bundle_aggregations=None ) # The relations returned for the requested event do include their # bundled aggregations. + aggregations = await self.store.get_bundled_aggregations(events) serialized_events = await self._event_serializer.serialize_events( - events, now, bundle_aggregations=True + events, now, bundle_aggregations=aggregations ) return_value = pagination_chunk.to_dict() diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 40330749e546..09d8070cadc8 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -642,6 +642,7 @@ class RoomEventServlet(RestServlet): def __init__(self, hs: "HomeServer"): super().__init__() self.clock = hs.get_clock() + self._store = hs.get_datastore() self.event_handler = hs.get_event_handler() self._event_serializer = hs.get_event_client_serializer() self.auth = hs.get_auth() @@ -660,10 +661,13 @@ async def on_GET( # https://matrix.org/docs/spec/client_server/r0.5.0#get-matrix-client-r0-rooms-roomid-event-eventid raise SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND) - time_now = self.clock.time_msec() if event: + # Ensure there are bundled aggregations available. + aggregations = await self._store.get_bundled_aggregations([event]) + + time_now = self.clock.time_msec() event_dict = await self._event_serializer.serialize_event( - event, time_now, bundle_aggregations=True + event, time_now, bundle_aggregations=aggregations ) return 200, event_dict @@ -709,13 +713,17 @@ async def on_GET( time_now = self.clock.time_msec() results["events_before"] = await self._event_serializer.serialize_events( - results["events_before"], time_now, bundle_aggregations=True + results["events_before"], + time_now, + bundle_aggregations=results["aggregations"], ) results["event"] = await self._event_serializer.serialize_event( - results["event"], time_now, bundle_aggregations=True + results["event"], time_now, bundle_aggregations=results["aggregations"] ) results["events_after"] = await self._event_serializer.serialize_events( - results["events_after"], time_now, bundle_aggregations=True + results["events_after"], + time_now, + bundle_aggregations=results["aggregations"], ) results["state"] = await self._event_serializer.serialize_events( results["state"], time_now diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index e99a943d0d4b..e174b6f8e90a 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -525,21 +525,14 @@ async def encode_room( The room, encoded in our response format """ - def serialize(events: Iterable[EventBase]) -> Awaitable[List[JsonDict]]: + def serialize( + events: Iterable[EventBase], + aggregations: Optional[Dict[str, Dict[str, Any]]] = None, + ) -> Awaitable[List[JsonDict]]: return self._event_serializer.serialize_events( events, time_now=time_now, - # Don't bother to bundle aggregations if the timeline is unlimited, - # as clients will have all the necessary information. - # bundle_aggregations=room.timeline.limited, - # - # richvdh 2021-12-15: disable this temporarily as it has too high an - # overhead for initialsyncs. We need to figure out a way that the - # bundling can be done *before* the events are stored in the - # SyncResponseCache so that this part can be synchronous. - # - # Ensure to re-enable the test at tests/rest/client/test_relations.py::RelationsTestCase.test_bundled_aggregations. - bundle_aggregations=False, + bundle_aggregations=aggregations, token_id=token_id, event_format=event_formatter, only_event_fields=only_fields, @@ -562,7 +555,20 @@ def serialize(events: Iterable[EventBase]) -> Awaitable[List[JsonDict]]: ) serialized_state = await serialize(state_events) - serialized_timeline = await serialize(timeline_events) + # Don't bother to bundle aggregations if the timeline is unlimited, + # as clients will have all the necessary information. + # bundle_aggregations=room.timeline.limited, + # + # richvdh 2021-12-15: disable this temporarily as it has too high an + # overhead for initialsyncs. We need to figure out a way that the + # bundling can be done *before* the events are stored in the + # SyncResponseCache so that this part can be synchronous. + # + # Ensure to re-enable the test at tests/rest/client/test_relations.py::RelationsTestCase.test_bundled_aggregations. + # if room.timeline.limited: + # aggregations = await self.store.get_bundled_aggregations(timeline_events) + aggregations = None + serialized_timeline = await serialize(timeline_events, aggregations) account_data = room.account_data diff --git a/synapse/server.py b/synapse/server.py index 185e40e4da0f..3032f0b738a8 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -759,7 +759,7 @@ def get_oidc_handler(self) -> "OidcHandler": @cache_in_self def get_event_client_serializer(self) -> EventClientSerializer: - return EventClientSerializer(self) + return EventClientSerializer() @cache_in_self def get_password_policy_handler(self) -> PasswordPolicyHandler: From 9870e0832e5faeabd12d77fdefe4c746025acdc3 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 15 Dec 2021 14:27:12 -0500 Subject: [PATCH 3/4] Event serialization is no longer async. --- synapse/events/utils.py | 19 ++++++++----------- synapse/handlers/events.py | 2 +- synapse/handlers/initial_sync.py | 16 +++++++--------- synapse/handlers/message.py | 2 +- synapse/handlers/pagination.py | 4 ++-- synapse/handlers/search.py | 10 ++++------ synapse/rest/admin/rooms.py | 10 +++++----- synapse/rest/client/events.py | 2 +- synapse/rest/client/notifications.py | 2 +- synapse/rest/client/relations.py | 6 +++--- synapse/rest/client/room.py | 10 +++++----- synapse/rest/client/sync.py | 11 +++++------ tests/rest/client/test_retention.py | 2 +- 13 files changed, 44 insertions(+), 52 deletions(-) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index dbe62230d024..de0e0c17312a 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -22,7 +22,6 @@ from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import RoomVersion from synapse.types import JsonDict -from synapse.util.async_helpers import yieldable_gather_results from synapse.util.frozenutils import unfreeze from . import EventBase @@ -372,7 +371,7 @@ class EventClientSerializer: clients. """ - async def serialize_event( + def serialize_event( self, event: Union[JsonDict, EventBase], time_now: int, @@ -403,7 +402,7 @@ async def serialize_event( if bundle_aggregations: event_aggregations = bundle_aggregations.get(event.event_id) if event_aggregations: - await self._injected_bundled_aggregations( + self._injected_bundled_aggregations( event, time_now, bundle_aggregations[event.event_id], @@ -412,7 +411,7 @@ async def serialize_event( return serialized_event - async def _injected_bundled_aggregations( + def _injected_bundled_aggregations( self, event: EventBase, time_now: int, @@ -464,16 +463,14 @@ async def _injected_bundled_aggregations( latest_thread_event = aggregations[RelationTypes.THREAD]["latest_event"] # Don't bundle aggregations as this could recurse forever. - aggregations[RelationTypes.THREAD][ - "latest_event" - ] = await self.serialize_event( + aggregations[RelationTypes.THREAD]["latest_event"] = self.serialize_event( latest_thread_event, time_now, bundle_aggregations=None ) # Include the bundled aggregations in the event. serialized_event["unsigned"].setdefault("m.relations", {}).update(aggregations) - async def serialize_events( + def serialize_events( self, events: Iterable[Union[JsonDict, EventBase]], time_now: int, **kwargs: Any ) -> List[JsonDict]: """Serializes multiple events. @@ -486,9 +483,9 @@ async def serialize_events( Returns: The list of serialized events """ - return await yieldable_gather_results( - self.serialize_event, events, time_now=time_now, **kwargs - ) + return [ + self.serialize_event(event, time_now=time_now, **kwargs) for event in events + ] def copy_power_levels_contents( diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 1b996c420d20..a3add8a58679 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -119,7 +119,7 @@ async def get_stream( events.extend(to_add) - chunks = await self._event_serializer.serialize_events( + chunks = self._event_serializer.serialize_events( events, time_now, as_client_event=as_client_event, diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 601bab67f9cc..346a06ff49b7 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -170,7 +170,7 @@ async def handle_room(event: RoomsForUser) -> None: d["inviter"] = event.sender invite_event = await self.store.get_event(event.event_id) - d["invite"] = await self._event_serializer.serialize_event( + d["invite"] = self._event_serializer.serialize_event( invite_event, time_now, as_client_event=as_client_event, @@ -222,7 +222,7 @@ async def handle_room(event: RoomsForUser) -> None: d["messages"] = { "chunk": ( - await self._event_serializer.serialize_events( + self._event_serializer.serialize_events( messages, time_now=time_now, as_client_event=as_client_event, @@ -232,7 +232,7 @@ async def handle_room(event: RoomsForUser) -> None: "end": await end_token.to_string(self.store), } - d["state"] = await self._event_serializer.serialize_events( + d["state"] = self._event_serializer.serialize_events( current_state.values(), time_now=time_now, as_client_event=as_client_event, @@ -376,16 +376,14 @@ async def _room_initial_sync_parted( "messages": { "chunk": ( # Don't bundle aggregations as this is a deprecated API. - await self._event_serializer.serialize_events(messages, time_now) + self._event_serializer.serialize_events(messages, time_now) ), "start": await start_token.to_string(self.store), "end": await end_token.to_string(self.store), }, "state": ( # Don't bundle aggregations as this is a deprecated API. - await self._event_serializer.serialize_events( - room_state.values(), time_now - ) + self._event_serializer.serialize_events(room_state.values(), time_now) ), "presence": [], "receipts": [], @@ -404,7 +402,7 @@ async def _room_initial_sync_joined( # TODO: These concurrently time_now = self.clock.time_msec() # Don't bundle aggregations as this is a deprecated API. - state = await self._event_serializer.serialize_events( + state = self._event_serializer.serialize_events( current_state.values(), time_now ) @@ -480,7 +478,7 @@ async def get_receipts() -> List[JsonDict]: "messages": { "chunk": ( # Don't bundle aggregations as this is a deprecated API. - await self._event_serializer.serialize_events(messages, time_now) + self._event_serializer.serialize_events(messages, time_now) ), "start": await start_token.to_string(self.store), "end": await end_token.to_string(self.store), diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 5e3d3886eb1d..b37250aa3895 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -246,7 +246,7 @@ async def get_state_events( room_state = room_state_events[membership_event_id] now = self.clock.time_msec() - events = await self._event_serializer.serialize_events(room_state.values(), now) + events = self._event_serializer.serialize_events(room_state.values(), now) return events async def get_joined_members(self, requester: Requester, room_id: str) -> dict: diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index b34ddd67d149..472688f04506 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -543,7 +543,7 @@ async def get_messages( chunk = { "chunk": ( - await self._event_serializer.serialize_events( + self._event_serializer.serialize_events( events, time_now, bundle_aggregations=aggregations, @@ -555,7 +555,7 @@ async def get_messages( } if state: - chunk["state"] = await self._event_serializer.serialize_events( + chunk["state"] = self._event_serializer.serialize_events( state, time_now, as_client_event=as_client_event ) diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index ab7eaab2fb56..0b153a682261 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -420,10 +420,10 @@ async def search( time_now = self.clock.time_msec() for context in contexts.values(): - context["events_before"] = await self._event_serializer.serialize_events( + context["events_before"] = self._event_serializer.serialize_events( context["events_before"], time_now ) - context["events_after"] = await self._event_serializer.serialize_events( + context["events_after"] = self._event_serializer.serialize_events( context["events_after"], time_now ) @@ -441,9 +441,7 @@ async def search( results.append( { "rank": rank_map[e.event_id], - "result": ( - await self._event_serializer.serialize_event(e, time_now) - ), + "result": self._event_serializer.serialize_event(e, time_now), "context": contexts.get(e.event_id, {}), } ) @@ -457,7 +455,7 @@ async def search( if state_results: s = {} for room_id, state_events in state_results.items(): - s[room_id] = await self._event_serializer.serialize_events( + s[room_id] = self._event_serializer.serialize_events( state_events, time_now ) diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index fcb913ae6d49..2e714ac87bfe 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -424,7 +424,7 @@ async def on_GET( event_ids = await self.store.get_current_state_ids(room_id) events = await self.store.get_events(event_ids.values()) now = self.clock.time_msec() - room_state = await self._event_serializer.serialize_events(events.values(), now) + room_state = self._event_serializer.serialize_events(events.values(), now) ret = {"state": room_state} return HTTPStatus.OK, ret @@ -744,22 +744,22 @@ async def on_GET( ) time_now = self.clock.time_msec() - results["events_before"] = await self._event_serializer.serialize_events( + results["events_before"] = self._event_serializer.serialize_events( results["events_before"], time_now, bundle_aggregations=results["aggregations"], ) - results["event"] = await self._event_serializer.serialize_event( + results["event"] = self._event_serializer.serialize_event( results["event"], time_now, bundle_aggregations=results["aggregations"], ) - results["events_after"] = await self._event_serializer.serialize_events( + results["events_after"] = self._event_serializer.serialize_events( results["events_after"], time_now, bundle_aggregations=results["aggregations"], ) - results["state"] = await self._event_serializer.serialize_events( + results["state"] = self._event_serializer.serialize_events( results["state"], time_now ) diff --git a/synapse/rest/client/events.py b/synapse/rest/client/events.py index 13b72a045a4a..672c821061ff 100644 --- a/synapse/rest/client/events.py +++ b/synapse/rest/client/events.py @@ -91,7 +91,7 @@ async def on_GET( time_now = self.clock.time_msec() if event: - result = await self._event_serializer.serialize_event(event, time_now) + result = self._event_serializer.serialize_event(event, time_now) return 200, result else: return 404, "Event not found." diff --git a/synapse/rest/client/notifications.py b/synapse/rest/client/notifications.py index acd0c9e135d1..8e427a96a320 100644 --- a/synapse/rest/client/notifications.py +++ b/synapse/rest/client/notifications.py @@ -72,7 +72,7 @@ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: "actions": pa.actions, "ts": pa.received_ts, "event": ( - await self._event_serializer.serialize_event( + self._event_serializer.serialize_event( notif_events[pa.event_id], self.clock.time_msec(), event_format=format_event_for_client_v2_without_room_id, diff --git a/synapse/rest/client/relations.py b/synapse/rest/client/relations.py index bf8ddb50fb97..d7b353f9701f 100644 --- a/synapse/rest/client/relations.py +++ b/synapse/rest/client/relations.py @@ -227,13 +227,13 @@ async def on_GET( now = self.clock.time_msec() # Do not bundle aggregations when retrieving the original event because # we want the content before relations are applied to it. - original_event = await self._event_serializer.serialize_event( + original_event = self._event_serializer.serialize_event( event, now, bundle_aggregations=None ) # The relations returned for the requested event do include their # bundled aggregations. aggregations = await self.store.get_bundled_aggregations(events) - serialized_events = await self._event_serializer.serialize_events( + serialized_events = self._event_serializer.serialize_events( events, now, bundle_aggregations=aggregations ) @@ -423,7 +423,7 @@ async def on_GET( ) now = self.clock.time_msec() - serialized_events = await self._event_serializer.serialize_events(events, now) + serialized_events = self._event_serializer.serialize_events(events, now) return_value = result.to_dict() return_value["chunk"] = serialized_events diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 09d8070cadc8..da6014900a9c 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -666,7 +666,7 @@ async def on_GET( aggregations = await self._store.get_bundled_aggregations([event]) time_now = self.clock.time_msec() - event_dict = await self._event_serializer.serialize_event( + event_dict = self._event_serializer.serialize_event( event, time_now, bundle_aggregations=aggregations ) return 200, event_dict @@ -712,20 +712,20 @@ async def on_GET( raise SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND) time_now = self.clock.time_msec() - results["events_before"] = await self._event_serializer.serialize_events( + results["events_before"] = self._event_serializer.serialize_events( results["events_before"], time_now, bundle_aggregations=results["aggregations"], ) - results["event"] = await self._event_serializer.serialize_event( + results["event"] = self._event_serializer.serialize_event( results["event"], time_now, bundle_aggregations=results["aggregations"] ) - results["events_after"] = await self._event_serializer.serialize_events( + results["events_after"] = self._event_serializer.serialize_events( results["events_after"], time_now, bundle_aggregations=results["aggregations"], ) - results["state"] = await self._event_serializer.serialize_events( + results["state"] = self._event_serializer.serialize_events( results["state"], time_now ) diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index e174b6f8e90a..a3e57e4b20ab 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -17,7 +17,6 @@ from typing import ( TYPE_CHECKING, Any, - Awaitable, Callable, Dict, Iterable, @@ -395,7 +394,7 @@ async def encode_invited( """ invited = {} for room in rooms: - invite = await self._event_serializer.serialize_event( + invite = self._event_serializer.serialize_event( room.invite, time_now, token_id=token_id, @@ -432,7 +431,7 @@ async def encode_knocked( """ knocked = {} for room in rooms: - knock = await self._event_serializer.serialize_event( + knock = self._event_serializer.serialize_event( room.knock, time_now, token_id=token_id, @@ -528,7 +527,7 @@ async def encode_room( def serialize( events: Iterable[EventBase], aggregations: Optional[Dict[str, Dict[str, Any]]] = None, - ) -> Awaitable[List[JsonDict]]: + ) -> List[JsonDict]: return self._event_serializer.serialize_events( events, time_now=time_now, @@ -554,7 +553,7 @@ def serialize( event.room_id, ) - serialized_state = await serialize(state_events) + serialized_state = serialize(state_events) # Don't bother to bundle aggregations if the timeline is unlimited, # as clients will have all the necessary information. # bundle_aggregations=room.timeline.limited, @@ -568,7 +567,7 @@ def serialize( # if room.timeline.limited: # aggregations = await self.store.get_bundled_aggregations(timeline_events) aggregations = None - serialized_timeline = await serialize(timeline_events, aggregations) + serialized_timeline = serialize(timeline_events, aggregations) account_data = room.account_data diff --git a/tests/rest/client/test_retention.py b/tests/rest/client/test_retention.py index b58452195a82..fe5b536d9705 100644 --- a/tests/rest/client/test_retention.py +++ b/tests/rest/client/test_retention.py @@ -228,7 +228,7 @@ def get_event(self, event_id, expect_none=False): self.assertIsNotNone(event) time_now = self.clock.time_msec() - serialized = self.get_success(self.serializer.serialize_event(event, time_now)) + serialized = self.serializer.serialize_event(event, time_now) return serialized From 2fa075c3dfafe43194f42c96d05f07aee2f89c21 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 20 Dec 2021 09:38:08 -0500 Subject: [PATCH 4/4] Newsfragment --- changelog.d/11612.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/11612.misc diff --git a/changelog.d/11612.misc b/changelog.d/11612.misc new file mode 100644 index 000000000000..2d886169c547 --- /dev/null +++ b/changelog.d/11612.misc @@ -0,0 +1 @@ +Avoid database access in the JSON serialization process.