From ecd7e36047d090cdb027f500b0f95a375ba61811 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 13 Feb 2017 13:16:48 +0000 Subject: [PATCH 1/5] http txns: Do not cache error responses Previously we did. This meant that, amongst other errors, rate-limiting errors would be cached and prevent messages with that txn ID being sent. --- synapse/rest/client/transactions.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py index efa77b8c5..6396a0803 100644 --- a/synapse/rest/client/transactions.py +++ b/synapse/rest/client/transactions.py @@ -81,7 +81,16 @@ class HttpTransactionCache(object): Deferred which resolves to a tuple of (response_code, response_dict). """ try: - return self.transactions[txn_key][0].observe() + observable = self.transactions[txn_key][0] + if not observable.has_called() or observable.has_succeeded(): + return observable.observe() + # if the request has already been called with a non-2xx status + # (a Twisted failure), remove it from the transaction map. + # This is done to ensure that we don't cache rate-limiting errors, etc. + res = observable.get_result() + if res.value.code >= 300: + del self.transactions[txn_key] + # fall through except (KeyError, IndexError): pass # execute the function instead. From feb15dc99f02e6cb0a84a53e397529c51743f114 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 13 Feb 2017 13:33:12 +0000 Subject: [PATCH 2/5] Don't cache errors at all --- synapse/rest/client/transactions.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py index 6396a0803..95376a2fb 100644 --- a/synapse/rest/client/transactions.py +++ b/synapse/rest/client/transactions.py @@ -81,16 +81,7 @@ class HttpTransactionCache(object): Deferred which resolves to a tuple of (response_code, response_dict). """ try: - observable = self.transactions[txn_key][0] - if not observable.has_called() or observable.has_succeeded(): - return observable.observe() - # if the request has already been called with a non-2xx status - # (a Twisted failure), remove it from the transaction map. - # This is done to ensure that we don't cache rate-limiting errors, etc. - res = observable.get_result() - if res.value.code >= 300: - del self.transactions[txn_key] - # fall through + return self.transactions[txn_key][0].observe() except (KeyError, IndexError): pass # execute the function instead. @@ -101,6 +92,14 @@ class HttpTransactionCache(object): # to the observers. observable = ObservableDeferred(deferred, consumeErrors=True) self.transactions[txn_key] = (observable, self.clock.time_msec()) + + # if the request fails with a Twisted failure, remove it + # from the transaction map. This is done to ensure that we don't + # cache transient errors like rate-limiting errors, etc. + def remove_from_map(err): + del self.transactions[txn_key] + return err + observable.addErrback(remove_from_map) return observable.observe() def _cleanup(self): From 808ddf0ae72ef45d887e00c07ba834d0873ceb8d Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 13 Feb 2017 13:36:15 +0000 Subject: [PATCH 3/5] Pop the txn from the map in case it has already been deleted somehow --- synapse/rest/client/transactions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py index 95376a2fb..7b742ca7a 100644 --- a/synapse/rest/client/transactions.py +++ b/synapse/rest/client/transactions.py @@ -97,7 +97,7 @@ class HttpTransactionCache(object): # from the transaction map. This is done to ensure that we don't # cache transient errors like rate-limiting errors, etc. def remove_from_map(err): - del self.transactions[txn_key] + self.transactions.pop(txn_key, None) return err observable.addErrback(remove_from_map) return observable.observe() From d0497425f81ed21b58046f5f8725fc70ffcc0544 Mon Sep 17 00:00:00 2001 From: Kegan Dougal Date: Mon, 13 Feb 2017 13:49:44 +0000 Subject: [PATCH 4/5] Ordering is important on errbacks so add the cleanup func before creating an ObservableDeferred --- synapse/rest/client/transactions.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py index 7b742ca7a..fceca2ede 100644 --- a/synapse/rest/client/transactions.py +++ b/synapse/rest/client/transactions.py @@ -87,19 +87,19 @@ class HttpTransactionCache(object): deferred = fn(*args, **kwargs) - # We don't add an errback to the raw deferred, so we ask ObservableDeferred - # to swallow the error. This is fine as the error will still be reported - # to the observers. - observable = ObservableDeferred(deferred, consumeErrors=True) - self.transactions[txn_key] = (observable, self.clock.time_msec()) - # if the request fails with a Twisted failure, remove it # from the transaction map. This is done to ensure that we don't # cache transient errors like rate-limiting errors, etc. def remove_from_map(err): self.transactions.pop(txn_key, None) return err - observable.addErrback(remove_from_map) + deferred.addErrback(remove_from_map) + + # We don't add any other errbacks to the raw deferred, so we ask + # ObservableDeferred to swallow the error. This is fine as the error will + # still be reported to the observers. + observable = ObservableDeferred(deferred, consumeErrors=True) + self.transactions[txn_key] = (observable, self.clock.time_msec()) return observable.observe() def _cleanup(self): From 9e617cd4c2da527caf93799e286b42352ad492d2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 13 Feb 2017 13:50:03 +0000 Subject: [PATCH 5/5] Cache get_presence storage --- synapse/handlers/presence.py | 2 +- synapse/replication/slave/storage/presence.py | 4 +++- synapse/storage/presence.py | 14 +++++++++++--- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index fdfce2a88..da610e430 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -531,7 +531,7 @@ class PresenceHandler(object): # There are things not in our in memory cache. Lets pull them out of # the database. res = yield self.store.get_presence_for_users(missing) - states.update({state.user_id: state for state in res}) + states.update(res) missing = [user_id for user_id, state in states.items() if not state] if missing: diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py index 703f4a49b..40f6c9a38 100644 --- a/synapse/replication/slave/storage/presence.py +++ b/synapse/replication/slave/storage/presence.py @@ -18,6 +18,7 @@ from ._slaved_id_tracker import SlavedIdTracker from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.storage import DataStore +from synapse.storage.presence import PresenceStore class SlavedPresenceStore(BaseSlavedStore): @@ -35,7 +36,8 @@ class SlavedPresenceStore(BaseSlavedStore): _get_active_presence = DataStore._get_active_presence.__func__ take_presence_startup_info = DataStore.take_presence_startup_info.__func__ - get_presence_for_users = DataStore.get_presence_for_users.__func__ + _get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"] + get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"] def get_current_presence_token(self): return self._presence_id_gen.get_current_token() diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py index 7460f98a1..4d1590d2b 100644 --- a/synapse/storage/presence.py +++ b/synapse/storage/presence.py @@ -15,7 +15,7 @@ from ._base import SQLBaseStore from synapse.api.constants import PresenceState -from synapse.util.caches.descriptors import cached, cachedInlineCallbacks +from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList from collections import namedtuple from twisted.internet import defer @@ -85,6 +85,9 @@ class PresenceStore(SQLBaseStore): self.presence_stream_cache.entity_has_changed, state.user_id, stream_id, ) + self._invalidate_cache_and_stream( + txn, self._get_presence_for_user, (state.user_id,) + ) # Actually insert new rows self._simple_insert_many_txn( @@ -143,7 +146,12 @@ class PresenceStore(SQLBaseStore): "get_all_presence_updates", get_all_presence_updates_txn ) - @defer.inlineCallbacks + @cached() + def _get_presence_for_user(self, user_id): + raise NotImplementedError() + + @cachedList(cached_method_name="_get_presence_for_user", list_name="user_ids", + num_args=1, inlineCallbacks=True) def get_presence_for_users(self, user_ids): rows = yield self._simple_select_many_batch( table="presence_stream", @@ -165,7 +173,7 @@ class PresenceStore(SQLBaseStore): for row in rows: row["currently_active"] = bool(row["currently_active"]) - defer.returnValue([UserPresenceState(**row) for row in rows]) + defer.returnValue({row["user_id"]: UserPresenceState(**row) for row in rows}) def get_current_presence_token(self): return self._presence_id_gen.get_current_token()