Use stream rather depth ordering for push actions

This simplifies things as it is, but will also allow us to change the
way we traverse topologically without having to update the way push
actions work.
This commit is contained in:
Erik Johnston 2018-05-11 15:30:11 +01:00
parent bfe1f73855
commit 6406b70aeb
3 changed files with 18 additions and 40 deletions

View File

@ -18,8 +18,6 @@ from synapse.storage._base import SQLBaseStore, LoggingTransaction
from twisted.internet import defer
from synapse.util.async import sleep
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.types import RoomStreamToken
from .stream import lower_bound
import logging
import simplejson as json
@ -99,7 +97,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
def _get_unread_counts_by_receipt_txn(self, txn, room_id, user_id,
last_read_event_id):
sql = (
"SELECT stream_ordering, topological_ordering"
"SELECT stream_ordering"
" FROM events"
" WHERE room_id = ? AND event_id = ?"
)
@ -111,17 +109,12 @@ class EventPushActionsWorkerStore(SQLBaseStore):
return {"notify_count": 0, "highlight_count": 0}
stream_ordering = results[0][0]
topological_ordering = results[0][1]
return self._get_unread_counts_by_pos_txn(
txn, room_id, user_id, topological_ordering, stream_ordering
txn, room_id, user_id, stream_ordering
)
def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, topological_ordering,
stream_ordering):
token = RoomStreamToken(
topological_ordering, stream_ordering
)
def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering):
# First get number of notifications.
# We don't need to put a notif=1 clause as all rows always have
@ -132,10 +125,10 @@ class EventPushActionsWorkerStore(SQLBaseStore):
" WHERE"
" user_id = ?"
" AND room_id = ?"
" AND %s"
) % (lower_bound(token, self.database_engine, inclusive=False),)
" AND stream_ordering > ?"
)
txn.execute(sql, (user_id, room_id))
txn.execute(sql, (user_id, room_id, stream_ordering))
row = txn.fetchone()
notify_count = row[0] if row else 0
@ -155,10 +148,10 @@ class EventPushActionsWorkerStore(SQLBaseStore):
" highlight = 1"
" AND user_id = ?"
" AND room_id = ?"
" AND %s"
) % (lower_bound(token, self.database_engine, inclusive=False),)
" AND stream_ordering > ?"
)
txn.execute(sql, (user_id, room_id))
txn.execute(sql, (user_id, room_id, stream_ordering))
row = txn.fetchone()
highlight_count = row[0] if row else 0
@ -209,7 +202,6 @@ class EventPushActionsWorkerStore(SQLBaseStore):
" ep.highlight "
" FROM ("
" SELECT room_id,"
" MAX(topological_ordering) as topological_ordering,"
" MAX(stream_ordering) as stream_ordering"
" FROM events"
" INNER JOIN receipts_linearized USING (room_id, event_id)"
@ -219,13 +211,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
" event_push_actions AS ep"
" WHERE"
" ep.room_id = rl.room_id"
" AND ("
" ep.topological_ordering > rl.topological_ordering"
" OR ("
" ep.topological_ordering = rl.topological_ordering"
" AND ep.stream_ordering > rl.stream_ordering"
" )"
" )"
" AND ep.stream_ordering > rl.stream_ordering"
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
@ -318,7 +304,6 @@ class EventPushActionsWorkerStore(SQLBaseStore):
" ep.highlight, e.received_ts"
" FROM ("
" SELECT room_id,"
" MAX(topological_ordering) as topological_ordering,"
" MAX(stream_ordering) as stream_ordering"
" FROM events"
" INNER JOIN receipts_linearized USING (room_id, event_id)"
@ -329,13 +314,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
" INNER JOIN events AS e USING (room_id, event_id)"
" WHERE"
" ep.room_id = rl.room_id"
" AND ("
" ep.topological_ordering > rl.topological_ordering"
" OR ("
" ep.topological_ordering = rl.topological_ordering"
" AND ep.stream_ordering > rl.stream_ordering"
" )"
" )"
" AND ep.stream_ordering > rl.stream_ordering"
" AND ep.user_id = ?"
" AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?"
@ -762,10 +741,10 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
)
def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
topological_ordering, stream_ordering):
stream_ordering):
"""
Purges old push actions for a user and room before a given
topological_ordering.
stream_ordering.
We however keep a months worth of highlighted notifications, so that
users can still get a list of recent highlights.
@ -774,7 +753,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
txn: The transcation
room_id: Room ID to delete from
user_id: user ID to delete for
topological_ordering: The lowest topological ordering which will
stream_ordering: The lowest stream ordering which will
not be deleted.
"""
txn.call_after(
@ -793,9 +772,9 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
txn.execute(
"DELETE FROM event_push_actions "
" WHERE user_id = ? AND room_id = ? AND "
" topological_ordering <= ?"
" stream_ordering <= ?"
" AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
(user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
(user_id, room_id, stream_ordering, self.stream_ordering_month_ago)
)
txn.execute("""

View File

@ -407,7 +407,6 @@ class ReceiptsStore(ReceiptsWorkerStore):
txn,
room_id=room_id,
user_id=user_id,
topological_ordering=topological_ordering,
stream_ordering=stream_ordering,
)

View File

@ -55,7 +55,7 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
def _assert_counts(noitf_count, highlight_count):
counts = yield self.store.runInteraction(
"", self.store._get_unread_counts_by_pos_txn,
room_id, user_id, 0, 0
room_id, user_id, 0
)
self.assertEquals(
counts,
@ -86,7 +86,7 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
def _mark_read(stream, depth):
return self.store.runInteraction(
"", self.store._remove_old_push_actions_before_txn,
room_id, user_id, depth, stream
room_id, user_id, stream
)
yield _assert_counts(0, 0)