From 31a2e5e417cc99f381f4ac6c8248a55de8f80a5a Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 5 Aug 2022 22:08:34 +0100 Subject: [PATCH 01/12] Add option for `get_state_ids_for_event(s)` to return partial state Signed-off-by: Sean Quah --- synapse/storage/controllers/state.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index 0d480f101432..cbab7a65fe86 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -232,6 +232,7 @@ async def get_state_ids_for_events( self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None, + await_full_state: Optional[bool] = None, ) -> Dict[str, StateMap[str]]: """ Get the state dicts corresponding to a list of events, containing the event_ids @@ -240,6 +241,9 @@ async def get_state_ids_for_events( Args: event_ids: events whose state should be returned state_filter: The state filter used to fetch state from the database. + await_full_state: if `True`, will block if we do not yet have complete state + at these events. Defaults to `True` unless `state_filter` can be + completely satisfied with partial state. Returns: A dict from event_id -> (type, state_key) -> event_id @@ -248,9 +252,13 @@ async def get_state_ids_for_events( RuntimeError if we don't have a state group for one or more of the events (ie they are outliers or unknown) """ - await_full_state = True - if state_filter and not state_filter.must_await_full_state(self._is_mine_id): - await_full_state = False + if await_full_state is None: + if state_filter and not state_filter.must_await_full_state( + self._is_mine_id + ): + await_full_state = False + else: + await_full_state = True event_to_groups = await self.get_state_group_for_events( event_ids, await_full_state=await_full_state @@ -292,7 +300,10 @@ async def get_state_for_event( @trace async def get_state_ids_for_event( - self, event_id: str, state_filter: Optional[StateFilter] = None + self, + event_id: str, + state_filter: Optional[StateFilter] = None, + await_full_state: Optional[bool] = None, ) -> StateMap[str]: """ Get the state dict corresponding to a particular event @@ -300,6 +311,9 @@ async def get_state_ids_for_event( Args: event_id: event whose state should be returned state_filter: The state filter used to fetch state from the database. + await_full_state: if `True`, will block if we do not yet have complete state + at the event. Defaults to `True` unless `state_filter` can be completely + satisfied with partial state. Returns: A dict from (type, state_key) -> state_event_id @@ -309,7 +323,9 @@ async def get_state_ids_for_event( outlier or is unknown) """ state_map = await self.get_state_ids_for_events( - [event_id], state_filter or StateFilter.all() + [event_id], + state_filter or StateFilter.all(), + await_full_state=await_full_state, ) return state_map[event_id] From 0a539b9b574bc479dc17b6f66a9f91e2227b0091 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 5 Aug 2022 22:20:53 +0100 Subject: [PATCH 02/12] Add option to `get_state_at/after_event` to return partial state Signed-off-by: Sean Quah --- synapse/handlers/sync.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 3ca01391c9cf..2aec08e56c9b 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -637,7 +637,10 @@ async def _load_filtered_recents( ) async def get_state_after_event( - self, event_id: str, state_filter: Optional[StateFilter] = None + self, + event_id: str, + state_filter: Optional[StateFilter] = None, + await_full_state: Optional[bool] = None, ) -> StateMap[str]: """ Get the room state after the given event @@ -645,9 +648,14 @@ async def get_state_after_event( Args: event_id: event of interest state_filter: The state filter used to fetch state from the database. + await_full_state: if `True`, will block if we do not yet have complete state + at the event. Defaults to `True` unless `state_filter` can be completely + satisfied with partial state. """ state_ids = await self._state_storage_controller.get_state_ids_for_event( - event_id, state_filter=state_filter or StateFilter.all() + event_id, + state_filter=state_filter or StateFilter.all(), + await_full_state=await_full_state, ) # using get_metadata_for_events here (instead of get_event) sidesteps an issue @@ -670,6 +678,7 @@ async def get_state_at( room_id: str, stream_position: StreamToken, state_filter: Optional[StateFilter] = None, + await_full_state: Optional[bool] = None, ) -> StateMap[str]: """Get the room state at a particular stream position @@ -677,6 +686,10 @@ async def get_state_at( room_id: room for which to get state stream_position: point at which to get state state_filter: The state filter used to fetch state from the database. + await_full_state: if `True`, will block if we do not yet have complete state + at the last event in the room before `stream_position`. Defaults to + `True` unless `state_filter` can be completely satisfied with partial + state. """ # FIXME: This gets the state at the latest event before the stream ordering, # which might not be the same as the "current state" of the room at the time @@ -688,7 +701,9 @@ async def get_state_at( if last_event_id: state = await self.get_state_after_event( - last_event_id, state_filter=state_filter or StateFilter.all() + last_event_id, + state_filter=state_filter or StateFilter.all(), + await_full_state=await_full_state, ) else: From ad8bab8548ce28b7263bd5b37b57ccb3df44541d Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 5 Aug 2022 22:28:43 +0100 Subject: [PATCH 03/12] Do not wait for full state in a few cases in `_load_filtered_recents` --- synapse/handlers/sync.py | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 2aec08e56c9b..cf6a2d18865d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -517,10 +517,17 @@ async def _load_filtered_recents( # ensure that we always include current state in the timeline current_state_ids: FrozenSet[str] = frozenset() if any(e.is_state() for e in recents): + # FIXME(faster_joins): We use the partial state here as + # we don't want to block `/sync` on finishing a lazy join. + # Which should be fine once + # https://github.com/matrix-org/synapse/issues/12989 is resolved, + # since we shouldn't reach here anymore? + # Note that we use the current state as a whitelist for filtering + # `recents`, so partial state is only a problem when a membership + # event turns up in `recents` but has not made it into the current + # state. current_state_ids_map = ( - await self._state_storage_controller.get_current_state_ids( - room_id - ) + await self.store.get_partial_current_state_ids(room_id) ) current_state_ids = frozenset(current_state_ids_map.values()) @@ -589,7 +596,13 @@ async def _load_filtered_recents( if any(e.is_state() for e in loaded_recents): # FIXME(faster_joins): We use the partial state here as # we don't want to block `/sync` on finishing a lazy join. - # Is this the correct way of doing it? + # Which should be fine once + # https://github.com/matrix-org/synapse/issues/12989 is resolved, + # since we shouldn't reach here anymore? + # Note that we use the current state as a whitelist for filtering + # `loaded_recents`, so partial state is only a problem when a + # membership event turns up in `loaded_recents` but has not made it + # into the current state. current_state_ids_map = ( await self.store.get_partial_current_state_ids(room_id) ) From 10013eaaccd6610486a73196f25460e32fcebe08 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 5 Aug 2022 22:29:37 +0100 Subject: [PATCH 04/12] Do not wait for full state in a few cases in `_get_rooms_changed` --- synapse/handlers/sync.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index cf6a2d18865d..a10a61363b0c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1758,7 +1758,11 @@ async def _get_rooms_changed( continue if room_id in sync_result_builder.joined_room_ids or has_join: - old_state_ids = await self.get_state_at(room_id, since_token) + old_state_ids = await self.get_state_at( + room_id, + since_token, + state_filter=StateFilter.from_types([(EventTypes.Member, user_id)]), + ) old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None) old_mem_ev = None if old_mem_ev_id: @@ -1784,7 +1788,13 @@ async def _get_rooms_changed( newly_left_rooms.append(room_id) else: if not old_state_ids: - old_state_ids = await self.get_state_at(room_id, since_token) + old_state_ids = await self.get_state_at( + room_id, + since_token, + state_filter=StateFilter.from_types( + [(EventTypes.Member, user_id)] + ), + ) old_mem_ev_id = old_state_ids.get( (EventTypes.Member, user_id), None ) From 5274c8779b714e84a61e4807fb68f3961efaf1f6 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 5 Aug 2022 22:31:50 +0100 Subject: [PATCH 05/12] Do not wait for full state in `compute_state_delta` Signed-off-by: Sean Quah --- synapse/handlers/sync.py | 38 ++++++++++++++++++++++++++++++-------- 1 file changed, 30 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index a10a61363b0c..d2672378a164 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -964,19 +964,26 @@ async def compute_state_delta( if batch: state_at_timeline_end = ( await self._state_storage_controller.get_state_ids_for_event( - batch.events[-1].event_id, state_filter=state_filter + batch.events[-1].event_id, + state_filter=state_filter, + await_full_state=not lazy_load_members, ) ) state_at_timeline_start = ( await self._state_storage_controller.get_state_ids_for_event( - batch.events[0].event_id, state_filter=state_filter + batch.events[0].event_id, + state_filter=state_filter, + await_full_state=not lazy_load_members, ) ) else: state_at_timeline_end = await self.get_state_at( - room_id, stream_position=now_token, state_filter=state_filter + room_id, + stream_position=now_token, + state_filter=state_filter, + await_full_state=not lazy_load_members, ) state_at_timeline_start = state_at_timeline_end @@ -992,14 +999,19 @@ async def compute_state_delta( if batch: state_at_timeline_start = ( await self._state_storage_controller.get_state_ids_for_event( - batch.events[0].event_id, state_filter=state_filter + batch.events[0].event_id, + state_filter=state_filter, + await_full_state=not lazy_load_members, ) ) else: # We can get here if the user has ignored the senders of all # the recent events. state_at_timeline_start = await self.get_state_at( - room_id, stream_position=now_token, state_filter=state_filter + room_id, + stream_position=now_token, + state_filter=state_filter, + await_full_state=not lazy_load_members, ) # for now, we disable LL for gappy syncs - see @@ -1021,20 +1033,28 @@ async def compute_state_delta( # is indeed the case. assert since_token is not None state_at_previous_sync = await self.get_state_at( - room_id, stream_position=since_token, state_filter=state_filter + room_id, + stream_position=since_token, + state_filter=state_filter, + await_full_state=not lazy_load_members, ) if batch: state_at_timeline_end = ( await self._state_storage_controller.get_state_ids_for_event( - batch.events[-1].event_id, state_filter=state_filter + batch.events[-1].event_id, + state_filter=state_filter, + await_full_state=not lazy_load_members, ) ) else: # We can get here if the user has ignored the senders of all # the recent events. state_at_timeline_end = await self.get_state_at( - room_id, stream_position=now_token, state_filter=state_filter + room_id, + stream_position=now_token, + state_filter=state_filter, + await_full_state=not lazy_load_members, ) state_ids = _calculate_state( @@ -1064,8 +1084,10 @@ async def compute_state_delta( (EventTypes.Member, member) for member in members_to_fetch ), + await_full_state=False, ) + # FIXME: `state_ids` may be missing memberships for partial state rooms. # At this point, if `lazy_load_members` is enabled, `state_ids` includes # the memberships of all event senders in the timeline. This is because we # may not have sent the memberships in a previous sync. From 04bee9e70907e4cdf05735d6f095286f5da45aa0 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 5 Aug 2022 22:35:42 +0100 Subject: [PATCH 06/12] Dig up memberships for lazy-loading syncs in partial state rooms Signed-off-by: Sean Quah --- synapse/handlers/sync.py | 114 +++++++++++++++++++++++++++++++++------ 1 file changed, 99 insertions(+), 15 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index d2672378a164..eb1aa9e263ec 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -918,8 +918,14 @@ async def compute_state_delta( with Measure(self.clock, "compute_state_delta"): # The memberships needed for events in the timeline. + # A dictionary with user IDs as keys and the first event in the timeline + # requiring each member as values. # Only calculated when `lazy_load_members` is on. - members_to_fetch = None + members_to_fetch: Optional[Dict[str, Optional[EventBase]]] = None + + # The contribution to the room state from state events in the timeline. + # Only contains the last event for any given state key. + timeline_state: StateMap[str] lazy_load_members = sync_config.filter_collection.lazy_load_members() include_redundant_members = ( @@ -930,29 +936,38 @@ async def compute_state_delta( # We only request state for the members needed to display the # timeline: - members_to_fetch = { - event.sender # FIXME: we also care about invite targets etc. - for event in batch.events - } + timeline_state = {} + + members_to_fetch = {} + for event in batch.events: + # We need the event's sender, unless their membership was in a + # previous timeline event. + if ( + EventTypes.Member, + event.sender, + ) not in timeline_state and event.sender not in members_to_fetch: + members_to_fetch[event.sender] = event + # FIXME: we also care about invite targets etc. + + if event.is_state(): + timeline_state[(event.type, event.state_key)] = event.event_id if full_state: # always make sure we LL ourselves so we know we're in the room # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209 # We only need apply this on full state syncs given we disabled # LL for incr syncs in #3840. - members_to_fetch.add(sync_config.user.to_string()) + members_to_fetch[sync_config.user.to_string()] = None state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch) else: - state_filter = StateFilter.all() + timeline_state = { + (event.type, event.state_key): event.event_id + for event in batch.events + if event.is_state() + } - # The contribution to the room state from state events in the timeline. - # Only contains the last event for any given state key. - timeline_state = { - (event.type, event.state_key): event.event_id - for event in batch.events - if event.is_state() - } + state_filter = StateFilter.all() # Now calculate the state to return in the sync response for the room. # This is more or less the change in state between the end of the previous @@ -1087,7 +1102,76 @@ async def compute_state_delta( await_full_state=False, ) - # FIXME: `state_ids` may be missing memberships for partial state rooms. + # If we only have partial state for the room, `state_ids` may be missing the + # memberships we wanted. We attempt to find some by digging through the auth + # events of timeline events. + if lazy_load_members: + assert members_to_fetch is not None + + is_partial_state = await self.store.is_partial_state_room(room_id) + if is_partial_state: + additional_state_ids: MutableStateMap[str] = {} + + # Tracks the missing members for logging purposes. + missing_members = {} + + # Pick out the auth events of timeline events whose sender + # memberships are missing. + auth_event_ids: Set[str] = set() + for member, first_referencing_event in members_to_fetch.items(): + if ( + first_referencing_event is None + or (EventTypes.Member, member) in state_ids + ): + continue + + missing_members[member] = first_referencing_event + auth_event_ids.update(first_referencing_event.auth_event_ids()) + + auth_events = await self.store.get_events(auth_event_ids) + + # Run through the events with missing sender memberships once more, + # picking out the memberships from the pile of auth events we have + # just fetched. + for member, first_referencing_event in members_to_fetch.items(): + if ( + first_referencing_event is None + or (EventTypes.Member, member) in state_ids + ): + continue + + # Dig through the auth events to find the sender's membership. + for auth_event_id in first_referencing_event.auth_event_ids(): + # We only store events once we have all their auth events, + # so the auth event must be in the pile we have just + # fetched. + auth_event = auth_events[auth_event_id] + + if ( + auth_event.type == EventTypes.Member + and auth_event.state_key == event.sender + ): + missing_members.pop(member) + additional_state_ids[ + (EventTypes.Member, event.sender) + ] = auth_event.event_id + break + + # Now merge in the state we have scrounged up. + state_ids = {**state_ids, **additional_state_ids} + + if missing_members: + # There really shouldn't be any missing memberships now. + # For an event to appear in the timeline, we must have its auth + # events, which must include its sender's membership. + logger.error( + "Failed to find memberships for %s in partial state room " + "%s in the auth events of %s.", + list(missing_members.keys()), + room_id, + list(missing_members.values()), + ) + # At this point, if `lazy_load_members` is enabled, `state_ids` includes # the memberships of all event senders in the timeline. This is because we # may not have sent the memberships in a previous sync. From ad8a2b3c6ec9a454e3617adcb9790705ecd6b5fd Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Wed, 10 Aug 2022 13:03:39 +0100 Subject: [PATCH 07/12] Add newsfile --- changelog.d/13477.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/13477.misc diff --git a/changelog.d/13477.misc b/changelog.d/13477.misc new file mode 100644 index 000000000000..5d21ae9d7a8d --- /dev/null +++ b/changelog.d/13477.misc @@ -0,0 +1 @@ +Faster room joins: Avoid blocking lazy-loading `/sync`s during partial joins due to remote memberships. Pull remote memberships from auth events instead of the room state. From f728bacf043b8e9529bb006b92aa14cb3d0d442f Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Thu, 11 Aug 2022 16:18:12 +0100 Subject: [PATCH 08/12] fixup: make `await_full_state` a boolean again --- synapse/handlers/sync.py | 13 ++++++------- synapse/storage/controllers/state.py | 26 +++++++++++++------------- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index eb1aa9e263ec..f098107bb979 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -653,7 +653,7 @@ async def get_state_after_event( self, event_id: str, state_filter: Optional[StateFilter] = None, - await_full_state: Optional[bool] = None, + await_full_state: bool = True, ) -> StateMap[str]: """ Get the room state after the given event @@ -662,8 +662,8 @@ async def get_state_after_event( event_id: event of interest state_filter: The state filter used to fetch state from the database. await_full_state: if `True`, will block if we do not yet have complete state - at the event. Defaults to `True` unless `state_filter` can be completely - satisfied with partial state. + at the event and `state_filter` is not satisfied by partial state. + Defaults to `True`. """ state_ids = await self._state_storage_controller.get_state_ids_for_event( event_id, @@ -691,7 +691,7 @@ async def get_state_at( room_id: str, stream_position: StreamToken, state_filter: Optional[StateFilter] = None, - await_full_state: Optional[bool] = None, + await_full_state: bool = True, ) -> StateMap[str]: """Get the room state at a particular stream position @@ -700,9 +700,8 @@ async def get_state_at( stream_position: point at which to get state state_filter: The state filter used to fetch state from the database. await_full_state: if `True`, will block if we do not yet have complete state - at the last event in the room before `stream_position`. Defaults to - `True` unless `state_filter` can be completely satisfied with partial - state. + at the last event in the room before `stream_position` and + `state_filter` is not satisfied by partial state. Defaults to `True`. """ # FIXME: This gets the state at the latest event before the stream ordering, # which might not be the same as the "current state" of the room at the time diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index cbab7a65fe86..1556d53bbd2d 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -232,7 +232,7 @@ async def get_state_ids_for_events( self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None, - await_full_state: Optional[bool] = None, + await_full_state: bool = True, ) -> Dict[str, StateMap[str]]: """ Get the state dicts corresponding to a list of events, containing the event_ids @@ -242,8 +242,8 @@ async def get_state_ids_for_events( event_ids: events whose state should be returned state_filter: The state filter used to fetch state from the database. await_full_state: if `True`, will block if we do not yet have complete state - at these events. Defaults to `True` unless `state_filter` can be - completely satisfied with partial state. + at these events and `state_filter` is not satisfied by partial state. + Defaults to `True`. Returns: A dict from event_id -> (type, state_key) -> event_id @@ -252,13 +252,13 @@ async def get_state_ids_for_events( RuntimeError if we don't have a state group for one or more of the events (ie they are outliers or unknown) """ - if await_full_state is None: - if state_filter and not state_filter.must_await_full_state( - self._is_mine_id - ): - await_full_state = False - else: - await_full_state = True + if ( + await_full_state + and state_filter + and not state_filter.must_await_full_state(self._is_mine_id) + ): + # Full state is not required if the state filter is restrictive enough. + await_full_state = False event_to_groups = await self.get_state_group_for_events( event_ids, await_full_state=await_full_state @@ -303,7 +303,7 @@ async def get_state_ids_for_event( self, event_id: str, state_filter: Optional[StateFilter] = None, - await_full_state: Optional[bool] = None, + await_full_state: bool = True, ) -> StateMap[str]: """ Get the state dict corresponding to a particular event @@ -312,8 +312,8 @@ async def get_state_ids_for_event( event_id: event whose state should be returned state_filter: The state filter used to fetch state from the database. await_full_state: if `True`, will block if we do not yet have complete state - at the event. Defaults to `True` unless `state_filter` can be completely - satisfied with partial state. + at the event and `state_filter` is not satisfied by partial state. + Defaults to `True`. Returns: A dict from (type, state_key) -> state_event_id From 24821a26290577fe935de426c0bd50b268a36426 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 12 Aug 2022 12:49:19 +0100 Subject: [PATCH 09/12] fixup: explain why `await_full_state` is off when lazy loading members --- synapse/handlers/sync.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index f098107bb979..c6734ff70eea 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -959,6 +959,12 @@ async def compute_state_delta( members_to_fetch[sync_config.user.to_string()] = None state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch) + + # We are happy to use partial state to compute the `/sync` response. + # Since partial state may not include the lazy-loaded memberships we + # require, we fix up the state response afterwards with memberships from + # auth events. + await_full_state = False else: timeline_state = { (event.type, event.state_key): event.event_id @@ -967,6 +973,7 @@ async def compute_state_delta( } state_filter = StateFilter.all() + await_full_state = True # Now calculate the state to return in the sync response for the room. # This is more or less the change in state between the end of the previous @@ -980,7 +987,7 @@ async def compute_state_delta( await self._state_storage_controller.get_state_ids_for_event( batch.events[-1].event_id, state_filter=state_filter, - await_full_state=not lazy_load_members, + await_full_state=await_full_state, ) ) @@ -988,7 +995,7 @@ async def compute_state_delta( await self._state_storage_controller.get_state_ids_for_event( batch.events[0].event_id, state_filter=state_filter, - await_full_state=not lazy_load_members, + await_full_state=await_full_state, ) ) @@ -997,7 +1004,7 @@ async def compute_state_delta( room_id, stream_position=now_token, state_filter=state_filter, - await_full_state=not lazy_load_members, + await_full_state=await_full_state, ) state_at_timeline_start = state_at_timeline_end @@ -1015,7 +1022,7 @@ async def compute_state_delta( await self._state_storage_controller.get_state_ids_for_event( batch.events[0].event_id, state_filter=state_filter, - await_full_state=not lazy_load_members, + await_full_state=await_full_state, ) ) else: @@ -1025,7 +1032,7 @@ async def compute_state_delta( room_id, stream_position=now_token, state_filter=state_filter, - await_full_state=not lazy_load_members, + await_full_state=await_full_state, ) # for now, we disable LL for gappy syncs - see @@ -1050,7 +1057,7 @@ async def compute_state_delta( room_id, stream_position=since_token, state_filter=state_filter, - await_full_state=not lazy_load_members, + await_full_state=await_full_state, ) if batch: @@ -1058,7 +1065,7 @@ async def compute_state_delta( await self._state_storage_controller.get_state_ids_for_event( batch.events[-1].event_id, state_filter=state_filter, - await_full_state=not lazy_load_members, + await_full_state=await_full_state, ) ) else: @@ -1068,7 +1075,7 @@ async def compute_state_delta( room_id, stream_position=now_token, state_filter=state_filter, - await_full_state=not lazy_load_members, + await_full_state=await_full_state, ) state_ids = _calculate_state( From 8f8c5bdb40fcbf2585e2c57df009108f3049ed4f Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 12 Aug 2022 13:28:14 +0100 Subject: [PATCH 10/12] fixup: Don't include own membership in `members_to_fetch` --- synapse/handlers/sync.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c6734ff70eea..c8094fc82f6c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -919,8 +919,10 @@ async def compute_state_delta( # The memberships needed for events in the timeline. # A dictionary with user IDs as keys and the first event in the timeline # requiring each member as values. + # The syncing user's own membership is always implicitly required for + # `full_state` syncs and may not be present in the dictionary. # Only calculated when `lazy_load_members` is on. - members_to_fetch: Optional[Dict[str, Optional[EventBase]]] = None + members_to_fetch: Optional[Dict[str, EventBase]] = None # The contribution to the room state from state events in the timeline. # Only contains the last event for any given state key. @@ -956,9 +958,15 @@ async def compute_state_delta( # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209 # We only need apply this on full state syncs given we disabled # LL for incr syncs in #3840. - members_to_fetch[sync_config.user.to_string()] = None - - state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch) + state_filter = StateFilter.from_lazy_load_member_list( + itertools.chain( + members_to_fetch.keys(), [sync_config.user.to_string()] + ) + ) + else: + state_filter = StateFilter.from_lazy_load_member_list( + members_to_fetch + ) # We are happy to use partial state to compute the `/sync` response. # Since partial state may not include the lazy-loaded memberships we @@ -1125,10 +1133,7 @@ async def compute_state_delta( # memberships are missing. auth_event_ids: Set[str] = set() for member, first_referencing_event in members_to_fetch.items(): - if ( - first_referencing_event is None - or (EventTypes.Member, member) in state_ids - ): + if (EventTypes.Member, member) in state_ids: continue missing_members[member] = first_referencing_event @@ -1140,10 +1145,7 @@ async def compute_state_delta( # picking out the memberships from the pile of auth events we have # just fetched. for member, first_referencing_event in members_to_fetch.items(): - if ( - first_referencing_event is None - or (EventTypes.Member, member) in state_ids - ): + if (EventTypes.Member, member) in state_ids: continue # Dig through the auth events to find the sender's membership. From 7fe265464d6a503567b801609fcd345d4d20cd60 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Fri, 12 Aug 2022 14:31:06 +0100 Subject: [PATCH 11/12] fixup: factor out logic to find missing memberships in auth events --- synapse/handlers/sync.py | 147 ++++++++++++++++++++++++--------------- 1 file changed, 89 insertions(+), 58 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index c8094fc82f6c..120d150527a8 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -19,6 +19,7 @@ Dict, FrozenSet, List, + Mapping, Optional, Sequence, Set, @@ -1119,66 +1120,15 @@ async def compute_state_delta( # If we only have partial state for the room, `state_ids` may be missing the # memberships we wanted. We attempt to find some by digging through the auth # events of timeline events. - if lazy_load_members: + if lazy_load_members and await self.store.is_partial_state_room(room_id): assert members_to_fetch is not None - is_partial_state = await self.store.is_partial_state_room(room_id) - if is_partial_state: - additional_state_ids: MutableStateMap[str] = {} - - # Tracks the missing members for logging purposes. - missing_members = {} - - # Pick out the auth events of timeline events whose sender - # memberships are missing. - auth_event_ids: Set[str] = set() - for member, first_referencing_event in members_to_fetch.items(): - if (EventTypes.Member, member) in state_ids: - continue - - missing_members[member] = first_referencing_event - auth_event_ids.update(first_referencing_event.auth_event_ids()) - - auth_events = await self.store.get_events(auth_event_ids) - - # Run through the events with missing sender memberships once more, - # picking out the memberships from the pile of auth events we have - # just fetched. - for member, first_referencing_event in members_to_fetch.items(): - if (EventTypes.Member, member) in state_ids: - continue - - # Dig through the auth events to find the sender's membership. - for auth_event_id in first_referencing_event.auth_event_ids(): - # We only store events once we have all their auth events, - # so the auth event must be in the pile we have just - # fetched. - auth_event = auth_events[auth_event_id] - - if ( - auth_event.type == EventTypes.Member - and auth_event.state_key == event.sender - ): - missing_members.pop(member) - additional_state_ids[ - (EventTypes.Member, event.sender) - ] = auth_event.event_id - break - - # Now merge in the state we have scrounged up. - state_ids = {**state_ids, **additional_state_ids} - - if missing_members: - # There really shouldn't be any missing memberships now. - # For an event to appear in the timeline, we must have its auth - # events, which must include its sender's membership. - logger.error( - "Failed to find memberships for %s in partial state room " - "%s in the auth events of %s.", - list(missing_members.keys()), - room_id, - list(missing_members.values()), - ) + additional_state_ids = ( + await self._find_missing_partial_state_memberships( + room_id, members_to_fetch, state_ids + ) + ) + state_ids = {**state_ids, **additional_state_ids} # At this point, if `lazy_load_members` is enabled, `state_ids` includes # the memberships of all event senders in the timeline. This is because we @@ -1228,6 +1178,87 @@ async def compute_state_delta( if e.type != EventTypes.Aliases # until MSC2261 or alternative solution } + async def _find_missing_partial_state_memberships( + self, + room_id: str, + members_to_fetch: Mapping[str, EventBase], + found_state_ids: StateMap[str], + ) -> StateMap[str]: + """Finds missing memberships from a set of auth events and returns them as a + state map. + + Args: + room_id: The partial state room to find the remaining memberships for. + members_to_fetch: The memberships to find. A dictionary whose keys are + user IDs and whose values are events whose auth events are known to + contain the desired memberships. + found_state_ids: A dict from (type, state_key) -> state_event_id, containing + memberships that have been previously found. Entries in + `members_to_fetch` that have a membership in `found_state_ids` are + ignored. + + Returns: + A dict from ("m.room.member", state_key) -> state_event_id, containing the + memberships missing from `found_state_ids`. + """ + additional_state_ids: MutableStateMap[str] = {} + + # Tracks the missing members for logging purposes. + missing_members = {} + + # Identify memberships missing from `found_state_ids` and pick out the auth + # events in which to look for them. + auth_event_ids: Set[str] = set() + for member, event_with_membership_auth in members_to_fetch.items(): + if (EventTypes.Member, member) in found_state_ids: + continue + + missing_members[member] = event_with_membership_auth + auth_event_ids.update(event_with_membership_auth.auth_event_ids()) + + auth_events = await self.store.get_events(auth_event_ids) + + # Run through the missing memberships once more, picking out the memberships + # from the pile of auth events we have just fetched. + for member, event_with_membership_auth in members_to_fetch.items(): + if (EventTypes.Member, member) in found_state_ids: + continue + + # Dig through the auth events to find the desired membership. + for auth_event_id in event_with_membership_auth.auth_event_ids(): + # We only store events once we have all their auth events, + # so the auth event must be in the pile we have just + # fetched. + auth_event = auth_events[auth_event_id] + + if ( + auth_event.type == EventTypes.Member + and auth_event.state_key == member + ): + missing_members.pop(member) + additional_state_ids[ + (EventTypes.Member, member) + ] = auth_event.event_id + break + + if missing_members: + # There really shouldn't be any missing memberships now. Either: + # * we couldn't find an auth event, which shouldn't happen because we do + # not persist events with persisting their auth events first, or + # * the set of auth events did not contain a membership we wanted, which + # means our caller didn't compute the events in `members_to_fetch` + # correctly, or we somehow accepted an event whose auth events were + # dodgy. + logger.error( + "Failed to find memberships for %s in partial state room " + "%s in the auth events of %s.", + list(missing_members.keys()), + room_id, + list(missing_members.values()), + ) + + return additional_state_ids + async def unread_notifs_for_room_id( self, room_id: str, sync_config: SyncConfig ) -> NotifCounts: From 1a33404903effa48e61a1ebf42d7aa065f5a0e01 Mon Sep 17 00:00:00 2001 From: Sean Quah Date: Tue, 16 Aug 2022 18:21:48 +0100 Subject: [PATCH 12/12] fixup: Split members_to_fetch into a set and a dictionary --- synapse/handlers/sync.py | 74 +++++++++++++++++++++++----------------- 1 file changed, 42 insertions(+), 32 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 120d150527a8..b4d3f3958ca4 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -16,6 +16,7 @@ from typing import ( TYPE_CHECKING, Any, + Collection, Dict, FrozenSet, List, @@ -918,12 +919,12 @@ async def compute_state_delta( with Measure(self.clock, "compute_state_delta"): # The memberships needed for events in the timeline. - # A dictionary with user IDs as keys and the first event in the timeline - # requiring each member as values. - # The syncing user's own membership is always implicitly required for - # `full_state` syncs and may not be present in the dictionary. # Only calculated when `lazy_load_members` is on. - members_to_fetch: Optional[Dict[str, EventBase]] = None + members_to_fetch: Optional[Set[str]] = None + + # A dictionary mapping user IDs to the first event in the timeline sent by + # them. Only calculated when `lazy_load_members` is on. + first_event_by_sender_map: Optional[Dict[str, EventBase]] = None # The contribution to the room state from state events in the timeline. # Only contains the last event for any given state key. @@ -940,15 +941,17 @@ async def compute_state_delta( timeline_state = {} - members_to_fetch = {} + members_to_fetch = set() + first_event_by_sender_map = {} for event in batch.events: + # Build the map from user IDs to the first timeline event they sent. + if event.sender not in first_event_by_sender_map: + first_event_by_sender_map[event.sender] = event + # We need the event's sender, unless their membership was in a # previous timeline event. - if ( - EventTypes.Member, - event.sender, - ) not in timeline_state and event.sender not in members_to_fetch: - members_to_fetch[event.sender] = event + if (EventTypes.Member, event.sender) not in timeline_state: + members_to_fetch.add(event.sender) # FIXME: we also care about invite targets etc. if event.is_state(): @@ -959,15 +962,9 @@ async def compute_state_delta( # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209 # We only need apply this on full state syncs given we disabled # LL for incr syncs in #3840. - state_filter = StateFilter.from_lazy_load_member_list( - itertools.chain( - members_to_fetch.keys(), [sync_config.user.to_string()] - ) - ) - else: - state_filter = StateFilter.from_lazy_load_member_list( - members_to_fetch - ) + members_to_fetch.add(sync_config.user.to_string()) + + state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch) # We are happy to use partial state to compute the `/sync` response. # Since partial state may not include the lazy-loaded memberships we @@ -1122,10 +1119,11 @@ async def compute_state_delta( # events of timeline events. if lazy_load_members and await self.store.is_partial_state_room(room_id): assert members_to_fetch is not None + assert first_event_by_sender_map is not None additional_state_ids = ( await self._find_missing_partial_state_memberships( - room_id, members_to_fetch, state_ids + room_id, members_to_fetch, first_event_by_sender_map, state_ids ) ) state_ids = {**state_ids, **additional_state_ids} @@ -1181,7 +1179,8 @@ async def compute_state_delta( async def _find_missing_partial_state_memberships( self, room_id: str, - members_to_fetch: Mapping[str, EventBase], + members_to_fetch: Collection[str], + events_with_membership_auth: Mapping[str, EventBase], found_state_ids: StateMap[str], ) -> StateMap[str]: """Finds missing memberships from a set of auth events and returns them as a @@ -1189,9 +1188,9 @@ async def _find_missing_partial_state_memberships( Args: room_id: The partial state room to find the remaining memberships for. - members_to_fetch: The memberships to find. A dictionary whose keys are - user IDs and whose values are events whose auth events are known to - contain the desired memberships. + members_to_fetch: The memberships to find. + events_with_membership_auth: A mapping from user IDs to events whose auth + events are known to contain their membership. found_state_ids: A dict from (type, state_key) -> state_event_id, containing memberships that have been previously found. Entries in `members_to_fetch` that have a membership in `found_state_ids` are @@ -1200,30 +1199,38 @@ async def _find_missing_partial_state_memberships( Returns: A dict from ("m.room.member", state_key) -> state_event_id, containing the memberships missing from `found_state_ids`. + + Raises: + KeyError: if `events_with_membership_auth` does not have an entry for a + missing membership. Memberships in `found_state_ids` do not need an + entry in `events_with_membership_auth`. """ additional_state_ids: MutableStateMap[str] = {} # Tracks the missing members for logging purposes. - missing_members = {} + missing_members = set() # Identify memberships missing from `found_state_ids` and pick out the auth # events in which to look for them. auth_event_ids: Set[str] = set() - for member, event_with_membership_auth in members_to_fetch.items(): + for member in members_to_fetch: if (EventTypes.Member, member) in found_state_ids: continue - missing_members[member] = event_with_membership_auth + missing_members.add(member) + event_with_membership_auth = events_with_membership_auth[member] auth_event_ids.update(event_with_membership_auth.auth_event_ids()) auth_events = await self.store.get_events(auth_event_ids) # Run through the missing memberships once more, picking out the memberships # from the pile of auth events we have just fetched. - for member, event_with_membership_auth in members_to_fetch.items(): + for member in members_to_fetch: if (EventTypes.Member, member) in found_state_ids: continue + event_with_membership_auth = events_with_membership_auth[member] + # Dig through the auth events to find the desired membership. for auth_event_id in event_with_membership_auth.auth_event_ids(): # We only store events once we have all their auth events, @@ -1235,7 +1242,7 @@ async def _find_missing_partial_state_memberships( auth_event.type == EventTypes.Member and auth_event.state_key == member ): - missing_members.pop(member) + missing_members.remove(member) additional_state_ids[ (EventTypes.Member, member) ] = auth_event.event_id @@ -1252,9 +1259,12 @@ async def _find_missing_partial_state_memberships( logger.error( "Failed to find memberships for %s in partial state room " "%s in the auth events of %s.", - list(missing_members.keys()), + missing_members, room_id, - list(missing_members.values()), + [ + events_with_membership_auth[member].event_id + for member in missing_members + ], ) return additional_state_ids