Merge commit 'd7457c7661' of github.com:matrix-org/synapse into matrix-org-hotfixes

This commit is contained in:
Erik Johnston 2017-02-15 10:17:01 +00:00
commit 3b42bb96e4
4 changed files with 26 additions and 8 deletions

View file

@ -531,7 +531,7 @@ class PresenceHandler(object):
# There are things not in our in memory cache. Lets pull them out of # There are things not in our in memory cache. Lets pull them out of
# the database. # the database.
res = yield self.store.get_presence_for_users(missing) res = yield self.store.get_presence_for_users(missing)
states.update({state.user_id: state for state in res}) states.update(res)
missing = [user_id for user_id, state in states.items() if not state] missing = [user_id for user_id, state in states.items() if not state]
if missing: if missing:

View file

@ -18,6 +18,7 @@ from ._slaved_id_tracker import SlavedIdTracker
from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.storage import DataStore from synapse.storage import DataStore
from synapse.storage.presence import PresenceStore
class SlavedPresenceStore(BaseSlavedStore): class SlavedPresenceStore(BaseSlavedStore):
@ -35,7 +36,8 @@ class SlavedPresenceStore(BaseSlavedStore):
_get_active_presence = DataStore._get_active_presence.__func__ _get_active_presence = DataStore._get_active_presence.__func__
take_presence_startup_info = DataStore.take_presence_startup_info.__func__ take_presence_startup_info = DataStore.take_presence_startup_info.__func__
get_presence_for_users = DataStore.get_presence_for_users.__func__ _get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]
def get_current_presence_token(self): def get_current_presence_token(self):
return self._presence_id_gen.get_current_token() return self._presence_id_gen.get_current_token()

View file

@ -87,9 +87,17 @@ class HttpTransactionCache(object):
deferred = fn(*args, **kwargs) deferred = fn(*args, **kwargs)
# We don't add an errback to the raw deferred, so we ask ObservableDeferred # if the request fails with a Twisted failure, remove it
# to swallow the error. This is fine as the error will still be reported # from the transaction map. This is done to ensure that we don't
# to the observers. # cache transient errors like rate-limiting errors, etc.
def remove_from_map(err):
self.transactions.pop(txn_key, None)
return err
deferred.addErrback(remove_from_map)
# We don't add any other errbacks to the raw deferred, so we ask
# ObservableDeferred to swallow the error. This is fine as the error will
# still be reported to the observers.
observable = ObservableDeferred(deferred, consumeErrors=True) observable = ObservableDeferred(deferred, consumeErrors=True)
self.transactions[txn_key] = (observable, self.clock.time_msec()) self.transactions[txn_key] = (observable, self.clock.time_msec())
return observable.observe() return observable.observe()

View file

@ -15,7 +15,7 @@
from ._base import SQLBaseStore from ._base import SQLBaseStore
from synapse.api.constants import PresenceState from synapse.api.constants import PresenceState
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
from collections import namedtuple from collections import namedtuple
from twisted.internet import defer from twisted.internet import defer
@ -85,6 +85,9 @@ class PresenceStore(SQLBaseStore):
self.presence_stream_cache.entity_has_changed, self.presence_stream_cache.entity_has_changed,
state.user_id, stream_id, state.user_id, stream_id,
) )
self._invalidate_cache_and_stream(
txn, self._get_presence_for_user, (state.user_id,)
)
# Actually insert new rows # Actually insert new rows
self._simple_insert_many_txn( self._simple_insert_many_txn(
@ -143,7 +146,12 @@ class PresenceStore(SQLBaseStore):
"get_all_presence_updates", get_all_presence_updates_txn "get_all_presence_updates", get_all_presence_updates_txn
) )
@defer.inlineCallbacks @cached()
def _get_presence_for_user(self, user_id):
raise NotImplementedError()
@cachedList(cached_method_name="_get_presence_for_user", list_name="user_ids",
num_args=1, inlineCallbacks=True)
def get_presence_for_users(self, user_ids): def get_presence_for_users(self, user_ids):
rows = yield self._simple_select_many_batch( rows = yield self._simple_select_many_batch(
table="presence_stream", table="presence_stream",
@ -165,7 +173,7 @@ class PresenceStore(SQLBaseStore):
for row in rows: for row in rows:
row["currently_active"] = bool(row["currently_active"]) row["currently_active"] = bool(row["currently_active"])
defer.returnValue([UserPresenceState(**row) for row in rows]) defer.returnValue({row["user_id"]: UserPresenceState(**row) for row in rows})
def get_current_presence_token(self): def get_current_presence_token(self):
return self._presence_id_gen.get_current_token() return self._presence_id_gen.get_current_token()