diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 5cbe89f4fddf..d604a1bc7afb 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -37,7 +37,6 @@ AuthError, Codes, ConsentNotGivenError, - LimitExceededError, NotFoundError, ShadowBanError, SynapseError, @@ -1417,34 +1416,23 @@ async def handle_new_client_event( # We now persist the event (and update the cache in parallel, since we # don't want to block on it). event, context = events_and_context[0] - try: - result, _ = await make_deferred_yieldable( - gather_results( - ( - run_in_background( - self._persist_events, - requester=requester, - events_and_context=events_and_context, - ratelimit=ratelimit, - extra_users=extra_users, - ), - run_in_background( - self.cache_joined_hosts_for_events, events_and_context - ).addErrback( - log_failure, "cache_joined_hosts_for_event failed" - ), + result, _ = await make_deferred_yieldable( + gather_results( + ( + run_in_background( + self._persist_events, + requester=requester, + events_and_context=events_and_context, + ratelimit=ratelimit, + extra_users=extra_users, ), - consumeErrors=True, - ) - ).addErrback(unwrapFirstError) - except PartialStateConflictError as e: - # The event context needs to be recomputed. - # Turn the error into a 429, as a hint to the client to try again. - logger.info( - "Room %s was un-partial stated while persisting client event.", - event.room_id, + run_in_background( + self.cache_joined_hosts_for_events, events_and_context + ).addErrback(log_failure, "cache_joined_hosts_for_event failed"), + ), + consumeErrors=True, ) - raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0) + ).addErrback(unwrapFirstError) return result diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index a5174cf1087a..918ff1287533 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -34,6 +34,7 @@ from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN from synapse.logging import opentracing from synapse.module_api import NOT_SPAM +from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.state import StateFilter from synapse.types import ( JsonDict, @@ -392,60 +393,77 @@ async def _local_membership_update( event_pos = await self.store.get_position_for_event(existing_event_id) return existing_event_id, event_pos.stream - event, context = await self.event_creation_handler.create_event( - requester, - { - "type": EventTypes.Member, - "content": content, - "room_id": room_id, - "sender": requester.user.to_string(), - "state_key": user_id, - # For backwards compatibility: - "membership": membership, - "origin_server_ts": origin_server_ts, - }, - txn_id=txn_id, - allow_no_prev_events=allow_no_prev_events, - prev_event_ids=prev_event_ids, - state_event_ids=state_event_ids, - depth=depth, - require_consent=require_consent, - outlier=outlier, - historical=historical, - ) - - prev_state_ids = await context.get_prev_state_ids( - StateFilter.from_types([(EventTypes.Member, None)]) - ) + event_handled = False + while not event_handled: + try: + event, context = await self.event_creation_handler.create_event( + requester, + { + "type": EventTypes.Member, + "content": content, + "room_id": room_id, + "sender": requester.user.to_string(), + "state_key": user_id, + # For backwards compatibility: + "membership": membership, + "origin_server_ts": origin_server_ts, + }, + txn_id=txn_id, + allow_no_prev_events=allow_no_prev_events, + prev_event_ids=prev_event_ids, + state_event_ids=state_event_ids, + depth=depth, + require_consent=require_consent, + outlier=outlier, + historical=historical, + ) - prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None) + prev_state_ids = await context.get_prev_state_ids( + StateFilter.from_types([(EventTypes.Member, None)]) + ) - if event.membership == Membership.JOIN: - newly_joined = True - if prev_member_event_id: - prev_member_event = await self.store.get_event(prev_member_event_id) - newly_joined = prev_member_event.membership != Membership.JOIN - - # Only rate-limit if the user actually joined the room, otherwise we'll end - # up blocking profile updates. - if newly_joined and ratelimit: - await self._join_rate_limiter_local.ratelimit(requester) - await self._join_rate_per_room_limiter.ratelimit( - requester, key=room_id, update=False + prev_member_event_id = prev_state_ids.get( + (EventTypes.Member, user_id), None ) - with opentracing.start_active_span("handle_new_client_event"): - result_event = await self.event_creation_handler.handle_new_client_event( - requester, - events_and_context=[(event, context)], - extra_users=[target], - ratelimit=ratelimit, - ) - if event.membership == Membership.LEAVE: - if prev_member_event_id: - prev_member_event = await self.store.get_event(prev_member_event_id) - if prev_member_event.membership == Membership.JOIN: - await self._user_left_room(target, room_id) + if event.membership == Membership.JOIN: + newly_joined = True + if prev_member_event_id: + prev_member_event = await self.store.get_event( + prev_member_event_id + ) + newly_joined = prev_member_event.membership != Membership.JOIN + + # Only rate-limit if the user actually joined the room, otherwise we'll end + # up blocking profile updates. + if newly_joined and ratelimit: + await self._join_rate_limiter_local.ratelimit(requester) + await self._join_rate_per_room_limiter.ratelimit( + requester, key=room_id, update=False + ) + with opentracing.start_active_span("handle_new_client_event"): + result_event = ( + await self.event_creation_handler.handle_new_client_event( + requester, + events_and_context=[(event, context)], + extra_users=[target], + ratelimit=ratelimit, + ) + ) + + if event.membership == Membership.LEAVE: + if prev_member_event_id: + prev_member_event = await self.store.get_event( + prev_member_event_id + ) + if prev_member_event.membership == Membership.JOIN: + await self._user_left_room(target, room_id) + + event_handled = True + except PartialStateConflictError: + # persisting couldn't happen because the room got un-partial in the meantime + # and context needs to be recomputed, so let's do so + pass # we know it was persisted, so should have a stream ordering assert result_event.internal_metadata.stream_ordering