Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Improve event caching code #10119

Merged
merged 16 commits into from
Aug 4, 2021
Merged
29 changes: 14 additions & 15 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ async def _get_events_from_cache_or_db(self, event_ids, allow_rejected=False):
map from event id to result
"""
event_entry_map = self._get_events_from_cache(
event_ids, allow_rejected=allow_rejected
event_ids,
)

missing_events_ids = [e for e in event_ids if e not in event_entry_map]
Expand All @@ -510,22 +510,30 @@ async def _get_events_from_cache_or_db(self, event_ids, allow_rejected=False):
# of the database to check it.
#
missing_events = await self._get_events_from_db(
missing_events_ids, allow_rejected=allow_rejected
missing_events_ids,
)

event_entry_map.update(missing_events)

if not allow_rejected:
event_entry_map = {
event_id: entry
for event_id, entry in event_entry_map.items()
if not entry.event.rejected_reason
}

return event_entry_map

def _invalidate_get_event_cache(self, event_id):
self._get_event_cache.invalidate((event_id,))

def _get_events_from_cache(self, events, allow_rejected, update_metrics=True):
def _get_events_from_cache(
self, events, update_metrics=True
) -> Dict[str, _EventCacheEntry]:
"""Fetch events from the caches

Args:
events (Iterable[str]): list of event_ids to fetch
allow_rejected (bool): Whether to return events that were rejected
update_metrics (bool): Whether to update the cache hit ratio metrics

Returns:
Expand All @@ -542,10 +550,7 @@ def _get_events_from_cache(self, events, allow_rejected, update_metrics=True):
if not ret:
continue

if allow_rejected or not ret.event.rejected_reason:
event_map[event_id] = ret
else:
event_map[event_id] = None
event_map[event_id] = ret

return event_map

Expand Down Expand Up @@ -672,7 +677,7 @@ def fire(evs, exc):
with PreserveLoggingContext():
self.hs.get_reactor().callFromThread(fire, event_list, e)

async def _get_events_from_db(self, event_ids, allow_rejected=False):
async def _get_events_from_db(self, event_ids):
"""Fetch a bunch of events from the database.

Returned events will be added to the cache for future lookups.
Expand All @@ -682,9 +687,6 @@ async def _get_events_from_db(self, event_ids, allow_rejected=False):
Args:
event_ids (Iterable[str]): The event_ids of the events to fetch

allow_rejected (bool): Whether to include rejected events. If False,
rejected events are omitted from the response.

Returns:
Dict[str, _EventCacheEntry]:
map from event id to result. May return extra events which
Expand Down Expand Up @@ -717,9 +719,6 @@ async def _get_events_from_db(self, event_ids, allow_rejected=False):

rejected_reason = row["rejected_reason"]

if not allow_rejected and rejected_reason:
continue

# If the event or metadata cannot be parsed, log the error and act
# as if the event is unknown.
try:
Expand Down
6 changes: 2 additions & 4 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,14 +629,12 @@ async def _get_joined_users_from_context(
# We don't update the event cache hit ratio as it completely throws off
# the hit ratio counts. After all, we don't populate the cache if we
# miss it here
event_map = self._get_events_from_cache(
member_event_ids, allow_rejected=False, update_metrics=False
)
event_map = self._get_events_from_cache(member_event_ids, update_metrics=False)

missing_member_event_ids = []
for event_id in member_event_ids:
ev_entry = event_map.get(event_id)
if ev_entry:
if ev_entry and not ev_entry.event.rejected_reason:
if ev_entry.event.membership == Membership.JOIN:
users_in_room[ev_entry.event.state_key] = ProfileInfo(
display_name=ev_entry.event.content.get("displayname", None),
Expand Down