Merge pull request #3212 from matrix-org/erikj/epa_stream

Use stream rather depth ordering for push actions
This commit is contained in:
Erik Johnston 2018-05-17 12:01:21 +01:00 committed by GitHub
commit f7906203f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 18 additions and 40 deletions

View File

@ -18,8 +18,6 @@ from synapse.storage._base import SQLBaseStore, LoggingTransaction
from twisted.internet import defer from twisted.internet import defer
from synapse.util.async import sleep from synapse.util.async import sleep
from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.types import RoomStreamToken
from .stream import lower_bound
import logging import logging
import simplejson as json import simplejson as json
@ -99,7 +97,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
def _get_unread_counts_by_receipt_txn(self, txn, room_id, user_id, def _get_unread_counts_by_receipt_txn(self, txn, room_id, user_id,
last_read_event_id): last_read_event_id):
sql = ( sql = (
"SELECT stream_ordering, topological_ordering" "SELECT stream_ordering"
" FROM events" " FROM events"
" WHERE room_id = ? AND event_id = ?" " WHERE room_id = ? AND event_id = ?"
) )
@ -111,17 +109,12 @@ class EventPushActionsWorkerStore(SQLBaseStore):
return {"notify_count": 0, "highlight_count": 0} return {"notify_count": 0, "highlight_count": 0}
stream_ordering = results[0][0] stream_ordering = results[0][0]
topological_ordering = results[0][1]
return self._get_unread_counts_by_pos_txn( return self._get_unread_counts_by_pos_txn(
txn, room_id, user_id, topological_ordering, stream_ordering txn, room_id, user_id, stream_ordering
) )
def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, topological_ordering, def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering):
stream_ordering):
token = RoomStreamToken(
topological_ordering, stream_ordering
)
# First get number of notifications. # First get number of notifications.
# We don't need to put a notif=1 clause as all rows always have # We don't need to put a notif=1 clause as all rows always have
@ -132,10 +125,10 @@ class EventPushActionsWorkerStore(SQLBaseStore):
" WHERE" " WHERE"
" user_id = ?" " user_id = ?"
" AND room_id = ?" " AND room_id = ?"
" AND %s" " AND stream_ordering > ?"
) % (lower_bound(token, self.database_engine, inclusive=False),) )
txn.execute(sql, (user_id, room_id)) txn.execute(sql, (user_id, room_id, stream_ordering))
row = txn.fetchone() row = txn.fetchone()
notify_count = row[0] if row else 0 notify_count = row[0] if row else 0
@ -155,10 +148,10 @@ class EventPushActionsWorkerStore(SQLBaseStore):
" highlight = 1" " highlight = 1"
" AND user_id = ?" " AND user_id = ?"
" AND room_id = ?" " AND room_id = ?"
" AND %s" " AND stream_ordering > ?"
) % (lower_bound(token, self.database_engine, inclusive=False),) )
txn.execute(sql, (user_id, room_id)) txn.execute(sql, (user_id, room_id, stream_ordering))
row = txn.fetchone() row = txn.fetchone()
highlight_count = row[0] if row else 0 highlight_count = row[0] if row else 0
@ -209,7 +202,6 @@ class EventPushActionsWorkerStore(SQLBaseStore):
" ep.highlight " " ep.highlight "
" FROM (" " FROM ("
" SELECT room_id," " SELECT room_id,"
" MAX(topological_ordering) as topological_ordering,"
" MAX(stream_ordering) as stream_ordering" " MAX(stream_ordering) as stream_ordering"
" FROM events" " FROM events"
" INNER JOIN receipts_linearized USING (room_id, event_id)" " INNER JOIN receipts_linearized USING (room_id, event_id)"
@ -219,13 +211,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
" event_push_actions AS ep" " event_push_actions AS ep"
" WHERE" " WHERE"
" ep.room_id = rl.room_id" " ep.room_id = rl.room_id"
" AND ("
" ep.topological_ordering > rl.topological_ordering"
" OR ("
" ep.topological_ordering = rl.topological_ordering"
" AND ep.stream_ordering > rl.stream_ordering" " AND ep.stream_ordering > rl.stream_ordering"
" )"
" )"
" AND ep.user_id = ?" " AND ep.user_id = ?"
" AND ep.stream_ordering > ?" " AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?" " AND ep.stream_ordering <= ?"
@ -318,7 +304,6 @@ class EventPushActionsWorkerStore(SQLBaseStore):
" ep.highlight, e.received_ts" " ep.highlight, e.received_ts"
" FROM (" " FROM ("
" SELECT room_id," " SELECT room_id,"
" MAX(topological_ordering) as topological_ordering,"
" MAX(stream_ordering) as stream_ordering" " MAX(stream_ordering) as stream_ordering"
" FROM events" " FROM events"
" INNER JOIN receipts_linearized USING (room_id, event_id)" " INNER JOIN receipts_linearized USING (room_id, event_id)"
@ -329,13 +314,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
" INNER JOIN events AS e USING (room_id, event_id)" " INNER JOIN events AS e USING (room_id, event_id)"
" WHERE" " WHERE"
" ep.room_id = rl.room_id" " ep.room_id = rl.room_id"
" AND ("
" ep.topological_ordering > rl.topological_ordering"
" OR ("
" ep.topological_ordering = rl.topological_ordering"
" AND ep.stream_ordering > rl.stream_ordering" " AND ep.stream_ordering > rl.stream_ordering"
" )"
" )"
" AND ep.user_id = ?" " AND ep.user_id = ?"
" AND ep.stream_ordering > ?" " AND ep.stream_ordering > ?"
" AND ep.stream_ordering <= ?" " AND ep.stream_ordering <= ?"
@ -762,10 +741,10 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
) )
def _remove_old_push_actions_before_txn(self, txn, room_id, user_id, def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
topological_ordering, stream_ordering): stream_ordering):
""" """
Purges old push actions for a user and room before a given Purges old push actions for a user and room before a given
topological_ordering. stream_ordering.
We however keep a months worth of highlighted notifications, so that We however keep a months worth of highlighted notifications, so that
users can still get a list of recent highlights. users can still get a list of recent highlights.
@ -774,7 +753,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
txn: The transcation txn: The transcation
room_id: Room ID to delete from room_id: Room ID to delete from
user_id: user ID to delete for user_id: user ID to delete for
topological_ordering: The lowest topological ordering which will stream_ordering: The lowest stream ordering which will
not be deleted. not be deleted.
""" """
txn.call_after( txn.call_after(
@ -793,9 +772,9 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
txn.execute( txn.execute(
"DELETE FROM event_push_actions " "DELETE FROM event_push_actions "
" WHERE user_id = ? AND room_id = ? AND " " WHERE user_id = ? AND room_id = ? AND "
" topological_ordering <= ?" " stream_ordering <= ?"
" AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)", " AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
(user_id, room_id, topological_ordering, self.stream_ordering_month_ago) (user_id, room_id, stream_ordering, self.stream_ordering_month_ago)
) )
txn.execute(""" txn.execute("""

View File

@ -407,7 +407,6 @@ class ReceiptsStore(ReceiptsWorkerStore):
txn, txn,
room_id=room_id, room_id=room_id,
user_id=user_id, user_id=user_id,
topological_ordering=topological_ordering,
stream_ordering=stream_ordering, stream_ordering=stream_ordering,
) )

View File

@ -55,7 +55,7 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
def _assert_counts(noitf_count, highlight_count): def _assert_counts(noitf_count, highlight_count):
counts = yield self.store.runInteraction( counts = yield self.store.runInteraction(
"", self.store._get_unread_counts_by_pos_txn, "", self.store._get_unread_counts_by_pos_txn,
room_id, user_id, 0, 0 room_id, user_id, 0
) )
self.assertEquals( self.assertEquals(
counts, counts,
@ -86,7 +86,7 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
def _mark_read(stream, depth): def _mark_read(stream, depth):
return self.store.runInteraction( return self.store.runInteraction(
"", self.store._remove_old_push_actions_before_txn, "", self.store._remove_old_push_actions_before_txn,
room_id, user_id, depth, stream room_id, user_id, stream
) )
yield _assert_counts(0, 0) yield _assert_counts(0, 0)