From 901f56fa63c0f8cd4519e2a2f774724af9ea3431 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 Jun 2015 15:29:47 +0100 Subject: [PATCH 01/20] Add tables for receipts --- synapse/storage/__init__.py | 2 +- synapse/storage/schema/delta/21/receipts.sql | 35 ++++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/21/receipts.sql diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index c137f4782..275598add 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -51,7 +51,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 20 +SCHEMA_VERSION = 21 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/21/receipts.sql b/synapse/storage/schema/delta/21/receipts.sql new file mode 100644 index 000000000..da9e18e90 --- /dev/null +++ b/synapse/storage/schema/delta/21/receipts.sql @@ -0,0 +1,35 @@ +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +CREATE TABLE IF NOT EXISTS receipts_graph( + room_id TEXT NOT NULL, + receipt_type TEXT NOT NULL, + user_id TEXT NOT NULL, + event_id TEXT NOT NULL +); + +CREATE INDEX receipts_graph_room_tuple ON receipts_graph( + room_id, receipt_type, user_id +); + +CREATE TABLE IF NOT EXISTS receipts_linearized ( + room_id TEXT NOT NULL, + receipt_type TEXT NOT NULL, + user_id TEXT NOT NULL, + event_id TEXT NOT NULL +); + +CREATE INDEX receipts_graph_room_tuple ON receipts_graph( + room_id, receipt_type, user_id +); From 80a61330ee794147b213b1d54f2292a1c9adc002 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Jul 2015 11:41:55 +0100 Subject: [PATCH 02/20] Add basic storage functions for handling of receipts --- synapse/storage/_base.py | 3 +- synapse/storage/receipts.py | 162 +++++++++++++++++++ synapse/storage/schema/delta/21/receipts.sql | 31 ++-- synapse/storage/util/id_generators.py | 7 +- 4 files changed, 186 insertions(+), 17 deletions(-) create mode 100644 synapse/storage/receipts.py diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 8d33def6c..8f812f0fd 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -329,13 +329,14 @@ class SQLBaseStore(object): self.database_engine = hs.database_engine - self._stream_id_gen = StreamIdGenerator() + self._stream_id_gen = StreamIdGenerator("events", "stream_ordering") self._transaction_id_gen = IdGenerator("sent_transactions", "id", self) self._state_groups_id_gen = IdGenerator("state_groups", "id", self) self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self) self._pushers_id_gen = IdGenerator("pushers", "id", self) self._push_rule_id_gen = IdGenerator("push_rules", "id", self) self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self) + self._receipts_id_gen = StreamIdGenerator("receipts_linearized", "stream_id") def start_profiling(self): self._previous_loop_ts = self._clock.time_msec() diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py new file mode 100644 index 000000000..0168e74a0 --- /dev/null +++ b/synapse/storage/receipts.py @@ -0,0 +1,162 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import SQLBaseStore, cached + +from twisted.internet import defer + + +class ReceiptStore(SQLBaseStore): + + @cached + @defer.inlineCallbacks + def get_linearized_receipts_for_room(self, room_id): + rows = yield self._simple_select_list( + table="receipts_linearized", + keyvalues={"room_id": room_id}, + retcols=["receipt_type", "user_id", "event_id"], + desc="get_linearized_receipts_for_room", + ) + + result = {} + for row in rows: + result.setdefault( + row["event_id"], {} + ).setdefault( + row["receipt_type"], [] + ).append(row["user_id"]) + + defer.returnValue(result) + + @cached + @defer.inlineCallbacks + def get_graph_receipts_for_room(self, room_id): + rows = yield self._simple_select_list( + table="receipts_graph", + keyvalues={"room_id": room_id}, + retcols=["receipt_type", "user_id", "event_id"], + desc="get_linearized_receipts_for_room", + ) + + result = {} + for row in rows: + result.setdefault( + row["user_id"], {} + ).setdefault( + row["receipt_type"], [] + ).append(row["event_id"]) + + defer.returnValue(result) + + def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, + user_id, event_id, stream_id): + self._simple_delete_txn( + txn, + table="receipts_linearized", + keyvalues={ + "stream_id": stream_id, + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + } + ) + + self._simple_insert_txn( + txn, + table="receipts_linearized", + values={ + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + "event_id": event_id, + } + ) + + @defer.inlineCallbacks + def insert_receipt(self, room_id, receipt_type, user_id, event_ids): + if not event_ids: + return + + if len(event_ids) == 1: + linearized_event_id = event_ids[0] + else: + # we need to points in graph -> linearized form. + def graph_to_linear(txn): + query = ( + "SELECT event_id WHERE room_id = ? AND stream_ordering IN (" + " SELECT max(stream_ordering) WHERE event_id IN (%s)" + ")" + ) % (",".join(["?"] * len(event_ids))) + + txn.execute(query, [room_id] + event_ids) + rows = txn.fetchall() + if rows: + return rows[0][0] + else: + # TODO: ARGH?! + return None + + linearized_event_id = yield self.runInteraction( + graph_to_linear, desc="insert_receipt_conv" + ) + + stream_id_manager = yield self._stream_id_gen.get_next(self) + with stream_id_manager() as stream_id: + yield self.runInteraction( + self.insert_linearized_receipt_txn, + room_id, receipt_type, user_id, linearized_event_id, + stream_id=stream_id, + desc="insert_linearized_receipt" + ) + + yield self.insert_graph_receipt( + room_id, receipt_type, user_id, event_ids + ) + + max_persisted_id = yield self._stream_id_gen.get_max_token(self) + defer.returnValue((stream_id, max_persisted_id)) + + def insert_graph_receipt(self, room_id, receipt_type, + user_id, event_ids): + return self.runInteraction( + self.insert_graph_receipt_txn, + room_id, receipt_type, user_id, event_ids, + desc="insert_graph_receipt" + ) + + def insert_graph_receipt_txn(self, txn, room_id, receipt_type, + user_id, event_ids): + self._simple_delete_txn( + txn, + table="receipts_graph", + keyvalues={ + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + } + ) + self._simple_insert_many_txn( + txn, + table="receipts_graph", + values=[ + { + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + "event_id": event_id, + } + for event_id in event_ids + ], + ) diff --git a/synapse/storage/schema/delta/21/receipts.sql b/synapse/storage/schema/delta/21/receipts.sql index da9e18e90..ccd64ec7f 100644 --- a/synapse/storage/schema/delta/21/receipts.sql +++ b/synapse/storage/schema/delta/21/receipts.sql @@ -1,16 +1,18 @@ -# Copyright 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + CREATE TABLE IF NOT EXISTS receipts_graph( room_id TEXT NOT NULL, @@ -24,12 +26,13 @@ CREATE INDEX receipts_graph_room_tuple ON receipts_graph( ); CREATE TABLE IF NOT EXISTS receipts_linearized ( + stream_id BIGINT NOT NULL, room_id TEXT NOT NULL, receipt_type TEXT NOT NULL, user_id TEXT NOT NULL, event_id TEXT NOT NULL ); -CREATE INDEX receipts_graph_room_tuple ON receipts_graph( +CREATE INDEX receipts_linearized_room_tuple ON receipts_graph( room_id, receipt_type, user_id ); diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 89d1643f1..b39006315 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -72,7 +72,10 @@ class StreamIdGenerator(object): with stream_id_gen.get_next_txn(txn) as stream_id: # ... persist event ... """ - def __init__(self): + def __init__(self, table, column): + self.table = table + self.column = column + self._lock = threading.Lock() self._current_max = None @@ -126,7 +129,7 @@ class StreamIdGenerator(object): def _get_or_compute_current_max(self, txn): with self._lock: - txn.execute("SELECT MAX(stream_ordering) FROM events") + txn.execute("SELECT MAX(%s) FROM %s" % (self.column, self.table)) rows = txn.fetchall() val, = rows[0] From 0862fed2a89723e8aa6a4df9f1dbad975a7fbffc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Jul 2015 17:19:31 +0100 Subject: [PATCH 03/20] Add basic ReceiptHandler --- synapse/handlers/receipts.py | 130 +++++++++++++++++++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 synapse/handlers/receipts.py diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py new file mode 100644 index 000000000..f3f705063 --- /dev/null +++ b/synapse/handlers/receipts.py @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Contains handlers for federation events.""" + +from ._base import BaseHandler + +from twisted.internet import defer + +from synapse.util.logcontext import PreserveLoggingContext + +import logging + + +logger = logging.getLogger(__name__) + + +class ReceiptsHandler(BaseHandler): + def __init__(self, hs): + super(ReceiptsHandler, self).__init__(hs) + + self.federation.register_edu_handler( + "m.receipt", self._received_remote_receipt + ) + + self._latest_serial = 0 + + @defer.inlineCallbacks + def received_client_receipt(self, room_id, receipt_type, user_id, + event_id): + # 1. Persist. + # 2. Notify local clients + # 3. Notify remote servers + + receipt = { + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + "event_ids": [event_id], + } + + yield self._handle_new_receipts([receipt]) + self._push_remotes([receipt]) + + @defer.inlineCallbacks + def _received_remote_receipt(self, origin, content): + receipts = [ + { + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + "event_ids": [event_id], + } + for room_id, room_values in content.items() + for event_id, ev_values in room_values.items() + for receipt_type, users in ev_values.items() + for user_id in users + ] + + yield self._handle_new_receipts(receipts) + + @defer.inlineCallbacks + def _handle_new_receipts(self, receipts): + for receipt in receipts: + room_id = receipt["room_id"] + receipt_type = receipt["receipt_type"] + user_id = receipt["user_id"] + event_ids = receipt["event_ids"] + + stream_id, max_persisted_id = yield self.store.insert_receipt( + room_id, receipt_type, user_id, event_ids, + ) + + # TODO: Use max_persisted_id + + self._latest_serial = max(self._latest_serial, stream_id) + + with PreserveLoggingContext(): + self.notifier.on_new_user_event( + "recei[t_key", self._latest_serial, rooms=[room_id] + ) + + localusers = set() + remotedomains = set() + + rm_handler = self.homeserver.get_handlers().room_member_handler + yield rm_handler.fetch_room_distributions_into( + room_id, localusers=localusers, remotedomains=remotedomains + ) + + receipt["remotedomains"] = remotedomains + + self.notifier.on_new_user_event( + "receipt_key", self._latest_room_serial, rooms=[room_id] + ) + + def _push_remotes(self, receipts): + # TODO: Some of this stuff should be coallesced. + for receipt in receipts: + room_id = receipt["room_id"] + receipt_type = receipt["receipt_type"] + user_id = receipt["user_id"] + event_ids = receipt["event_ids"] + remotedomains = receipt["remotedomains"] + + for domain in remotedomains: + self.federation.send_edu( + destination=domain, + edu_type="m.receipt", + content={ + room_id: { + event_id: { + receipt_type: [user_id] + } + for event_id in event_ids + }, + }, + ) From ddf7979531cf514ee54e280714bdf228d88a62d5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 2 Jul 2015 11:40:22 +0100 Subject: [PATCH 04/20] Add receipts_key to StreamToken --- synapse/notifier.py | 2 +- synapse/streams/events.py | 3 ++- synapse/types.py | 6 +++++- tests/rest/client/v1/test_presence.py | 4 ++-- 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index bdd03dcbe..f13164dbd 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -283,7 +283,7 @@ class Notifier(object): @defer.inlineCallbacks def wait_for_events(self, user, rooms, timeout, callback, - from_token=StreamToken("s0", "0", "0")): + from_token=StreamToken("s0", "0", "0", "0")): """Wait until the callback returns a non empty response or the timeout fires. """ diff --git a/synapse/streams/events.py b/synapse/streams/events.py index dff7970be..0a1a3a3d0 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -62,7 +62,8 @@ class EventSources(object): ), typing_key=( yield self.sources["typing"].get_current_key() - ) + ), + receipt_key="0", ) defer.returnValue(token) diff --git a/synapse/types.py b/synapse/types.py index 1b21160c5..dd1b10d64 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -100,7 +100,7 @@ class EventID(DomainSpecificString): class StreamToken( namedtuple( "Token", - ("room_key", "presence_key", "typing_key") + ("room_key", "presence_key", "typing_key", "receipt_key") ) ): _SEPARATOR = "_" @@ -109,6 +109,9 @@ class StreamToken( def from_string(cls, string): try: keys = string.split(cls._SEPARATOR) + if len(keys) == len(cls._fields) - 1: + # i.e. old token from before receipt_key + keys.append("0") return cls(*keys) except: raise SynapseError(400, "Invalid Token") @@ -131,6 +134,7 @@ class StreamToken( (other_token.room_stream_id < self.room_stream_id) or (int(other_token.presence_key) < int(self.presence_key)) or (int(other_token.typing_key) < int(self.typing_key)) + or (int(other_token.receipt_key) < int(self.receipt_key)) ) def copy_and_advance(self, key, new_value): diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 4b32c7a20..089a71568 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -357,7 +357,7 @@ class PresenceEventStreamTestCase(unittest.TestCase): # all be ours # I'll already get my own presence state change - self.assertEquals({"start": "0_1_0", "end": "0_1_0", "chunk": []}, + self.assertEquals({"start": "0_1_0_0", "end": "0_1_0_0", "chunk": []}, response ) @@ -376,7 +376,7 @@ class PresenceEventStreamTestCase(unittest.TestCase): "/events?from=s0_1_0&timeout=0", None) self.assertEquals(200, code) - self.assertEquals({"start": "s0_1_0", "end": "s0_2_0", "chunk": [ + self.assertEquals({"start": "s0_1_0_0", "end": "s0_2_0_0", "chunk": [ {"type": "m.presence", "content": { "user_id": "@banana:test", From bd1236c0ee5b3703de51dc773a02da92e0960d0f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 2 Jul 2015 11:40:56 +0100 Subject: [PATCH 05/20] Consolidate duplicate code in notifier --- synapse/handlers/presence.py | 2 +- synapse/handlers/receipts.py | 4 ++-- synapse/handlers/typing.py | 2 +- synapse/notifier.py | 35 +++++++++++------------------------ tests/handlers/test_typing.py | 20 ++++++++++---------- 5 files changed, 25 insertions(+), 38 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7c0319831..341a516da 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -992,7 +992,7 @@ class PresenceHandler(BaseHandler): room_ids([str]): List of room_ids to notify. """ with PreserveLoggingContext(): - self.notifier.on_new_user_event( + self.notifier.on_new_event( "presence_key", self._user_cachemap_latest_serial, users_to_push, diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index f3f705063..f0d12d35f 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -88,7 +88,7 @@ class ReceiptsHandler(BaseHandler): self._latest_serial = max(self._latest_serial, stream_id) with PreserveLoggingContext(): - self.notifier.on_new_user_event( + self.notifier.on_new_event( "recei[t_key", self._latest_serial, rooms=[room_id] ) @@ -102,7 +102,7 @@ class ReceiptsHandler(BaseHandler): receipt["remotedomains"] = remotedomains - self.notifier.on_new_user_event( + self.notifier.on_new_event( "receipt_key", self._latest_room_serial, rooms=[room_id] ) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index a9895292c..026bd2b9d 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -218,7 +218,7 @@ class TypingNotificationHandler(BaseHandler): self._room_serials[room_id] = self._latest_room_serial with PreserveLoggingContext(): - self.notifier.on_new_user_event( + self.notifier.on_new_event( "typing_key", self._latest_room_serial, rooms=[room_id] ) diff --git a/synapse/notifier.py b/synapse/notifier.py index f13164dbd..85ae34313 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -221,16 +221,7 @@ class Notifier(object): event ) - room_id = event.room_id - - room_user_streams = self.room_to_user_streams.get(room_id, set()) - - user_streams = room_user_streams.copy() - - for user in extra_users: - user_stream = self.user_to_user_stream.get(str(user)) - if user_stream is not None: - user_streams.add(user_stream) + app_streams = set() for appservice in self.appservice_to_user_streams: # TODO (kegan): Redundant appservice listener checks? @@ -242,24 +233,20 @@ class Notifier(object): app_user_streams = self.appservice_to_user_streams.get( appservice, set() ) - user_streams |= app_user_streams + app_streams |= app_user_streams - logger.debug("on_new_room_event listeners %s", user_streams) - - time_now_ms = self.clock.time_msec() - for user_stream in user_streams: - try: - user_stream.notify( - "room_key", "s%d" % (room_stream_id,), time_now_ms - ) - except: - logger.exception("Failed to notify listener") + self.on_new_event( + "room_key", room_stream_id, + users=extra_users, + rooms=[event.room_id], + extra_streams=app_streams, + ) @defer.inlineCallbacks @log_function - def on_new_user_event(self, stream_key, new_token, users=[], rooms=[]): - """ Used to inform listeners that something has happend - presence/user event wise. + def on_new_event(self, stream_key, new_token, users=[], rooms=[], + extra_streams=set()): + """ Used to inform listeners that something has happend event wise. Will wake up all listeners for the given users and rooms. """ diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 7ccbe2ea9..41bb08b7c 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -66,8 +66,8 @@ class TypingNotificationsTestCase(unittest.TestCase): self.mock_federation_resource = MockHttpResource() - mock_notifier = Mock(spec=["on_new_user_event"]) - self.on_new_user_event = mock_notifier.on_new_user_event + mock_notifier = Mock(spec=["on_new_event"]) + self.on_new_event = mock_notifier.on_new_event self.auth = Mock(spec=[]) @@ -182,7 +182,7 @@ class TypingNotificationsTestCase(unittest.TestCase): timeout=20000, ) - self.on_new_user_event.assert_has_calls([ + self.on_new_event.assert_has_calls([ call('typing_key', 1, rooms=[self.room_id]), ]) @@ -245,7 +245,7 @@ class TypingNotificationsTestCase(unittest.TestCase): ) ) - self.on_new_user_event.assert_has_calls([ + self.on_new_event.assert_has_calls([ call('typing_key', 1, rooms=[self.room_id]), ]) @@ -299,7 +299,7 @@ class TypingNotificationsTestCase(unittest.TestCase): room_id=self.room_id, ) - self.on_new_user_event.assert_has_calls([ + self.on_new_event.assert_has_calls([ call('typing_key', 1, rooms=[self.room_id]), ]) @@ -331,10 +331,10 @@ class TypingNotificationsTestCase(unittest.TestCase): timeout=10000, ) - self.on_new_user_event.assert_has_calls([ + self.on_new_event.assert_has_calls([ call('typing_key', 1, rooms=[self.room_id]), ]) - self.on_new_user_event.reset_mock() + self.on_new_event.reset_mock() self.assertEquals(self.event_source.get_current_key(), 1) events = yield self.event_source.get_new_events_for_user(self.u_apple, 0, None) @@ -351,7 +351,7 @@ class TypingNotificationsTestCase(unittest.TestCase): self.clock.advance_time(11) - self.on_new_user_event.assert_has_calls([ + self.on_new_event.assert_has_calls([ call('typing_key', 2, rooms=[self.room_id]), ]) @@ -377,10 +377,10 @@ class TypingNotificationsTestCase(unittest.TestCase): timeout=10000, ) - self.on_new_user_event.assert_has_calls([ + self.on_new_event.assert_has_calls([ call('typing_key', 3, rooms=[self.room_id]), ]) - self.on_new_user_event.reset_mock() + self.on_new_event.reset_mock() self.assertEquals(self.event_source.get_current_key(), 3) events = yield self.event_source.get_new_events_for_user(self.u_apple, 0, None) From ac78e60de6fd2e409d6abd806e8850d539bbe644 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 2 Jul 2015 13:18:41 +0100 Subject: [PATCH 06/20] Add stream_id index --- synapse/storage/schema/delta/21/receipts.sql | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/storage/schema/delta/21/receipts.sql b/synapse/storage/schema/delta/21/receipts.sql index ccd64ec7f..ac7738e37 100644 --- a/synapse/storage/schema/delta/21/receipts.sql +++ b/synapse/storage/schema/delta/21/receipts.sql @@ -33,6 +33,10 @@ CREATE TABLE IF NOT EXISTS receipts_linearized ( event_id TEXT NOT NULL ); -CREATE INDEX receipts_linearized_room_tuple ON receipts_graph( +CREATE INDEX receipts_linearized_room_tuple ON receipts_linearized( room_id, receipt_type, user_id ); + +CREATE INDEX receipts_linearized_id ON receipts_linearized( + stream_id +); From e8b2f6f8a13c1b419911a54f349d05fe97b5ace0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jul 2015 10:55:12 +0100 Subject: [PATCH 07/20] Add a ReceiptServlet --- synapse/rest/client/v2_alpha/__init__.py | 4 +- synapse/rest/client/v2_alpha/receipts.py | 56 ++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) create mode 100644 synapse/rest/client/v2_alpha/receipts.py diff --git a/synapse/rest/client/v2_alpha/__init__.py b/synapse/rest/client/v2_alpha/__init__.py index 7d1aff430..231e1dd97 100644 --- a/synapse/rest/client/v2_alpha/__init__.py +++ b/synapse/rest/client/v2_alpha/__init__.py @@ -18,7 +18,8 @@ from . import ( filter, account, register, - auth + auth, + receipts, ) from synapse.http.server import JsonResource @@ -38,3 +39,4 @@ class ClientV2AlphaRestResource(JsonResource): account.register_servlets(hs, client_resource) register.register_servlets(hs, client_resource) auth.register_servlets(hs, client_resource) + receipts.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/receipts.py b/synapse/rest/client/v2_alpha/receipts.py new file mode 100644 index 000000000..829427b7b --- /dev/null +++ b/synapse/rest/client/v2_alpha/receipts.py @@ -0,0 +1,56 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.http.servlet import RestServlet +from ._base import client_v2_pattern + +import logging + + +logger = logging.getLogger(__name__) + + +class ReceiptRestServlet(RestServlet): + PATTERN = client_v2_pattern( + "/rooms/(?P[^/]*)" + "/receipt/(?P[^/]*)" + "/(?P[^/])*" + ) + + def __init__(self, hs): + super(ReceiptRestServlet, self).__init__() + self.hs = hs + self.auth = hs.get_auth() + self.receipts_handler = hs.get_handlers().receipts_handler + + @defer.inlineCallbacks + def on_POST(self, request, room_id, receipt_type, event_id): + user, client = yield self.auth.get_user_by_req(request) + + # TODO: STUFF + yield self.receipts_handler.received_client_receipt( + room_id, + receipt_type, + user_id=user.to_string(), + event_id=event_id + ) + + defer.returnValue((200, {})) + + +def register_servlets(hs, http_server): + ReceiptRestServlet(hs).register(http_server) From 716e42693354553ee2878e3a8df6811226e91130 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jul 2015 10:55:31 +0100 Subject: [PATCH 08/20] Fix various typos --- synapse/handlers/__init__.py | 2 ++ synapse/handlers/receipts.py | 6 ++++-- synapse/storage/__init__.py | 3 +++ synapse/storage/receipts.py | 13 +++++++------ 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 685792dbd..dc5b6ef79 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -32,6 +32,7 @@ from .appservice import ApplicationServicesHandler from .sync import SyncHandler from .auth import AuthHandler from .identity import IdentityHandler +from .receipts import ReceiptsHandler class Handlers(object): @@ -57,6 +58,7 @@ class Handlers(object): self.directory_handler = DirectoryHandler(hs) self.typing_notification_handler = TypingNotificationHandler(hs) self.admin_handler = AdminHandler(hs) + self.receipts_handler = ReceiptsHandler(hs) asapi = ApplicationServiceApi(hs) self.appservice_handler = ApplicationServicesHandler( hs, asapi, AppServiceScheduler( diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index f0d12d35f..fc2f38c1c 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -31,6 +31,8 @@ class ReceiptsHandler(BaseHandler): def __init__(self, hs): super(ReceiptsHandler, self).__init__(hs) + self.hs = hs + self.federation = hs.get_replication_layer() self.federation.register_edu_handler( "m.receipt", self._received_remote_receipt ) @@ -89,13 +91,13 @@ class ReceiptsHandler(BaseHandler): with PreserveLoggingContext(): self.notifier.on_new_event( - "recei[t_key", self._latest_serial, rooms=[room_id] + "receipt_key", self._latest_serial, rooms=[room_id] ) localusers = set() remotedomains = set() - rm_handler = self.homeserver.get_handlers().room_member_handler + rm_handler = self.hs.get_handlers().room_member_handler yield rm_handler.fetch_room_distributions_into( room_id, localusers=localusers, remotedomains=remotedomains ) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 275598add..2bc88a795 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -38,6 +38,8 @@ from .state import StateStore from .signatures import SignatureStore from .filtering import FilteringStore +from .receipts import ReceiptsStore + import fnmatch import imp @@ -74,6 +76,7 @@ class DataStore(RoomMemberStore, RoomStore, PushRuleStore, ApplicationServiceTransactionStore, EventsStore, + ReceiptsStore, ): def __init__(self, hs): diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 0168e74a0..15c11fd41 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -18,7 +18,7 @@ from ._base import SQLBaseStore, cached from twisted.internet import defer -class ReceiptStore(SQLBaseStore): +class ReceiptsStore(SQLBaseStore): @cached @defer.inlineCallbacks @@ -77,6 +77,7 @@ class ReceiptStore(SQLBaseStore): txn, table="receipts_linearized", values={ + "stream_id": stream_id, "room_id": room_id, "receipt_type": receipt_type, "user_id": user_id, @@ -109,16 +110,16 @@ class ReceiptStore(SQLBaseStore): return None linearized_event_id = yield self.runInteraction( - graph_to_linear, desc="insert_receipt_conv" + "insert_receipt_conv", graph_to_linear ) - stream_id_manager = yield self._stream_id_gen.get_next(self) - with stream_id_manager() as stream_id: + stream_id_manager = yield self._receipts_id_gen.get_next(self) + with stream_id_manager as stream_id: yield self.runInteraction( + "insert_linearized_receipt", self.insert_linearized_receipt_txn, room_id, receipt_type, user_id, linearized_event_id, stream_id=stream_id, - desc="insert_linearized_receipt" ) yield self.insert_graph_receipt( @@ -131,9 +132,9 @@ class ReceiptStore(SQLBaseStore): def insert_graph_receipt(self, room_id, receipt_type, user_id, event_ids): return self.runInteraction( + "insert_graph_receipt", self.insert_graph_receipt_txn, room_id, receipt_type, user_id, event_ids, - desc="insert_graph_receipt" ) def insert_graph_receipt_txn(self, txn, room_id, receipt_type, From ca041d55267740214a2cfab95c44ee6f70cc6d0d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jul 2015 15:25:30 +0100 Subject: [PATCH 09/20] Wire together receipts and the notifer/federation --- synapse/handlers/receipts.py | 81 ++++++++++++++++++------ synapse/rest/client/v2_alpha/receipts.py | 3 +- synapse/storage/receipts.py | 69 +++++++++++++++++--- synapse/streams/events.py | 6 +- 4 files changed, 126 insertions(+), 33 deletions(-) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index fc2f38c1c..94f081005 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -37,7 +37,8 @@ class ReceiptsHandler(BaseHandler): "m.receipt", self._received_remote_receipt ) - self._latest_serial = 0 + # self._earliest_cached_serial = 0 + # self._rooms_to_latest_serial = {} @defer.inlineCallbacks def received_client_receipt(self, room_id, receipt_type, user_id, @@ -53,8 +54,10 @@ class ReceiptsHandler(BaseHandler): "event_ids": [event_id], } - yield self._handle_new_receipts([receipt]) - self._push_remotes([receipt]) + is_new = yield self._handle_new_receipts([receipt]) + + if is_new: + self._push_remotes([receipt]) @defer.inlineCallbacks def _received_remote_receipt(self, origin, content): @@ -81,33 +84,24 @@ class ReceiptsHandler(BaseHandler): user_id = receipt["user_id"] event_ids = receipt["event_ids"] - stream_id, max_persisted_id = yield self.store.insert_receipt( + res = yield self.store.insert_receipt( room_id, receipt_type, user_id, event_ids, ) - # TODO: Use max_persisted_id + if not res: + # res will be None if this read receipt is 'old' + defer.returnValue(False) - self._latest_serial = max(self._latest_serial, stream_id) + stream_id, max_persisted_id = res with PreserveLoggingContext(): self.notifier.on_new_event( - "receipt_key", self._latest_serial, rooms=[room_id] + "receipt_key", max_persisted_id, rooms=[room_id] ) - localusers = set() - remotedomains = set() - - rm_handler = self.hs.get_handlers().room_member_handler - yield rm_handler.fetch_room_distributions_into( - room_id, localusers=localusers, remotedomains=remotedomains - ) - - receipt["remotedomains"] = remotedomains - - self.notifier.on_new_event( - "receipt_key", self._latest_room_serial, rooms=[room_id] - ) + defer.returnValue(True) + @defer.inlineCallbacks def _push_remotes(self, receipts): # TODO: Some of this stuff should be coallesced. for receipt in receipts: @@ -115,7 +109,15 @@ class ReceiptsHandler(BaseHandler): receipt_type = receipt["receipt_type"] user_id = receipt["user_id"] event_ids = receipt["event_ids"] - remotedomains = receipt["remotedomains"] + + remotedomains = set() + + rm_handler = self.hs.get_handlers().room_member_handler + yield rm_handler.fetch_room_distributions_into( + room_id, localusers=None, remotedomains=remotedomains + ) + + logger.debug("Sending receipt to: %r", remotedomains) for domain in remotedomains: self.federation.send_edu( @@ -130,3 +132,40 @@ class ReceiptsHandler(BaseHandler): }, }, ) + + +class ReceiptEventSource(object): + def __init__(self, hs): + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def get_new_events_for_user(self, user, from_key, limit): + from_key = int(from_key) + to_key = yield self.get_current_key() + + rooms = yield self.store.get_rooms_for_user(user.to_string()) + rooms = [room.room_id for room in rooms] + content = {} + for room_id in rooms: + result = yield self.store.get_linearized_receipts_for_room( + room_id, from_key, to_key + ) + if result: + content[room_id] = result + + if not content: + defer.returnValue(([], to_key)) + + event = { + "type": "m.receipt", + "content": content, + } + + defer.returnValue(([event], to_key)) + + def get_current_key(self, direction='f'): + return self.store.get_max_receipt_stream_id() + + @defer.inlineCallbacks + def get_pagination_rows(self, user, config, key): + defer.returnValue(([{}], 0)) diff --git a/synapse/rest/client/v2_alpha/receipts.py b/synapse/rest/client/v2_alpha/receipts.py index 829427b7b..40406e2ed 100644 --- a/synapse/rest/client/v2_alpha/receipts.py +++ b/synapse/rest/client/v2_alpha/receipts.py @@ -28,7 +28,7 @@ class ReceiptRestServlet(RestServlet): PATTERN = client_v2_pattern( "/rooms/(?P[^/]*)" "/receipt/(?P[^/]*)" - "/(?P[^/])*" + "/(?P[^/]*)$" ) def __init__(self, hs): @@ -41,7 +41,6 @@ class ReceiptRestServlet(RestServlet): def on_POST(self, request, room_id, receipt_type, event_id): user, client = yield self.auth.get_user_by_req(request) - # TODO: STUFF yield self.receipts_handler.received_client_receipt( room_id, receipt_type, diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 15c11fd41..5a02c8025 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -17,17 +17,33 @@ from ._base import SQLBaseStore, cached from twisted.internet import defer +import logging + + +logger = logging.getLogger(__name__) + class ReceiptsStore(SQLBaseStore): - @cached @defer.inlineCallbacks - def get_linearized_receipts_for_room(self, room_id): - rows = yield self._simple_select_list( - table="receipts_linearized", - keyvalues={"room_id": room_id}, - retcols=["receipt_type", "user_id", "event_id"], - desc="get_linearized_receipts_for_room", + def get_linearized_receipts_for_room(self, room_id, from_key, to_key): + def f(txn): + sql = ( + "SELECT * FROM receipts_linearized WHERE" + " room_id = ? AND stream_id > ? AND stream_id <= ?" + ) + + txn.execute( + sql, + (room_id, from_key, to_key) + ) + + rows = self.cursor_to_dict(txn) + + return rows + + rows = yield self.runInteraction( + "get_linearized_receipts_for_room", f ) result = {} @@ -40,6 +56,9 @@ class ReceiptsStore(SQLBaseStore): defer.returnValue(result) + def get_max_receipt_stream_id(self): + return self._receipts_id_gen.get_max_token(self) + @cached @defer.inlineCallbacks def get_graph_receipts_for_room(self, room_id): @@ -62,11 +81,38 @@ class ReceiptsStore(SQLBaseStore): def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, user_id, event_id, stream_id): + + # We don't want to clobber receipts for more recent events, so we + # have to compare orderings of existing receipts + sql = ( + "SELECT topological_ordering, stream_ordering, event_id FROM events" + " INNER JOIN receipts_linearized as r USING (event_id, room_id)" + " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?" + ) + + txn.execute(sql, (room_id, receipt_type, user_id)) + results = txn.fetchall() + + if results: + res = self._simple_select_one_txn( + txn, + table="events", + retcols=["topological_ordering", "stream_ordering"], + keyvalues={"event_id": event_id}, + ) + topological_ordering = int(res["topological_ordering"]) + stream_ordering = int(res["stream_ordering"]) + + for to, so, _ in results: + if int(to) > topological_ordering: + return False + elif int(to) == topological_ordering and int(so) >= stream_ordering: + return False + self._simple_delete_txn( txn, table="receipts_linearized", keyvalues={ - "stream_id": stream_id, "room_id": room_id, "receipt_type": receipt_type, "user_id": user_id, @@ -85,6 +131,8 @@ class ReceiptsStore(SQLBaseStore): } ) + return True + @defer.inlineCallbacks def insert_receipt(self, room_id, receipt_type, user_id, event_ids): if not event_ids: @@ -115,13 +163,16 @@ class ReceiptsStore(SQLBaseStore): stream_id_manager = yield self._receipts_id_gen.get_next(self) with stream_id_manager as stream_id: - yield self.runInteraction( + have_persisted = yield self.runInteraction( "insert_linearized_receipt", self.insert_linearized_receipt_txn, room_id, receipt_type, user_id, linearized_event_id, stream_id=stream_id, ) + if not have_persisted: + defer.returnValue(None) + yield self.insert_graph_receipt( room_id, receipt_type, user_id, event_ids ) diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 0a1a3a3d0..aaa3609aa 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -20,6 +20,7 @@ from synapse.types import StreamToken from synapse.handlers.presence import PresenceEventSource from synapse.handlers.room import RoomEventSource from synapse.handlers.typing import TypingNotificationEventSource +from synapse.handlers.receipts import ReceiptEventSource class NullSource(object): @@ -43,6 +44,7 @@ class EventSources(object): "room": RoomEventSource, "presence": PresenceEventSource, "typing": TypingNotificationEventSource, + "receipt": ReceiptEventSource, } def __init__(self, hs): @@ -63,7 +65,9 @@ class EventSources(object): typing_key=( yield self.sources["typing"].get_current_key() ), - receipt_key="0", + receipt_key=( + yield self.sources["receipt"].get_current_key() + ), ) defer.returnValue(token) From f0dd6d4cbddd71f19b4bb20754e302eb48dd21b3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 7 Jul 2015 16:18:36 +0100 Subject: [PATCH 10/20] Fix test. --- tests/rest/client/v1/test_events.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tests/rest/client/v1/test_events.py b/tests/rest/client/v1/test_events.py index 445272e32..ac3b0b58a 100644 --- a/tests/rest/client/v1/test_events.py +++ b/tests/rest/client/v1/test_events.py @@ -183,7 +183,17 @@ class EventStreamPermissionsTestCase(RestTestCase): ) self.assertEquals(200, code, msg=str(response)) - self.assertEquals(0, len(response["chunk"])) + # We may get a presence event for ourselves down + self.assertEquals( + 0, + len([ + c for c in response["chunk"] + if not ( + c.get("type") == "m.presence" + and c["content"].get("user_id") == self.user_id + ) + ]) + ) # joined room (expect all content for room) yield self.join(room=room_id, user=self.user_id, tok=self.token) From 87311d1b8cc648400dfce5db8a7fed46abbeb963 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Jul 2015 10:54:01 +0100 Subject: [PATCH 11/20] Hook up receipts to v1 initialSync --- synapse/handlers/message.py | 16 ++++++++++--- synapse/handlers/receipts.py | 45 +++++++++++++++++++++++++++++++++++- synapse/storage/receipts.py | 27 +++++++++++++++------- 3 files changed, 76 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index e324662f1..7c1d6b548 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -278,6 +278,11 @@ class MessageHandler(BaseHandler): user, pagination_config.get_source_config("presence"), None ) + receipt_stream = self.hs.get_event_sources().sources["receipt"] + receipt, _ = yield receipt_stream.get_pagination_rows( + user, pagination_config.get_source_config("receipt"), None + ) + public_room_ids = yield self.store.get_public_room_ids() limit = pagin_config.limit @@ -344,7 +349,8 @@ class MessageHandler(BaseHandler): ret = { "rooms": rooms_ret, "presence": presence, - "end": now_token.to_string() + "receipts": receipt, + "end": now_token.to_string(), } defer.returnValue(ret) @@ -405,9 +411,12 @@ class MessageHandler(BaseHandler): defer.returnValue([p for success, p in presence_defs if success]) - presence, (messages, token) = yield defer.gatherResults( + receipts_handler = self.hs.get_handlers().receipts_handler + + presence, receipts, (messages, token) = yield defer.gatherResults( [ get_presence(), + receipts_handler.get_receipts_for_room(room_id, now_token.receipt_key), self.store.get_recent_events_for_room( room_id, limit=limit, @@ -431,5 +440,6 @@ class MessageHandler(BaseHandler): "end": end_token.to_string(), }, "state": state, - "presence": presence + "presence": presence, + "receipts": receipts, }) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 94f081005..f6cde30e6 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -133,6 +133,24 @@ class ReceiptsHandler(BaseHandler): }, ) + @defer.inlineCallbacks + def get_receipts_for_room(self, room_id, to_key): + result = yield self.store.get_linearized_receipts_for_room( + room_id, None, to_key + ) + + if not result: + defer.returnValue([]) + + event = { + "type": "m.receipt", + "content": { + room_id: result, + }, + } + + defer.returnValue([event]) + class ReceiptEventSource(object): def __init__(self, hs): @@ -168,4 +186,29 @@ class ReceiptEventSource(object): @defer.inlineCallbacks def get_pagination_rows(self, user, config, key): - defer.returnValue(([{}], 0)) + to_key = int(config.from_key) + + if config.to_key: + from_key = int(config.to_key) + else: + from_key = None + + rooms = yield self.store.get_rooms_for_user(user.to_string()) + rooms = [room.room_id for room in rooms] + content = {} + for room_id in rooms: + result = yield self.store.get_linearized_receipts_for_room( + room_id, from_key, to_key + ) + if result: + content[room_id] = result + + if not content: + defer.returnValue(([], to_key)) + + event = { + "type": "m.receipt", + "content": content, + } + + defer.returnValue(([event], to_key)) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 5a02c8025..07f8edaac 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -28,15 +28,26 @@ class ReceiptsStore(SQLBaseStore): @defer.inlineCallbacks def get_linearized_receipts_for_room(self, room_id, from_key, to_key): def f(txn): - sql = ( - "SELECT * FROM receipts_linearized WHERE" - " room_id = ? AND stream_id > ? AND stream_id <= ?" - ) + if from_key: + sql = ( + "SELECT * FROM receipts_linearized WHERE" + " room_id = ? AND stream_id > ? AND stream_id <= ?" + ) - txn.execute( - sql, - (room_id, from_key, to_key) - ) + txn.execute( + sql, + (room_id, from_key, to_key) + ) + else: + sql = ( + "SELECT * FROM receipts_linearized WHERE" + " room_id = ? AND stream_id <= ?" + ) + + txn.execute( + sql, + (room_id, to_key) + ) rows = self.cursor_to_dict(txn) From d85ce8d89bc1fb6ff6f277fb424dddca7ab2f47e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Jul 2015 11:36:05 +0100 Subject: [PATCH 12/20] Split receipt events up into one per room --- synapse/handlers/receipts.py | 49 +++++++++++++++--------------------- 1 file changed, 20 insertions(+), 29 deletions(-) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index f6cde30e6..b7567b9ea 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -144,9 +144,8 @@ class ReceiptsHandler(BaseHandler): event = { "type": "m.receipt", - "content": { - room_id: result, - }, + "room_id": room_id, + "content": result, } defer.returnValue([event]) @@ -163,23 +162,19 @@ class ReceiptEventSource(object): rooms = yield self.store.get_rooms_for_user(user.to_string()) rooms = [room.room_id for room in rooms] - content = {} + events = [] for room_id in rooms: - result = yield self.store.get_linearized_receipts_for_room( + content = yield self.store.get_linearized_receipts_for_room( room_id, from_key, to_key ) - if result: - content[room_id] = result + if content: + events.append({ + "type": "m.receipt", + "room_id": room_id, + "content": content, + }) - if not content: - defer.returnValue(([], to_key)) - - event = { - "type": "m.receipt", - "content": content, - } - - defer.returnValue(([event], to_key)) + defer.returnValue((events, to_key)) def get_current_key(self, direction='f'): return self.store.get_max_receipt_stream_id() @@ -195,20 +190,16 @@ class ReceiptEventSource(object): rooms = yield self.store.get_rooms_for_user(user.to_string()) rooms = [room.room_id for room in rooms] - content = {} + events = [] for room_id in rooms: - result = yield self.store.get_linearized_receipts_for_room( + content = yield self.store.get_linearized_receipts_for_room( room_id, from_key, to_key ) - if result: - content[room_id] = result + if content: + events.append({ + "type": "m.receipt", + "room_id": room_id, + "content": content, + }) - if not content: - defer.returnValue(([], to_key)) - - event = { - "type": "m.receipt", - "content": content, - } - - defer.returnValue(([event], to_key)) + defer.returnValue((events, to_key)) From af812b68ddbd1a69a8c98c463248d000633b075f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Jul 2015 15:35:00 +0100 Subject: [PATCH 13/20] Add a cache to fetching of receipt streams --- synapse/handlers/receipts.py | 31 +++--------- synapse/storage/receipts.py | 92 ++++++++++++++++++++++++++++++++++-- 2 files changed, 96 insertions(+), 27 deletions(-) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index b7567b9ea..053ed8480 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -37,8 +37,7 @@ class ReceiptsHandler(BaseHandler): "m.receipt", self._received_remote_receipt ) - # self._earliest_cached_serial = 0 - # self._rooms_to_latest_serial = {} + self._receipt_cache = None @defer.inlineCallbacks def received_client_receipt(self, room_id, receipt_type, user_id, @@ -162,17 +161,9 @@ class ReceiptEventSource(object): rooms = yield self.store.get_rooms_for_user(user.to_string()) rooms = [room.room_id for room in rooms] - events = [] - for room_id in rooms: - content = yield self.store.get_linearized_receipts_for_room( - room_id, from_key, to_key - ) - if content: - events.append({ - "type": "m.receipt", - "room_id": room_id, - "content": content, - }) + events = yield self.store.get_linearized_receipts_for_rooms( + rooms, from_key, to_key + ) defer.returnValue((events, to_key)) @@ -190,16 +181,8 @@ class ReceiptEventSource(object): rooms = yield self.store.get_rooms_for_user(user.to_string()) rooms = [room.room_id for room in rooms] - events = [] - for room_id in rooms: - content = yield self.store.get_linearized_receipts_for_room( - room_id, from_key, to_key - ) - if content: - events.append({ - "type": "m.receipt", - "room_id": room_id, - "content": content, - }) + events = yield self.store.get_linearized_receipts_for_rooms( + rooms, from_key, to_key + ) defer.returnValue((events, to_key)) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 07f8edaac..503f68f85 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -17,6 +17,9 @@ from ._base import SQLBaseStore, cached from twisted.internet import defer +from synapse.util import unwrapFirstError + +from blist import sorteddict import logging @@ -24,6 +27,29 @@ logger = logging.getLogger(__name__) class ReceiptsStore(SQLBaseStore): + def __init__(self, hs): + super(ReceiptsStore, self).__init__(hs) + + self._receipts_stream_cache = _RoomStreamChangeCache() + + @defer.inlineCallbacks + def get_linearized_receipts_for_rooms(self, room_ids, from_key, to_key): + room_ids = set(room_ids) + + if from_key: + room_ids = yield self._receipts_stream_cache.get_rooms_changed( + self, room_ids, from_key + ) + + results = yield defer.gatherResults( + [ + self.get_linearized_receipts_for_room(room_id, from_key, to_key) + for room_id in room_ids + ], + consumeErrors=True, + ).addErrback(unwrapFirstError) + + defer.returnValue([ev for res in results for ev in res]) @defer.inlineCallbacks def get_linearized_receipts_for_room(self, room_id, from_key, to_key): @@ -57,15 +83,22 @@ class ReceiptsStore(SQLBaseStore): "get_linearized_receipts_for_room", f ) - result = {} + if not rows: + defer.returnValue([]) + + content = {} for row in rows: - result.setdefault( + content.setdefault( row["event_id"], {} ).setdefault( row["receipt_type"], [] ).append(row["user_id"]) - defer.returnValue(result) + defer.returnValue([{ + "type": "m.receipt", + "room_id": room_id, + "content": content, + }]) def get_max_receipt_stream_id(self): return self._receipts_id_gen.get_max_token(self) @@ -174,6 +207,9 @@ class ReceiptsStore(SQLBaseStore): stream_id_manager = yield self._receipts_id_gen.get_next(self) with stream_id_manager as stream_id: + yield self._receipts_stream_cache.room_has_changed( + self, room_id, stream_id + ) have_persisted = yield self.runInteraction( "insert_linearized_receipt", self.insert_linearized_receipt_txn, @@ -223,3 +259,53 @@ class ReceiptsStore(SQLBaseStore): for event_id in event_ids ], ) + + +class _RoomStreamChangeCache(object): + """Keeps track of the stream_id of the latest change in rooms. + + Given a list of rooms and stream key, it will give a subset of rooms that + may have changed since that key. If the key is too old then the cache + will simply return all rooms. + """ + def __init__(self, size_of_cache=1000): + self._size_of_cache = size_of_cache + self._room_to_key = {} + self._cache = sorteddict() + self._earliest_key = None + + @defer.inlineCallbacks + def get_rooms_changed(self, store, room_ids, key): + if key > (yield self._get_earliest_key(store)): + keys = self._cache.keys() + i = keys.bisect_right(key) + + result = set( + self._cache[k] for k in keys[i:] + ).intersection(room_ids) + else: + result = room_ids + + defer.returnValue(result) + + @defer.inlineCallbacks + def room_has_changed(self, store, room_id, key): + if key > (yield self._get_earliest_key(store)): + old_key = self._room_to_key.get(room_id, None) + if old_key: + key = max(key, old_key) + self._cache.pop(old_key, None) + self._cache[key] = room_id + + while len(self._cache) > self._size_of_cache: + k, r = self._cache.popitem() + self._earliest_key = max(k, self._earliest_key) + self._room_to_key.pop(r, None) + + @defer.inlineCallbacks + def _get_earliest_key(self, store): + if self._earliest_key is None: + self._earliest_key = yield store.get_max_receipt_stream_id() + self._earliest_key = int(self._earliest_key) + + defer.returnValue(self._earliest_key) From ce9e2f84ad6c62e64884fef1965b786eea3c365c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Jul 2015 15:41:59 +0100 Subject: [PATCH 14/20] Add blist to dependencies --- synapse/python_dependencies.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index f9e59dd91..daaa61b5f 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -31,6 +31,7 @@ REQUIREMENTS = { "pillow": ["PIL"], "pydenticon": ["pydenticon"], "ujson": ["ujson"], + "blist": ["blist"], } CONDITIONAL_REQUIREMENTS = { "web_client": { From 1af188209a03567dc4b5300b9a0fc8613ad176df Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 9 Jul 2015 11:39:30 +0100 Subject: [PATCH 15/20] Change format of receipts to allow inclusion of data --- synapse/handlers/receipts.py | 24 ++++++++---- synapse/storage/receipts.py | 39 ++++++++++---------- synapse/storage/schema/delta/21/receipts.sql | 16 +++----- 3 files changed, 42 insertions(+), 37 deletions(-) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 053ed8480..8a052f071 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -36,6 +36,7 @@ class ReceiptsHandler(BaseHandler): self.federation.register_edu_handler( "m.receipt", self._received_remote_receipt ) + self.clock = self.hs.get_clock() self._receipt_cache = None @@ -51,6 +52,9 @@ class ReceiptsHandler(BaseHandler): "receipt_type": receipt_type, "user_id": user_id, "event_ids": [event_id], + "data": { + "ts": self.clock.time_msec() + } } is_new = yield self._handle_new_receipts([receipt]) @@ -65,12 +69,12 @@ class ReceiptsHandler(BaseHandler): "room_id": room_id, "receipt_type": receipt_type, "user_id": user_id, - "event_ids": [event_id], + "event_ids": user_values["event_ids"], + "data": user_values.get("data", {}), } for room_id, room_values in content.items() - for event_id, ev_values in room_values.items() - for receipt_type, users in ev_values.items() - for user_id in users + for receipt_type, users in room_values.items() + for user_id, user_values in users.items() ] yield self._handle_new_receipts(receipts) @@ -82,9 +86,10 @@ class ReceiptsHandler(BaseHandler): receipt_type = receipt["receipt_type"] user_id = receipt["user_id"] event_ids = receipt["event_ids"] + data = receipt["data"] res = yield self.store.insert_receipt( - room_id, receipt_type, user_id, event_ids, + room_id, receipt_type, user_id, event_ids, data ) if not res: @@ -108,6 +113,7 @@ class ReceiptsHandler(BaseHandler): receipt_type = receipt["receipt_type"] user_id = receipt["user_id"] event_ids = receipt["event_ids"] + data = receipt["data"] remotedomains = set() @@ -124,10 +130,12 @@ class ReceiptsHandler(BaseHandler): edu_type="m.receipt", content={ room_id: { - event_id: { - receipt_type: [user_id] + receipt_type: { + user_id: { + "event_ids": event_ids, + "data": data, + } } - for event_id in event_ids }, }, ) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 503f68f85..c4e6b02bd 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -21,6 +21,7 @@ from synapse.util import unwrapFirstError from blist import sorteddict import logging +import ujson as json logger = logging.getLogger(__name__) @@ -91,8 +92,8 @@ class ReceiptsStore(SQLBaseStore): content.setdefault( row["event_id"], {} ).setdefault( - row["receipt_type"], [] - ).append(row["user_id"]) + row["receipt_type"], {} + )[row["user_id"]] = json.loads(row["data"]) defer.returnValue([{ "type": "m.receipt", @@ -124,7 +125,7 @@ class ReceiptsStore(SQLBaseStore): defer.returnValue(result) def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, - user_id, event_id, stream_id): + user_id, event_id, data, stream_id): # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts @@ -172,13 +173,14 @@ class ReceiptsStore(SQLBaseStore): "receipt_type": receipt_type, "user_id": user_id, "event_id": event_id, + "data": json.dumps(data), } ) return True @defer.inlineCallbacks - def insert_receipt(self, room_id, receipt_type, user_id, event_ids): + def insert_receipt(self, room_id, receipt_type, user_id, event_ids, data): if not event_ids: return @@ -214,6 +216,7 @@ class ReceiptsStore(SQLBaseStore): "insert_linearized_receipt", self.insert_linearized_receipt_txn, room_id, receipt_type, user_id, linearized_event_id, + data, stream_id=stream_id, ) @@ -221,22 +224,22 @@ class ReceiptsStore(SQLBaseStore): defer.returnValue(None) yield self.insert_graph_receipt( - room_id, receipt_type, user_id, event_ids + room_id, receipt_type, user_id, event_ids, data ) max_persisted_id = yield self._stream_id_gen.get_max_token(self) defer.returnValue((stream_id, max_persisted_id)) - def insert_graph_receipt(self, room_id, receipt_type, - user_id, event_ids): + def insert_graph_receipt(self, room_id, receipt_type, user_id, event_ids, + data): return self.runInteraction( "insert_graph_receipt", self.insert_graph_receipt_txn, - room_id, receipt_type, user_id, event_ids, + room_id, receipt_type, user_id, event_ids, data ) def insert_graph_receipt_txn(self, txn, room_id, receipt_type, - user_id, event_ids): + user_id, event_ids, data): self._simple_delete_txn( txn, table="receipts_graph", @@ -246,18 +249,16 @@ class ReceiptsStore(SQLBaseStore): "user_id": user_id, } ) - self._simple_insert_many_txn( + self._simple_insert_txn( txn, table="receipts_graph", - values=[ - { - "room_id": room_id, - "receipt_type": receipt_type, - "user_id": user_id, - "event_id": event_id, - } - for event_id in event_ids - ], + values={ + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + "event_ids": json.dumps(event_ids), + "data": json.dumps(data), + } ) diff --git a/synapse/storage/schema/delta/21/receipts.sql b/synapse/storage/schema/delta/21/receipts.sql index ac7738e37..2f64d609f 100644 --- a/synapse/storage/schema/delta/21/receipts.sql +++ b/synapse/storage/schema/delta/21/receipts.sql @@ -18,11 +18,9 @@ CREATE TABLE IF NOT EXISTS receipts_graph( room_id TEXT NOT NULL, receipt_type TEXT NOT NULL, user_id TEXT NOT NULL, - event_id TEXT NOT NULL -); - -CREATE INDEX receipts_graph_room_tuple ON receipts_graph( - room_id, receipt_type, user_id + event_ids TEXT NOT NULL, + data TEXT NOT NULL, + CONSTRAINT receipts_graph_uniqueness UNIQUE (room_id, receipt_type, user_id) ); CREATE TABLE IF NOT EXISTS receipts_linearized ( @@ -30,11 +28,9 @@ CREATE TABLE IF NOT EXISTS receipts_linearized ( room_id TEXT NOT NULL, receipt_type TEXT NOT NULL, user_id TEXT NOT NULL, - event_id TEXT NOT NULL -); - -CREATE INDEX receipts_linearized_room_tuple ON receipts_linearized( - room_id, receipt_type, user_id + event_id TEXT NOT NULL, + data TEXT NOT NULL, + CONSTRAINT receipts_linearized_uniqueness UNIQUE (room_id, receipt_type, user_id) ); CREATE INDEX receipts_linearized_id ON receipts_linearized( From c2d08ca62aebdfa689ae7d39ff74b2a43e4b5e3a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 9 Jul 2015 13:15:34 +0100 Subject: [PATCH 16/20] Integer timestamps --- synapse/handlers/receipts.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 8a052f071..403c1c849 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -53,7 +53,7 @@ class ReceiptsHandler(BaseHandler): "user_id": user_id, "event_ids": [event_id], "data": { - "ts": self.clock.time_msec() + "ts": int(self.clock.time_msec()), } } From f0979afdb0b0b8f96730a97cf0b51de6024cabda Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 9 Jul 2015 16:02:07 +0100 Subject: [PATCH 17/20] Remove spurious comment --- synapse/handlers/receipts.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 403c1c849..f847360d0 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Contains handlers for federation events.""" - from ._base import BaseHandler from twisted.internet import defer From ed887209520cc3268885c918738734f8b9874c81 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 9 Jul 2015 16:14:46 +0100 Subject: [PATCH 18/20] Handle error slightly better --- synapse/storage/receipts.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index c4e6b02bd..593032713 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -188,6 +188,7 @@ class ReceiptsStore(SQLBaseStore): linearized_event_id = event_ids[0] else: # we need to points in graph -> linearized form. + # TODO: Make this better. def graph_to_linear(txn): query = ( "SELECT event_id WHERE room_id = ? AND stream_ordering IN (" @@ -200,8 +201,7 @@ class ReceiptsStore(SQLBaseStore): if rows: return rows[0][0] else: - # TODO: ARGH?! - return None + raise RuntimeError("Unrecognized event_ids: %r" % (event_ids,)) linearized_event_id = yield self.runInteraction( "insert_receipt_conv", graph_to_linear From e5991af629df2e63c20c5f10e4589a9faf8305cb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 13 Jul 2015 13:30:43 +0100 Subject: [PATCH 19/20] Comments --- synapse/handlers/receipts.py | 16 ++++++++++++---- synapse/storage/receipts.py | 11 +++++++++++ 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index f847360d0..1925a4803 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -41,10 +41,9 @@ class ReceiptsHandler(BaseHandler): @defer.inlineCallbacks def received_client_receipt(self, room_id, receipt_type, user_id, event_id): - # 1. Persist. - # 2. Notify local clients - # 3. Notify remote servers - + """Called when a client tells us a local user has read up to the given + event_id in the room. + """ receipt = { "room_id": room_id, "receipt_type": receipt_type, @@ -62,6 +61,8 @@ class ReceiptsHandler(BaseHandler): @defer.inlineCallbacks def _received_remote_receipt(self, origin, content): + """Called when we receive an EDU of type m.receipt from a remote HS. + """ receipts = [ { "room_id": room_id, @@ -79,6 +80,8 @@ class ReceiptsHandler(BaseHandler): @defer.inlineCallbacks def _handle_new_receipts(self, receipts): + """Takes a list of receipts, stores them and informs the notifier. + """ for receipt in receipts: room_id = receipt["room_id"] receipt_type = receipt["receipt_type"] @@ -105,6 +108,9 @@ class ReceiptsHandler(BaseHandler): @defer.inlineCallbacks def _push_remotes(self, receipts): + """Given a list of receipts, works out which remote servers should be + poked and pokes them. + """ # TODO: Some of this stuff should be coallesced. for receipt in receipts: room_id = receipt["room_id"] @@ -140,6 +146,8 @@ class ReceiptsHandler(BaseHandler): @defer.inlineCallbacks def get_receipts_for_room(self, room_id, to_key): + """Gets all receipts for a room, upto the given key. + """ result = yield self.store.get_linearized_receipts_for_room( room_id, None, to_key ) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 593032713..56b9fedfd 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -35,6 +35,8 @@ class ReceiptsStore(SQLBaseStore): @defer.inlineCallbacks def get_linearized_receipts_for_rooms(self, room_ids, from_key, to_key): + """Get receipts for multiple rooms for sending to clients. + """ room_ids = set(room_ids) if from_key: @@ -54,6 +56,8 @@ class ReceiptsStore(SQLBaseStore): @defer.inlineCallbacks def get_linearized_receipts_for_room(self, room_id, from_key, to_key): + """Get receipts for a single room for sending to clients. + """ def f(txn): if from_key: sql = ( @@ -107,6 +111,8 @@ class ReceiptsStore(SQLBaseStore): @cached @defer.inlineCallbacks def get_graph_receipts_for_room(self, room_id): + """Get receipts for sending to remote servers. + """ rows = yield self._simple_select_list( table="receipts_graph", keyvalues={"room_id": room_id}, @@ -181,6 +187,11 @@ class ReceiptsStore(SQLBaseStore): @defer.inlineCallbacks def insert_receipt(self, room_id, receipt_type, user_id, event_ids, data): + """Insert a receipt, either from local client or remote server. + + Automatically does conversion between linearized and graph + representations. + """ if not event_ids: return From 4624d6035e28c4ee05e38234f2aa1671b4ac701a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 Jul 2015 10:19:07 +0100 Subject: [PATCH 20/20] Docs --- synapse/handlers/receipts.py | 11 ++++++++--- synapse/storage/receipts.py | 31 ++++++++++++++++++++++++++++--- 2 files changed, 36 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 1925a4803..5b3df6932 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -149,7 +149,8 @@ class ReceiptsHandler(BaseHandler): """Gets all receipts for a room, upto the given key. """ result = yield self.store.get_linearized_receipts_for_room( - room_id, None, to_key + room_id, + to_key=to_key, ) if not result: @@ -176,7 +177,9 @@ class ReceiptEventSource(object): rooms = yield self.store.get_rooms_for_user(user.to_string()) rooms = [room.room_id for room in rooms] events = yield self.store.get_linearized_receipts_for_rooms( - rooms, from_key, to_key + rooms, + from_key=from_key, + to_key=to_key, ) defer.returnValue((events, to_key)) @@ -196,7 +199,9 @@ class ReceiptEventSource(object): rooms = yield self.store.get_rooms_for_user(user.to_string()) rooms = [room.room_id for room in rooms] events = yield self.store.get_linearized_receipts_for_rooms( - rooms, from_key, to_key + rooms, + from_key=from_key, + to_key=to_key, ) defer.returnValue((events, to_key)) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 56b9fedfd..d515a0a15 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -34,8 +34,17 @@ class ReceiptsStore(SQLBaseStore): self._receipts_stream_cache = _RoomStreamChangeCache() @defer.inlineCallbacks - def get_linearized_receipts_for_rooms(self, room_ids, from_key, to_key): + def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None): """Get receipts for multiple rooms for sending to clients. + + Args: + room_ids (list): List of room_ids. + to_key (int): Max stream id to fetch receipts upto. + from_key (int): Min stream id to fetch receipts from. None fetches + from the start. + + Returns: + list: A list of receipts. """ room_ids = set(room_ids) @@ -46,7 +55,9 @@ class ReceiptsStore(SQLBaseStore): results = yield defer.gatherResults( [ - self.get_linearized_receipts_for_room(room_id, from_key, to_key) + self.get_linearized_receipts_for_room( + room_id, to_key, from_key=from_key + ) for room_id in room_ids ], consumeErrors=True, @@ -55,8 +66,17 @@ class ReceiptsStore(SQLBaseStore): defer.returnValue([ev for res in results for ev in res]) @defer.inlineCallbacks - def get_linearized_receipts_for_room(self, room_id, from_key, to_key): + def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None): """Get receipts for a single room for sending to clients. + + Args: + room_ids (str): The room id. + to_key (int): Max stream id to fetch receipts upto. + from_key (int): Min stream id to fetch receipts from. None fetches + from the start. + + Returns: + list: A list of receipts. """ def f(txn): if from_key: @@ -288,6 +308,9 @@ class _RoomStreamChangeCache(object): @defer.inlineCallbacks def get_rooms_changed(self, store, room_ids, key): + """Returns subset of room ids that have had new receipts since the + given key. If the key is too old it will just return the given list. + """ if key > (yield self._get_earliest_key(store)): keys = self._cache.keys() i = keys.bisect_right(key) @@ -302,6 +325,8 @@ class _RoomStreamChangeCache(object): @defer.inlineCallbacks def room_has_changed(self, store, room_id, key): + """Informs the cache that the room has been changed at the given key. + """ if key > (yield self._get_earliest_key(store)): old_key = self._room_to_key.get(room_id, None) if old_key: