Refactor sync APIs to reuse pagination API

The sync API often returns events in a topological rather than stream
ordering, e.g. when the user joined the room or on initial sync. When
this happens we can reuse existing pagination storage functions.
This commit is contained in:
Erik Johnston 2018-05-09 11:59:45 +01:00
parent e5ab9cd24b
commit e2accd7f1d
2 changed files with 48 additions and 44 deletions

View File

@ -354,12 +354,19 @@ class SyncHandler(object):
since_key = since_token.room_key since_key = since_token.room_key
while limited and len(recents) < timeline_limit and max_repeat: while limited and len(recents) < timeline_limit and max_repeat:
if since_key:
events, end_key = yield self.store.get_room_events_stream_for_room( events, end_key = yield self.store.get_room_events_stream_for_room(
room_id, room_id,
limit=load_limit + 1, limit=load_limit + 1,
from_key=since_key, from_key=since_key,
to_key=end_key, to_key=end_key,
) )
else:
events, end_key = yield self.store.get_recent_events_for_room(
room_id,
limit=load_limit + 1,
end_token=end_key,
)
loaded_recents = sync_config.filter_collection.filter_room_timeline( loaded_recents = sync_config.filter_collection.filter_room_timeline(
events events
) )

View File

@ -233,19 +233,31 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks @defer.inlineCallbacks
def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0, def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0,
order='DESC'): order='DESC'):
# Note: If from_key is None then we return in topological order. This
# is because in that case we're using this as a "get the last few messages
# in a room" function, rather than "get new messages since last sync"
if from_key is not None:
from_id = RoomStreamToken.parse_stream_token(from_key).stream
else:
from_id = None
to_id = RoomStreamToken.parse_stream_token(to_key).stream
"""Get new room events in stream ordering since `from_key`.
Args:
room_id (str)
from_key (str): Token from which no events are returned before
to_key (str): Token from which no events are returned after. (This
is typically the current stream token)
limit (int): Maximum number of events to return
order (str): Either "DESC" or "ASC". Determines which events are
returned when the result is limited. If "DESC" then the most
recent `limit` events are returned, otherwise returns the
oldest `limit` events.
Returns:
Deferred[tuple[list[FrozenEvent], str]]: Returns the list of
events (in ascending order) and the token from the start of
the chunk of events returned.
"""
if from_key == to_key: if from_key == to_key:
defer.returnValue(([], from_key)) defer.returnValue(([], from_key))
if from_id: from_id = RoomStreamToken.parse_stream_token(from_key).stream
to_id = RoomStreamToken.parse_stream_token(to_key).stream
has_changed = yield self._events_stream_cache.has_entity_changed( has_changed = yield self._events_stream_cache.has_entity_changed(
room_id, from_id room_id, from_id
) )
@ -254,7 +266,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
defer.returnValue(([], from_key)) defer.returnValue(([], from_key))
def f(txn): def f(txn):
if from_id is not None:
sql = ( sql = (
"SELECT event_id, stream_ordering FROM events WHERE" "SELECT event_id, stream_ordering FROM events WHERE"
" room_id = ?" " room_id = ?"
@ -265,20 +276,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
txn.execute(sql, (room_id, from_id, to_id, limit)) txn.execute(sql, (room_id, from_id, to_id, limit))
rows = [_EventDictReturn(row[0], None, row[1]) for row in txn] rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
else:
sql = (
"SELECT event_id, topological_ordering, stream_ordering"
" FROM events"
" WHERE"
" room_id = ?"
" AND not outlier"
" AND stream_ordering <= ?"
" ORDER BY topological_ordering %s, stream_ordering %s LIMIT ?"
) % (order, order,)
txn.execute(sql, (room_id, to_id, limit))
rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
return rows return rows
rows = yield self.runInteraction("get_room_events_stream_for_room", f) rows = yield self.runInteraction("get_room_events_stream_for_room", f)