Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Wait for room stream change cache position #14269

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/14269.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix race conditions with stream change cache invalidation when using synapse workers. Contributed by Nick @ Beeper (@fizzadar).
3 changes: 3 additions & 0 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,9 @@ async def update_current_state(
stream_id=stream_ordering,
)

# Flag the stream change cache for this room
self.store._events_stream_cache.entity_has_changed(room_id, stream_ordering)

def _update_current_state_txn(
self,
txn: LoggingTransaction,
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ async def get_room_events_stream_for_rooms(
- list of recent events in the room
- stream ordering key for the start of the chunk of events returned.
"""
await self._events_stream_cache.wait_for_position(to_key.stream)
room_ids = self._events_stream_cache.get_entities_changed(
room_ids, from_key.stream
)
Expand Down
29 changes: 29 additions & 0 deletions synapse/util/caches/stream_change_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@

import logging
import math
from collections import defaultdict
from typing import Collection, Dict, FrozenSet, List, Mapping, Optional, Set, Union

from sortedcontainers import SortedDict

from twisted.internet import defer

from synapse.util import caches

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -50,6 +53,10 @@ def __init__(
# map from stream id to the a set of entities which changed at that stream id.
self._cache: SortedDict[int, Set[EntityType]] = SortedDict()

# Maximum known stream position to wait on if behind the current persisted position.
self.max_stream_pos = current_stream_pos
self.wait_for_pos_deferreds: Dict[int, List[defer.Deferred]] = defaultdict(list)

# the earliest stream_pos for which we can reliably answer
# get_all_entities_changed. In other words, one less than the earliest
# stream_pos for which we know _cache is valid.
Expand All @@ -64,6 +71,22 @@ def __init__(
for entity, stream_pos in prefilled_cache.items():
self.entity_has_changed(entity, stream_pos)

async def wait_for_position(self, pos: int) -> None:
if self.max_stream_pos >= pos:
return

logger.warning(
"Waiting on lagging stream cache %s with current position %s "
"behind requested token position %s",
self.name,
self.max_stream_pos,
pos,
)

d: defer.Deferred[None] = defer.Deferred()
self.wait_for_pos_deferreds[pos].append(d)
await d

def set_cache_factor(self, factor: float) -> bool:
"""
Set the cache factor for this individual cache.
Expand Down Expand Up @@ -195,6 +218,12 @@ def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None:
for entity in r:
del self._entity_to_key[entity]

if stream_pos > self.max_stream_pos:
self.max_stream_pos = stream_pos

for d in self.wait_for_pos_deferreds[stream_pos]:
d.callback(None)

def _evict(self) -> None:
while len(self._cache) > self._max_size:
k, r = self._cache.popitem(0)
Expand Down