From 2292dc35fc99f19f3c5397818716a8a5bec1fb8b Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 19 Sep 2019 00:54:05 +0100 Subject: [PATCH 01/27] Add experimental "dont_push" push action to suppress push for notifications This is a potential solution to https://github.com/vector-im/riot-web/issues/3374 and https://github.com/vector-im/riot-web/issues/5953 as raised by Mozilla at https://github.com/vector-im/riot-web/issues/10868. This lets you define a push rule action which increases the badge count (unread notification) count on a given room, but doesn't actually send a push for that notification via email or HTTP. We might want to define this as the default behaviour for group chats in future to solve https://github.com/vector-im/riot-web/issues/3268 at last. This is implemented as a string action rather than a tweak because: * Other pushers don't care about the tweak, given they won't ever get pushed * The DB can store the tweak more efficiently using the existing `notify` table. * It avoids breaking the default_notif/highlight_action optimisations. Clients which generate their own notifs (e.g. desktop notifs from Riot/Web would need to be aware of the new push action) to uphold it. An alternative way to do this would be to maintain a `msg_count` alongside `highlight_count` and `notification_count` in `unread_notifications` in sync responses. However, doing this by counting the rows in `events` since the `stream_position` of the user's last read receipt turns out to be painfully slow (~200ms), perhaps due to the size of the events table. So instead, we use the highly optimised existing event_push_actions (and event_push_actions_staging) table to maintain the counts - using the code paths which already exist for tracking unread notification counts efficiently. These queries are typically ~3ms or so. The biggest issues I see here are: * We're slightly repurposing the `notif` field on `event_push_actions` to track whether a given action actually sent a `push` or not. This doesn't seem unreasonable, but it's slightly naughty given that previously the field explicitly tracked whether `notify` was true for the action (and as a result, it was uselessly always set to 1 in the DB). * We're going to put more load on the `event_push_actions` table for all the random group chats which people had previously muted. In practice i don't think there are many of these though. * There isn't an MSC for this yet (although this comment could become one). --- synapse/storage/event_push_actions.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 22025effb..b01a12528 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -124,8 +124,8 @@ class EventPushActionsWorkerStore(SQLBaseStore): def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering): # First get number of notifications. - # We don't need to put a notif=1 clause as all rows always have - # notif=1 + # We ignore the notif column, given we want unread counts irrespective of + # whether the notification actually sent a push or not. sql = ( "SELECT count(*)" " FROM event_push_actions ea" @@ -223,6 +223,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): " AND ep.user_id = ?" " AND ep.stream_ordering > ?" " AND ep.stream_ordering <= ?" + " AND ep.notif = 1" " ORDER BY ep.stream_ordering ASC LIMIT ?" ) args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] @@ -251,6 +252,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): " AND ep.user_id = ?" " AND ep.stream_ordering > ?" " AND ep.stream_ordering <= ?" + " AND ep.notif = 1" " ORDER BY ep.stream_ordering ASC LIMIT ?" ) args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] @@ -323,6 +325,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): " AND ep.user_id = ?" " AND ep.stream_ordering > ?" " AND ep.stream_ordering <= ?" + " AND ep.notif = 1" " ORDER BY ep.stream_ordering DESC LIMIT ?" ) args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] @@ -351,6 +354,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): " AND ep.user_id = ?" " AND ep.stream_ordering > ?" " AND ep.stream_ordering <= ?" + " AND ep.notif = 1" " ORDER BY ep.stream_ordering DESC LIMIT ?" ) args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] @@ -400,7 +404,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): def _get_if_maybe_push_in_range_for_user_txn(txn): sql = """ SELECT 1 FROM event_push_actions - WHERE user_id = ? AND stream_ordering > ? + WHERE user_id = ? AND stream_ordering > ? AND notif = 1 LIMIT 1 """ @@ -429,14 +433,15 @@ class EventPushActionsWorkerStore(SQLBaseStore): return # This is a helper function for generating the necessary tuple that - # can be used to inert into the `event_push_actions_staging` table. + # can be used to insert into the `event_push_actions_staging` table. def _gen_entry(user_id, actions): is_highlight = 1 if _action_has_highlight(actions) else 0 + notif = 0 if "dont_push" in actions else 1 return ( event_id, # event_id column user_id, # user_id column _serialize_action(actions, is_highlight), # actions column - 1, # notif column + notif, # notif column is_highlight, # highlight column ) From dd8e24f42ee430803ce65bda744672a2ec92af88 Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Thu, 19 Sep 2019 01:14:17 +0100 Subject: [PATCH 02/27] changelog --- changelog.d/6061.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/6061.feature diff --git a/changelog.d/6061.feature b/changelog.d/6061.feature new file mode 100644 index 000000000..d85c497d9 --- /dev/null +++ b/changelog.d/6061.feature @@ -0,0 +1 @@ +Add experimental "no_push" push rule action From 6f6a4bfc079c51f130630237cf86488179bae63e Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 10 Jun 2020 14:24:01 +0100 Subject: [PATCH 03/27] Rename dont_push into mark_unread --- synapse/rest/client/v1/push_rule.py | 4 ++-- synapse/storage/data_stores/main/event_push_actions.py | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index 9fd490813..c27e05d1d 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014-2016 OpenMarket Ltd +# Copyright 2014-2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -267,7 +267,7 @@ def _check_actions(actions): raise InvalidRuleException("No actions found") for a in actions: - if a in ["notify", "dont_notify", "coalesce"]: + if a in ["notify", "dont_notify", "coalesce", "mark_unread"]: pass elif isinstance(a, dict) and "set_tweak" in a: pass diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py index 8ad7a306f..a86a6a1be 100644 --- a/synapse/storage/data_stores/main/event_push_actions.py +++ b/synapse/storage/data_stores/main/event_push_actions.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2015 OpenMarket Ltd -# Copyright 2018 New Vector Ltd +# Copyright 2015-2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -437,7 +436,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): # can be used to insert into the `event_push_actions_staging` table. def _gen_entry(user_id, actions): is_highlight = 1 if _action_has_highlight(actions) else 0 - notif = 0 if "dont_push" in actions else 1 + notif = 0 if "mark_unread" in actions else 1 return ( event_id, # event_id column user_id, # user_id column From ef345c5a7b544aafa9c37bc2c4f626dfcef529f9 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 10 Jun 2020 16:21:16 +0100 Subject: [PATCH 04/27] Add a new unread_counter to sync responses --- synapse/handlers/sync.py | 1 + synapse/push/push_tools.py | 5 +++- .../data_stores/main/event_push_actions.py | 25 +++++++++++++++++-- 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6bdb24baf..cec0ca427 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1895,6 +1895,7 @@ class SyncHandler(object): if notifs is not None: unread_notifications["notification_count"] = notifs["notify_count"] unread_notifications["highlight_count"] = notifs["highlight_count"] + unread_notifications["unread_count"] = notifs["unread_count"] sync_result_builder.joined.append(room_sync) diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index 5dae4648c..9f264ca4a 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -39,7 +39,10 @@ def get_badge_count(store, user_id): ) # return one badge count per conversation, as count per # message is so noisy as to be almost useless - badge += 1 if notifs["notify_count"] else 0 + # We're populating this badge using the unread_count (instead of the + # notify_count) as this badge is the number of missed messages, not the + # number of missed notifications. + badge += 1 if notifs["unread_count"] else 0 return badge diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py index a86a6a1be..9922fda50 100644 --- a/synapse/storage/data_stores/main/event_push_actions.py +++ b/synapse/storage/data_stores/main/event_push_actions.py @@ -133,6 +133,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): " user_id = ?" " AND room_id = ?" " AND stream_ordering > ?" + " AND notif = 1" ) txn.execute(sql, (user_id, room_id, stream_ordering)) @@ -150,6 +151,22 @@ class EventPushActionsWorkerStore(SQLBaseStore): if rows: notify_count += rows[0][0] + # Now get the number of unread messages in the room, i.e. messages that matched + # both a mark_unread rule and a notify one. + sql = ( + "SELECT count(*)" + " FROM event_push_actions ea" + " WHERE" + " user_id = ?" + " AND room_id = ?" + " AND stream_ordering > ?" + " AND notif = 0" + ) + txn.execute(sql, (user_id, room_id, stream_ordering)) + row = txn.fetchone() + unread_count = row[0] if row else 0 + unread_count += notify_count + # Now get the number of highlights sql = ( "SELECT count(*)" @@ -165,7 +182,11 @@ class EventPushActionsWorkerStore(SQLBaseStore): row = txn.fetchone() highlight_count = row[0] if row else 0 - return {"notify_count": notify_count, "highlight_count": highlight_count} + return { + "notify_count": notify_count, + "highlight_count": highlight_count, + "unread_count": unread_count, + } @defer.inlineCallbacks def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering): @@ -831,7 +852,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore): max(stream_ordering) as stream_ordering FROM event_push_actions WHERE ? <= stream_ordering AND stream_ordering < ? - AND highlight = 0 + AND highlight = 0 AND notif = 1 GROUP BY user_id, room_id ) AS upd LEFT JOIN event_push_summary AS old USING (user_id, room_id) From c7b99a1180439752c2864e883e0a6d7c72b7f116 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 10 Jun 2020 17:54:33 +0100 Subject: [PATCH 05/27] Use a more efficient way of calculating counters --- .../data_stores/main/event_push_actions.py | 43 +++++++++---------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py index 9922fda50..7ba741cce 100644 --- a/synapse/storage/data_stores/main/event_push_actions.py +++ b/synapse/storage/data_stores/main/event_push_actions.py @@ -123,22 +123,31 @@ class EventPushActionsWorkerStore(SQLBaseStore): def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering): - # First get number of notifications. - # We ignore the notif column, given we want unread counts irrespective of - # whether the notification actually sent a push or not. + # First get number of actions, grouped on whether the action notifies. sql = ( - "SELECT count(*)" + "SELECT count(*), notif" " FROM event_push_actions ea" " WHERE" " user_id = ?" " AND room_id = ?" " AND stream_ordering > ?" - " AND notif = 1" + " GROUP BY notif" ) - txn.execute(sql, (user_id, room_id, stream_ordering)) - row = txn.fetchone() - notify_count = row[0] if row else 0 + rows = txn.fetchall() + + # We should get a maximum number of two rows: one for notif = 0, which is the + # number of actions that contribute to the unread_count but not to the + # notify_count, and one for notif = 1, which is the number of actions that + # contribute to both counters. If one or both rows don't appear, then the + # value for the matching counter should be 0. + unread_count = 0 + notify_count = 0 + for row in rows: + if row[1] == 0: + unread_count = row[0] + if row[1] == 1: + notify_count = row[0] txn.execute( """ @@ -151,20 +160,8 @@ class EventPushActionsWorkerStore(SQLBaseStore): if rows: notify_count += rows[0][0] - # Now get the number of unread messages in the room, i.e. messages that matched - # both a mark_unread rule and a notify one. - sql = ( - "SELECT count(*)" - " FROM event_push_actions ea" - " WHERE" - " user_id = ?" - " AND room_id = ?" - " AND stream_ordering > ?" - " AND notif = 0" - ) - txn.execute(sql, (user_id, room_id, stream_ordering)) - row = txn.fetchone() - unread_count = row[0] if row else 0 + # Now that we've got the final notify_count, add it to unread_count, as notify + # actions also contribute to the unread count. unread_count += notify_count # Now get the number of highlights @@ -183,9 +180,9 @@ class EventPushActionsWorkerStore(SQLBaseStore): highlight_count = row[0] if row else 0 return { + "unread_count": unread_count, "notify_count": notify_count, "highlight_count": highlight_count, - "unread_count": unread_count, } @defer.inlineCallbacks From 476a89707ada05c0767324063d9c5814547d3ae1 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 10 Jun 2020 17:55:03 +0100 Subject: [PATCH 06/27] Fix tests --- .../replication/slave/storage/test_events.py | 6 ++-- tests/storage/test_event_push_actions.py | 32 +++++++++++-------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 1a88c7fb8..bc667454c 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -160,7 +160,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): self.check( "get_unread_event_push_actions_by_room_for_user", [ROOM_ID, USER_ID_2, event1.event_id], - {"highlight_count": 0, "notify_count": 0}, + {"highlight_count": 0, "notify_count": 0, "unread_count": 0}, ) self.persist( @@ -173,7 +173,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): self.check( "get_unread_event_push_actions_by_room_for_user", [ROOM_ID, USER_ID_2, event1.event_id], - {"highlight_count": 0, "notify_count": 1}, + {"highlight_count": 0, "notify_count": 1, "unread_count": 1}, ) self.persist( @@ -188,7 +188,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): self.check( "get_unread_event_push_actions_by_room_for_user", [ROOM_ID, USER_ID_2, event1.event_id], - {"highlight_count": 1, "notify_count": 2}, + {"highlight_count": 1, "notify_count": 2, "unread_count": 2}, ) def test_get_rooms_for_user_with_stream_ordering(self): diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index b45bc9c11..79a88a148 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -55,13 +55,17 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase): user_id = "@user1235:example.com" @defer.inlineCallbacks - def _assert_counts(noitf_count, highlight_count): + def _assert_counts(unread_count, notif_count, highlight_count): counts = yield self.store.db.runInteraction( "", self.store._get_unread_counts_by_pos_txn, room_id, user_id, 0 ) self.assertEquals( counts, - {"notify_count": noitf_count, "highlight_count": highlight_count}, + { + "unread_count": unread_count, + "notify_count": notif_count, + "highlight_count": highlight_count, + }, ) @defer.inlineCallbacks @@ -96,23 +100,23 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase): stream, ) - yield _assert_counts(0, 0) + yield _assert_counts(0, 0, 0) yield _inject_actions(1, PlAIN_NOTIF) - yield _assert_counts(1, 0) + yield _assert_counts(1, 1, 0) yield _rotate(2) - yield _assert_counts(1, 0) + yield _assert_counts(1, 1, 0) yield _inject_actions(3, PlAIN_NOTIF) - yield _assert_counts(2, 0) + yield _assert_counts(2, 2, 0) yield _rotate(4) - yield _assert_counts(2, 0) + yield _assert_counts(2, 2, 0) yield _inject_actions(5, PlAIN_NOTIF) yield _mark_read(3, 3) - yield _assert_counts(1, 0) + yield _assert_counts(1, 1, 0) yield _mark_read(5, 5) - yield _assert_counts(0, 0) + yield _assert_counts(0, 0, 0) yield _inject_actions(6, PlAIN_NOTIF) yield _rotate(7) @@ -121,17 +125,17 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase): table="event_push_actions", keyvalues={"1": 1}, desc="" ) - yield _assert_counts(1, 0) + yield _assert_counts(1, 1, 0) yield _mark_read(7, 7) - yield _assert_counts(0, 0) + yield _assert_counts(0, 0, 0) yield _inject_actions(8, HIGHLIGHT) - yield _assert_counts(1, 1) + yield _assert_counts(1, 1, 1) yield _rotate(9) - yield _assert_counts(1, 1) + yield _assert_counts(1, 1, 1) yield _rotate(10) - yield _assert_counts(1, 1) + yield _assert_counts(1, 1, 1) @defer.inlineCallbacks def test_find_first_stream_ordering_after_ts(self): From aad40e38e1492049c361df8aba23310eaffac008 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 10 Jun 2020 17:56:33 +0100 Subject: [PATCH 07/27] Changelog --- changelog.d/6061.feature | 1 - changelog.d/7673.feature | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) delete mode 100644 changelog.d/6061.feature create mode 100644 changelog.d/7673.feature diff --git a/changelog.d/6061.feature b/changelog.d/6061.feature deleted file mode 100644 index d85c497d9..000000000 --- a/changelog.d/6061.feature +++ /dev/null @@ -1 +0,0 @@ -Add experimental "no_push" push rule action diff --git a/changelog.d/7673.feature b/changelog.d/7673.feature new file mode 100644 index 000000000..74e2059ad --- /dev/null +++ b/changelog.d/7673.feature @@ -0,0 +1 @@ +Add a per-room counter for unread messages in responses to `/sync` requests. From df3323a7cfe831813c00df32c85b983587f8529e Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 10 Jun 2020 20:32:01 +0100 Subject: [PATCH 08/27] Use temporary prefixes as per the MSC --- synapse/handlers/sync.py | 4 +++- synapse/rest/client/v1/push_rule.py | 2 +- synapse/storage/data_stores/main/event_push_actions.py | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index cec0ca427..5a38f3e9a 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1895,7 +1895,9 @@ class SyncHandler(object): if notifs is not None: unread_notifications["notification_count"] = notifs["notify_count"] unread_notifications["highlight_count"] = notifs["highlight_count"] - unread_notifications["unread_count"] = notifs["unread_count"] + unread_notifications["org.matrix.msc2625.unread_count"] = ( + notifs["unread_count"] + ) sync_result_builder.joined.append(room_sync) diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index c27e05d1d..f563b3dc3 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -267,7 +267,7 @@ def _check_actions(actions): raise InvalidRuleException("No actions found") for a in actions: - if a in ["notify", "dont_notify", "coalesce", "mark_unread"]: + if a in ["notify", "dont_notify", "coalesce", "org.matrix.msc2625.mark_unread"]: pass elif isinstance(a, dict) and "set_tweak" in a: pass diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py index 7ba741cce..52dcc7be4 100644 --- a/synapse/storage/data_stores/main/event_push_actions.py +++ b/synapse/storage/data_stores/main/event_push_actions.py @@ -454,7 +454,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): # can be used to insert into the `event_push_actions_staging` table. def _gen_entry(user_id, actions): is_highlight = 1 if _action_has_highlight(actions) else 0 - notif = 0 if "mark_unread" in actions else 1 + notif = 0 if "org.matrix.msc2625.mark_unread" in actions else 1 return ( event_id, # event_id column user_id, # user_id column From 243f0ba6ced0b99a7022c16324484e0825803483 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 10 Jun 2020 20:35:35 +0100 Subject: [PATCH 09/27] Lint --- synapse/handlers/sync.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5a38f3e9a..44ddb5504 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1895,9 +1895,9 @@ class SyncHandler(object): if notifs is not None: unread_notifications["notification_count"] = notifs["notify_count"] unread_notifications["highlight_count"] = notifs["highlight_count"] - unread_notifications["org.matrix.msc2625.unread_count"] = ( - notifs["unread_count"] - ) + unread_notifications["org.matrix.msc2625.unread_count"] = notifs[ + "unread_count", + ] sync_result_builder.joined.append(room_sync) From 9dbd006607349dd87dde5653e10dfccd3bed00d4 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 10 Jun 2020 20:44:24 +0100 Subject: [PATCH 10/27] Appease mypy --- synapse/handlers/sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 44ddb5504..15cf64773 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1896,7 +1896,7 @@ class SyncHandler(object): unread_notifications["notification_count"] = notifs["notify_count"] unread_notifications["highlight_count"] = notifs["highlight_count"] unread_notifications["org.matrix.msc2625.unread_count"] = notifs[ - "unread_count", + "unread_count" ] sync_result_builder.joined.append(room_sync) From ea8f6e611bdc4c2ee3f6fea76893650ba8f0facd Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 11 Jun 2020 15:30:42 +0100 Subject: [PATCH 11/27] Actually act on mark_unread --- synapse/push/bulk_push_rule_evaluator.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index e75d964ac..f7c3db582 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -191,9 +191,13 @@ class BulkPushRuleEvaluator(object): ) if matches: actions = [x for x in rule["actions"] if x != "dont_notify"] - if actions and "notify" in actions: - # Push rules say we should notify the user of this event - actions_by_user[uid] = actions + if actions: + if ( + "notify" in actions + or "org.matrix.msc2625.mark_unread" in actions + ): + # Push rules say we should act on this event. + actions_by_user[uid] = actions break # Mark in the DB staging area the push actions for users who should be From ce74a6685d2fa57e9bbb54d0826344ee48ee7f57 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 11 Jun 2020 17:58:26 +0100 Subject: [PATCH 12/27] Save the count of unread messages to event_push_summary --- .../data_stores/main/event_push_actions.py | 53 ++++++++++++------- .../delta/59/00push_summary_unread_count.sql | 18 +++++++ synapse/storage/prepare_database.py | 2 +- 3 files changed, 53 insertions(+), 20 deletions(-) create mode 100644 synapse/storage/data_stores/main/schema/delta/59/00push_summary_unread_count.sql diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py index 52dcc7be4..2b56e1d10 100644 --- a/synapse/storage/data_stores/main/event_push_actions.py +++ b/synapse/storage/data_stores/main/event_push_actions.py @@ -144,14 +144,15 @@ class EventPushActionsWorkerStore(SQLBaseStore): unread_count = 0 notify_count = 0 for row in rows: - if row[1] == 0: - unread_count = row[0] + # We always increment unread_count because actions that notify also + # contribute to it. + unread_count += row[0] if row[1] == 1: notify_count = row[0] txn.execute( """ - SELECT notif_count FROM event_push_summary + SELECT notif_count, unread_count FROM event_push_summary WHERE room_id = ? AND user_id = ? AND stream_ordering > ? """, (room_id, user_id, stream_ordering), @@ -159,10 +160,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): rows = txn.fetchall() if rows: notify_count += rows[0][0] - - # Now that we've got the final notify_count, add it to unread_count, as notify - # actions also contribute to the unread count. - unread_count += notify_count + unread_count += rows[0][1] # Now get the number of highlights sql = ( @@ -841,23 +839,35 @@ class EventPushActionsStore(EventPushActionsWorkerStore): # Calculate the new counts that should be upserted into event_push_summary sql = """ SELECT user_id, room_id, - coalesce(old.notif_count, 0) + upd.notif_count, + coalesce(old.%s, 0) + upd.%s, upd.stream_ordering, old.user_id FROM ( - SELECT user_id, room_id, count(*) as notif_count, + SELECT user_id, room_id, count(*) as unread_count, max(stream_ordering) as stream_ordering FROM event_push_actions WHERE ? <= stream_ordering AND stream_ordering < ? - AND highlight = 0 AND notif = 1 + AND highlight = 0 + %s GROUP BY user_id, room_id ) AS upd LEFT JOIN event_push_summary AS old USING (user_id, room_id) """ - txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering)) + # First get the count of unread messages. + txn.execute( + sql % ("unread_count", "unread_count", ""), + (old_rotate_stream_ordering, rotate_to_stream_ordering), + ) rows = txn.fetchall() + # Then get the count of notifications. + txn.execute( + sql % ("notify_count", "notify_count", "notif = 1"), + (old_rotate_stream_ordering, rotate_to_stream_ordering), + ) + notif_rows = txn.fetchall() + logger.info("Rotating notifications, handling %d rows", len(rows)) # If the `old.user_id` above is NULL then we know there isn't already an @@ -868,22 +878,27 @@ class EventPushActionsStore(EventPushActionsWorkerStore): table="event_push_summary", values=[ { - "user_id": row[0], - "room_id": row[1], - "notif_count": row[2], - "stream_ordering": row[3], + "user_id": rows[i][0], + "room_id": rows[i][1], + "notif_count": notif_rows[i][2], + "unread_count": rows[i][2], + "stream_ordering": rows[i][3], } - for row in rows - if row[4] is None + for i, _ in enumerate(rows) + if rows[i][4] is None ], ) txn.executemany( """ - UPDATE event_push_summary SET notif_count = ?, stream_ordering = ? + UPDATE event_push_summary + SET notif_count = ?, unread_count = ?, stream_ordering = ? WHERE user_id = ? AND room_id = ? """, - ((row[2], row[3], row[0], row[1]) for row in rows if row[4] is not None), + ( + (notif_rows[i][2], rows[i][2], rows[i][3], rows[i][0], rows[i][1]) + for i, _ in enumerate(rows) if rows[i][4] is not None + ), ) txn.execute( diff --git a/synapse/storage/data_stores/main/schema/delta/59/00push_summary_unread_count.sql b/synapse/storage/data_stores/main/schema/delta/59/00push_summary_unread_count.sql new file mode 100644 index 000000000..298516020 --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/59/00push_summary_unread_count.sql @@ -0,0 +1,18 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Store the number of unread messages, i.e. messages that triggered either a notify +-- action or a mark_unread one. +ALTER TABLE event_push_summary ADD COLUMN unread_count BIGINT NOT NULL; diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 9cc3b51fe..bec8da7f6 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -34,7 +34,7 @@ logger = logging.getLogger(__name__) # XXX: If you're about to bump this to 59 (or higher) please create an update # that drops the unused `cache_invalidation_stream` table, as per #7436! # XXX: Also add an update to drop `account_data_max_stream_id` as per #7656! -SCHEMA_VERSION = 58 +SCHEMA_VERSION = 59 dir_path = os.path.abspath(os.path.dirname(__file__)) From d0f095625c996c8b831e27609ed88704df3b2845 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 11 Jun 2020 18:04:43 +0100 Subject: [PATCH 13/27] Lint --- synapse/storage/data_stores/main/event_push_actions.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py index 2b56e1d10..af0ab6cbc 100644 --- a/synapse/storage/data_stores/main/event_push_actions.py +++ b/synapse/storage/data_stores/main/event_push_actions.py @@ -897,7 +897,8 @@ class EventPushActionsStore(EventPushActionsWorkerStore): """, ( (notif_rows[i][2], rows[i][2], rows[i][3], rows[i][0], rows[i][1]) - for i, _ in enumerate(rows) if rows[i][4] is not None + for i, _ in enumerate(rows) + if rows[i][4] is not None ), ) From 34fd1f7ab52d4cccdd650ebda3962a44f7f2db23 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 11 Jun 2020 18:12:12 +0100 Subject: [PATCH 14/27] Fix schema update --- .../main/schema/delta/59/00push_summary_unread_count.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/data_stores/main/schema/delta/59/00push_summary_unread_count.sql b/synapse/storage/data_stores/main/schema/delta/59/00push_summary_unread_count.sql index 298516020..560858d88 100644 --- a/synapse/storage/data_stores/main/schema/delta/59/00push_summary_unread_count.sql +++ b/synapse/storage/data_stores/main/schema/delta/59/00push_summary_unread_count.sql @@ -15,4 +15,4 @@ -- Store the number of unread messages, i.e. messages that triggered either a notify -- action or a mark_unread one. -ALTER TABLE event_push_summary ADD COLUMN unread_count BIGINT NOT NULL; +ALTER TABLE event_push_summary ADD COLUMN unread_count BIGINT NOT NULL DEFAULT 0; From 803291728cad3c29e7800d0f92f79eea73a169ef Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 11 Jun 2020 18:25:25 +0100 Subject: [PATCH 15/27] Fix SQL --- synapse/storage/data_stores/main/event_push_actions.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py index af0ab6cbc..7cd3ae6ae 100644 --- a/synapse/storage/data_stores/main/event_push_actions.py +++ b/synapse/storage/data_stores/main/event_push_actions.py @@ -843,7 +843,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore): upd.stream_ordering, old.user_id FROM ( - SELECT user_id, room_id, count(*) as unread_count, + SELECT user_id, room_id, count(*) as %s, max(stream_ordering) as stream_ordering FROM event_push_actions WHERE ? <= stream_ordering AND stream_ordering < ? @@ -856,14 +856,14 @@ class EventPushActionsStore(EventPushActionsWorkerStore): # First get the count of unread messages. txn.execute( - sql % ("unread_count", "unread_count", ""), + sql % ("unread_count", "unread_count", "unread_count", ""), (old_rotate_stream_ordering, rotate_to_stream_ordering), ) rows = txn.fetchall() # Then get the count of notifications. txn.execute( - sql % ("notify_count", "notify_count", "notif = 1"), + sql % ("notif_count", "notif_count", "notif_count", "AND notif = 1"), (old_rotate_stream_ordering, rotate_to_stream_ordering), ) notif_rows = txn.fetchall() From cb6d4d07b1cbeff3be55be4ddeedfb2cc13ff959 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 11 Jun 2020 18:29:20 +0100 Subject: [PATCH 16/27] Log for invalid values of notif --- synapse/storage/data_stores/main/event_push_actions.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py index 7cd3ae6ae..14eb79cc4 100644 --- a/synapse/storage/data_stores/main/event_push_actions.py +++ b/synapse/storage/data_stores/main/event_push_actions.py @@ -149,6 +149,12 @@ class EventPushActionsWorkerStore(SQLBaseStore): unread_count += row[0] if row[1] == 1: notify_count = row[0] + elif row[1] != 0: + logger.warning( + "Unexpected value %d for column 'notif' in table" + " 'event_push_actions'", + row[1], + ) txn.execute( """ From 3cc7f43e8d5f24532e6f65ebe44dde6f7d40ab01 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 12 Jun 2020 11:07:26 +0100 Subject: [PATCH 17/27] Fix summary rotation --- .../data_stores/main/event_push_actions.py | 47 ++++++++++++++----- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py index 14eb79cc4..eb4ce2f76 100644 --- a/synapse/storage/data_stores/main/event_push_actions.py +++ b/synapse/storage/data_stores/main/event_push_actions.py @@ -865,7 +865,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore): sql % ("unread_count", "unread_count", "unread_count", ""), (old_rotate_stream_ordering, rotate_to_stream_ordering), ) - rows = txn.fetchall() + unread_rows = txn.fetchall() # Then get the count of notifications. txn.execute( @@ -874,7 +874,24 @@ class EventPushActionsStore(EventPushActionsWorkerStore): ) notif_rows = txn.fetchall() - logger.info("Rotating notifications, handling %d rows", len(rows)) + # We need to merge both lists into a single object because we might not have the + # same amount of rows in each of them. In this case we use a dict indexed on the + # user ID and room ID to make it easier to populate. + summaries = {} + for row in unread_rows: + summaries[(row[0], row[1])] = { + "unread_count": row[2], + "stream_ordering": row[3], + "old_user_id": row[4], + "notif_count": 0, + } + + # notif_rows is populated based on a subset of the query used to populate + # unread_rows, so we can be sure that there will be no KeyError here. + for row in notif_rows: + summaries[(row[0], row[1])]["notif_count"] = row[2] + + logger.info("Rotating notifications, handling %d rows", len(summaries)) # If the `old.user_id` above is NULL then we know there isn't already an # entry in the table, so we simply insert it. Otherwise we update the @@ -884,14 +901,14 @@ class EventPushActionsStore(EventPushActionsWorkerStore): table="event_push_summary", values=[ { - "user_id": rows[i][0], - "room_id": rows[i][1], - "notif_count": notif_rows[i][2], - "unread_count": rows[i][2], - "stream_ordering": rows[i][3], + "user_id": key[0], + "room_id": key[1], + "notif_count": summary["notif_count"], + "unread_count": summary["unread_count"], + "stream_ordering": summary["stream_ordering"], } - for i, _ in enumerate(rows) - if rows[i][4] is None + for key, summary in summaries.items() + if summary["old_user_id"] is None ], ) @@ -902,9 +919,15 @@ class EventPushActionsStore(EventPushActionsWorkerStore): WHERE user_id = ? AND room_id = ? """, ( - (notif_rows[i][2], rows[i][2], rows[i][3], rows[i][0], rows[i][1]) - for i, _ in enumerate(rows) - if rows[i][4] is not None + ( + summary["notif_count"], + summary["unread_count"], + summary["stream_ordering"], + key[0], + key[1], + ) + for key, summary in summaries.items() + if summary["old_user_id"] is not None ), ) From 2a07c5ded67f598376d82c37057ead6571a4276d Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 12 Jun 2020 11:08:05 +0100 Subject: [PATCH 18/27] Test that a mark_unread action updates the right counter --- tests/storage/test_event_push_actions.py | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index 79a88a148..1e6ec9531 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -17,11 +17,16 @@ from mock import Mock from twisted.internet import defer +from tests import unittest import tests.unittest import tests.utils USER_ID = "@user:example.com" +MARK_UNREAD = [ + "org.matrix.msc2625.mark_unread", + {"set_tweak": "highlight", "value": False}, +] PlAIN_NOTIF = ["notify", {"set_tweak": "highlight", "value": False}] HIGHLIGHT = [ "notify", @@ -49,6 +54,7 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase): USER_ID, 0, 1000, 20 ) + @unittest.DEBUG @defer.inlineCallbacks def test_count_aggregation(self): room_id = "!foo:example.com" @@ -130,12 +136,17 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase): yield _mark_read(7, 7) yield _assert_counts(0, 0, 0) - yield _inject_actions(8, HIGHLIGHT) - yield _assert_counts(1, 1, 1) + yield _inject_actions(8, MARK_UNREAD) + yield _assert_counts(1, 0, 0) yield _rotate(9) - yield _assert_counts(1, 1, 1) - yield _rotate(10) - yield _assert_counts(1, 1, 1) + yield _assert_counts(1, 0, 0) + + yield _inject_actions(10, HIGHLIGHT) + yield _assert_counts(2, 1, 1) + yield _rotate(11) + yield _assert_counts(2, 1, 1) + yield _rotate(12) + yield _assert_counts(2, 1, 1) @defer.inlineCallbacks def test_find_first_stream_ordering_after_ts(self): From 63d9a00bf11b5d0f50c173258a0d24ddc0fb7bdf Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 12 Jun 2020 11:13:30 +0100 Subject: [PATCH 19/27] Remove debug logging --- tests/storage/test_event_push_actions.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index 1e6ec9531..303dc8571 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -17,7 +17,6 @@ from mock import Mock from twisted.internet import defer -from tests import unittest import tests.unittest import tests.utils @@ -54,7 +53,6 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase): USER_ID, 0, 1000, 20 ) - @unittest.DEBUG @defer.inlineCallbacks def test_count_aggregation(self): room_id = "!foo:example.com" From 6b1fa3293d5e834b6b66c4b9d83a5f938cbcabde Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 12 Jun 2020 11:28:26 +0100 Subject: [PATCH 20/27] Test that a mark_unread action updates the right counter when using a slave store --- tests/replication/slave/storage/test_events.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index bc667454c..9837d4499 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -191,6 +191,21 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): {"highlight_count": 1, "notify_count": 2, "unread_count": 2}, ) + self.persist( + type="m.room.message", + msgtype="m.text", + body="world", + push_actions=[ + (USER_ID_2, ["org.matrix.msc2625.mark_unread"]) + ], + ) + self.replicate() + self.check( + "get_unread_event_push_actions_by_room_for_user", + [ROOM_ID, USER_ID_2, event1.event_id], + {"highlight_count": 1, "notify_count": 2, "unread_count": 3}, + ) + def test_get_rooms_for_user_with_stream_ordering(self): """Check that the cache on get_rooms_for_user_with_stream_ordering is invalidated by rows in the events stream From 7e80c84902f2d34aff1bb8b4c5833cb33d3dc653 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 12 Jun 2020 11:31:11 +0100 Subject: [PATCH 21/27] Lint --- tests/replication/slave/storage/test_events.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 9837d4499..cd8680e81 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -195,9 +195,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): type="m.room.message", msgtype="m.text", body="world", - push_actions=[ - (USER_ID_2, ["org.matrix.msc2625.mark_unread"]) - ], + push_actions=[(USER_ID_2, ["org.matrix.msc2625.mark_unread"])], ) self.replicate() self.check( From cf92fbb8aa6386cc2075efedb8c27b10d8584901 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 12 Jun 2020 15:02:15 +0100 Subject: [PATCH 22/27] Use attr instead of a dict --- .../data_stores/main/event_push_actions.py | 52 +++++++++++-------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py index eb4ce2f76..688aef4d2 100644 --- a/synapse/storage/data_stores/main/event_push_actions.py +++ b/synapse/storage/data_stores/main/event_push_actions.py @@ -15,6 +15,7 @@ import logging +import attr from six import iteritems from canonicaljson import json @@ -37,6 +38,17 @@ DEFAULT_HIGHLIGHT_ACTION = [ ] +@attr.s +class EventPushSummary(object): + """Summary of pending event push actions for a given user in a given room.""" + user_id = attr.ib() + room_id = attr.ib() + unread_count = attr.ib() + stream_ordering = attr.ib() + old_user_id = attr.ib() + notif_count = attr.ib() + + def _serialize_action(actions, is_highlight): """Custom serializer for actions. This allows us to "compress" common actions. @@ -879,17 +891,15 @@ class EventPushActionsStore(EventPushActionsWorkerStore): # user ID and room ID to make it easier to populate. summaries = {} for row in unread_rows: - summaries[(row[0], row[1])] = { - "unread_count": row[2], - "stream_ordering": row[3], - "old_user_id": row[4], - "notif_count": 0, - } + summaries[(row[0], row[1])] = EventPushSummary( + user_id=row[0], room_id=row[1], unread_count=row[2], + stream_ordering=row[3], old_user_id=row[4], notif_count=0, + ) # notif_rows is populated based on a subset of the query used to populate # unread_rows, so we can be sure that there will be no KeyError here. for row in notif_rows: - summaries[(row[0], row[1])]["notif_count"] = row[2] + summaries[(row[0], row[1])].notif_count = row[2] logger.info("Rotating notifications, handling %d rows", len(summaries)) @@ -901,14 +911,14 @@ class EventPushActionsStore(EventPushActionsWorkerStore): table="event_push_summary", values=[ { - "user_id": key[0], - "room_id": key[1], - "notif_count": summary["notif_count"], - "unread_count": summary["unread_count"], - "stream_ordering": summary["stream_ordering"], + "user_id": summary.user_id, + "room_id": summary.room_id, + "notif_count": summary.notif_count, + "unread_count": summary.unread_count, + "stream_ordering": summary.stream_ordering, } - for key, summary in summaries.items() - if summary["old_user_id"] is None + for summary in summaries.values() + if summary.old_user_id is None ], ) @@ -920,14 +930,14 @@ class EventPushActionsStore(EventPushActionsWorkerStore): """, ( ( - summary["notif_count"], - summary["unread_count"], - summary["stream_ordering"], - key[0], - key[1], + summary.notif_count, + summary.unread_count, + summary.stream_ordering, + summary.user_id, + summary.room_id, ) - for key, summary in summaries.items() - if summary["old_user_id"] is not None + for summary in summaries.values() + if summary.old_user_id is not None ), ) From 9549d557ea37f5851bad12cca87ab4e9b610cec8 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 12 Jun 2020 15:03:26 +0100 Subject: [PATCH 23/27] Don't update the schema version --- .../07push_summary_unread_count.sql} | 0 synapse/storage/prepare_database.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename synapse/storage/data_stores/main/schema/delta/{59/00push_summary_unread_count.sql => 58/07push_summary_unread_count.sql} (100%) diff --git a/synapse/storage/data_stores/main/schema/delta/59/00push_summary_unread_count.sql b/synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql similarity index 100% rename from synapse/storage/data_stores/main/schema/delta/59/00push_summary_unread_count.sql rename to synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index bec8da7f6..9cc3b51fe 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -34,7 +34,7 @@ logger = logging.getLogger(__name__) # XXX: If you're about to bump this to 59 (or higher) please create an update # that drops the unused `cache_invalidation_stream` table, as per #7436! # XXX: Also add an update to drop `account_data_max_stream_id` as per #7656! -SCHEMA_VERSION = 59 +SCHEMA_VERSION = 58 dir_path = os.path.abspath(os.path.dirname(__file__)) From 1e5a50302f1b481e56019f2cc1c99b34183845af Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 12 Jun 2020 15:05:47 +0100 Subject: [PATCH 24/27] Pre-populate the unread_count column --- .../main/schema/delta/58/07push_summary_unread_count.sql | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql b/synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql index 560858d88..f1459ef7f 100644 --- a/synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql +++ b/synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql @@ -16,3 +16,8 @@ -- Store the number of unread messages, i.e. messages that triggered either a notify -- action or a mark_unread one. ALTER TABLE event_push_summary ADD COLUMN unread_count BIGINT NOT NULL DEFAULT 0; + +-- Pre-populate the new column with the count of pending notifications. +-- We expect event_push_summary to be relatively small, so we can do this update +-- synchronously without impacting Synapse's startup time too much. +UPDATE event_push_summary SET unread_count = notif_count; \ No newline at end of file From e47e5a2dcd2e7210c3830c3f0b8420a8b0988133 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 12 Jun 2020 15:11:01 +0100 Subject: [PATCH 25/27] Incorporate review bits --- changelog.d/7673.feature | 2 +- synapse/push/bulk_push_rule_evaluator.py | 13 +++++---- .../data_stores/main/event_push_actions.py | 27 +++++++++---------- 3 files changed, 20 insertions(+), 22 deletions(-) diff --git a/changelog.d/7673.feature b/changelog.d/7673.feature index 74e2059ad..ecc3ffd8d 100644 --- a/changelog.d/7673.feature +++ b/changelog.d/7673.feature @@ -1 +1 @@ -Add a per-room counter for unread messages in responses to `/sync` requests. +Add a per-room counter for unread messages in responses to `/sync` requests. Implements [MSC2625](https://github.com/matrix-org/matrix-doc/pull/2625). diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index f7c3db582..3244d39c3 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -191,13 +191,12 @@ class BulkPushRuleEvaluator(object): ) if matches: actions = [x for x in rule["actions"] if x != "dont_notify"] - if actions: - if ( - "notify" in actions - or "org.matrix.msc2625.mark_unread" in actions - ): - # Push rules say we should act on this event. - actions_by_user[uid] = actions + if ( + "notify" in actions + or "org.matrix.msc2625.mark_unread" in actions + ): + # Push rules say we should act on this event. + actions_by_user[uid] = actions break # Mark in the DB staging area the push actions for users who should be diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py index 688aef4d2..4409e8791 100644 --- a/synapse/storage/data_stores/main/event_push_actions.py +++ b/synapse/storage/data_stores/main/event_push_actions.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +from typing import Dict, Tuple import attr from six import iteritems @@ -857,11 +858,11 @@ class EventPushActionsStore(EventPushActionsWorkerStore): # Calculate the new counts that should be upserted into event_push_summary sql = """ SELECT user_id, room_id, - coalesce(old.%s, 0) + upd.%s, + coalesce(old.%s, 0) + upd.cnt, upd.stream_ordering, old.user_id FROM ( - SELECT user_id, room_id, count(*) as %s, + SELECT user_id, room_id, count(*) as cnt, max(stream_ordering) as stream_ordering FROM event_push_actions WHERE ? <= stream_ordering AND stream_ordering < ? @@ -874,31 +875,29 @@ class EventPushActionsStore(EventPushActionsWorkerStore): # First get the count of unread messages. txn.execute( - sql % ("unread_count", "unread_count", "unread_count", ""), + sql % ("unread_count", ""), (old_rotate_stream_ordering, rotate_to_stream_ordering), ) - unread_rows = txn.fetchall() - - # Then get the count of notifications. - txn.execute( - sql % ("notif_count", "notif_count", "notif_count", "AND notif = 1"), - (old_rotate_stream_ordering, rotate_to_stream_ordering), - ) - notif_rows = txn.fetchall() # We need to merge both lists into a single object because we might not have the # same amount of rows in each of them. In this case we use a dict indexed on the # user ID and room ID to make it easier to populate. - summaries = {} - for row in unread_rows: + summaries = {} # type: Dict[Tuple[str, str], EventPushSummary] + for row in txn: summaries[(row[0], row[1])] = EventPushSummary( user_id=row[0], room_id=row[1], unread_count=row[2], stream_ordering=row[3], old_user_id=row[4], notif_count=0, ) + # Then get the count of notifications. + txn.execute( + sql % ("notif_count", "AND notif = 1"), + (old_rotate_stream_ordering, rotate_to_stream_ordering), + ) + # notif_rows is populated based on a subset of the query used to populate # unread_rows, so we can be sure that there will be no KeyError here. - for row in notif_rows: + for row in txn: summaries[(row[0], row[1])].notif_count = row[2] logger.info("Rotating notifications, handling %d rows", len(summaries)) From e186c660b170ba31e6fe0f46efa393820ebe92ee Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Fri, 12 Jun 2020 15:31:59 +0100 Subject: [PATCH 26/27] Lint --- .../storage/data_stores/main/event_push_actions.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py index 4409e8791..382e0f61c 100644 --- a/synapse/storage/data_stores/main/event_push_actions.py +++ b/synapse/storage/data_stores/main/event_push_actions.py @@ -16,9 +16,9 @@ import logging from typing import Dict, Tuple -import attr from six import iteritems +import attr from canonicaljson import json from twisted.internet import defer @@ -42,6 +42,7 @@ DEFAULT_HIGHLIGHT_ACTION = [ @attr.s class EventPushSummary(object): """Summary of pending event push actions for a given user in a given room.""" + user_id = attr.ib() room_id = attr.ib() unread_count = attr.ib() @@ -885,8 +886,12 @@ class EventPushActionsStore(EventPushActionsWorkerStore): summaries = {} # type: Dict[Tuple[str, str], EventPushSummary] for row in txn: summaries[(row[0], row[1])] = EventPushSummary( - user_id=row[0], room_id=row[1], unread_count=row[2], - stream_ordering=row[3], old_user_id=row[4], notif_count=0, + user_id=row[0], + room_id=row[1], + unread_count=row[2], + stream_ordering=row[3], + old_user_id=row[4], + notif_count=0, ) # Then get the count of notifications. From fed493c5fdbce5942a7339f10693b7685dcac90a Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 15 Jun 2020 09:58:55 +0100 Subject: [PATCH 27/27] Incorporate review --- .../data_stores/main/event_push_actions.py | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py index 382e0f61c..b1a2804b3 100644 --- a/synapse/storage/data_stores/main/event_push_actions.py +++ b/synapse/storage/data_stores/main/event_push_actions.py @@ -40,15 +40,13 @@ DEFAULT_HIGHLIGHT_ACTION = [ @attr.s -class EventPushSummary(object): +class EventPushSummary: """Summary of pending event push actions for a given user in a given room.""" - user_id = attr.ib() - room_id = attr.ib() - unread_count = attr.ib() - stream_ordering = attr.ib() - old_user_id = attr.ib() - notif_count = attr.ib() + unread_count = attr.ib(type=int) + stream_ordering = attr.ib(type=int) + old_user_id = attr.ib(type=str) + notif_count = attr.ib(type=int) def _serialize_action(actions, is_highlight): @@ -886,8 +884,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore): summaries = {} # type: Dict[Tuple[str, str], EventPushSummary] for row in txn: summaries[(row[0], row[1])] = EventPushSummary( - user_id=row[0], - room_id=row[1], unread_count=row[2], stream_ordering=row[3], old_user_id=row[4], @@ -915,13 +911,13 @@ class EventPushActionsStore(EventPushActionsWorkerStore): table="event_push_summary", values=[ { - "user_id": summary.user_id, - "room_id": summary.room_id, + "user_id": user_id, + "room_id": room_id, "notif_count": summary.notif_count, "unread_count": summary.unread_count, "stream_ordering": summary.stream_ordering, } - for summary in summaries.values() + for ((user_id, room_id), summary) in summaries.items() if summary.old_user_id is None ], ) @@ -937,10 +933,10 @@ class EventPushActionsStore(EventPushActionsWorkerStore): summary.notif_count, summary.unread_count, summary.stream_ordering, - summary.user_id, - summary.room_id, + user_id, + room_id, ) - for summary in summaries.values() + for ((user_id, room_id), summary) in summaries.items() if summary.old_user_id is not None ), )