From 7e77a82c5f4c2dbda1d00dace74a1aece2ab5b78 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 13 Aug 2015 16:58:10 +0100 Subject: [PATCH 01/18] Re-enable receipts --- synapse/handlers/receipts.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index 415dd339f..86c911c4b 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -171,7 +171,6 @@ class ReceiptEventSource(object): @defer.inlineCallbacks def get_new_events_for_user(self, user, from_key, limit): - defer.returnValue(([], from_key)) from_key = int(from_key) to_key = yield self.get_current_key() @@ -194,7 +193,6 @@ class ReceiptEventSource(object): @defer.inlineCallbacks def get_pagination_rows(self, user, config, key): to_key = int(config.from_key) - defer.returnValue(([], to_key)) if config.to_key: from_key = int(config.to_key) From 5db56779699a3c9a95701e6373ac8bd5cace7860 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 13 Aug 2015 16:58:23 +0100 Subject: [PATCH 02/18] Add metrics to the receipts cache --- synapse/metrics/metric.py | 7 +++++++ synapse/storage/receipts.py | 6 ++++++ 2 files changed, 13 insertions(+) diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py index 21b37748f..5b1e9c3dd 100644 --- a/synapse/metrics/metric.py +++ b/synapse/metrics/metric.py @@ -151,5 +151,12 @@ class CacheMetric(object): def inc_misses(self, *values): self.total.inc(*values) + def inc_hits_by(self, inc, *values): + self.hits.inc_by(inc, *values) + self.total.inc_by(inc, *values) + + def inc_misses_by(self, inc, *values): + self.total.inc_by(inc, *values) + def render(self): return self.hits.render() + self.total.render() + self.size.render() diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index cac1a5657..42f4b5f78 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -15,6 +15,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches import cache_counter, caches_by_name from twisted.internet import defer @@ -305,6 +306,8 @@ class _RoomStreamChangeCache(object): self._room_to_key = {} self._cache = sorteddict() self._earliest_key = None + self.name = "ReceiptsRoomChangeCache" + caches_by_name[self.name] = self._cache @defer.inlineCallbacks def get_rooms_changed(self, store, room_ids, key): @@ -321,6 +324,9 @@ class _RoomStreamChangeCache(object): else: result = room_ids + cache_counter.inc_hits_by(len(result), self.name) + cache_counter.inc_misses_by(len(room_ids) - len(result), self.name) + defer.returnValue(result) @defer.inlineCallbacks From 68b255c5a19cca4fce6a4225447e143b5ed7814b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Aug 2015 15:06:22 +0100 Subject: [PATCH 03/18] Batch _get_linearized_receipts_for_rooms --- synapse/storage/receipts.py | 79 ++++++++++++++++++++++++++++++------- 1 file changed, 64 insertions(+), 15 deletions(-) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 42f4b5f78..fb35472ad 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -14,7 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore -from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList from synapse.util.caches import cache_counter, caches_by_name from twisted.internet import defer @@ -55,19 +55,13 @@ class ReceiptsStore(SQLBaseStore): self, room_ids, from_key ) - results = yield defer.gatherResults( - [ - self.get_linearized_receipts_for_room( - room_id, to_key, from_key=from_key - ) - for room_id in room_ids - ], - consumeErrors=True, - ).addErrback(unwrapFirstError) + results = yield self._get_linearized_receipts_for_rooms( + room_ids, to_key, from_key=from_key + ) - defer.returnValue([ev for res in results for ev in res]) + defer.returnValue([ev for res in results.values() for ev in res]) - @defer.inlineCallbacks + @cachedInlineCallbacks(num_args=3, max_entries=5000) def get_linearized_receipts_for_room(self, room_id, to_key, from_key=None): """Get receipts for a single room for sending to clients. @@ -127,6 +121,61 @@ class ReceiptsStore(SQLBaseStore): "content": content, }]) + @cachedList(cache=get_linearized_receipts_for_room.cache, list_name="room_ids", + num_args=3, inlineCallbacks=True) + def _get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None): + if not room_ids: + defer.returnValue({}) + + def f(txn): + if from_key: + sql = ( + "SELECT * FROM receipts_linearized WHERE" + " room_id IN (%s) AND stream_id > ? AND stream_id <= ?" + ) % ( + ",".join(["?"] * len(room_ids)) + ) + args = list(room_ids) + args.extend([from_key, to_key]) + + txn.execute(sql, args) + else: + sql = ( + "SELECT * FROM receipts_linearized WHERE" + " room_id IN (%s) AND stream_id <= ?" + ) % ( + ",".join(["?"] * len(room_ids)) + ) + + args = list(room_ids) + args.append(to_key) + + txn.execute(sql, args) + + return self.cursor_to_dict(txn) + + txn_results = yield self.runInteraction( + "_get_linearized_receipts_for_rooms", f + ) + + results = {} + for row in txn_results: + results.setdefault(row["room_id"], { + "type": "m.receipt", + "room_id": row["room_id"], + "content": {}, + })["content"].setdefault( + row["event_id"], {} + ).setdefault( + row["receipt_type"], {} + )[row["user_id"]] = json.loads(row["data"]) + + results = { + room_id: [results[room_id]] if room_id in results else [] + for room_id in room_ids + } + defer.returnValue(results) + def get_max_receipt_stream_id(self): return self._receipts_id_gen.get_max_token(self) @@ -321,11 +370,11 @@ class _RoomStreamChangeCache(object): result = set( self._cache[k] for k in keys[i:] ).intersection(room_ids) + + cache_counter.inc_hits(self.name) else: result = room_ids - - cache_counter.inc_hits_by(len(result), self.name) - cache_counter.inc_misses_by(len(room_ids) - len(result), self.name) + cache_counter.inc_misses(self.name) defer.returnValue(result) From 891dfd90bd97ad485d54dfae3e510c640de8e978 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Aug 2015 15:42:52 +0100 Subject: [PATCH 04/18] Fix pending_calls metric to not lie --- synapse/metrics/__init__.py | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 0be977299..7b8a20108 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -158,18 +158,33 @@ def runUntilCurrentTimer(func): @functools.wraps(func) def f(*args, **kwargs): - pending_calls = len(reactor.getDelayedCalls()) + now = reactor.seconds() + num_pending = 0 + + # _newTimedCalls is one long list of *all* pending calls. Below loop + # is based off of impl of reactor.runUntilCurrent + for p in reactor._newTimedCalls: + if p.time > now: + break + + if p.delayed_time > 0: + continue + + num_pending += 1 + + num_pending += len(reactor.threadCallQueue) + start = time.time() * 1000 ret = func(*args, **kwargs) end = time.time() * 1000 tick_time.inc_by(end - start) - pending_calls_metric.inc_by(pending_calls) + pending_calls_metric.inc_by(num_pending) return ret return f -if hasattr(reactor, "runUntilCurrent"): +if hasattr(reactor, "runUntilCurrent") and hasattr(reactor, "_newTimedCalls"): # runUntilCurrent is called when we have pending calls. It is called once # per iteratation after fd polling. reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent) From 4d8e1e1f9e6fb143aa1317f31e561ef7629bfbf2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 17 Aug 2015 13:36:07 +0100 Subject: [PATCH 05/18] Remove added unused methods --- synapse/metrics/metric.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py index 5b1e9c3dd..21b37748f 100644 --- a/synapse/metrics/metric.py +++ b/synapse/metrics/metric.py @@ -151,12 +151,5 @@ class CacheMetric(object): def inc_misses(self, *values): self.total.inc(*values) - def inc_hits_by(self, inc, *values): - self.hits.inc_by(inc, *values) - self.total.inc_by(inc, *values) - - def inc_misses_by(self, inc, *values): - self.total.inc_by(inc, *values) - def render(self): return self.hits.render() + self.total.render() + self.size.render() From d3d582bc1c1d6d497f72d6b248e4f15722f6e18d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 17 Aug 2015 13:38:09 +0100 Subject: [PATCH 06/18] Remove unused import --- synapse/storage/receipts.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index fb35472ad..46c06e42b 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -19,8 +19,6 @@ from synapse.util.caches import cache_counter, caches_by_name from twisted.internet import defer -from synapse.util import unwrapFirstError - from blist import sorteddict import logging import ujson as json From 8f4165628b7e8fa5510bd3cffe01bb93cb7e2df7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 17 Aug 2015 14:43:54 +0100 Subject: [PATCH 07/18] Add index receipts_linearized_room_stream --- synapse/storage/__init__.py | 2 +- .../storage/schema/delta/22/receipts_index.sql | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/22/receipts_index.sql diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index c6ce65b4c..f154b1c8a 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -54,7 +54,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 21 +SCHEMA_VERSION = 22 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/22/receipts_index.sql b/synapse/storage/schema/delta/22/receipts_index.sql new file mode 100644 index 000000000..b182b2b66 --- /dev/null +++ b/synapse/storage/schema/delta/22/receipts_index.sql @@ -0,0 +1,18 @@ +/* Copyright 2015 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE INDEX receipts_linearized_room_stream ON receipts_linearized( + room_id, stream_id +); From cfc503681f3febbd80faeda0da4134b98bcbcbbc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Aug 2015 10:49:23 +0100 Subject: [PATCH 08/18] Comments --- synapse/storage/receipts.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 46c06e42b..0ed9f45ec 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -158,15 +158,25 @@ class ReceiptsStore(SQLBaseStore): results = {} for row in txn_results: - results.setdefault(row["room_id"], { + # We want a single event per room, since we want to batch the + # receipts by room, event and type. + room_event = results.setdefault(row["room_id"], { "type": "m.receipt", "room_id": row["room_id"], "content": {}, - })["content"].setdefault( + }) + + # The content is of the form: + # {"$foo:bar": { "read": { "@user:host": }, .. }, .. } + event_id = room_event["content"].setdefault( row["event_id"], {} - ).setdefault( + ) + + receipt_type = event_id.setdefault( row["receipt_type"], {} - )[row["user_id"]] = json.loads(row["data"]) + ) + + receipt_type[row["user_id"]] = json.loads(row["data"]) results = { room_id: [results[room_id]] if room_id in results else [] From 8199475ce062c8ed7b050ddecd51c6c3ec22e560 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Aug 2015 11:44:10 +0100 Subject: [PATCH 09/18] Ensure we never return a None event from _get_state_for_groups --- synapse/storage/state.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index ab3ad5a07..b32141281 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -405,16 +405,21 @@ class StateStore(SQLBaseStore): state_event = state_events[event_id] state_dict[(state_event.type, state_event.state_key)] = state_event + results[group] = state_dict + self._state_group_cache.update( cache_seq_num, key=group, - value=state_dict, + value=results[group], full=(types is None), ) - # We replace here to remove all the entries with None values. + # Remove all the entries with None values. The None values were just + # used for bookkeeping in the cache. + for group, state_dict in results.items(): results[group] = { - key: value for key, value in state_dict.items() if value + key: event for key, event in state_dict.items() + if event } defer.returnValue(results) From d3da63f7667f93e30f6071c1f5a0a0ffb9419e79 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Aug 2015 11:47:00 +0100 Subject: [PATCH 10/18] Use more helpful variable names --- synapse/metrics/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 7b8a20108..2e307a03a 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -163,11 +163,11 @@ def runUntilCurrentTimer(func): # _newTimedCalls is one long list of *all* pending calls. Below loop # is based off of impl of reactor.runUntilCurrent - for p in reactor._newTimedCalls: - if p.time > now: + for delayed_call in reactor._newTimedCalls: + if delayed_call.time > now: break - if p.delayed_time > 0: + if delayed_call.delayed_time > 0: continue num_pending += 1 From 6e7d36a72c5778be6d1a79d296b1fcdce667839d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Aug 2015 11:51:08 +0100 Subject: [PATCH 11/18] Also check for presence of 'threadCallQueue' in reactor --- synapse/metrics/__init__.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 2e307a03a..d7bcad8a8 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -184,7 +184,14 @@ def runUntilCurrentTimer(func): return f -if hasattr(reactor, "runUntilCurrent") and hasattr(reactor, "_newTimedCalls"): +try: + # Ensure the reactor has all the attributes we expect + reactor.runUntilCurrent + reactor._newTimedCalls + reactor.threadCallQueue + # runUntilCurrent is called when we have pending calls. It is called once # per iteratation after fd polling. reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent) +except AttributeError: + pass From f704c10f29ac1b013966cc8e43e74d38449cee6a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Aug 2015 11:54:03 +0100 Subject: [PATCH 12/18] Rename unhelpful variable name --- synapse/storage/receipts.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 0ed9f45ec..a53506354 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -168,13 +168,8 @@ class ReceiptsStore(SQLBaseStore): # The content is of the form: # {"$foo:bar": { "read": { "@user:host": }, .. }, .. } - event_id = room_event["content"].setdefault( - row["event_id"], {} - ) - - receipt_type = event_id.setdefault( - row["receipt_type"], {} - ) + event_entry = room_event["content"].setdefault(row["event_id"], {}) + receipt_type = event_entry.setdefault(row["receipt_type"], {}) receipt_type[row["user_id"]] = json.loads(row["data"]) From ee59af9ac0ede0efe97cb70edbe6ed9e21ff8db4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Aug 2015 15:16:28 +0100 Subject: [PATCH 13/18] Set request.authenticated_entity for application services --- synapse/api/auth.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/api/auth.py b/synapse/api/auth.py index a7f428a96..1e3b0fbfb 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -352,6 +352,8 @@ class Auth(object): if not user_id: raise KeyError + request.authenticated_entity = user_id + defer.returnValue( (UserID.from_string(user_id), ClientInfo("", "")) ) @@ -425,6 +427,7 @@ class Auth(object): "Unrecognised access token.", errcode=Codes.UNKNOWN_TOKEN ) + request.authenticated_entity = service.sender defer.returnValue(service) except KeyError: raise AuthError( From 128ed32e6b64b7c590e288c29dfb993330a0ff8e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Aug 2015 15:51:23 +0100 Subject: [PATCH 14/18] Bump size of get_presence_state cache --- synapse/storage/presence.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 9b136f311..34ca3b9a5 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -39,7 +39,7 @@ class PresenceStore(SQLBaseStore): desc="has_presence_state", ) - @cached() + @cached(max_entries=2000) def get_presence_state(self, user_localpart): return self._simple_select_one( table="presence", From 0bfdaf1f4fe73c6afd211a7744f82d8d0b2b1e9d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Aug 2015 16:26:07 +0100 Subject: [PATCH 15/18] Rejig the code to make it nicer --- synapse/storage/state.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index b32141281..c831d8d38 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -398,6 +398,7 @@ class StateStore(SQLBaseStore): # for them again. state_dict = {key: None for key in types} state_dict.update(results[group]) + results[group] = state_dict else: state_dict = results[group] @@ -405,12 +406,10 @@ class StateStore(SQLBaseStore): state_event = state_events[event_id] state_dict[(state_event.type, state_event.state_key)] = state_event - results[group] = state_dict - self._state_group_cache.update( cache_seq_num, key=group, - value=results[group], + value=state_dict, full=(types is None), ) From a82938416db87adde7c7e7965b93c2eecf86695a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 18 Aug 2015 16:28:13 +0100 Subject: [PATCH 16/18] Remove newline because vertical whitespace makes mjark sad --- synapse/storage/state.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/storage/state.py b/synapse/storage/state.py index c831d8d38..c9110e630 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -417,8 +417,7 @@ class StateStore(SQLBaseStore): # used for bookkeeping in the cache. for group, state_dict in results.items(): results[group] = { - key: event for key, event in state_dict.items() - if event + key: event for key, event in state_dict.items() if event } defer.returnValue(results) From abc6986a24e7f843ffdfc1610833feb96462d5a8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 19 Aug 2015 09:30:52 +0100 Subject: [PATCH 17/18] Fix regression where we incorrectly responded with a 200 to /login --- synapse/handlers/auth.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 98d99dd0a..4947c4051 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -283,7 +283,7 @@ class AuthHandler(BaseHandler): StoreError if there was a problem storing the token. LoginError if there was an authentication problem. """ - self._check_password(user_id, password) + yield self._check_password(user_id, password) reg_handler = self.hs.get_handlers().registration_handler access_token = reg_handler.generate_token(user_id) @@ -291,6 +291,7 @@ class AuthHandler(BaseHandler): yield self.store.add_access_token_to_user(user_id, access_token) defer.returnValue(access_token) + @defer.inlineCallbacks def _check_password(self, user_id, password): """Checks that user_id has passed password, raises LoginError if not.""" user_info = yield self.store.get_user_by_id(user_id=user_id) From 40da1f200d6a7778acc76562fcecbbcd3f97eee3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 19 Aug 2015 09:41:07 +0100 Subject: [PATCH 18/18] Remove an access token log line --- synapse/handlers/auth.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 4947c4051..be2baeaec 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -287,7 +287,7 @@ class AuthHandler(BaseHandler): reg_handler = self.hs.get_handlers().registration_handler access_token = reg_handler.generate_token(user_id) - logger.info("Adding token %s for user %s", access_token, user_id) + logger.info("Logging in user %s", user_id) yield self.store.add_access_token_to_user(user_id, access_token) defer.returnValue(access_token)