Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Improve event caching code #10119

Merged
merged 16 commits into from
Aug 4, 2021
Merged
6 changes: 5 additions & 1 deletion synapse/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def __get__(self, instance, owner=None):


class _EventInternalMetadata:
__slots__ = ["_dict", "stream_ordering", "outlier"]
__slots__ = ["_dict", "stream_ordering", "outlier", "redacted_by"]

def __init__(self, internal_metadata_dict: JsonDict):
# we have to copy the dict, because it turns out that the same dict is
Expand All @@ -111,6 +111,10 @@ def __init__(self, internal_metadata_dict: JsonDict):
# in the DAG)
self.outlier = False

# Whether this event has a valid redaction event pointing at it (i.e.
# whether it should be redacted before giving to clients).
self.redacted_by: Optional[str] = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm super dubious about stuffing more things in here that don't form part of the internal_metadata column in the database.

Should we start a new attrs class for things like this, stream_ordering and outlier? (it also looks like before, after, order might fall into the same category).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also: it seems confusing to have both a redacted_by and a redacted property.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code will be split out into its own PR.


out_of_band_membership: bool = DictProperty("out_of_band_membership")
send_on_behalf_of: str = DictProperty("send_on_behalf_of")
recheck_redaction: bool = DictProperty("recheck_redaction")
Expand Down
41 changes: 38 additions & 3 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import logging
import threading
import weakref
from typing import (
Collection,
Container,
Expand Down Expand Up @@ -174,6 +175,10 @@ def __init__(self, database: DatabasePool, db_conn, hs):
# Dict[str, _EventCacheEntry].
self._current_event_fetches: Dict[str, ObservableDeferred] = {}

# We keep track of the events we have currently loaded in memory so that
# we can reuse them even if they've been evicted from the cache.
self._event_ref: Dict[str, EventBase] = weakref.WeakValueDictionary()

self._event_fetch_lock = threading.Condition()
self._event_fetch_list = []
self._event_fetch_ongoing = 0
Expand Down Expand Up @@ -588,6 +593,8 @@ async def _get_events_from_cache_or_db(

def _invalidate_get_event_cache(self, event_id):
self._get_event_cache.invalidate((event_id,))
self._event_ref.pop(event_id, None)
self._current_event_fetches.pop(event_id, None)

def _get_events_from_cache(
self, events: Iterable[str], update_metrics: bool = True
Expand All @@ -601,13 +608,34 @@ def _get_events_from_cache(
event_map = {}

for event_id in events:
# First check if its in the event cache
ret = self._get_event_cache.get(
(event_id,), None, update_metrics=update_metrics
)
if not ret:
continue
if ret:
event_map[event_id] = ret

# Otherwise check if we still have the event in memory.
event = self._event_ref.get(event_id)
if event:
redacted_event = None
if event.internal_metadata.redacted_by is not None:
# The event has been redacted, so we generate a redacted
# version.
redacted_event = prune_event(event)
redacted_event.unsigned[
"redacted_by"
] = event.internal_metadata.redacted_by
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we not need redacted_because here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code will be split out into its own PR.


cache_entry = _EventCacheEntry(
event=event,
redacted_event=redacted_event,
)
event_map[event_id] = cache_entry

event_map[event_id] = ret
# We add the entry back into the cache as we want to keep
# recently queried events in the cache.
self._get_event_cache.set((event_id,), cache_entry)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we refactor the similar code in _get_events_from_db/_maybe_redact_event_row so that we can reuse some common code rather than have two similar but slightly different impls?

For example: have _maybe_redact_event_row just return a boolean rather than the redaction; have _get_events_from_db poke internal_metadata.redacted_by based on that boolean; then move these lines here out to a build_cache_entry which can be called by _get_events_from_db.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code will be split out into its own PR.


return event_map

Expand Down Expand Up @@ -877,13 +905,20 @@ async def _get_events_from_db(
original_ev, redactions, event_map
)

if redacted_event:
original_ev.internal_metadata.redacted_by = redacted_event.unsigned[
"redacted_by"
]

cache_entry = _EventCacheEntry(
event=original_ev, redacted_event=redacted_event
)

self._get_event_cache.set((event_id,), cache_entry)
result_map[event_id] = cache_entry

self._event_ref[event_id] = original_ev

return result_map

async def _enqueue_events(self, events):
Expand Down