Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in sliding sync when using old DB. (default instance_name to "master") #17398

Merged
merged 10 commits into from
Jul 8, 2024
1 change: 1 addition & 0 deletions changelog.d/17398.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix bug in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint when using an old database.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for trying out Sliding Sync and jumping on a fix for this!

6 changes: 0 additions & 6 deletions synapse/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,6 @@ def _invalidate_state_caches(
self._attempt_to_invalidate_cache(
"get_user_in_room_with_profile", (room_id, user_id)
)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", (user_id,)
)
self._attempt_to_invalidate_cache("get_rooms_for_user", (user_id,))

# Purge other caches based on room state.
Expand All @@ -148,9 +145,6 @@ def _invalidate_state_caches_all(self, room_id: str) -> None:
self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,))
self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", None
)
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_room_summary", (room_id,))

Expand Down
10 changes: 0 additions & 10 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,16 +268,12 @@ def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]

if data.type == EventTypes.Member:
self.get_rooms_for_user_with_stream_ordering.invalidate( # type: ignore[attr-defined]
(data.state_key,)
)
self.get_rooms_for_user.invalidate((data.state_key,)) # type: ignore[attr-defined]
elif row.type == EventsStreamAllStateRow.TypeId:
assert isinstance(data, EventsStreamAllStateRow)
# Similar to the above, but the entire caches are invalidated. This is
# unfortunate for the membership caches, but should recover quickly.
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
self.get_rooms_for_user_with_stream_ordering.invalidate_all() # type: ignore[attr-defined]
self.get_rooms_for_user.invalidate_all() # type: ignore[attr-defined]
else:
raise Exception("Unknown events stream row type %s" % (row.type,))
Expand Down Expand Up @@ -334,9 +330,6 @@ def _invalidate_caches_for_event(
self._attempt_to_invalidate_cache(
"get_invited_rooms_for_local_user", (state_key,)
)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", (state_key,)
)
self._attempt_to_invalidate_cache("get_rooms_for_user", (state_key,))

self._attempt_to_invalidate_cache(
Expand Down Expand Up @@ -399,9 +392,6 @@ def _invalidate_caches_for_room_events(self, room_id: str) -> None:
self._attempt_to_invalidate_cache("get_thread_id", None)
self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
self._attempt_to_invalidate_cache(
"get_rooms_for_user_with_stream_ordering", None
)
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("did_forget", None)
self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
Expand Down
3 changes: 2 additions & 1 deletion synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1457,7 +1457,8 @@ def _fetch_event_rows(
event_dict[event_id] = _EventRow(
event_id=event_id,
stream_ordering=row[1],
instance_name=row[2],
# If instance_name is null we default to "master"
instance_name=row[2] or "master",
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
internal_metadata=row[3],
json=row[4],
format_version=row[5],
Expand Down
67 changes: 6 additions & 61 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,7 @@
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import Sqlite3Engine
from synapse.storage.roommember import (
GetRoomsForUserWithStreamOrdering,
MemberSummary,
ProfileInfo,
RoomsForUser,
)
from synapse.storage.roommember import MemberSummary, ProfileInfo, RoomsForUser
from synapse.types import (
JsonDict,
PersistedEventPosition,
Expand Down Expand Up @@ -494,7 +489,11 @@ def _get_rooms_for_local_user_where_membership_is_txn(
sender=sender,
membership=membership,
event_id=event_id,
event_pos=PersistedEventPosition(instance_name, stream_ordering),
event_pos=PersistedEventPosition(
# If instance_name is null we default to "master"
instance_name or "master",
Copy link
Contributor

@MadLittleMods MadLittleMods Jul 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably have some constant to represent "master" to avoid typos (subtle bugs)

(can iterate in another PR)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The typing from the SQL queries makes this type of thing too easy. Kinda wish we had to duck-type all of it to narrow things down (forced by the linter instead of letting it slide because Any)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no real easy way of annotating the types of the returned rows that aren't error prone, e.g. here we'd probably assume instance_name column was NON NULL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we just need to use the disallow-any-xxx options in mypy so we can't just assign Any to something that expects a str.

Copy link
Contributor

@reivilibre reivilibre Jul 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have thought about this in the past, the best idea I came up with is: during the trial tests, have Postgres return the type information of the SQL statements (and combine this with an API change on the fetch functions where you tell it what type you're expecting and this is used as the return type). Then it could assert that the Postgres type information matches what is expected. (You'd disable this at real runtime to avoid any performance hits)
Ultimately this is loosely inspired by the sqlx library in Rust (where type information flows out from the database at or ahead of compile-time) but I think for us it'd be too much work for too little benefit now.

stream_ordering,
),
room_version_id=room_version,
)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
for room_id, sender, membership, event_id, instance_name, stream_ordering, room_version in txn
Expand Down Expand Up @@ -606,53 +605,6 @@ async def get_local_current_membership_for_user_in_room(

return results

@cached(max_entries=500000, iterable=True)
async def get_rooms_for_user_with_stream_ordering(
self, user_id: str
) -> FrozenSet[GetRoomsForUserWithStreamOrdering]:
"""Returns a set of room_ids the user is currently joined to.

If a remote user only returns rooms this server is currently
participating in.

Args:
user_id

Returns:
Returns the rooms the user is in currently, along with the stream
ordering of the most recent join for that user and room, along with
the room version of the room.
"""
return await self.db_pool.runInteraction(
"get_rooms_for_user_with_stream_ordering",
self._get_rooms_for_user_with_stream_ordering_txn,
user_id,
)

def _get_rooms_for_user_with_stream_ordering_txn(
self, txn: LoggingTransaction, user_id: str
) -> FrozenSet[GetRoomsForUserWithStreamOrdering]:
# We use `current_state_events` here and not `local_current_membership`
# as a) this gets called with remote users and b) this only gets called
# for rooms the server is participating in.
sql = """
SELECT room_id, e.instance_name, e.stream_ordering
FROM current_state_events AS c
INNER JOIN events AS e USING (room_id, event_id)
WHERE
c.type = 'm.room.member'
AND c.state_key = ?
AND c.membership = ?
"""

txn.execute(sql, (user_id, Membership.JOIN))
return frozenset(
GetRoomsForUserWithStreamOrdering(
room_id, PersistedEventPosition(instance, stream_id)
)
for room_id, instance, stream_id in txn
)

async def get_users_server_still_shares_room_with(
self, user_ids: Collection[str]
) -> Set[str]:
Expand Down Expand Up @@ -701,13 +653,6 @@ async def get_rooms_for_user(self, user_id: str) -> FrozenSet[str]:
If a remote user only returns rooms this server is currently
participating in.
"""
rooms = self.get_rooms_for_user_with_stream_ordering.cache.get_immediate(
(user_id,),
None,
update_metrics=False,
)
if rooms:
return frozenset(r.room_id for r in rooms)

room_ids = await self.db_pool.simple_select_onecol(
table="current_state_events",
Expand Down
33 changes: 23 additions & 10 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def _make_generic_sql_bound(
def _filter_results(
lower_token: Optional[RoomStreamToken],
upper_token: Optional[RoomStreamToken],
instance_name: str,
instance_name: Optional[str],
topological_ordering: int,
stream_ordering: int,
) -> bool:
Expand All @@ -384,8 +384,14 @@ def _filter_results(
position maps, which we handle by fetching more than necessary from the DB
and then filtering (rather than attempting to construct a complicated SQL
query).

The `instance_name` arg is optional to handle historic rows, and is
interpreted as if it was "master".
"""

if instance_name is None:
instance_name = "master"

event_historical_tuple = (
topological_ordering,
stream_ordering,
Expand Down Expand Up @@ -420,7 +426,7 @@ def _filter_results(
def _filter_results_by_stream(
lower_token: Optional[RoomStreamToken],
upper_token: Optional[RoomStreamToken],
instance_name: str,
instance_name: Optional[str],
stream_ordering: int,
) -> bool:
"""
Expand All @@ -436,7 +442,14 @@ def _filter_results_by_stream(
position maps, which we handle by fetching more than necessary from the DB
and then filtering (rather than attempting to construct a complicated SQL
query).

The `instance_name` arg is optional to handle historic rows, and is
interpreted as if it was "master".
"""

if instance_name is None:
instance_name = "master"

if lower_token:
assert lower_token.topological is None

Expand Down Expand Up @@ -912,7 +925,6 @@ def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]:
prev_sender,
) in txn:
assert room_id is not None
assert instance_name is not None
assert stream_ordering is not None

if _filter_results_by_stream(
Expand All @@ -936,7 +948,8 @@ def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]:
# Event
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
event_id=event_id,
event_pos=PersistedEventPosition(
instance_name=instance_name,
# If instance_name is null we default to "master"
instance_name=instance_name or "master",
MadLittleMods marked this conversation as resolved.
Show resolved Hide resolved
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
stream=stream_ordering,
),
# When `s.event_id = null`, we won't be able to get respective
Expand All @@ -952,13 +965,11 @@ def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]:
prev_event_id=prev_event_id,
prev_event_pos=(
PersistedEventPosition(
instance_name=prev_instance_name,
# If instance_name is null we default to "master"
instance_name=prev_instance_name or "master",
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
stream=prev_stream_ordering,
)
if (
prev_instance_name is not None
and prev_stream_ordering is not None
)
if (prev_stream_ordering is not None)
else None
),
prev_membership=prev_membership,
Expand Down Expand Up @@ -1257,7 +1268,9 @@ def get_last_event_pos_in_room_before_stream_ordering_txn(
stream_ordering=stream_ordering,
):
return event_id, PersistedEventPosition(
instance_name, stream_ordering
# If instance_name is null we default to "master"
instance_name or "master",
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
stream_ordering,
)

erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
return None
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
1 change: 0 additions & 1 deletion tests/handlers/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ def test_unknown_room_version(self) -> None:
)

# Blow away caches (supported room versions can only change due to a restart).
self.store.get_rooms_for_user_with_stream_ordering.invalidate_all()
self.store.get_rooms_for_user.invalidate_all()
self.store._get_event_cache.clear()
self.store._event_ref.clear()
Expand Down
Loading
Loading