Merge pull request #3195 from matrix-org/erikj/pagination_refactor

Refactor recent events func to use pagination func
This commit is contained in:
Erik Johnston 2018-05-09 14:12:24 +01:00 committed by GitHub
commit 0461ef01b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -38,7 +38,6 @@ from twisted.internet import defer
from synapse.storage._base import SQLBaseStore from synapse.storage._base import SQLBaseStore
from synapse.storage.events import EventsWorkerStore from synapse.storage.events import EventsWorkerStore
from synapse.util.caches.descriptors import cached
from synapse.types import RoomStreamToken from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.logcontext import make_deferred_yieldable, run_in_background
@ -347,9 +346,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
defer.returnValue(ret) defer.returnValue(ret)
@defer.inlineCallbacks @defer.inlineCallbacks
def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None): def get_recent_events_for_room(self, room_id, limit, end_token):
rows, token = yield self.get_recent_event_ids_for_room( rows, token = yield self.get_recent_event_ids_for_room(
room_id, limit, end_token, from_token room_id, limit, end_token,
) )
logger.debug("stream before") logger.debug("stream before")
@ -363,60 +362,37 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
defer.returnValue((events, token)) defer.returnValue((events, token))
@cached(num_args=4) @defer.inlineCallbacks
def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None): def get_recent_event_ids_for_room(self, room_id, limit, end_token):
"""Get the most recent events in the room in topological ordering.
Args:
room_id (str)
limit (int)
end_token (str): The stream token representing now.
Returns:
Deferred[tuple[list[dict], tuple[str, str]]]: Returns a list of
dicts (which include event_ids, etc), and a tuple for
`(start_token, end_token)` representing the range of rows
returned.
The returned events are in ascending order.
"""
# Allow a zero limit here, and no-op.
if limit == 0:
defer.returnValue(([], (end_token, end_token)))
end_token = RoomStreamToken.parse_stream_token(end_token) end_token = RoomStreamToken.parse_stream_token(end_token)
if from_token is None: rows, token = yield self.runInteraction(
sql = ( "get_recent_event_ids_for_room", self._paginate_room_events_txn,
"SELECT stream_ordering, topological_ordering, event_id" room_id, from_token=end_token, limit=limit,
" FROM events"
" WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?"
" ORDER BY topological_ordering DESC, stream_ordering DESC"
" LIMIT ?"
)
else:
from_token = RoomStreamToken.parse_stream_token(from_token)
sql = (
"SELECT stream_ordering, topological_ordering, event_id"
" FROM events"
" WHERE room_id = ? AND stream_ordering > ?"
" AND stream_ordering <= ? AND outlier = ?"
" ORDER BY topological_ordering DESC, stream_ordering DESC"
" LIMIT ?"
) )
def get_recent_events_for_room_txn(txn): # We want to return the results in ascending order.
if from_token is None: rows.reverse()
txn.execute(sql, (room_id, end_token.stream, False, limit,))
else:
txn.execute(sql, (
room_id, from_token.stream, end_token.stream, False, limit
))
rows = self.cursor_to_dict(txn) defer.returnValue((rows, (token, str(end_token))))
rows.reverse() # As we selected with reverse ordering
if rows:
# Tokens are positions between events.
# This token points *after* the last event in the chunk.
# We need it to point to the event before it in the chunk
# since we are going backwards so we subtract one from the
# stream part.
topo = rows[0]["topological_ordering"]
toke = rows[0]["stream_ordering"] - 1
start_token = str(RoomStreamToken(topo, toke))
token = (start_token, str(end_token))
else:
token = (str(end_token), str(end_token))
return rows, token
return self.runInteraction(
"get_recent_events_for_room", get_recent_events_for_room_txn
)
def get_room_event_after_stream_ordering(self, room_id, stream_ordering): def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
"""Gets details of the first event in a room at or after a stream ordering """Gets details of the first event in a room at or after a stream ordering