Support stable identifiers for MSC2285: private read receipts. (#13273)

This adds support for the stable identifiers of MSC2285 while
continuing to support the unstable identifiers behind the configuration
flag. These will be removed in a future version.
This commit is contained in:
Šimon Brandner 2022-08-05 17:09:33 +02:00 committed by GitHub
parent e2ed1b7155
commit ab18441573
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 246 additions and 94 deletions

View file

@ -80,7 +80,7 @@ import attr
from synapse.api.constants import ReceiptTypes
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
@ -259,7 +259,11 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
txn,
user_id,
room_id,
receipt_types=(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
receipt_types=(
ReceiptTypes.READ,
ReceiptTypes.READ_PRIVATE,
ReceiptTypes.UNSTABLE_READ_PRIVATE,
),
)
stream_ordering = None
@ -448,6 +452,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
The list will be ordered by ascending stream_ordering.
The list will have between 0~limit entries.
"""
# find rooms that have a read receipt in them and return the next
# push actions
def get_after_receipt(
@ -455,7 +460,18 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
) -> List[Tuple[str, str, int, str, bool]]:
# find rooms that have a read receipt in them and return the next
# push actions
sql = """
receipt_types_clause, args = make_in_list_sql_clause(
self.database_engine,
"receipt_type",
(
ReceiptTypes.READ,
ReceiptTypes.READ_PRIVATE,
ReceiptTypes.UNSTABLE_READ_PRIVATE,
),
)
sql = f"""
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
ep.highlight
FROM (
@ -463,10 +479,10 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
MAX(stream_ordering) as stream_ordering
FROM events
INNER JOIN receipts_linearized USING (room_id, event_id)
WHERE receipt_type = 'm.read' AND user_id = ?
WHERE {receipt_types_clause} AND user_id = ?
GROUP BY room_id
) AS rl,
event_push_actions AS ep
event_push_actions AS ep
WHERE
ep.room_id = rl.room_id
AND ep.stream_ordering > rl.stream_ordering
@ -476,7 +492,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
AND ep.notif = 1
ORDER BY ep.stream_ordering ASC LIMIT ?
"""
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
args.extend(
(user_id, user_id, min_stream_ordering, max_stream_ordering, limit)
)
txn.execute(sql, args)
return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall())
@ -490,7 +508,17 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
def get_no_receipt(
txn: LoggingTransaction,
) -> List[Tuple[str, str, int, str, bool]]:
sql = """
receipt_types_clause, args = make_in_list_sql_clause(
self.database_engine,
"receipt_type",
(
ReceiptTypes.READ,
ReceiptTypes.READ_PRIVATE,
ReceiptTypes.UNSTABLE_READ_PRIVATE,
),
)
sql = f"""
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
ep.highlight
FROM event_push_actions AS ep
@ -498,7 +526,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
WHERE
ep.room_id NOT IN (
SELECT room_id FROM receipts_linearized
WHERE receipt_type = 'm.read' AND user_id = ?
WHERE {receipt_types_clause} AND user_id = ?
GROUP BY room_id
)
AND ep.user_id = ?
@ -507,7 +535,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
AND ep.notif = 1
ORDER BY ep.stream_ordering ASC LIMIT ?
"""
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
args.extend(
(user_id, user_id, min_stream_ordering, max_stream_ordering, limit)
)
txn.execute(sql, args)
return cast(List[Tuple[str, str, int, str, bool]], txn.fetchall())
@ -557,12 +587,23 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
The list will be ordered by descending received_ts.
The list will have between 0~limit entries.
"""
# find rooms that have a read receipt in them and return the most recent
# push actions
def get_after_receipt(
txn: LoggingTransaction,
) -> List[Tuple[str, str, int, str, bool, int]]:
sql = """
receipt_types_clause, args = make_in_list_sql_clause(
self.database_engine,
"receipt_type",
(
ReceiptTypes.READ,
ReceiptTypes.READ_PRIVATE,
ReceiptTypes.UNSTABLE_READ_PRIVATE,
),
)
sql = f"""
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
ep.highlight, e.received_ts
FROM (
@ -570,7 +611,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
MAX(stream_ordering) as stream_ordering
FROM events
INNER JOIN receipts_linearized USING (room_id, event_id)
WHERE receipt_type = 'm.read' AND user_id = ?
WHERE {receipt_types_clause} AND user_id = ?
GROUP BY room_id
) AS rl,
event_push_actions AS ep
@ -584,7 +625,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
AND ep.notif = 1
ORDER BY ep.stream_ordering DESC LIMIT ?
"""
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
args.extend(
(user_id, user_id, min_stream_ordering, max_stream_ordering, limit)
)
txn.execute(sql, args)
return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall())
@ -598,7 +641,17 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
def get_no_receipt(
txn: LoggingTransaction,
) -> List[Tuple[str, str, int, str, bool, int]]:
sql = """
receipt_types_clause, args = make_in_list_sql_clause(
self.database_engine,
"receipt_type",
(
ReceiptTypes.READ,
ReceiptTypes.READ_PRIVATE,
ReceiptTypes.UNSTABLE_READ_PRIVATE,
),
)
sql = f"""
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
ep.highlight, e.received_ts
FROM event_push_actions AS ep
@ -606,7 +659,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
WHERE
ep.room_id NOT IN (
SELECT room_id FROM receipts_linearized
WHERE receipt_type = 'm.read' AND user_id = ?
WHERE {receipt_types_clause} AND user_id = ?
GROUP BY room_id
)
AND ep.user_id = ?
@ -615,7 +668,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
AND ep.notif = 1
ORDER BY ep.stream_ordering DESC LIMIT ?
"""
args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
args.extend(
(user_id, user_id, min_stream_ordering, max_stream_ordering, limit)
)
txn.execute(sql, args)
return cast(List[Tuple[str, str, int, str, bool, int]], txn.fetchall())