Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Improve performance of getting unread counts in rooms (#13119)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston authored Jun 29, 2022
1 parent cdc0259 commit 92a0c18
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 4 deletions.
1 change: 1 addition & 0 deletions changelog.d/13119.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Reduce DB usage of `/sync` when a large number of unread messages have recently been sent in a room.
3 changes: 3 additions & 0 deletions synapse/_scripts/synapse_port_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ def get_reactor(self) -> ISynapseReactor:
def get_instance_name(self) -> str:
return "master"

def should_send_federation(self) -> bool:
return False


class Porter:
def __init__(
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ class DataStore(
RoomStore,
RoomBatchStore,
RegistrationStore,
StreamWorkerStore,
ProfileStore,
PresenceStore,
TransactionWorkerStore,
Expand All @@ -112,6 +111,7 @@ class DataStore(
SearchStore,
TagsStore,
AccountDataStore,
StreamWorkerStore,
OpenIdStore,
ClientIpWorkerStore,
DeviceStore,
Expand Down
16 changes: 13 additions & 3 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
LoggingDatabaseConnection,
LoggingTransaction,
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached

Expand Down Expand Up @@ -122,7 +122,7 @@ def _deserialize_action(actions: str, is_highlight: bool) -> List[Union[dict, st
return DEFAULT_NOTIF_ACTION


class EventPushActionsWorkerStore(ReceiptsWorkerStore, EventsWorkerStore, SQLBaseStore):
class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBaseStore):
def __init__(
self,
database: DatabasePool,
Expand Down Expand Up @@ -218,7 +218,7 @@ def _get_unread_counts_by_receipt_txn(
retcol="event_id",
)

stream_ordering = self.get_stream_id_for_event_txn(txn, event_id) # type: ignore[attr-defined]
stream_ordering = self.get_stream_id_for_event_txn(txn, event_id)

return self._get_unread_counts_by_pos_txn(
txn, room_id, user_id, stream_ordering
Expand Down Expand Up @@ -307,12 +307,22 @@ def _get_notif_unread_count_for_user_room(
actions that have been deleted from `event_push_actions` table.
"""

# If there have been no events in the room since the stream ordering,
# there can't be any push actions either.
if not self._events_stream_cache.has_entity_changed(room_id, stream_ordering):
return 0, 0

clause = ""
args = [user_id, room_id, stream_ordering]
if max_stream_ordering is not None:
clause = "AND ea.stream_ordering <= ?"
args.append(max_stream_ordering)

# If the max stream ordering is less than the min stream ordering,
# then obviously there are zero push actions in that range.
if max_stream_ordering <= stream_ordering:
return 0, 0

sql = f"""
SELECT
COUNT(CASE WHEN notif = 1 THEN 1 END),
Expand Down
20 changes: 20 additions & 0 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@
Set,
Tuple,
cast,
overload,
)

import attr
from frozendict import frozendict
from typing_extensions import Literal

from twisted.internet import defer

Expand Down Expand Up @@ -795,6 +797,24 @@ async def get_current_room_stream_token_for_room_id(
)
return RoomStreamToken(topo, stream_ordering)

@overload
def get_stream_id_for_event_txn(
self,
txn: LoggingTransaction,
event_id: str,
allow_none: Literal[False] = False,
) -> int:
...

@overload
def get_stream_id_for_event_txn(
self,
txn: LoggingTransaction,
event_id: str,
allow_none: bool = False,
) -> Optional[int]:
...

def get_stream_id_for_event_txn(
self,
txn: LoggingTransaction,
Expand Down
2 changes: 2 additions & 0 deletions tests/storage/test_event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ def _inject_actions(stream: int, action: list) -> None:
event.internal_metadata.is_outlier.return_value = False
event.depth = stream

self.store._events_stream_cache.entity_has_changed(room_id, stream)

self.get_success(
self.store.db_pool.simple_insert(
table="events",
Expand Down

0 comments on commit 92a0c18

Please sign in to comment.