Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge pull request #5480 from matrix-org/erikj/extremities_dummy_events
Browse files Browse the repository at this point in the history
Add experimental option to reduce extremities.
  • Loading branch information
erikjohnston authored Jun 19, 2019
2 parents ceb2fa6 + 5546092 commit e0be8d7
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 1 deletion.
1 change: 1 addition & 0 deletions changelog.d/5480.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add an EXPERIMENTAL config option to try and periodically clean up extremities by sending dummy events.
6 changes: 6 additions & 0 deletions synapse/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,12 @@ def read_config(self, config):

_check_resource_config(self.listeners)

# An experimental option to try and periodically clean up extremities
# by sending dummy events.
self.cleanup_extremities_with_dummy_events = config.get(
"cleanup_extremities_with_dummy_events", False,
)

def has_tls_listener(self):
return any(l["tls"] for l in self.listeners)

Expand Down
12 changes: 12 additions & 0 deletions synapse/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,18 @@ def is_soft_failed(self):
"""
return getattr(self, "soft_failed", False)

def should_proactively_send(self):
"""Whether the event, if ours, should be sent to other clients and
servers.
This is used for sending dummy events internally. Servers and clients
can still explicitly fetch the event.
Returns:
bool
"""
return getattr(self, "proactively_send", True)


def _event_dict_property(key):
# We want to be able to use hasattr with the event dict properties.
Expand Down
3 changes: 3 additions & 0 deletions synapse/federation/sender/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ def handle_event(event):
if not is_mine and send_on_behalf_of is None:
return

if not event.internal_metadata.should_proactively_send():
return

try:
# Get the state from before the event.
# We need to make sure that this is the state from before
Expand Down
75 changes: 74 additions & 1 deletion synapse/handlers/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@
from synapse.api.room_versions import RoomVersions
from synapse.api.urls import ConsentURIBuilder
from synapse.events.validator import EventValidator
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, UserID
from synapse.types import RoomAlias, UserID, create_requester
from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background
Expand Down Expand Up @@ -261,6 +262,18 @@ def __init__(self, hs):
if self._block_events_without_consent_error:
self._consent_uri_builder = ConsentURIBuilder(self.config)

if (
not self.config.worker_app
and self.config.cleanup_extremities_with_dummy_events
):
self.clock.looping_call(
lambda: run_as_background_process(
"send_dummy_events_to_fill_extremities",
self._send_dummy_events_to_fill_extremities
),
5 * 60 * 1000,
)

@defer.inlineCallbacks
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
prev_events_and_hashes=None, require_consent=True):
Expand Down Expand Up @@ -874,3 +887,63 @@ def _bump_active_time(self, user):
yield presence.bump_presence_active_time(user)
except Exception:
logger.exception("Error bumping presence active time")

@defer.inlineCallbacks
def _send_dummy_events_to_fill_extremities(self):
"""Background task to send dummy events into rooms that have a large
number of extremities
"""

room_ids = yield self.store.get_rooms_with_many_extremities(
min_count=10, limit=5,
)

for room_id in room_ids:
# For each room we need to find a joined member we can use to send
# the dummy event with.

prev_events_and_hashes = yield self.store.get_prev_events_for_room(
room_id,
)

latest_event_ids = (
event_id for (event_id, _, _) in prev_events_and_hashes
)

members = yield self.state.get_current_users_in_room(
room_id, latest_event_ids=latest_event_ids,
)

user_id = None
for member in members:
if self.hs.is_mine_id(member):
user_id = member
break

if not user_id:
# We don't have a joined user.
# TODO: We should do something here to stop the room from
# appearing next time.
continue

requester = create_requester(user_id)

event, context = yield self.create_event(
requester,
{
"type": "org.matrix.dummy_event",
"content": {},
"room_id": room_id,
"sender": user_id,
},
prev_events_and_hashes=prev_events_and_hashes,
)

event.internal_metadata.proactively_send = False

yield self.send_nonmember_event(
requester,
event,
context,
ratelimit=False,
)
29 changes: 29 additions & 0 deletions synapse/storage/event_federation.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,35 @@ def get_latest_event_ids_and_hashes_in_room(self, room_id):
room_id,
)

def get_rooms_with_many_extremities(self, min_count, limit):
"""Get the top rooms with at least N extremities.
Args:
min_count (int): The minimum number of extremities
limit (int): The maximum number of rooms to return.
Returns:
Deferred[list]: At most `limit` room IDs that have at least
`min_count` extremities, sorted by extremity count.
"""

def _get_rooms_with_many_extremities_txn(txn):
sql = """
SELECT room_id FROM event_forward_extremities
GROUP BY room_id
HAVING count(*) > ?
ORDER BY count(*) DESC
LIMIT ?
"""

txn.execute(sql, (min_count, limit))
return [room_id for room_id, in txn]

return self.runInteraction(
"get_rooms_with_many_extremities",
_get_rooms_with_many_extremities_txn,
)

@cached(max_entries=5000, iterable=True)
def get_latest_event_ids_in_room(self, room_id):
return self._simple_select_onecol(
Expand Down
41 changes: 41 additions & 0 deletions tests/storage/test_cleanup_extrems.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,44 @@ def test_forked_graph_cleanup(self):
self.store.get_latest_event_ids_in_room(self.room_id)
)
self.assertEqual(set(latest_event_ids), set([event_id_b, event_id_c]))


class CleanupExtremDummyEventsTestCase(HomeserverTestCase):
def make_homeserver(self, reactor, clock):
config = self.default_config()
config["cleanup_extremities_with_dummy_events"] = True
return self.setup_test_homeserver(config=config)

def prepare(self, reactor, clock, homeserver):
self.store = homeserver.get_datastore()
self.room_creator = homeserver.get_room_creation_handler()

# Create a test user and room
self.user = UserID("alice", "test")
self.requester = Requester(self.user, None, False, None, None)
info = self.get_success(self.room_creator.create_room(self.requester, {}))
self.room_id = info["room_id"]

def test_send_dummy_event(self):
# Create a bushy graph with 50 extremities.

event_id_start = self.create_and_send_event(self.room_id, self.user)

for _ in range(50):
self.create_and_send_event(
self.room_id, self.user, prev_event_ids=[event_id_start]
)

latest_event_ids = self.get_success(
self.store.get_latest_event_ids_in_room(self.room_id)
)
self.assertEqual(len(latest_event_ids), 50)

# Pump the reactor repeatedly so that the background updates have a
# chance to run.
self.pump(10 * 60)

latest_event_ids = self.get_success(
self.store.get_latest_event_ids_in_room(self.room_id)
)
self.assertTrue(len(latest_event_ids) < 10, len(latest_event_ids))

0 comments on commit e0be8d7

Please sign in to comment.