diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 9261984b7..b0b2441b9 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -97,30 +97,30 @@ class MessageHandler(BaseHandler): self.notifier.on_new_room_event(event, store_id) yield self.hs.get_federation().handle_new_event(event) -# -# @defer.inlineCallbacks -# def get_messages(self, user_id=None, room_id=None, pagin_config=None, -# feedback=False): -# """Get messages in a room. -# -# Args: -# user_id (str): The user requesting messages. -# room_id (str): The room they want messages from. -# pagin_config (synapse.api.streams.PaginationConfig): The pagination -# config rules to apply, if any. -# feedback (bool): True to get compressed feedback with the messages -# Returns: -# dict: Pagination API results -# """ -# yield self.auth.check_joined_room(room_id, user_id) -# -# data_source = [MessagesStreamData(self.hs, room_id=room_id, -# feedback=feedback)] -# event_stream = EventStream(user_id, data_source) -# pagin_config = yield event_stream.fix_tokens(pagin_config) -# data_chunk = yield event_stream.get_chunk(config=pagin_config) -# defer.returnValue(data_chunk) -# + + @defer.inlineCallbacks + def get_messages(self, user_id=None, room_id=None, pagin_config=None, + feedback=False): + """Get messages in a room. + + Args: + user_id (str): The user requesting messages. + room_id (str): The room they want messages from. + pagin_config (synapse.api.streams.PaginationConfig): The pagination + config rules to apply, if any. + feedback (bool): True to get compressed feedback with the messages + Returns: + dict: Pagination API results + """ + yield self.auth.check_joined_room(room_id, user_id) + + data_source = [EventsStreamData(self.hs, room_id=room_id, + feedback=feedback)] + event_stream = EventStream(user_id, data_source) + pagin_config = yield event_stream.fix_tokens(pagin_config) + data_chunk = yield event_stream.get_chunk(config=pagin_config) + defer.returnValue(data_chunk) + @defer.inlineCallbacks def store_room_data(self, event=None, stamp_event=True): """ Stores data for a room. diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 1300aee8b..6bfa00d59 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -53,18 +53,35 @@ class StreamStore(SQLBaseStore): else: limit = 1000 + # From and to keys should be integers from ordering. + from_key = int(from_key) + to_key = int(to_key) + + if from_key == to_key: + defer.returnValue(([], to_key)) + return + + sql = ( "SELECT * FROM events as e WHERE " "((room_id IN (%(current)s)) OR " "(event_id IN (%(invites)s))) " - " AND e.ordering > ? AND e.ordering < ? " - "ORDER BY ordering ASC LIMIT %(limit)d" ) % { "current": current_room_membership_sql, "invites": invites_sql, - "limit": limit, } + if from_key < to_key: + sql += ( + "AND e.ordering > ? AND e.ordering < ? " + "ORDER BY ordering ASC LIMIT %(limit)d " + ) % {"limit": limit} + else: + sql += ( + "AND e.ordering < ? AND e.ordering > ? " + "ORDER BY ordering DESC LIMIT %(limit)d " + ) % {"limit": int(limit)} + rows = yield self._execute_and_decode( sql, user_id, user_id, Membership.INVITE, from_key, to_key @@ -72,12 +89,12 @@ class StreamStore(SQLBaseStore): ret = [self._parse_event_from_row(r) for r in rows] - if ret: - max_id = max([r["ordering"] for r in rows]) + if from_key < to_key: + key = max([r["ordering"] for r in rows]) else: - max_id = to_key + key = min([r["ordering"] for r in rows]) - defer.returnValue((ret, max_id)) + defer.returnValue((ret, key)) @defer.inlineCallbacks def get_recent_events_for_room(self, room_id, limit, with_feedback=False):