Initial cut

This commit is contained in:
Erik Johnston 2016-02-15 17:10:40 +00:00
parent 9e7900da1e
commit e5999bfb1a
16 changed files with 1004 additions and 1207 deletions

View file

@ -14,73 +14,128 @@
# limitations under the License.
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached, cachedList
from synapse.api.constants import PresenceState
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
from collections import namedtuple
from twisted.internet import defer
class UserPresenceState(namedtuple("UserPresenceState",
("user_id", "state", "last_active", "last_federation_update",
"last_user_sync", "status_msg", "currently_active"))):
"""Represents the current presence state of the user.
user_id (str)
last_active (int): Time in msec that the user last interacted with server.
last_federation_update (int): Time in msec since either a) we sent a presence
update to other servers or b) we received a presence update, depending
on if is a local user or not.
last_user_sync (int): Time in msec that the user last *completed* a sync
(or event stream).
status_msg (str): User set status message.
"""
def copy_and_replace(self, **kwargs):
return self._replace(**kwargs)
@classmethod
def default(cls, user_id):
"""Returns a default presence state.
"""
return cls(
user_id=user_id,
state=PresenceState.OFFLINE,
last_active=0,
last_federation_update=0,
last_user_sync=0,
status_msg=None,
currently_active=False,
)
class PresenceStore(SQLBaseStore):
def create_presence(self, user_localpart):
res = self._simple_insert(
table="presence",
values={"user_id": user_localpart},
desc="create_presence",
@defer.inlineCallbacks
def update_presence(self, presence_states):
stream_id_manager = yield self._presence_id_gen.get_next(self)
with stream_id_manager as stream_id:
yield self.runInteraction(
"update_presence",
self._update_presence_txn, stream_id, presence_states,
)
defer.returnValue((stream_id, self._presence_id_gen.get_max_token()))
def _update_presence_txn(self, txn, stream_id, presence_states):
for state in presence_states:
txn.call_after(
self.presence_stream_cache.entity_has_changed,
state.user_id, stream_id,
)
# Actually insert new rows
self._simple_insert_many_txn(
txn,
table="presence_stream",
values=[
{
"stream_id": stream_id,
"user_id": state.user_id,
"state": state.state,
"last_active": state.last_active,
"last_federation_update": state.last_federation_update,
"last_user_sync": state.last_user_sync,
"status_msg": state.status_msg,
"currently_active": state.currently_active,
}
for state in presence_states
],
)
self.get_presence_state.invalidate((user_localpart,))
return res
def has_presence_state(self, user_localpart):
return self._simple_select_one(
table="presence",
keyvalues={"user_id": user_localpart},
retcols=["user_id"],
allow_none=True,
desc="has_presence_state",
# Delete old rows to stop database from getting really big
sql = (
"DELETE FROM presence_stream WHERE"
" stream_id < ?"
" AND user_id IN (%s)"
)
@cached(max_entries=2000)
def get_presence_state(self, user_localpart):
return self._simple_select_one(
table="presence",
keyvalues={"user_id": user_localpart},
retcols=["state", "status_msg", "mtime"],
desc="get_presence_state",
batches = (
presence_states[i:i + 50]
for i in xrange(0, len(presence_states), 50)
)
@cachedList(get_presence_state.cache, list_name="user_localparts",
inlineCallbacks=True)
def get_presence_states(self, user_localparts):
rows = yield self._simple_select_many_batch(
table="presence",
column="user_id",
iterable=user_localparts,
retcols=("user_id", "state", "status_msg", "mtime",),
desc="get_presence_states",
)
defer.returnValue({
row["user_id"]: {
"state": row["state"],
"status_msg": row["status_msg"],
"mtime": row["mtime"],
}
for row in rows
})
for states in batches:
args = [stream_id]
args.extend(s.user_id for s in states)
txn.execute(
sql % (",".join("?" for _ in states),),
args
)
@defer.inlineCallbacks
def set_presence_state(self, user_localpart, new_state):
res = yield self._simple_update_one(
table="presence",
keyvalues={"user_id": user_localpart},
updatevalues={"state": new_state["state"],
"status_msg": new_state["status_msg"],
"mtime": self._clock.time_msec()},
desc="set_presence_state",
def get_presence_for_users(self, user_ids):
rows = yield self._simple_select_many_batch(
table="presence_stream",
column="user_id",
iterable=user_ids,
keyvalues={},
retcols=(
"user_id",
"state",
"last_active",
"last_federation_update",
"last_user_sync",
"status_msg",
"currently_active",
),
)
self.get_presence_state.invalidate((user_localpart,))
defer.returnValue(res)
for row in rows:
row["currently_active"] = bool(row["currently_active"])
defer.returnValue([UserPresenceState(**row) for row in rows])
def get_current_presence_token(self):
return self._presence_id_gen.get_max_token()
def allow_presence_visible(self, observed_localpart, observer_userid):
return self._simple_insert(
@ -128,6 +183,7 @@ class PresenceStore(SQLBaseStore):
desc="set_presence_list_accepted",
)
self.get_presence_list_accepted.invalidate((observer_localpart,))
self.get_presence_list_observers_accepted.invalidate((observed_userid,))
defer.returnValue(result)
def get_presence_list(self, observer_localpart, accepted=None):
@ -154,6 +210,19 @@ class PresenceStore(SQLBaseStore):
desc="get_presence_list_accepted",
)
@cachedInlineCallbacks()
def get_presence_list_observers_accepted(self, observed_userid):
user_localparts = yield self._simple_select_onecol(
table="presence_list",
keyvalues={"observed_user_id": observed_userid, "accepted": True},
retcol="user_id",
desc="get_presence_list_accepted",
)
defer.returnValue([
"@%s:%s" % (u, self.hs.hostname,) for u in user_localparts
])
@defer.inlineCallbacks
def del_presence_list(self, observer_localpart, observed_userid):
yield self._simple_delete_one(
@ -163,3 +232,4 @@ class PresenceStore(SQLBaseStore):
desc="del_presence_list",
)
self.get_presence_list_accepted.invalidate((observer_localpart,))
self.get_presence_list_observers_accepted.invalidate((observed_userid,))