Fix pagination to work with new db schema

This commit is contained in:
Erik Johnston 2014-08-15 15:53:06 +01:00
parent 01f089d9fb
commit 8d1f763209
2 changed files with 48 additions and 31 deletions

View File

@ -97,30 +97,30 @@ class MessageHandler(BaseHandler):
self.notifier.on_new_room_event(event, store_id) self.notifier.on_new_room_event(event, store_id)
yield self.hs.get_federation().handle_new_event(event) yield self.hs.get_federation().handle_new_event(event)
#
# @defer.inlineCallbacks @defer.inlineCallbacks
# def get_messages(self, user_id=None, room_id=None, pagin_config=None, def get_messages(self, user_id=None, room_id=None, pagin_config=None,
# feedback=False): feedback=False):
# """Get messages in a room. """Get messages in a room.
#
# Args: Args:
# user_id (str): The user requesting messages. user_id (str): The user requesting messages.
# room_id (str): The room they want messages from. room_id (str): The room they want messages from.
# pagin_config (synapse.api.streams.PaginationConfig): The pagination pagin_config (synapse.api.streams.PaginationConfig): The pagination
# config rules to apply, if any. config rules to apply, if any.
# feedback (bool): True to get compressed feedback with the messages feedback (bool): True to get compressed feedback with the messages
# Returns: Returns:
# dict: Pagination API results dict: Pagination API results
# """ """
# yield self.auth.check_joined_room(room_id, user_id) yield self.auth.check_joined_room(room_id, user_id)
#
# data_source = [MessagesStreamData(self.hs, room_id=room_id, data_source = [EventsStreamData(self.hs, room_id=room_id,
# feedback=feedback)] feedback=feedback)]
# event_stream = EventStream(user_id, data_source) event_stream = EventStream(user_id, data_source)
# pagin_config = yield event_stream.fix_tokens(pagin_config) pagin_config = yield event_stream.fix_tokens(pagin_config)
# data_chunk = yield event_stream.get_chunk(config=pagin_config) data_chunk = yield event_stream.get_chunk(config=pagin_config)
# defer.returnValue(data_chunk) defer.returnValue(data_chunk)
#
@defer.inlineCallbacks @defer.inlineCallbacks
def store_room_data(self, event=None, stamp_event=True): def store_room_data(self, event=None, stamp_event=True):
""" Stores data for a room. """ Stores data for a room.

View File

@ -53,18 +53,35 @@ class StreamStore(SQLBaseStore):
else: else:
limit = 1000 limit = 1000
# From and to keys should be integers from ordering.
from_key = int(from_key)
to_key = int(to_key)
if from_key == to_key:
defer.returnValue(([], to_key))
return
sql = ( sql = (
"SELECT * FROM events as e WHERE " "SELECT * FROM events as e WHERE "
"((room_id IN (%(current)s)) OR " "((room_id IN (%(current)s)) OR "
"(event_id IN (%(invites)s))) " "(event_id IN (%(invites)s))) "
" AND e.ordering > ? AND e.ordering < ? "
"ORDER BY ordering ASC LIMIT %(limit)d"
) % { ) % {
"current": current_room_membership_sql, "current": current_room_membership_sql,
"invites": invites_sql, "invites": invites_sql,
"limit": limit,
} }
if from_key < to_key:
sql += (
"AND e.ordering > ? AND e.ordering < ? "
"ORDER BY ordering ASC LIMIT %(limit)d "
) % {"limit": limit}
else:
sql += (
"AND e.ordering < ? AND e.ordering > ? "
"ORDER BY ordering DESC LIMIT %(limit)d "
) % {"limit": int(limit)}
rows = yield self._execute_and_decode( rows = yield self._execute_and_decode(
sql, sql,
user_id, user_id, Membership.INVITE, from_key, to_key user_id, user_id, Membership.INVITE, from_key, to_key
@ -72,12 +89,12 @@ class StreamStore(SQLBaseStore):
ret = [self._parse_event_from_row(r) for r in rows] ret = [self._parse_event_from_row(r) for r in rows]
if ret: if from_key < to_key:
max_id = max([r["ordering"] for r in rows]) key = max([r["ordering"] for r in rows])
else: else:
max_id = to_key key = min([r["ordering"] for r in rows])
defer.returnValue((ret, max_id)) defer.returnValue((ret, key))
@defer.inlineCallbacks @defer.inlineCallbacks
def get_recent_events_for_room(self, room_id, limit, with_feedback=False): def get_recent_events_for_room(self, room_id, limit, with_feedback=False):