From 06c0d0ed081b56716135c4881e600861b2e8cad5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 May 2018 15:45:38 +0100 Subject: [PATCH 1/5] Split paginate_room_events storage function --- synapse/storage/stream.py | 100 +++++++++++++++++++++++++++----------- 1 file changed, 72 insertions(+), 28 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index f0784ba13..b57a8a7ef 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -738,17 +738,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): def has_room_changed_since(self, room_id, stream_id): return self._events_stream_cache.has_entity_changed(room_id, stream_id) + def paginate_room_events_txn(self, txn, room_id, from_key, to_key=None, + direction='b', limit=-1, event_filter=None): + """Returns list of events before or after a given token. -class StreamStore(StreamWorkerStore): - def get_room_max_stream_ordering(self): - return self._stream_id_gen.get_current_token() + Args: + txn + room_id (str) + from_key (str): The token used to stream from + to_key (str|None): A token which if given limits the results to + only those before + direction(char): Either 'b' or 'f' to indicate whether we are + paginating forwards or backwards from `from_key`. + limit (int): The maximum number of events to return. Zero or less + means no limit. + event_filter (Filter|None): If provided filters the events to + those that match the filter. - def get_room_min_stream_ordering(self): - return self._backfill_id_gen.get_current_token() - - @defer.inlineCallbacks - def paginate_room_events(self, room_id, from_key, to_key=None, - direction='b', limit=-1, event_filter=None): + Returns: + tuple[list[dict], str]: Returns the results as a list of dicts and + a token that points to the end of the result set. The dicts have + the keys "event_id", "toplogical_ordering" and "stream_orderign". + """ # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence # we have a bit of asymmetry when it comes to equalities. @@ -795,29 +806,54 @@ class StreamStore(StreamWorkerStore): "limit": limit_str } - def f(txn): - txn.execute(sql, args) + txn.execute(sql, args) - rows = self.cursor_to_dict(txn) + rows = self.cursor_to_dict(txn) - if rows: - topo = rows[-1]["topological_ordering"] - toke = rows[-1]["stream_ordering"] - if direction == 'b': - # Tokens are positions between events. - # This token points *after* the last event in the chunk. - # We need it to point to the event before it in the chunk - # when we are going backwards so we subtract one from the - # stream part. - toke -= 1 - next_token = str(RoomStreamToken(topo, toke)) - else: - # TODO (erikj): We should work out what to do here instead. - next_token = to_key if to_key else from_key + if rows: + topo = rows[-1]["topological_ordering"] + toke = rows[-1]["stream_ordering"] + if direction == 'b': + # Tokens are positions between events. + # This token points *after* the last event in the chunk. + # We need it to point to the event before it in the chunk + # when we are going backwards so we subtract one from the + # stream part. + toke -= 1 + next_token = str(RoomStreamToken(topo, toke)) + else: + # TODO (erikj): We should work out what to do here instead. + next_token = to_key if to_key else from_key - return rows, next_token, + return rows, next_token, - rows, token = yield self.runInteraction("paginate_room_events", f) + @defer.inlineCallbacks + def paginate_room_events(self, room_id, from_key, to_key=None, + direction='b', limit=-1, event_filter=None): + """Returns list of events before or after a given token. + + Args: + room_id (str) + from_key (str): The token used to stream from + to_key (str|None): A token which if given limits the results to + only those before + direction(char): Either 'b' or 'f' to indicate whether we are + paginating forwards or backwards from `from_key`. + limit (int): The maximum number of events to return. Zero or less + means no limit. + event_filter (Filter|None): If provided filters the events to + those that match the filter. + + Returns: + tuple[list[dict], str]: Returns the results as a list of dicts and + a token that points to the end of the result set. The dicts have + the keys "event_id", "toplogical_ordering" and "stream_orderign". + """ + + rows, token = yield self.runInteraction( + "paginate_room_events", self.paginate_room_events_txn, + room_id, from_key, to_key, direction, limit, event_filter, + ) events = yield self._get_events( [r["event_id"] for r in rows], @@ -827,3 +863,11 @@ class StreamStore(StreamWorkerStore): self._set_before_and_after(events, rows) defer.returnValue((events, token)) + + +class StreamStore(StreamWorkerStore): + def get_room_max_stream_ordering(self): + return self._stream_id_gen.get_current_token() + + def get_room_min_stream_ordering(self): + return self._backfill_id_gen.get_current_token() From 274b8c6025e15eede0137bcb6a73ac00bf370ee8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 May 2018 16:15:07 +0100 Subject: [PATCH 2/5] Only fetch required fields from database --- synapse/storage/stream.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index b57a8a7ef..d3adb0bf3 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -796,7 +796,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): limit_str = "" sql = ( - "SELECT * FROM events" + "SELECT event_id, topological_ordering, stream_ordering" + " FROM events" " WHERE outlier = ? AND room_id = ? AND %(bounds)s" " ORDER BY topological_ordering %(order)s," " stream_ordering %(order)s %(limit)s" From 3e6d306e94327983b1f6b66cec9faace642d3f16 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 May 2018 16:18:58 +0100 Subject: [PATCH 3/5] Parse tokens before calling DB function --- synapse/storage/stream.py | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index d3adb0bf3..ce9858760 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -738,16 +738,16 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): def has_room_changed_since(self, room_id, stream_id): return self._events_stream_cache.has_entity_changed(room_id, stream_id) - def paginate_room_events_txn(self, txn, room_id, from_key, to_key=None, + def paginate_room_events_txn(self, txn, room_id, from_token, to_token=None, direction='b', limit=-1, event_filter=None): """Returns list of events before or after a given token. Args: txn room_id (str) - from_key (str): The token used to stream from - to_key (str|None): A token which if given limits the results to - only those before + from_token (RoomStreamToken): The token used to stream from + to_token (RoomStreamToken|None): A token which if given limits the + results to only those before direction(char): Either 'b' or 'f' to indicate whether we are paginating forwards or backwards from `from_key`. limit (int): The maximum number of events to return. Zero or less @@ -757,7 +757,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Returns: tuple[list[dict], str]: Returns the results as a list of dicts and - a token that points to the end of the result set. The dicts have + a token that points to the end of the result set. The dicts haveq the keys "event_id", "toplogical_ordering" and "stream_orderign". """ # Tokens really represent positions between elements, but we use @@ -767,20 +767,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if direction == 'b': order = "DESC" bounds = upper_bound( - RoomStreamToken.parse(from_key), self.database_engine + from_token, self.database_engine ) - if to_key: + if to_token: bounds = "%s AND %s" % (bounds, lower_bound( - RoomStreamToken.parse(to_key), self.database_engine + to_token, self.database_engine )) else: order = "ASC" bounds = lower_bound( - RoomStreamToken.parse(from_key), self.database_engine + from_token, self.database_engine ) - if to_key: + if to_token: bounds = "%s AND %s" % (bounds, upper_bound( - RoomStreamToken.parse(to_key), self.database_engine + to_token, self.database_engine )) filter_clause, filter_args = filter_to_clause(event_filter) @@ -821,12 +821,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # when we are going backwards so we subtract one from the # stream part. toke -= 1 - next_token = str(RoomStreamToken(topo, toke)) + next_token = RoomStreamToken(topo, toke) else: # TODO (erikj): We should work out what to do here instead. - next_token = to_key if to_key else from_key + next_token = to_token if to_token else from_token - return rows, next_token, + return rows, str(next_token), @defer.inlineCallbacks def paginate_room_events(self, room_id, from_key, to_key=None, @@ -851,6 +851,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): the keys "event_id", "toplogical_ordering" and "stream_orderign". """ + from_key = RoomStreamToken.parse(from_key) + if to_key: + to_key = RoomStreamToken.parse(to_key) + rows, token = yield self.runInteraction( "paginate_room_events", self.paginate_room_events_txn, room_id, from_key, to_key, direction, limit, event_filter, From 696f5324539b014a5a9613c3a534abb2b3725dac Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 8 May 2018 16:19:33 +0100 Subject: [PATCH 4/5] Reuse existing pagination code for context API --- synapse/storage/stream.py | 90 +++++++-------------------------------- 1 file changed, 15 insertions(+), 75 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index ce9858760..938a8809d 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -42,7 +42,7 @@ from synapse.util.caches.descriptors import cached from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.logcontext import make_deferred_yieldable, run_in_background -from synapse.storage.engines import PostgresEngine, Sqlite3Engine +from synapse.storage.engines import PostgresEngine import abc import logging @@ -595,88 +595,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): retcols=["stream_ordering", "topological_ordering"], ) - token = RoomStreamToken( + # Paginating backwards includes the event at the token, but paginating + # forward doesn't. + before_token = RoomStreamToken( + results["topological_ordering"] - 1, + results["stream_ordering"], + ) + + after_token = RoomStreamToken( results["topological_ordering"], results["stream_ordering"], ) - if isinstance(self.database_engine, Sqlite3Engine): - # SQLite3 doesn't optimise ``(x < a) OR (x = a AND y < b)`` - # So we give pass it to SQLite3 as the UNION ALL of the two queries. - - query_before = ( - "SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND topological_ordering < ?" - " UNION ALL" - " SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?" - " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" - ) - before_args = ( - room_id, token.topological, - room_id, token.topological, token.stream, - before_limit, - ) - - query_after = ( - "SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND topological_ordering > ?" - " UNION ALL" - " SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?" - " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?" - ) - after_args = ( - room_id, token.topological, - room_id, token.topological, token.stream, - after_limit, - ) - else: - query_before = ( - "SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND %s" - " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" - ) % (upper_bound(token, self.database_engine, inclusive=False),) - - before_args = (room_id, before_limit) - - query_after = ( - "SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND %s" - " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?" - ) % (lower_bound(token, self.database_engine, inclusive=False),) - - after_args = (room_id, after_limit) - - txn.execute(query_before, before_args) - - rows = self.cursor_to_dict(txn) + rows, start_token = self.paginate_room_events_txn( + txn, room_id, before_token, direction='b', limit=before_limit, + ) events_before = [r["event_id"] for r in rows] - if rows: - start_token = str(RoomStreamToken( - rows[0]["topological_ordering"], - rows[0]["stream_ordering"] - 1, - )) - else: - start_token = str(RoomStreamToken( - token.topological, - token.stream - 1, - )) - - txn.execute(query_after, after_args) - - rows = self.cursor_to_dict(txn) + rows, end_token = self.paginate_room_events_txn( + txn, room_id, after_token, direction='f', limit=after_limit, + ) events_after = [r["event_id"] for r in rows] - if rows: - end_token = str(RoomStreamToken( - rows[-1]["topological_ordering"], - rows[-1]["stream_ordering"], - )) - else: - end_token = str(token) - return { "before": { "event_ids": events_before, From 23ec51c94ce90fd70931fbe98cadb7dd51a9db4e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 9 May 2018 09:55:19 +0100 Subject: [PATCH 5/5] Fix up comments and make function private --- synapse/storage/stream.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 938a8809d..54be02540 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -607,12 +607,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): results["stream_ordering"], ) - rows, start_token = self.paginate_room_events_txn( + rows, start_token = self._paginate_room_events_txn( txn, room_id, before_token, direction='b', limit=before_limit, ) events_before = [r["event_id"] for r in rows] - rows, end_token = self.paginate_room_events_txn( + rows, end_token = self._paginate_room_events_txn( txn, room_id, after_token, direction='f', limit=after_limit, ) events_after = [r["event_id"] for r in rows] @@ -678,8 +678,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): def has_room_changed_since(self, room_id, stream_id): return self._events_stream_cache.has_entity_changed(room_id, stream_id) - def paginate_room_events_txn(self, txn, room_id, from_token, to_token=None, - direction='b', limit=-1, event_filter=None): + def _paginate_room_events_txn(self, txn, room_id, from_token, to_token=None, + direction='b', limit=-1, event_filter=None): """Returns list of events before or after a given token. Args: @@ -697,8 +697,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Returns: tuple[list[dict], str]: Returns the results as a list of dicts and - a token that points to the end of the result set. The dicts haveq - the keys "event_id", "toplogical_ordering" and "stream_orderign". + a token that points to the end of the result set. The dicts have + the keys "event_id", "toplogical_ordering" and "stream_ordering". """ # Tokens really represent positions between elements, but we use # the convention of pointing to the event before the gap. Hence @@ -796,7 +796,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): to_key = RoomStreamToken.parse(to_key) rows, token = yield self.runInteraction( - "paginate_room_events", self.paginate_room_events_txn, + "paginate_room_events", self._paginate_room_events_txn, room_id, from_key, to_key, direction, limit, event_filter, )