Aggregate event push actions

This commit is contained in:
Erik Johnston 2017-02-03 18:12:53 +00:00
parent 795f8e3fe7
commit 095b45c165
5 changed files with 342 additions and 59 deletions

View File

@ -85,6 +85,12 @@ class SlavedEventStore(BaseSlavedStore):
get_unread_event_push_actions_by_room_for_user = (
EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
)
_get_unread_counts_by_receipt_txn = (
DataStore._get_unread_counts_by_receipt_txn.__func__
)
_get_unread_counts_by_pos_txn = (
DataStore._get_unread_counts_by_pos_txn.__func__
)
_get_state_group_for_events = (
StateStore.__dict__["_get_state_group_for_events"]
)

View File

@ -15,6 +15,7 @@
from ._base import SQLBaseStore
from twisted.internet import defer
from synapse.util.async import sleep
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.types import RoomStreamToken
from .stream import lower_bound
@ -29,7 +30,6 @@ class EventPushActionsStore(SQLBaseStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
def __init__(self, hs):
self.stream_ordering_month_ago = None
super(EventPushActionsStore, self).__init__(hs)
self.register_background_index_update(
@ -47,6 +47,9 @@ class EventPushActionsStore(SQLBaseStore):
where_clause="highlight=1"
)
self._doing_notif_rotation = False
self._clock.looping_call(self._rotate_notifs, 60 * 1000)
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
"""
Args:
@ -77,7 +80,15 @@ class EventPushActionsStore(SQLBaseStore):
def get_unread_event_push_actions_by_room_for_user(
self, room_id, user_id, last_read_event_id
):
def _get_unread_event_push_actions_by_room(txn):
ret = yield self.runInteraction(
"get_unread_event_push_actions_by_room",
self._get_unread_counts_by_receipt_txn,
room_id, user_id, last_read_event_id
)
defer.returnValue(ret)
def _get_unread_counts_by_receipt_txn(self, txn, room_id, user_id,
last_read_event_id):
sql = (
"SELECT stream_ordering, topological_ordering"
" FROM events"
@ -92,6 +103,13 @@ class EventPushActionsStore(SQLBaseStore):
stream_ordering = results[0][0]
topological_ordering = results[0][1]
return self._get_unread_counts_by_pos_txn(
txn, room_id, user_id, topological_ordering, stream_ordering
)
def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, topological_ordering,
stream_ordering):
token = RoomStreamToken(
topological_ordering, stream_ordering
)
@ -112,6 +130,20 @@ class EventPushActionsStore(SQLBaseStore):
row = txn.fetchone()
notify_count = row[0] if row else 0
summary_notif_count = self._simple_select_one_onecol_txn(
txn,
table="event_push_summary",
keyvalues={
"user_id": user_id,
"room_id": room_id,
},
retcol="notif_count",
allow_none=True,
)
if summary_notif_count:
notify_count += summary_notif_count
# Now get the number of highlights
sql = (
"SELECT count(*)"
@ -132,12 +164,6 @@ class EventPushActionsStore(SQLBaseStore):
"highlight_count": highlight_count,
}
ret = yield self.runInteraction(
"get_unread_event_push_actions_by_room",
_get_unread_event_push_actions_by_room
)
defer.returnValue(ret)
@defer.inlineCallbacks
def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
def f(txn):
@ -448,7 +474,7 @@ class EventPushActionsStore(SQLBaseStore):
)
def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
topological_ordering):
topological_ordering, stream_ordering):
"""
Purges old push actions for a user and room before a given
topological_ordering.
@ -479,11 +505,16 @@ class EventPushActionsStore(SQLBaseStore):
txn.execute(
"DELETE FROM event_push_actions "
" WHERE user_id = ? AND room_id = ? AND "
" topological_ordering < ?"
" topological_ordering <= ?"
" AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
(user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
)
txn.execute("""
DELETE FROM event_push_summary
WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
""", (room_id, user_id, stream_ordering))
@defer.inlineCallbacks
def _find_stream_orderings_for_times(self):
yield self.runInteraction(
@ -500,6 +531,14 @@ class EventPushActionsStore(SQLBaseStore):
"Found stream ordering 1 month ago: it's %d",
self.stream_ordering_month_ago
)
logger.info("Searching for stream ordering 1 day ago")
self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
)
logger.info(
"Found stream ordering 1 day ago: it's %d",
self.stream_ordering_day_ago
)
def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
"""
@ -539,6 +578,120 @@ class EventPushActionsStore(SQLBaseStore):
return range_end
@defer.inlineCallbacks
def _rotate_notifs(self):
if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
return
self._doing_notif_rotation = True
try:
while True:
logger.info("Rotating notifications")
caught_up = yield self.runInteraction(
"_rotate_notifs",
self._rotate_notifs_txn
)
if caught_up:
break
yield sleep(1)
finally:
self._doing_notif_rotation = False
def _rotate_notifs_txn(self, txn):
"""Archives older notifications into event_push_summary. Returns whether
the archiving process has caught up or not.
"""
# We want to make sure that we only ever do this one at a time
self.database_engine.lock_table(txn, "event_push_summary")
# We don't to try and rotate millions of rows at once, so we cap the
# maximum stream ordering we'll rotate before.
txn.execute("""
SELECT stream_ordering FROM event_push_actions
ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000
""")
stream_row = txn.fetchone()
if stream_row:
offset_stream_ordering, = stream_row
rotate_to_stream_ordering = min(
self.stream_ordering_day_ago, offset_stream_ordering
)
caught_up = offset_stream_ordering >= self.stream_ordering_day_ago
else:
rotate_to_stream_ordering = self.stream_ordering_day_ago
caught_up = True
self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering)
# We have caught up iff we were limited by `stream_ordering_day_ago`
return caught_up
def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
old_rotate_stream_ordering = self._simple_select_one_onecol_txn(
txn,
table="event_push_summary_stream_ordering",
keyvalues={},
retcol="stream_ordering",
)
# Calculate the new counts that should be upserted into event_push_summary
sql = """
SELECT user_id, room_id,
coalesce(old.notif_count, 0) + upd.notif_count,
upd.stream_ordering,
old.user_id
FROM (
SELECT user_id, room_id, count(*) as notif_count,
max(stream_ordering) as stream_ordering
FROM event_push_actions
WHERE ? <= stream_ordering AND stream_ordering < ?
AND highlight = 0
GROUP BY user_id, room_id
) AS upd
LEFT JOIN event_push_summary AS old USING (user_id, room_id)
"""
txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering,))
rows = txn.fetchall()
# If the `old.user_id` above is NULL then we know there isn't already an
# entry in the table, so we simply insert it. Otherwise we update the
# existing table.
self._simple_insert_many_txn(
txn,
table="event_push_summary",
values=[
{
"user_id": row[0],
"room_id": row[1],
"notif_count": row[2],
"stream_ordering": row[3],
}
for row in rows if row[4] is None
]
)
txn.executemany(
"""
UPDATE event_push_summary SET notif_count = ?, stream_ordering = ?
WHERE user_id = ? AND room_id = ?
""",
((row[2], row[3], row[0], row[1],) for row in rows if row[4] is not None)
)
txn.execute(
"DELETE FROM event_push_actions"
" WHERE ? <= stream_ordering AND stream_ordering < ? AND highlight = 0",
(old_rotate_stream_ordering, rotate_to_stream_ordering,)
)
txn.execute(
"UPDATE event_push_summary_stream_ordering SET stream_ordering = ?",
(rotate_to_stream_ordering,)
)
def _action_has_highlight(actions):
for action in actions:

View File

@ -351,6 +351,7 @@ class ReceiptsStore(SQLBaseStore):
room_id=room_id,
user_id=user_id,
topological_ordering=topological_ordering,
stream_ordering=stream_ordering,
)
return True

View File

@ -0,0 +1,37 @@
/* Copyright 2017 OpenMarket Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Aggregate of old notification counts that have been deleted out of the
-- main event_push_actions table. This count does not include those that were
-- highlights, as they remain in the event_push_actions table.
CREATE TABLE event_push_summary (
user_id TEXT NOT NULL,
room_id TEXT NOT NULL,
notif_count BIGINT NOT NULL,
stream_ordering BIGINT NOT NULL
);
CREATE INDEX event_push_summary_user_rm ON event_push_summary(user_id, room_id);
-- The stream ordering up to which we have aggregated the event_push_actions
-- table into event_push_summary
CREATE TABLE event_push_summary_stream_ordering (
Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row.
stream_ordering BIGINT NOT NULL,
CHECK (Lock='X')
);
INSERT INTO event_push_summary_stream_ordering (stream_ordering) VALUES (0);

View File

@ -17,9 +17,15 @@ from twisted.internet import defer
import tests.unittest
import tests.utils
from mock import Mock
USER_ID = "@user:example.com"
PlAIN_NOTIF = ["notify", {"set_tweak": "highlight", "value": False}]
HIGHLIGHT = [
"notify", {"set_tweak": "sound", "value": "default"}, {"set_tweak": "highlight"}
]
class EventPushActionsStoreTestCase(tests.unittest.TestCase):
@ -39,3 +45,83 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
yield self.store.get_unread_push_actions_for_user_in_range_for_email(
USER_ID, 0, 1000, 20
)
@defer.inlineCallbacks
def test_count_aggregation(self):
room_id = "!foo:example.com"
user_id = "@user1235:example.com"
@defer.inlineCallbacks
def _assert_counts(noitf_count, highlight_count):
counts = yield self.store.runInteraction(
"", self.store._get_unread_counts_by_pos_txn,
room_id, user_id, 0, 0
)
self.assertEquals(
counts,
{"notify_count": noitf_count, "highlight_count": highlight_count}
)
def _inject_actions(stream, action):
event = Mock()
event.room_id = room_id
event.event_id = "$test:example.com"
event.internal_metadata.stream_ordering = stream
event.depth = stream
tuples = [(user_id, action)]
return self.store.runInteraction(
"", self.store._set_push_actions_for_event_and_users_txn,
event, tuples
)
def _rotate(stream):
return self.store.runInteraction(
"", self.store._rotate_notifs_before_txn, stream
)
def _mark_read(stream, depth):
return self.store.runInteraction(
"", self.store._remove_old_push_actions_before_txn,
room_id, user_id, depth, stream
)
yield _assert_counts(0, 0)
yield _inject_actions(1, PlAIN_NOTIF)
yield _assert_counts(1, 0)
yield _rotate(2)
yield _assert_counts(1, 0)
yield _inject_actions(3, PlAIN_NOTIF)
yield _assert_counts(2, 0)
yield _rotate(4)
yield _assert_counts(2, 0)
yield _inject_actions(5, PlAIN_NOTIF)
yield _mark_read(3, 3)
yield _assert_counts(1, 0)
yield _mark_read(5, 5)
yield _assert_counts(0, 0)
yield _inject_actions(6, PlAIN_NOTIF)
yield _rotate(7)
yield self.store._simple_delete(
table="event_push_actions",
keyvalues={"1": 1},
desc="",
)
yield _assert_counts(1, 0)
yield _mark_read(7, 7)
yield _assert_counts(0, 0)
yield _inject_actions(8, HIGHLIGHT)
yield _assert_counts(1, 1)
yield _rotate(9)
yield _assert_counts(1, 1)
yield _rotate(10)
yield _assert_counts(1, 1)