Skip to content

Commit

Permalink
Fix bug in sliding sync when using old DB. (#17398)
Browse files Browse the repository at this point in the history
We don't necessarily have `instance_name` for old events (before we
support multiple event persisters). We treat those as if the
`instance_name` was "master".

---------

Co-authored-by: Eric Eastwood <[email protected]>
  • Loading branch information
erikjohnston and MadLittleMods authored Jul 8, 2024
1 parent 3fef535 commit 8cdd2d2
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 212 deletions.
1 change: 1 addition & 0 deletions changelog.d/17398.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint when using an old database.
6 changes: 0 additions & 6 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,6 @@ def _invalidate_state_caches(
self._attempt_to_invalidate_cache(
"get_user_in_room_with_profile", (room_id, user_id)
)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", (user_id,)
)
self._attempt_to_invalidate_cache("get_rooms_for_user", (user_id,))

# Purge other caches based on room state.
Expand All @@ -148,9 +145,6 @@ def _invalidate_state_caches_all(self, room_id: str) -> None:
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", None
)
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))

Expand Down
10 changes: 0 additions & 10 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,16 +268,12 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]

if data.type == EventTypes.Member:
self.get_rooms_for_user_with_stream_ordering.invalidate( # type: ignore[attr-defined]
(data.state_key,)
)
self.get_rooms_for_user.invalidate((data.state_key,)) # type: ignore[attr-defined]
elif row.type == EventsStreamAllStateRow.TypeId:
assert isinstance(data, EventsStreamAllStateRow)
# Similar to the above, but the entire caches are invalidated. This is
# unfortunate for the membership caches, but should recover quickly.
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
self.get_rooms_for_user_with_stream_ordering.invalidate_all() # type: ignore[attr-defined]
self.get_rooms_for_user.invalidate_all() # type: ignore[attr-defined]
else:
raise Exception("Unknown events stream row type %s" % (row.type,))
Expand Down Expand Up @@ -334,9 +330,6 @@ def _invalidate_caches_for_event(
self._attempt_to_invalidate_cache(
"get_invited_rooms_for_local_user", (state_key,)
)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", (state_key,)
)
self._attempt_to_invalidate_cache("get_rooms_for_user", (state_key,))

self._attempt_to_invalidate_cache(
Expand Down Expand Up @@ -399,9 +392,6 @@ def _invalidate_caches_for_room_events(self, room_id: str) -> None:
self._attempt_to_invalidate_cache("get_thread_id", None)
self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", None
)
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("did_forget", None)
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
Expand Down
3 changes: 2 additions & 1 deletion synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1457,7 +1457,8 @@ def _fetch_event_rows(
event_dict[event_id] = _EventRow(
event_id=event_id,
stream_ordering=row[1],
instance_name=row[2],
# If instance_name is null we default to "master"
instance_name=row[2] or "master",
internal_metadata=row[3],
json=row[4],
format_version=row[5],
Expand Down
67 changes: 6 additions & 61 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,7 @@
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import Sqlite3Engine
from synapse.storage.roommember import (
GetRoomsForUserWithStreamOrdering,
MemberSummary,
ProfileInfo,
RoomsForUser,
)
from synapse.storage.roommember import MemberSummary, ProfileInfo, RoomsForUser
from synapse.types import (
JsonDict,
PersistedEventPosition,
Expand Down Expand Up @@ -494,7 +489,11 @@ def _get_rooms_for_local_user_where_membership_is_txn(
sender=sender,
membership=membership,
event_id=event_id,
event_pos=PersistedEventPosition(instance_name, stream_ordering),
event_pos=PersistedEventPosition(
# If instance_name is null we default to "master"
instance_name or "master",
stream_ordering,
),
room_version_id=room_version,
)
for room_id, sender, membership, event_id, instance_name, stream_ordering, room_version in txn
Expand Down Expand Up @@ -606,53 +605,6 @@ async def get_local_current_membership_for_user_in_room(

return results

@cached(max_entries=500000, iterable=True)
async def get_rooms_for_user_with_stream_ordering(
self, user_id: str
) -> FrozenSet[GetRoomsForUserWithStreamOrdering]:
"""Returns a set of room_ids the user is currently joined to.
If a remote user only returns rooms this server is currently
participating in.
Args:
user_id
Returns:
Returns the rooms the user is in currently, along with the stream
ordering of the most recent join for that user and room, along with
the room version of the room.
"""
return await self.db_pool.runInteraction(
"get_rooms_for_user_with_stream_ordering",
self._get_rooms_for_user_with_stream_ordering_txn,
user_id,
)

def _get_rooms_for_user_with_stream_ordering_txn(
self, txn: LoggingTransaction, user_id: str
) -> FrozenSet[GetRoomsForUserWithStreamOrdering]:
# We use `current_state_events` here and not `local_current_membership`
# as a) this gets called with remote users and b) this only gets called
# for rooms the server is participating in.
sql = """
SELECT room_id, e.instance_name, e.stream_ordering
FROM current_state_events AS c
INNER JOIN events AS e USING (room_id, event_id)
WHERE
c.type = 'm.room.member'
AND c.state_key = ?
AND c.membership = ?
"""

txn.execute(sql, (user_id, Membership.JOIN))
return frozenset(
GetRoomsForUserWithStreamOrdering(
room_id, PersistedEventPosition(instance, stream_id)
)
for room_id, instance, stream_id in txn
)

async def get_users_server_still_shares_room_with(
self, user_ids: Collection[str]
) -> Set[str]:
Expand Down Expand Up @@ -701,13 +653,6 @@ async def get_rooms_for_user(self, user_id: str) -> FrozenSet[str]:
If a remote user only returns rooms this server is currently
participating in.
"""
rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate(
(user_id,),
None,
update_metrics=False,
)
if rooms:
return frozenset(r.room_id for r in rooms)

room_ids = await self.db_pool.simple_select_onecol(
table="current_state_events",
Expand Down
33 changes: 23 additions & 10 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def _make_generic_sql_bound(
def _filter_results(
lower_token: Optional[RoomStreamToken],
upper_token: Optional[RoomStreamToken],
instance_name: str,
instance_name: Optional[str],
topological_ordering: int,
stream_ordering: int,
) -> bool:
Expand All @@ -384,8 +384,14 @@ def _filter_results(
position maps, which we handle by fetching more than necessary from the DB
and then filtering (rather than attempting to construct a complicated SQL
query).
The `instance_name` arg is optional to handle historic rows, and is
interpreted as if it was "master".
"""

if instance_name is None:
instance_name = "master"

event_historical_tuple = (
topological_ordering,
stream_ordering,
Expand Down Expand Up @@ -420,7 +426,7 @@ def _filter_results(
def _filter_results_by_stream(
lower_token: Optional[RoomStreamToken],
upper_token: Optional[RoomStreamToken],
instance_name: str,
instance_name: Optional[str],
stream_ordering: int,
) -> bool:
"""
Expand All @@ -436,7 +442,14 @@ def _filter_results_by_stream(
position maps, which we handle by fetching more than necessary from the DB
and then filtering (rather than attempting to construct a complicated SQL
query).
The `instance_name` arg is optional to handle historic rows, and is
interpreted as if it was "master".
"""

if instance_name is None:
instance_name = "master"

if lower_token:
assert lower_token.topological is None

Expand Down Expand Up @@ -912,7 +925,6 @@ def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]:
prev_sender,
) in txn:
assert room_id is not None
assert instance_name is not None
assert stream_ordering is not None

if _filter_results_by_stream(
Expand All @@ -936,7 +948,8 @@ def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]:
# Event
event_id=event_id,
event_pos=PersistedEventPosition(
instance_name=instance_name,
# If instance_name is null we default to "master"
instance_name=instance_name or "master",
stream=stream_ordering,
),
# When `s.event_id = null`, we won't be able to get respective
Expand All @@ -952,13 +965,11 @@ def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]:
prev_event_id=prev_event_id,
prev_event_pos=(
PersistedEventPosition(
instance_name=prev_instance_name,
# If instance_name is null we default to "master"
instance_name=prev_instance_name or "master",
stream=prev_stream_ordering,
)
if (
prev_instance_name is not None
and prev_stream_ordering is not None
)
if (prev_stream_ordering is not None)
else None
),
prev_membership=prev_membership,
Expand Down Expand Up @@ -1270,7 +1281,9 @@ def get_last_event_pos_in_room_before_stream_ordering_txn(
stream_ordering=stream_ordering,
):
return event_id, PersistedEventPosition(
instance_name, stream_ordering
# If instance_name is null we default to "master"
instance_name or "master",
stream_ordering,
)

return None
Expand Down
1 change: 0 additions & 1 deletion tests/handlers/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ def test_unknown_room_version(self) -> None:
)

# Blow away caches (supported room versions can only change due to a restart).
self.store.get_rooms_for_user_with_stream_ordering.invalidate_all()
self.store.get_rooms_for_user.invalidate_all()
self.store._get_event_cache.clear()
self.store._event_ref.clear()
Expand Down
Loading

0 comments on commit 8cdd2d2

Please sign in to comment.