Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Add support for stable prefixes for MSC2285: private read receipts #13273

Merged
Merged
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
83b7af1
Switch to stable prefixes for MSC2285
SimonBrandner Jul 14, 2022
01fbb6d
Add changelog
SimonBrandner Jul 14, 2022
cafe4be
Avoid leaking unstable private read receipts
SimonBrandner Jul 14, 2022
967f8ed
Delint
SimonBrandner Jul 14, 2022
a9f2329
Fix code
SimonBrandner Jul 14, 2022
08d874d
Merge remote-tracking branch 'upstream/develop' into SimonBrandner/fe…
SimonBrandner Jul 23, 2022
ba20652
Use a const
SimonBrandner Jul 23, 2022
d18a485
Convert list to tuple.
clokep Jul 25, 2022
8323358
Merge remote-tracking branch 'upstream/develop' into SimonBrandner/fe…
SimonBrandner Jul 28, 2022
404459f
Add background update to remove unstable private read receipts
SimonBrandner Jul 29, 2022
57251c8
Remove handling for unstable private read receipts
SimonBrandner Jul 29, 2022
e12706c
Don't bump `SCHEMA_VERSION`
SimonBrandner Jul 29, 2022
c8cf10e
Remove `READ_PRIVATE_UNSTABLE` from `ReceiptTypes`
SimonBrandner Jul 29, 2022
84299f6
Assume return value
SimonBrandner Jul 29, 2022
208e97a
Remove non-relevant test
SimonBrandner Jul 29, 2022
a7c3a1c
Don't bother with a migration script
SimonBrandner Jul 29, 2022
252146d
Remove non-schema thing
SimonBrandner Jul 29, 2022
3cbd80b
Only use single `'`
SimonBrandner Jul 29, 2022
8854e35
Use `!=`
SimonBrandner Jul 29, 2022
baa9e84
Merge pull request #1 from SimonBrandner/SimonBrandner/feat/db-rr
SimonBrandner Jul 29, 2022
4455232
Changelog
SimonBrandner Aug 4, 2022
de9005f
Merge remote-tracking branch 'upstream/develop' into SimonBrandner/fe…
SimonBrandner Aug 4, 2022
c0b3c90
Support both stable and unstable private RRs
SimonBrandner Aug 4, 2022
3612ceb
Test both stable and unstable private RRs
SimonBrandner Aug 4, 2022
a37a17d
Advertise unstable prefix
SimonBrandner Aug 4, 2022
14b36a3
Check for all receipt types
SimonBrandner Aug 4, 2022
746351c
Delint
SimonBrandner Aug 4, 2022
50bffe4
Further dlint
SimonBrandner Aug 4, 2022
1b66814
Remove `(`
SimonBrandner Aug 4, 2022
bcdbb5d
This won't work...
SimonBrandner Aug 4, 2022
0f498e0
Reformat...
SimonBrandner Aug 4, 2022
660c776
Get tests passing
SimonBrandner Aug 4, 2022
b1ddf1a
Merge remote-tracking branch 'origin/develop' into SimonBrandner/feat…
clokep Aug 4, 2022
3094b39
Fix type hints.
clokep Aug 4, 2022
011d6d2
Put experimental flag back in place
SimonBrandner Aug 5, 2022
ee6512f
Only advertise `org.matrix.msc2285` if enabled
SimonBrandner Aug 5, 2022
2cd29bd
Fix tests
SimonBrandner Aug 5, 2022
89d5bd4
Merge remote-tracking branch 'origin/SimonBrandner/feat/disable-rr' i…
SimonBrandner Aug 5, 2022
f0c531d
Fix tests
SimonBrandner Aug 5, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Check for all receipt types
Signed-off-by: Šimon Brandner <[email protected]>
  • Loading branch information
SimonBrandner committed Aug 4, 2022
commit 14b36a372ac69cd891e78155c23f45268867f288
149 changes: 90 additions & 59 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from synapse.api.constants import ReceiptTypes
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
Expand Down Expand Up @@ -380,35 +380,49 @@ async def get_unread_push_actions_for_user_in_range_for_http(
The list will be ordered by ascending stream_ordering.
The list will have between 0~limit entries.
"""

receipt_types_clause, args = make_in_list_sql_clause(
self.database_engine,
"receipt_type",
(
ReceiptTypes.READ,
ReceiptTypes.READ_PRIVATE,
ReceiptTypes.UNSTABLE_READ_PRIVATE,
),
)

# find rooms that have a read receipt in them and return the next
# push actions
def get_after_receipt(
txn: LoggingTransaction,
) -> List[Tuple[str, str, int, str, bool]]:
# find rooms that have a read receipt in them and return the next
# push actions
sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
" ep.highlight "
" FROM ("
" SELECT room_id,"
" MAX(stream_ordering) as stream_ordering"
" FROM events"
" INNER JOIN receipts_linearized USING (room_id, event_id)"
" WHERE receipt_type = 'm.read' AND user_id = ?"
" GROUP BY room_id"
") AS rl,"
" event_push_actions AS ep"
" WHERE"
" ep.room_id = rl.room_id"
" AND ep.stream_ordering > rl.stream_ordering"
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
" AND ep.notif = 1"
" ORDER BY ep.stream_ordering ASC LIMIT ?"

sql = f"""(
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
ep.highlight
FROM (
SELECT room_id,
MAX(stream_ordering) as stream_ordering
FROM events
INNER JOIN receipts_linearized USING (room_id, event_id)
WHERE {receipt_types_clause} AND user_id = ?
GROUP BY room_id
) AS rl,
event_push_actions AS ep
WHERE
ep.room_id = rl.room_id
AND ep.stream_ordering > rl.stream_ordering
AND ep.user_id = ?
AND ep.stream_ordering > ?
AND ep.stream_ordering <= ?
AND ep.notif = 1
ORDER BY ep.stream_ordering ASC LIMIT ?
"""
args.extend(
user_id, user_id, min_stream_ordering, max_stream_ordering, limit
)
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
txn.execute(sql, args)
return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall())

Expand All @@ -422,24 +436,26 @@ def get_after_receipt(
def get_no_receipt(
txn: LoggingTransaction,
) -> List[Tuple[str, str, int, str, bool]]:
sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
" ep.highlight "
" FROM event_push_actions AS ep"
" INNER JOIN events AS e USING (room_id, event_id)"
" WHERE"
" ep.room_id NOT IN ("
" SELECT room_id FROM receipts_linearized"
" WHERE receipt_type = 'm.read' AND user_id = ?"
" GROUP BY room_id"
" )"
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
" AND ep.notif = 1"
" ORDER BY ep.stream_ordering ASC LIMIT ?"
sql = f"""
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
ep.highlight
FROM event_push_actions AS ep
INNER JOIN events AS e USING (room_id, event_id)
WHERE
ep.room_id NOT IN (
SELECT room_id FROM receipts_linearized
WHERE {receipt_types_clause} AND user_id = ?
GROUP BY room_id
)
AND ep.user_id = ?
AND ep.stream_ordering > ?
AND ep.stream_ordering <= ?
AND ep.notif = 1
ORDER BY ep.stream_ordering ASC LIMIT ?
"""
args.extends(
(user_id, user_id, min_stream_ordering, max_stream_ordering, limit)
)
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
txn.execute(sql, args)
return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall())

Expand Down Expand Up @@ -489,20 +505,31 @@ async def get_unread_push_actions_for_user_in_range_for_email(
The list will be ordered by descending received_ts.
The list will have between 0~limit entries.
"""

receipt_types_clause, args = make_in_list_sql_clause(
self.database_engine,
"receipt_type",
(
ReceiptTypes.READ,
ReceiptTypes.READ_PRIVATE,
ReceiptTypes.UNSTABLE_READ_PRIVATE,
),
)

# find rooms that have a read receipt in them and return the most recent
# push actions
def get_after_receipt(
txn: LoggingTransaction,
) -> List[Tuple[str, str, int, str, bool, int]]:
sql = (
sql = f"""
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
" ep.highlight, e.received_ts"
" FROM ("
" SELECT room_id,"
" MAX(stream_ordering) as stream_ordering"
" FROM events"
" INNER JOIN receipts_linearized USING (room_id, event_id)"
" WHERE receipt_type = 'm.read' AND user_id = ?"
" WHERE {receipt_types_clause} AND user_id = ?"
" GROUP BY room_id"
") AS rl,"
" event_push_actions AS ep"
Expand All @@ -515,8 +542,10 @@ def get_after_receipt(
" AND ep.stream_ordering <= ?"
" AND ep.notif = 1"
" ORDER BY ep.stream_ordering DESC LIMIT ?"
"""
args.extends(
(user_id, user_id, min_stream_ordering, max_stream_ordering, limit)
)
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
txn.execute(sql, args)
return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall())

Expand All @@ -530,24 +559,26 @@ def get_after_receipt(
def get_no_receipt(
txn: LoggingTransaction,
) -> List[Tuple[str, str, int, str, bool, int]]:
sql = (
"SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
" ep.highlight, e.received_ts"
" FROM event_push_actions AS ep"
" INNER JOIN events AS e USING (room_id, event_id)"
" WHERE"
" ep.room_id NOT IN ("
" SELECT room_id FROM receipts_linearized"
" WHERE receipt_type = 'm.read' AND user_id = ?"
" GROUP BY room_id"
" )"
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
" AND ep.notif = 1"
" ORDER BY ep.stream_ordering DESC LIMIT ?"
sql = f"""
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
ep.highlight, e.received_ts
FROM event_push_actions AS ep
INNER JOIN events AS e USING (room_id, event_id)
WHERE
ep.room_id NOT IN (
SELECT room_id FROM receipts_linearized
WHERE {receipt_types_clause} AND user_id = ?
GROUP BY room_id
)
AND ep.user_id = ?
AND ep.stream_ordering > ?
AND ep.stream_ordering <= ?
AND ep.notif = 1
ORDER BY ep.stream_ordering DESC LIMIT ?
"""
args.extend(
user_id, user_id, min_stream_ordering, max_stream_ordering, limit
)
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
txn.execute(sql, args)
return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall())

Expand Down