Mark events as read using threaded read receipts from MSC3771. (#13877)

Applies the proper logic for unthreaded and threaded receipts to either
apply to all events in the room or only events in the same thread, respectively.
This commit is contained in:
Patrick Cloke 2022-10-04 10:46:42 -04:00 committed by GitHub
parent f0019f3f3b
commit a7ba457b2b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 505 additions and 63 deletions

View file

@ -421,7 +421,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
txn: LoggingTransaction,
room_id: str,
user_id: str,
receipt_stream_ordering: int,
unthreaded_receipt_stream_ordering: int,
) -> RoomNotifCounts:
"""Get the number of unread messages for a user/room that have happened
since the given stream ordering.
@ -430,9 +430,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
txn: The database transaction.
room_id: The room ID to get unread counts for.
user_id: The user ID to get unread counts for.
receipt_stream_ordering: The stream ordering of the user's latest
receipt in the room. If there are no receipts, the stream ordering
of the user's join event.
unthreaded_receipt_stream_ordering: The stream ordering of the user's latest
unthreaded receipt in the room. If there are no unthreaded receipts,
the stream ordering of the user's join event.
Returns:
A RoomNotifCounts object containing the notification count, the
@ -448,71 +448,181 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
return main_counts
return thread_counts.setdefault(thread_id, NotifCounts())
receipt_types_clause, receipts_args = make_in_list_sql_clause(
self.database_engine,
"receipt_type",
(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
)
# First we pull the counts from the summary table.
#
# We check that `last_receipt_stream_ordering` matches the stream
# ordering given. If it doesn't match then a new read receipt has arrived and
# we haven't yet updated the counts in `event_push_summary` to reflect
# that; in that case we simply ignore `event_push_summary` counts
# and do a manual count of all of the rows in the `event_push_actions` table
# for this user/room.
# We check that `last_receipt_stream_ordering` matches the stream ordering of the
# latest receipt for the thread (which may be either the unthreaded read receipt
# or the threaded read receipt).
#
# If `last_receipt_stream_ordering` is null then that means it's up to
# date (as the row was written by an older version of Synapse that
# If it doesn't match then a new read receipt has arrived and we haven't yet
# updated the counts in `event_push_summary` to reflect that; in that case we
# simply ignore `event_push_summary` counts.
#
# We then do a manual count of all the rows in the `event_push_actions` table
# for any user/room/thread which did not have a valid summary found.
#
# If `last_receipt_stream_ordering` is null then that means it's up-to-date
# (as the row was written by an older version of Synapse that
# updated `event_push_summary` synchronously when persisting a new read
# receipt).
txn.execute(
"""
SELECT stream_ordering, notif_count, COALESCE(unread_count, 0), thread_id
f"""
SELECT notif_count, COALESCE(unread_count, 0), thread_id
FROM event_push_summary
LEFT JOIN (
SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering
FROM receipts_linearized
LEFT JOIN events USING (room_id, event_id)
WHERE
user_id = ?
AND room_id = ?
AND stream_ordering > ?
AND {receipt_types_clause}
GROUP BY thread_id
) AS receipts USING (thread_id)
WHERE room_id = ? AND user_id = ?
AND (
(last_receipt_stream_ordering IS NULL AND stream_ordering > ?)
OR last_receipt_stream_ordering = ?
(last_receipt_stream_ordering IS NULL AND stream_ordering > COALESCE(threaded_receipt_stream_ordering, ?))
OR last_receipt_stream_ordering = COALESCE(threaded_receipt_stream_ordering, ?)
) AND (notif_count != 0 OR COALESCE(unread_count, 0) != 0)
""",
(room_id, user_id, receipt_stream_ordering, receipt_stream_ordering),
(
user_id,
room_id,
unthreaded_receipt_stream_ordering,
*receipts_args,
room_id,
user_id,
unthreaded_receipt_stream_ordering,
unthreaded_receipt_stream_ordering,
),
)
max_summary_stream_ordering = 0
for summary_stream_ordering, notif_count, unread_count, thread_id in txn:
summarised_threads = set()
for notif_count, unread_count, thread_id in txn:
summarised_threads.add(thread_id)
counts = _get_thread(thread_id)
counts.notify_count += notif_count
counts.unread_count += unread_count
# Summaries will only be used if they have not been invalidated by
# a recent receipt; track the latest stream ordering or a valid summary.
#
# Note that since there's only one read receipt in the room per user,
# valid summaries are contiguous.
max_summary_stream_ordering = max(
summary_stream_ordering, max_summary_stream_ordering
)
# Next we need to count highlights, which aren't summarised
sql = """
sql = f"""
SELECT COUNT(*), thread_id FROM event_push_actions
LEFT JOIN (
SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering
FROM receipts_linearized
LEFT JOIN events USING (room_id, event_id)
WHERE
user_id = ?
AND room_id = ?
AND stream_ordering > ?
AND {receipt_types_clause}
GROUP BY thread_id
) AS receipts USING (thread_id)
WHERE user_id = ?
AND room_id = ?
AND stream_ordering > ?
AND stream_ordering > COALESCE(threaded_receipt_stream_ordering, ?)
AND highlight = 1
GROUP BY thread_id
"""
txn.execute(sql, (user_id, room_id, receipt_stream_ordering))
txn.execute(
sql,
(
user_id,
room_id,
unthreaded_receipt_stream_ordering,
*receipts_args,
user_id,
room_id,
unthreaded_receipt_stream_ordering,
),
)
for highlight_count, thread_id in txn:
_get_thread(thread_id).highlight_count += highlight_count
# For threads which were summarised we need to count actions since the last
# rotation.
thread_id_clause, thread_id_args = make_in_list_sql_clause(
self.database_engine, "thread_id", summarised_threads
)
# The (inclusive) event stream ordering that was previously summarised.
rotated_upto_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
txn,
table="event_push_summary_stream_ordering",
keyvalues={},
retcol="stream_ordering",
)
unread_counts = self._get_notif_unread_count_for_user_room(
txn, room_id, user_id, rotated_upto_stream_ordering
)
for notif_count, unread_count, thread_id in unread_counts:
if thread_id not in summarised_threads:
continue
if thread_id == MAIN_TIMELINE:
counts.notify_count += notif_count
counts.unread_count += unread_count
elif thread_id in thread_counts:
thread_counts[thread_id].notify_count += notif_count
thread_counts[thread_id].unread_count += unread_count
else:
# Previous thread summaries of 0 are discarded above.
#
# TODO If empty summaries are deleted this can be removed.
thread_counts[thread_id] = NotifCounts(
notify_count=notif_count,
unread_count=unread_count,
highlight_count=0,
)
# Finally we need to count push actions that aren't included in the
# summary returned above. This might be due to recent events that haven't
# been summarised yet or the summary is out of date due to a recent read
# receipt.
start_unread_stream_ordering = max(
receipt_stream_ordering, max_summary_stream_ordering
sql = f"""
SELECT
COUNT(CASE WHEN notif = 1 THEN 1 END),
COUNT(CASE WHEN unread = 1 THEN 1 END),
thread_id
FROM event_push_actions
LEFT JOIN (
SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering
FROM receipts_linearized
LEFT JOIN events USING (room_id, event_id)
WHERE
user_id = ?
AND room_id = ?
AND stream_ordering > ?
AND {receipt_types_clause}
GROUP BY thread_id
) AS receipts USING (thread_id)
WHERE user_id = ?
AND room_id = ?
AND stream_ordering > COALESCE(threaded_receipt_stream_ordering, ?)
AND NOT {thread_id_clause}
GROUP BY thread_id
"""
txn.execute(
sql,
(
user_id,
room_id,
unthreaded_receipt_stream_ordering,
*receipts_args,
user_id,
room_id,
unthreaded_receipt_stream_ordering,
*thread_id_args,
),
)
unread_counts = self._get_notif_unread_count_for_user_room(
txn, room_id, user_id, start_unread_stream_ordering
)
for notif_count, unread_count, thread_id in unread_counts:
for notif_count, unread_count, thread_id in txn:
counts = _get_thread(thread_id)
counts.notify_count += notif_count
counts.unread_count += unread_count
@ -526,6 +636,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
user_id: str,
stream_ordering: int,
max_stream_ordering: Optional[int] = None,
thread_id: Optional[str] = None,
) -> List[Tuple[int, int, str]]:
"""Returns the notify and unread counts from `event_push_actions` for
the given user/room in the given range.
@ -540,6 +651,11 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
stream_ordering: The (exclusive) minimum stream ordering to consider.
max_stream_ordering: The (inclusive) maximum stream ordering to consider.
If this is not given, then no maximum is applied.
thread_id: The thread ID to fetch unread counts for. If this is not provided
then the results for *all* threads is returned.
Note that if this is provided the resulting list will only have 0 or
1 tuples in it.
Return:
A tuple of the notif count and unread count in the given range for
@ -551,10 +667,10 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
if not self._events_stream_cache.has_entity_changed(room_id, stream_ordering):
return []
clause = ""
stream_ordering_clause = ""
args = [user_id, room_id, stream_ordering]
if max_stream_ordering is not None:
clause = "AND ea.stream_ordering <= ?"
stream_ordering_clause = "AND ea.stream_ordering <= ?"
args.append(max_stream_ordering)
# If the max stream ordering is less than the min stream ordering,
@ -562,6 +678,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
if max_stream_ordering <= stream_ordering:
return []
# Either limit the results to a specific thread or fetch all threads.
thread_id_clause = ""
if thread_id is not None:
thread_id_clause = "AND thread_id = ?"
args.append(thread_id)
sql = f"""
SELECT
COUNT(CASE WHEN notif = 1 THEN 1 END),
@ -571,7 +693,8 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
WHERE user_id = ?
AND room_id = ?
AND ea.stream_ordering > ?
{clause}
{stream_ordering_clause}
{thread_id_clause}
GROUP BY thread_id
"""
@ -1086,7 +1209,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
)
sql = """
SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering
SELECT r.stream_id, r.room_id, r.user_id, r.thread_id, e.stream_ordering
FROM receipts_linearized AS r
INNER JOIN events AS e USING (event_id)
WHERE ? < r.stream_id AND r.stream_id <= ? AND user_id LIKE ?
@ -1107,44 +1230,68 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
limit,
),
)
rows = cast(List[Tuple[int, str, str, int]], txn.fetchall())
rows = cast(List[Tuple[int, str, str, Optional[str], int]], txn.fetchall())
# For each new read receipt we delete push actions from before it and
# recalculate the summary.
for _, room_id, user_id, stream_ordering in rows:
#
# Care must be taken of whether it is a threaded or unthreaded receipt.
for _, room_id, user_id, thread_id, stream_ordering in rows:
# Only handle our own read receipts.
if not self.hs.is_mine_id(user_id):
continue
thread_clause = ""
thread_args: Tuple = ()
if thread_id is not None:
thread_clause = "AND thread_id = ?"
thread_args = (thread_id,)
# For each new read receipt we delete push actions from before it and
# recalculate the summary.
txn.execute(
"""
f"""
DELETE FROM event_push_actions
WHERE room_id = ?
AND user_id = ?
AND stream_ordering <= ?
AND highlight = 0
{thread_clause}
""",
(room_id, user_id, stream_ordering),
(room_id, user_id, stream_ordering, *thread_args),
)
# Fetch the notification counts between the stream ordering of the
# latest receipt and what was previously summarised.
unread_counts = self._get_notif_unread_count_for_user_room(
txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering
txn,
room_id,
user_id,
stream_ordering,
old_rotate_stream_ordering,
thread_id,
)
# First mark the summary for all threads in the room as cleared.
self.db_pool.simple_update_txn(
txn,
table="event_push_summary",
keyvalues={"user_id": user_id, "room_id": room_id},
updatevalues={
"notif_count": 0,
"unread_count": 0,
"stream_ordering": old_rotate_stream_ordering,
"last_receipt_stream_ordering": stream_ordering,
},
)
# For an unthreaded receipt, mark the summary for all threads in the room
# as cleared.
if thread_id is None:
self.db_pool.simple_update_txn(
txn,
table="event_push_summary",
keyvalues={"user_id": user_id, "room_id": room_id},
updatevalues={
"notif_count": 0,
"unread_count": 0,
"stream_ordering": old_rotate_stream_ordering,
"last_receipt_stream_ordering": stream_ordering,
},
)
# For a threaded receipt, we *always* want to update that receipt,
# event if there are no new notifications in that thread. This ensures
# the stream_ordering & last_receipt_stream_ordering are updated.
elif not unread_counts:
unread_counts = [(0, 0, thread_id)]
# Then any updated threads get their notification count and unread
# count updated.
@ -1153,8 +1300,16 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
table="event_push_summary",
key_names=("room_id", "user_id", "thread_id"),
key_values=[(room_id, user_id, row[2]) for row in unread_counts],
value_names=("notif_count", "unread_count"),
value_values=[(row[0], row[1]) for row in unread_counts],
value_names=(
"notif_count",
"unread_count",
"stream_ordering",
"last_receipt_stream_ordering",
),
value_values=[
(row[0], row[1], old_rotate_stream_ordering, stream_ordering)
for row in unread_counts
],
)
# We always update `event_push_summary_last_receipt_stream_id` to