Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Fix logic for dropping old events in fed queue #11806

Merged
merged 6 commits into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/11806.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug introduced in v1.40.0 that caused Synapse to fail to process incoming federation traffic after handling a large amount of events in a v1 room.
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 4 additions & 1 deletion synapse/storage/databases/main/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -1432,7 +1432,10 @@ async def prune_staged_events_in_room(

if room_version.event_format == EventFormatVersions.V1:
for prev_event_tuple in prev_events:
if not isinstance(prev_event_tuple, list) or len(prev_events) != 2:
if (
not isinstance(prev_event_tuple, list)
or len(prev_event_tuple) != 2
):
logger.info("Invalid prev_events for %s", event_id)
break

Expand Down
29 changes: 23 additions & 6 deletions tests/storage/test_event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Tuple, Union

import attr
from parameterized import parameterized

from synapse.api.room_versions import RoomVersions
from synapse.api.room_versions import RoomVersion, RoomVersions
from synapse.events import _EventInternalMetadata
from synapse.util import json_encoder

Expand Down Expand Up @@ -506,11 +508,24 @@ def insert_event(txn):
)
self.assertSetEqual(difference, set())

def test_prune_inbound_federation_queue(self):
"Test that pruning of inbound federation queues work"
@parameterized.expand(
anoadragon453 marked this conversation as resolved.
Show resolved Hide resolved
[
(RoomVersions.V1,),
(RoomVersions.V9,),
]
)
def test_prune_inbound_federation_queue(self, room_version: RoomVersion):
babolivier marked this conversation as resolved.
Show resolved Hide resolved
"""Test that pruning of inbound federation queues work"""

room_id = "some_room_id"

def prev_event_format(prev_event_id: str) -> Union[Tuple[str, dict], str]:
"""Account for differences in prev_events format across room versions"""
if room_version == RoomVersions.V1:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we change the test parameters to include all known room versions, this condition needs to also include v2 rooms according to the spec

return prev_event_id, {}

return prev_event_id

# Insert a bunch of events that all reference the previous one.
self.get_success(
self.store.db_pool.simple_insert_many(
Expand All @@ -529,7 +544,9 @@ def test_prune_inbound_federation_queue(self):
room_id,
0,
f"$fake_event_id_{i + 1}",
json_encoder.encode({"prev_events": [f"$fake_event_id_{i}"]}),
json_encoder.encode(
{"prev_events": [prev_event_format(f"$fake_event_id_{i}")]}
),
"{}",
)
for i in range(500)
Expand All @@ -541,12 +558,12 @@ def test_prune_inbound_federation_queue(self):
# Calling prune once should return True, i.e. a prune happen. The second
# time it shouldn't.
pruned = self.get_success(
self.store.prune_staged_events_in_room(room_id, RoomVersions.V6)
self.store.prune_staged_events_in_room(room_id, room_version)
)
self.assertTrue(pruned)

pruned = self.get_success(
self.store.prune_staged_events_in_room(room_id, RoomVersions.V6)
self.store.prune_staged_events_in_room(room_id, room_version)
)
self.assertFalse(pruned)

Expand Down