Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Speed up deleting device messages #16643

Merged
merged 7 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16643.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Speed up deleting of device messages when deleting a device.
8 changes: 5 additions & 3 deletions synapse/handlers/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,15 +396,17 @@ async def _delete_device_messages(
up_to_stream_id = task.params["up_to_stream_id"]

# Delete the messages in batches to avoid too much DB load.
from_stream_id = None
while True:
res = await self.store.delete_messages_for_device(
from_stream_id = await self.store.delete_messages_for_device_between(
user_id=user_id,
device_id=device_id,
up_to_stream_id=up_to_stream_id,
from_stream_id=from_stream_id,
to_stream_id=up_to_stream_id,
limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
)

if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
if from_stream_id is None:
return TaskStatus.COMPLETE, None, None

await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0)
Expand Down
66 changes: 66 additions & 0 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,72 @@ def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:

return count

@trace
async def delete_messages_for_device_between(
self,
user_id: str,
device_id: Optional[str],
from_stream_id: Optional[int],
to_stream_id: int,
limit: int,
) -> Optional[int]:
"""Delete N device messages between the stream IDs, returning the
highest stream ID deleted.
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

This is more efficient than `delete_messages_for_device` when calling in
a loop to batch delete messages.
"""

# Keeping track of a lower bound of stream ID where we've deleted
# everything below makes the queries much faster. Otherwise, every time
# we scan for rows to delete we'd re-scan across all the rows that have
# previously deleted (until the next table VACUUM).
clokep marked this conversation as resolved.
Show resolved Hide resolved

if from_stream_id is None:
# Minimum device stream ID is 1.
from_stream_id = 0

def delete_messages_for_device_between_txn(
txn: LoggingTransaction,
) -> Optional[int]:
txn.execute(
"""
SELECT MAX(stream_id) FROM (
SELECT stream_id FROM device_inbox
WHERE user_id = ? AND device_id = ?
AND ? < stream_id AND stream_id <= ?
ORDER BY stream_id
LIMIT ?
) AS d
""",
(user_id, device_id, from_stream_id, to_stream_id, limit),
)
row = txn.fetchone()
if row is None or row[0] is None:
return None
clokep marked this conversation as resolved.
Show resolved Hide resolved

(max_stream_id,) = row

txn.execute(
"""
DELETE FROM device_inbox
WHERE user_id = ? AND device_id = ?
AND ? < stream_id AND stream_id <= ?
""",
(user_id, device_id, from_stream_id, max_stream_id),
)

if txn.rowcount < limit:
return None
clokep marked this conversation as resolved.
Show resolved Hide resolved

return max_stream_id

return await self.db_pool.runInteraction(
"delete_messages_for_device_between",
delete_messages_for_device_between_txn,
db_autocommit=True, # We don't need to run in a transaction
)

@trace
async def get_new_device_msgs_for_remote(
self, destination: str, last_stream_id: int, current_stream_id: int, limit: int
Expand Down
2 changes: 1 addition & 1 deletion synapse/util/task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ async def update_task(
result: Optional[JsonMapping] = None,
error: Optional[str] = None,
) -> bool:
"""Update some task associated values. This is exposed publically so it can
"""Update some task associated values. This is exposed publicly so it can
be used inside task functions, mainly to update the result and be able to
resume a task at a specific step after a restart of synapse.

Expand Down
Loading